<a href="https://colab.research.google.com/github/kuldeep27396/SparkOptimization/blob/main/optimization_google_collab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#install pyspark library
!pip install pyspark

In [None]:
#import pyspark
import pyspark

In [None]:
#import sparksession 
from pyspark.sql import SparkSession

In [None]:
#creating a sparksession object and providing appName 
spark=SparkSession.builder.appName("optimization").getOrCreate()

In [None]:
#To create dataframe form External datasets
AirlineDF = spark.read.option("header", "true").csv("/Users/krishnapratap/Desktop/partation/data/*")

# use take() in place of collect() for reduce time


In [None]:
%time AirlineDF.take(5)

In [None]:
%time AirlineDF.show(5)

In [None]:
# calculate total rows in dataframe
%time AirlineDF.count()

In [None]:
#calculate total columns in dataframe
len(AirlineDF.columns)

#  Catalyst optimizer 

Catalyst Optimizer improves developer productivity and the performance of their written queries.

In [None]:
%time AirlineDF.select('OriginCityName') \
.filter("Distance >= 200").filter("ArrDelayMinutes  >= 5").filter("Year  >= 2005").groupBy('OriginCityName') \
.count().withColumnRenamed("count", "Total Delay Flights").orderBy("OriginCityName").show()

In [None]:
AirlineDF.select('OriginCityName') \
.filter("Distance >= 200").filter("ArrDelayMinutes  >= 5").filter("Year  >= 2005").groupBy('OriginCityName') \
.count().withColumnRenamed("count", "Total Delay Flights").orderBy("OriginCityName").explain(mode="formatted")

In [None]:
from pyspark.sql.functions import col

In [None]:
%time AirlineDF.select('OriginCityName') \
.orderBy("OriginCityName").withColumnRenamed("count", "Total Delay Flights").filter((col("Year") >= "2005") & (col("ArrDelayMinutes") >= "5") & (col("Distance") >= "200")).groupBy('OriginCityName') \
.count().orderBy("OriginCityName").withColumnRenamed("count", "Total Delay Flights").show()

In [None]:
AirlineDF.select('OriginCityName') \
.orderBy("OriginCityName").withColumnRenamed("count", "Total Delay Flights").filter((col("Year") >= "2005") & (col("ArrDelayMinutes") >= "5") & (col("Distance") >= "200")).groupBy('OriginCityName') \
.count().orderBy("OriginCityName").withColumnRenamed("count", "Total Delay Flights").explain(mode="formatted")

explain(mode="simple") shows physical plan.

explain(mode="extended") presents physical and logical plans.

explain(mode="codegen") shows the java code planned to be executed.

explain(mode="cost") presents the optimized logical plan and related statistics (if they exist).

explain(mode="formatted") shows a split output created by an optimized physical plan outline, and a section of every node detail.

# When data is huge otherwise not


# use coalesce() in place of repartition() to reduce the no. of partition 

In [None]:
#to check how many partation in current AirlineDF dataframe
AirlineDF.rdd.getNumPartitions()

In [None]:
AirlineDF1 = AirlineDF.repartition(2)

In [None]:
AirlineDF1.rdd.getNumPartitions()

In [None]:
AirlineDF2 = AirlineDF.coalesce(2)

In [None]:
AirlineDF2.rdd.getNumPartitions()

# when we load data from other source like s3, database

# when you dealing with heavy-weighted initialization on larger datasets.

# select needed columns from entire dataset and write into new file then create new dataframe

In [None]:
from pyspark import SparkContext
# Create an RDD
sc = SparkContext()

In [None]:
df=sc.textFile("/Users/krishnapratap/Desktop/partation/data/*")

In [None]:
Ardd = df.map(lambda x: x.replace(', ',' ').split(','))

In [None]:
AirlineRdd=Ardd.map(lambda x: Row(Year=x[0],UniqueCarrier=x[6]))

In [None]:
from pyspark import SQLContext
from pyspark.sql import Row

In [None]:
# setting up the sql context
sqlContex = SQLContext(sc)

