# Pyspark Environment Setup

In [1]:
import os
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'

# Get started With First Spark Session Program

In [2]:
#Import SparkSession
from pyspark.sql import SparkSession

#Create a Spark Session
spark = SparkSession.builder \
        .appName("Sparksession") \
        .master("local") \
        .getOrCreate()

In [3]:
# Test the data
data = [("Alice", 23),("Bob", 30),("Mahesh", 21)]
df = spark.createDataFrame(data, ["Name", "Age"])

#Let us show the data using show() method
df.show()

+------+---+
|  Name|Age|
+------+---+
| Alice| 23|
|   Bob| 30|
|Mahesh| 21|
+------+---+



In [4]:
spark.stop()

# Difference Between SparkContext and SparkSession

Spark Context :
- Used to be the entry point for spark in the earlier versions say 1.x
- Represents connection to spark cluster
- Coordinates task execution across the cluster
- Creates RDDs (Resilient Distributed Datasets)
- Performs Transformations and defines actions.

Spark Session :
- The entry point for Spark since the version 2.0 that provides simple Interaction.
- Combines the Functionalities like HiveContext, SparkContext, SQLContext and StreamingContext.
- Supports multiple Programming Languages like : Scala, Java, R and Python
- Extends the functionality of Spark Context.
- Supports Advanced abstractions like Datasets and DataFrames
- Provides Data Source APIs, Machine Learning Algorithms and streaming capabilities

## Creating PySpark Context

In [5]:
from pyspark import SparkContext

# Create a Spark Context Variable
sc = SparkContext(appName="MySparkContext-Application")

In [6]:
sc

In [7]:
# Shut down the Spark Context
sc.stop()

# Creating Spark Session with Manual Configurations

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("Spark-Session-Manual-Config") \
        .config("spark.executor.memory", "2g") \
        .config("spark.sql.shuffle.partitions", "4") \
        .getOrCreate()

In [9]:
spark

In [10]:
# Shutdown the spark session
spark.stop()

# RDDs (Resilient Distributed Datasets)

- Backbone of data processing in Spark
- Distributed, Fault-tolerant, parallelizable data structure
- Efficiently processes large datasets across the cluster
- RDDs are immutable, distributed, resilient, lazily evaluated, fault-tolerant
- Fault Tolerant operations may contain : map, filter, reduce, collect, count, save, etc.

## Transformations

- Create new RDDs by applying computation/ Manipulation
- Lazy Evaluation, Lineage Graph
- Examples like map, filter, flatMap, reduceByKey, sortBy and join

## Actions

- Return Results or perform actions on RDD, triggering execution
- Eager evaluation, data movement/ computation
- Examples like collect, count, first, take, save, foreach.

In [11]:
# Create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDD-SparkSession").getOrCreate()

## How to Create RDDs

In [12]:
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

In [13]:
# Collect method retrieves all elements from the RDD
rdd.collect()

[1, 2, 3, 4, 5]

In [14]:
# Create an RDD from the List of Tuples
employees = [("Ajay", 8), ("Raman", 7), ("Pratap", 5), ("Mohan", 6), ("Raman", 3)]
employees_rdd = spark.sparkContext.parallelize(employees)

In [15]:
print("All the employee tuples :")

## How to Create RDDs# Print the tuples in new line
for i in employees_rdd.collect():
    print(i)

All the employee tuples :
('Ajay', 8)
('Raman', 7)
('Pratap', 5)
('Mohan', 6)
('Raman', 3)


## RDDs - Actions

In [16]:
# Count() action is used to count the items in the RDD.

# Create a count variable to store the number of items in the rdd
rdd_count = rdd.count()
print("The total number of items in the RDD :",rdd_count)

The total number of items in the RDD : 5


In [17]:
employee_rdd_count = employees_rdd.count()
print("The total number of items in the Employee RDD is : ", employee_rdd_count)

The total number of items in the Employee RDD is :  5


In [18]:
# first() action returns the first action from the RDD.
first_item = employees_rdd.first()
print("The first item in the RDD is : ", first_item)

The first item in the RDD is :  ('Ajay', 8)


In [19]:
# take() action is used to retrieve the n number of elements from the RDD
elements_needed = employees_rdd.take(3)
print("The elements from the RDD are : ")
for i in elements_needed :
    print(i)

The elements from the RDD are : 
('Ajay', 8)
('Raman', 7)
('Pratap', 5)


In [20]:
# foreach() action is used to print each element of the rdd
employees_rdd.foreach(lambda x: print(x))

## RDDs - Transformations

In [21]:
# In Transformations, the data will be changed but only returns result when any action is performed

# Map Transformations are done to convert the name to uppercase
mapped_rdd = employees_rdd.map(lambda x: (x[0].upper(), x[1]))

In [22]:
map_result = mapped_rdd.collect()
print("RDD in Upper case :",map_result)

RDD in Upper case : [('AJAY', 8), ('RAMAN', 7), ('PRATAP', 5), ('MOHAN', 6), ('RAMAN', 3)]


In [23]:
# filter Transformation : filter records based on any condition
filtered_rdd = employees_rdd.filter(lambda x:x[1] == 7)

filtered_result = filtered_rdd.collect()
print("RDD with Experience of 7 years :", filtered_result)

RDD with Experience of 7 years : [('Raman', 7)]


In [24]:
# ReduceBy Key : Calculate the total experience for each name
reduced_rdd = employees_rdd.reduceByKey(lambda x,y: x + y)
reduced_rdd.collect()

[('Ajay', 8), ('Raman', 10), ('Pratap', 5), ('Mohan', 6)]

