## Serialization, Aggregation

In [2]:
#run this code beofre creating any context or session

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
#import the necessary libraries

from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel

In [4]:
#check the Python Version

import sys
print(sys.version)

3.11.4 (main, Jul  5 2023, 14:15:25) [GCC 11.2.0]


In [5]:
#create the spark context and session

conf = SparkConf().setAppName("NewApp").setMaster("local")
sc = SparkContext(conf = conf)
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/25 05:44:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/25 05:44:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
#create a list

test_df = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]


In [7]:
#create the schema

ud_schema = ["employee_name","department","state","salary","age","bonus"]

In [8]:
#create a dataframe

df = spark.createDataFrame(data=test_df,schema = ud_schema)

In [9]:
#show the dataframe

df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



                                                                                

In [None]:
#cache the dataframe

df.cache()

In [None]:
#Serialization
#StorageLevel decides how RDD should be stored. In Apache Spark, StorageLevel decides whether RDD should be stored in the memory or should it be stored over the disk, or both. 
#It also decides whether to serialize RDD and whether to replicate RDD partitions.


test = StorageLevel(useDisk=False, useMemory=True, useOffHeap=False,deserialized=False,replication = 1)
print(test)

In [None]:
df.presist(storageLevel=test)

In [None]:
#Aggregation

df.groupBy("department").agg(sum("salary").alias("sum_salary"), avg("salary").alias("avg_salary")).show()

In [15]:
#to use SQL queries on Dataframes, create a tempView first
df.createOrReplaceTempView("dftemp")

In [17]:
df1 = spark.sql("select * from dftemp").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [28]:
#check which database is being used
#select database()

In [29]:
#get the number of partitions
df.rdd.getNumPartitions()

1

In [31]:
df.write.format("csv").partitionBy("department").mode("overwrite").save("/home/labuser/Downloads/Partitions")

                                                                                

if the number of partitions are changed using repartition, then shuffling occurs which is not always wanted


hence, if the number of partitions are to be reduced, then coalesce should be used

In [32]:
#foreach
df.foreach(print)

Row(employee_name='James', department='Sales', state='NY', salary=90000, age=34, bonus=10000)
Row(employee_name='Michael', department='Sales', state='NY', salary=86000, age=56, bonus=20000)
Row(employee_name='Robert', department='Sales', state='CA', salary=81000, age=30, bonus=23000)
Row(employee_name='Maria', department='Finance', state='CA', salary=90000, age=24, bonus=23000)
Row(employee_name='Raman', department='Finance', state='CA', salary=99000, age=40, bonus=24000)
Row(employee_name='Scott', department='Finance', state='NY', salary=83000, age=36, bonus=19000)
Row(employee_name='Jen', department='Finance', state='NY', salary=79000, age=53, bonus=15000)
Row(employee_name='Jeff', department='Marketing', state='CA', salary=80000, age=25, bonus=18000)
Row(employee_name='Kumar', department='Marketing', state='NY', salary=91000, age=50, bonus=21000)


## Complex Data Processing

In [37]:
empSchema = StructType([StructField("Department",IntegerType(),True),
                        StructField("Name",StringType(), True),
                        StructField("Age",StringType(), True),
                        StructField("Address", StructType([
                            StructType("City", StringType(), True),
                            StructType("State", StringType(), True)
            ]),True)
])

TypeError: StructType.__init__() takes from 1 to 2 positional arguments but 4 were given

In [None]:
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("state", StringType(), True)
    ]), True)
])


df = spark.read.schema(schema).json(spark.sparkContext.parallelize(json_data))