In [None]:
AirlineDf1 = sqlContex.createDataFrame(AirlineRdd)

In [None]:
%time AirlineDf1.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

In [None]:
AirlineDf1.write.option("header", "true").csv("flight/airdata")

In [None]:
#import sparksession 
from pyspark.sql import SparkSession

In [None]:
#creating a sparksession object and providing appName 
spark=SparkSession.builder.appName("optimization").getOrCreate()

In [None]:
#To create dataframe form External datasets
AirlineDf2 = spark.read.option("header", "true").csv("flight/airdata/*")

In [None]:
AirlineDf2.show()

In [None]:
%time AirlineDf2.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

# Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.

In [None]:
AirlineDf2.write.parquet("parquet/")

In [None]:
NewDfParquet = spark.read.parquet("parquet/*")

In [None]:
%time NewDfParquet.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

# Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions.

# Note - When NOT to use Cache/Persist- When the size of the data is large and there are multiple dfs available for cache.

In [None]:
# before rdd is persisted
%time AirlineDf1.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

In [None]:
%time AirlineDf1.select("Year").groupby("Year").count().show()

In [None]:
from pyspark import StorageLevel

In [None]:
# persist dataframe to memory and disk
AirlineDf1.persist(StorageLevel.MEMORY_AND_DISK)

In [None]:
%time AirlineDf1.select("Year").groupby("Year").count().show()

In [None]:
%time AirlineDf1.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

In [None]:
#unpersist dataframe
AirlineDf1.unpersist()

In [None]:
#persist dataframe to memory 
AirlineDf1.persist(StorageLevel.MEMORY_ONLY)

In [None]:
%time AirlineDf1.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

In [None]:
%time AirlineDf1.select("Year").groupby("Year").count().show()

In [None]:
#unpersist dataframe
AirlineDf1.unpersist()

In [None]:
#persist dataframe to disk
AirlineDf1.persist(StorageLevel.DISK_ONLY)

In [None]:
%time AirlineDf1.select("Year").groupby("Year").count().show()

In [None]:
%time AirlineDf1.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

In [None]:
#unpersist dataframe
AirlineDf1.unpersist()

In [None]:
#cache dataframe
AirlineDf1.cache()

In [None]:
%time AirlineDf1.select("Year").groupby("Year").count().show()

In [None]:
%time AirlineDf1.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

In [None]:
#unchache dataframe 
AirlineDf1.unpersist()

In [None]:
# after rdd is uncached
%time AirlineDf1.select("UniqueCarrier").groupby("UniqueCarrier").count().show()

# use reduceByKey() in place of Groupbykey

In [None]:
rdd1=sc.textFile("/Users/krishnapratap/Desktop/partation/data/*")

In [None]:
# Split words using flatMap
rdd_word = rdd1.flatMap(lambda x: x.split(","))

In [None]:
# Create a paired-rdd
rdd_pair = rdd_word.map(lambda x: (x, 1))

In [None]:
# Count occurence per word using groupbykey()
rdd_group = rdd_pair.groupByKey()
rdd_group_count = rdd_group.map(lambda x:(x[0], len(x[1])))
%time rdd_group_count.collect()

In [None]:
# Count occurence per word using reducebykey()
rdd_reduce = rdd_pair.reduceByKey(lambda x,y: x+y)
%time rdd_reduce.collect()

# use apache arrow to optimize query time 

# create dataframe in pyspark from pandas

In [None]:
!pip install pyarrow

# here we are creating pandas dataframe from multiple csv file
# so we need to import glob

In [None]:
import pandas as pd
import numpy as np

In [None]:
pdf1 = pd.read_csv('/Users/krishnapratap/Desktop/partation/data/On_Time_On_Time_Performance_2005_1.csv')

# how to enable arrow for fast pyspark dataframe creation from pandas dataframe

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [None]:
%time df2 = spark.createDataFrame(pdf1)

# how to disable arrow  

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled", "false") 

In [None]:
#we need to cast all the columns in the pandas df to string type to overcome this datatype issue converting pandas df to spark df
%time df1 = spark.createDataFrame(pdf1.astype(str))