In [25]:
# sortBy Transformation : This returns the data arranged in ascending or descending order
sorted_rdd_asc = employees_rdd.sortBy(lambda x: x[1], ascending = True)
print("The RDD in Ascending Order :",sorted_rdd_asc.collect())

sorted_rdd_desc = employees_rdd.sortBy(lambda x: x[1], ascending = False)
print("The RDD in Descending Order :",sorted_rdd_desc.collect())

The RDD in Ascending Order : [('Raman', 3), ('Pratap', 5), ('Mohan', 6), ('Raman', 7), ('Ajay', 8)]
The RDD in Descending Order : [('Ajay', 8), ('Raman', 7), ('Mohan', 6), ('Pratap', 5), ('Raman', 3)]


# DataFrames - Introduction

In [26]:
rdd = spark.sparkContext.textFile("data.txt")

In [27]:
result_rdd = rdd.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)\
.sortBy(lambda x: x[1], ascending = False)

In [28]:
result_rdd.take(20)

[('the', 12),
 ('a', 7),
 ('of', 7),
 ('in', 5),
 ('distributed', 5),
 ('Spark', 4),
 ('is', 3),
 ('API', 3),
 ('as', 3),
 ('on', 3),
 ('Dataset', 3),
 ('RDD', 3),
 ('its', 2),
 ('data', 2),
 ('cluster', 2),
 ('that', 2),
 ('The', 2),
 ('was', 2),
 ('API.', 2),
 ('and', 2)]

In [29]:
df = spark.read.text("data.txt")

In [30]:
result_df = df.selectExpr("explode(split(value, ' ')) as word")\
.groupBy("word").count().orderBy("count", ascending = False)

In [31]:
result_df.createOrReplaceTempView("table")

In [32]:
df_table = spark.sql("select * from table")

In [33]:
print(df_table.show())

+-----------+-----+
|       word|count|
+-----------+-----+
|        the|   12|
|         of|    7|
|          a|    7|
|         in|    5|
|distributed|    5|
|      Spark|    4|
|        API|    3|
|        RDD|    3|
|         is|    3|
|         on|    3|
|    Dataset|    3|
|         as|    3|
|       data|    2|
|   programs|    2|
|        its|    2|
|       API.|    2|
|        and|    2|
|  MapReduce|    2|
|       RDDs|    2|
|        The|    2|
+-----------+-----+
only showing top 20 rows

None


# Dataframe from CSV

In [35]:
path = ".\wholesale-trade-survey-sep-2023-quarter-csv.csv"
dataframe = spark.read.format("csv")\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .load(path)

In [37]:
display(dataframe)

DataFrame[Series_reference: string, Period: double, Data_value: double, Suppressed: string, STATUS: string, UNITS: string, Magnitude: int, Subject: string, Group: string, Series_title_1: string, Series_title_2: string, Series_title_3: string, Series_title_4: string, Series_title_5: string]

In [39]:
dataframe.createOrReplaceTempView("df_table")

In [46]:
df_Spark = spark.sql("SELECT * FROM df_table limit 5")

In [47]:
df_Spark.show()

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|     WTSQ.SFA1CA|1995.03|   2368.69|      NULL|     F|Dollars|        6|Wholesale Trade S...|Industry by varia...|Basic material wh...|Sales (operating ...|Current prices|    Unadjusted|          NULL|
|     WTSQ.SFA1CA|1995.06|   2100.44|      NULL|     F|Dollars|        6|Wholesale Trade S...|Industry by varia...|Basic material wh...|Sales (operating ...|Current prices|    Unadjusted| 

In [57]:
ds_unique_STATUS = spark.sql("SELECT distinct(Suppressed) from df_table")

In [58]:
ds_unique_STATUS.show()

+----------+
|Suppressed|
+----------+
|      NULL|
+----------+



In [59]:
df_highest_Period = spark.sql("SELECT * from df_table order by Period desc LIMIT 3")

In [60]:
df_highest_Period.show()

+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+-------------------+--------------+
|Series_reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|      Series_title_1|      Series_title_2|Series_title_3|     Series_title_4|Series_title_5|
+----------------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+-------------------+--------------+
|     WTSQ.SFA1CS|2023.09| 11323.483|      NULL|     F|Dollars|        6|Wholesale Trade S...|Industry by varia...|Basic material wh...|Sales (operating ...|Current prices|Seasonally adjusted|          NULL|
|     WTSQ.SFA9CA|2023.09|  4015.207|      NULL|     F|Dollars|        6|Wholesale Trade S...|Industry by varia...|Basic material wh...|        Total stocks|Current pri

# Renaming Columns from the DataFrame

In [61]:
dataframe = dataframe.withColumnRenamed("Series_reference","Reference")\
            .withColumnRenamed("Series_title_1", "title_1")

In [62]:
dataframe.show()

+-----------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|  Reference| Period|Data_value|Suppressed|STATUS|  UNITS|Magnitude|             Subject|               Group|             title_1|      Series_title_2|Series_title_3|Series_title_4|Series_title_5|
+-----------+-------+----------+----------+------+-------+---------+--------------------+--------------------+--------------------+--------------------+--------------+--------------+--------------+
|WTSQ.SFA1CA|1995.03|   2368.69|      NULL|     F|Dollars|        6|Wholesale Trade S...|Industry by varia...|Basic material wh...|Sales (operating ...|Current prices|    Unadjusted|          NULL|
|WTSQ.SFA1CA|1995.06|   2100.44|      NULL|     F|Dollars|        6|Wholesale Trade S...|Industry by varia...|Basic material wh...|Sales (operating ...|Current prices|    Unadjusted|          NULL|
|WTSQ.SFA1