# Import the data from /databricks-datasets/definitive-guide/data

In [0]:
#change /data to /databricks-datasets/definitive-guide/data

flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("/databricks-datasets/definitive-guide/data/flight-data/csv/2015-summary.csv")


In [0]:
display(flightData2015)

# Chapter 5 - Basic Structured Operations

In [0]:
# Import data
df=spark.read.format("json").load("/databricks-datasets/definitive-guide/data/flight-data/json/2015-summary.json")

# A) Schema

In [0]:
# CHECK SCHEMA
spark.read.format("json").load("/databricks-datasets/definitive-guide/data/flight-data/json/2015-summary.json").schema

In [0]:
# LETS CREATE CUSTOM SCHEMA OR USER DEFINED SCHEMA ON THE EXISTING DATASET

from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([StructField("DEST_COUNTRY_NAME",StringType(), True),StructField("ORIGIN_COUNTRY_NAME", StringType(),True),\
                              StructField("count",LongType(),False,metadata={"hello":"world"})])


df=spark.read.format("json").schema(myManualSchema).load("/databricks-datasets/definitive-guide/data/flight-data/json/2015-summary.json")

# B) Columns and expression

In [0]:
# use method col("col_name") on a dataframe inside select
from pyspark.sql.functions import col, column, expr
display(df.select("count"))

count
15
1
344
15
62
1
62
588
40
1


In [0]:
# for expression you can creae complex transformation by using 1 or more columns of a dataframe

display(df.select(expr("(count+10)*2")))

((count + 10) * 2)
50
22
708
50
144
22
144
1196
100
22


In [0]:
# to check dataframe columns use column/printSchema:
spark.read.format("json").schema(myManualSchema).load("/databricks-datasets/definitive-guide/data/flight-data/json/2015-summary.json").columns

# C) Records and Rows

In [0]:
# calling first row in a dataframe which is a record
df.first()

In [0]:
# creating rows, note that row has no schema unlike dataFrame

from pyspark.sql import Row
myRow=Row("hello",None,1, False)

#lets access data in the Row just like accessing list

myRow[0]
#myRow[2]

# D) DataFrame Transformations

1.) We can add rows or columns
 2.) We can remove rows or columns
 3.) We can transform a row into a column (or vice versa)
 4.) We can change the order of rows based on the values in columns

I.) Creating DataFrames and Registering it for SQL access

In [0]:
df=spark.read.format("json").load("/databricks-datasets/definitive-guide/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")

In [0]:
#creating a dataframe on the fly using Row (mostly for quick analysis)

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema=StructType([StructField("some", StringType(),True),StructField("col", StringType(),True), StructField("names", LongType(),False)])

myRow=Row("Hello",None,1)
myDf=spark.createDataFrame([myRow],myManualSchema)
myDf.show()

II.) select and selectExpr

In [0]:
# you can select one or more columns of a dataframe using select

df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(2)

In [0]:
# you can select a column or do a string manipulation of column using selectExpr

df.selectExpr("DEST_COUNTRY_NAME as newcolumnname","DEST_COUNTRY_NAME").show(2)

In [0]:
# complex example of selectExpr

df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)


In [0]:
# another exmple of selectExpr

df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)

III.) Literals in spark (like a constant value as a column in SQL)

In [0]:
from pyspark.sql.functions import lit, expr
df.select(expr("*"),lit(1).alias("One")).show(2)

IV). Adding Columns (withColumn)

In [0]:
# columns can be added to dataframe using "withColumn"

df.withColumn("numberOne",lit(1)).show(2)

In [0]:
# one more example (filling new column based on dataframe columns comparison)

df.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME==DEST_COUNTRY_NAME")).show(6)

V). Renaming columns

In [0]:
#columns are renamed using "withColumnRenamed"

df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns

VI.) Reserved Characters and Keywords

In [0]:
#lets create a new column with escape character like dash
dfWithLongColName=df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME"))

In [0]:
# to select the column with escape character use `` in between and also to rename to new column with space in between then use ``
dfWithLongColName.selectExpr("`This Long Column-Name`","`This Long Column-Name` as `new col`").show(2)

VII.) Case sensitivity

In [0]:
# By default Spark is case insensitive. To turn on case sensitivity use:
# set spark.sql.caseSensitive true

VIII.) Removing Columns

In [0]:
# to remove one or more columns from a dataframe use drop method

dfWithLongColName.drop("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").columns

IX.) Changing Column Type (Cast)

In [0]:
# lets convert a column from string to integer type long as a new column
from pyspark.sql.functions import col
df.withColumn("count", col("count").cast("long"))

X.) Filtering Rows

In [0]:
# you can leverage filter or where method to do this. use where because of its familarity with SQL

df.where(col("count")<2).show(6)

In [0]:
#more complex filtering criteria can be created in a sequnatial fashion (tip: use col or expr to get to the column name):

df.where("count<2").where(col("ORIGIN_COUNTRY_NAME")!="Croatia").show(6)


XI.) Getting Unique rows (deduplication)

In [0]:
# to do this, we use distinct method (returns new dataframe)
#tip: you can select multiple columns as well

df.select("ORIGIN_COUNTRY_NAME").distinct().count()

In [0]:
# compare with original dataframe column
df.select(col("ORIGIN_COUNTRY_NAME")).count()

XII.) Random Samples

In [0]:
# sometime you need to sample random records from your dataframe(sample method):
seed=5
withReplacement=False
fraction=0.5
df.sample(withReplacement,fraction,seed).count()

XIII.) Concatenating and Appending Rows(Union)

In [0]:
# since dataframes are immutable so we have to union new dataframe with original dataframe. Just make sure that schema and column are same in both the dataframe

from pyspark.sql import Row
from pyspark.sql.functions import col,expr

schema=df.schema

newRows=[Row("New country","Other Country","5L"),
        Row("New Country 2", "Other Country 3","1L")]

parallelizedRows=spark.sparkContext.parallelize(newRows)
newDF=spark.createDataFrame(parallelizedRows,schema)

#join the two dataset (union)

df.union(newDF).where("count=1").where(col("ORIGIN_COUNTRY_NAME")!=("Croatia")).show()
 

XIV). Sorting

In [0]:
# sorting can be done using sort or orderBy ethod on a dataframe
# 'asc_nulls_first', 'desc_nulls_first', 'asc_nulls_last', or 'desc_nulls_last' to specify where you would like your null values to appear in an ordered DataFrame

df.sort("count").show(6)

In [0]:
from pyspark.sql.functions import desc, asc

df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(),col("DEST_COUNTRY_NAME").asc()).show(2)

XV). Limit

In [0]:
# to limit the data that you want to extract from a dataframe. exmaple top 10 of some dataframe
df.limit(6).show()

df.orderBy(expr("count desc")).limit(6).show()

XVI.) Repartition and coalesce

In [0]:
# you can partition the data across cluster using repartition method and specify how many partitions you need
# tip: if you know that you would need to partiotion the data based on a column then you can use the column in the repartition method and optionally specify the partitions  # # needed, note that repartition causes full shufle of the data regardless of if it's required or not
df.rdd.getNumPartitions()

df.repartition(5)

df.repartition(5,col("DEST_COUNTRY_NAME"))

# coalesce on the ther hand does not causes reshuffle. Following code will shuffle the data into 5 partitions based on the column name specified and then
# coalesce them (without shuffly)

df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)



XVII). Collecting Rows to the Driver

In [0]:
# Often we have to collect rows of dataframe on a local driver to perform data manipulaton. For this we can use collect() method
# note that if you have a large dataset and call collect method there is a high probabality that you would crash the driver
collectDF=df.limit(10)
collectDF.take(5) # prints first N rows
collectDF.show() 
collectDF.collect()

# Chapter 6 - Working with Different datatypes

In [0]:
# Spark contains plethora of API to transform data. Please use the link to the spark documentation for your specific data transformation needs.
# pyspark.sql.functions library has lot more methods which can used for specific data manipulation requirement

#  http://spark.apache.org/docs/latest/

# API reference guide --> https://spark.apache.org/docs/latest/api/python/reference/index.html
  


In [0]:
# let's import the retail data for this chapter and perform some transformation

df=spark.read.format("csv").option("header", "true").option("inferSchema","true")\
  .load("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-01.csv")

#print schema
df.printSchema()

#create a view for SQL code
df.createOrReplaceTempView("dfTable")

I).Converting to Spark Types --> lit() method

In [0]:
# we can convert native data type like string, integer etc to spark types using lit() method

from pyspark.sql.functions import lit
df.select(lit(2),lit("two"),lit(2.0))

# SQL equivalent
# SELECT 5, "five", 5.0


II). working with Booleans --> and, or, true, false

In [0]:
df.where("InvoiceNo != 536365")\
.select("InvoiceNo","Description")\
.show(5, False)

In [0]:
# complex exmaple. Best practice is to chain all the filters together for code readibility as well as for spark optimization
from pyspark.sql.functions import col, instr
priceFilter=col("UnitPrice")>600
descripFilter=instr(df.Description,"POSTAGE")>=1 # checks is "POSTAGE" is present in the Description column and retruns 1 if true else null
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

In [0]:
# you can create a boolean column and then use it to filer the dataframe
from pyspark.sql.functions import instr
DOTCodeFilter=col("StockCode")=="DOT" 
priceFilter=col("UnitPrice")>600
descripFilter=instr(col("Description"),"POSTAGE")>=1
df.withColumn("isExpensive",DOTCodeFilter & (priceFilter| descripFilter) )\
.where("isExpensive")\
.select("unitPrice","isExpensive").show(5)

In [0]:
# one more example
from pyspark.sql.functions import expr
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))\
.where("isExpensive")\
.select("Description", "UnitPrice").show(5)


In [0]:
# if Nulls present in  the dataset, then use the following code for nul-safe equivalence test
df.where((col("Description")).eqNullSafe("hello")).show()

In [0]:
df.columns

III). Working with Numbers

In [0]:
# often times in big data , we will be working with numbers and perform some mathematical computation like power. exmaple below

from pyspark.sql.functions import pow, expr,col
fabricatedQuantity=pow(col("Quantity")*col("UnitPrice"),2)+5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(10)

In [0]:
# We can also use the round function to round the numbers after decimal to desire length
# you can also use bround to round down example bround(lit("3.5")) = 3

from pyspark.sql.functions import pow, expr,col, round
fabricatedQuantity=round((pow(col("Quantity")*col("UnitPrice"),2)+5),2)
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(10)

In [0]:
# correlation (pearson) of two columns can be done using the following function

from pyspark.sql.functions import corr
df.select(corr("Quantity","UnitPrice")).show()


In [0]:
# summary statistics function
df.describe().show()

# in case you want to use the individual functions as in the below O/P, use the library:
# from pyspark.sql.functions import count, mean, min, max, stddev_pop

# also refer to statFunctions package (use stat) to check out the different stats method which can be used for statistical calculation

In [0]:
# tip: to add a unique id as a column to the dataframe use the 'monotonically_increasing_id' function
from pyspark.sql.functions import monotonically_increasing_id
df.select("*",monotonically_increasing_id()).show(2)


III). Working with Strings

In [0]:
# there are variety of use cases for string manipulation
# example:  regular expression extraction from log files, checing for string existence, upper and lower cases conversion

# first letter of every word, separated by space, is converted to upper case

from pyspark.sql.functions import initcap
display(df.select(initcap(col("Description"))))

initcap(Description)
White Hanging Heart T-light Holder
White Metal Lantern
Cream Cupid Hearts Coat Hanger
Knitted Union Flag Hot Water Bottle
Red Woolly Hottie White Heart.
Set 7 Babushka Nesting Boxes
Glass Star Frosted T-light Holder
Hand Warmer Union Jack
Hand Warmer Red Polka Dot
Assorted Colour Bird Ornament


In [0]:
# upper and lower case

from pyspark.sql.functions import upper, lower
df.select(col("Description"), lower(col("Description")), upper(lower(col("Description")))).show()

In [0]:
# Add or remove spaces around string
# Note that if lpad or rpad takes a number less than the length of the string, it will always remove values from the right side of the string.

from pyspark.sql.functions import lit, ltrim,rtrim,trim,lpad,rpad

df.select(ltrim(lit("   hello   ")).alias("ltrim"),
          rtrim(lit("   hello   ")).alias("rtrim"),
          trim(lit("   hello   ")).alias("trim"),
          lpad(lit("hello"),3, " ").alias("lpad"),
          rpad(lit("hello"),10, " ").alias("rpad"),             
         ).show(2)

In [0]:
# Regular expression: extract or replace a string in the column
# regexp_extract, regexp_replace

# following code replace any occurence of regex_string within column description with "COLOR"
from pyspark.sql.functions import regexp_replace

regex_string="WHITE|BLACK|RED|GREEN|BLUE"
display(df.select(col("Description"),regexp_replace(col("Description"),regex_string,"COLOR").alias("color_clean")))

Description,color_clean
WHITE HANGING HEART T-LIGHT HOLDER,COLOR HANGING HEART T-LIGHT HOLDER
WHITE METAL LANTERN,COLOR METAL LANTERN
CREAM CUPID HEARTS COAT HANGER,CREAM CUPID HEARTS COAT HANGER
KNITTED UNION FLAG HOT WATER BOTTLE,KNITTED UNION FLAG HOT WATER BOTTLE
RED WOOLLY HOTTIE WHITE HEART.,COLOR WOOLLY HOTTIE COLOR HEART.
SET 7 BABUSHKA NESTING BOXES,SET 7 BABUSHKA NESTING BOXES
GLASS STAR FROSTED T-LIGHT HOLDER,GLASS STAR FROSTED T-LIGHT HOLDER
HAND WARMER UNION JACK,HAND WARMER UNION JACK
HAND WARMER RED POLKA DOT,HAND WARMER COLOR POLKA DOT
ASSORTED COLOUR BIRD ORNAMENT,ASSORTED COLOUR BIRD ORNAMENT


In [0]:
# replace given characters with other characters. Example below replace the occurance of  each letter of "LEET" in "Description" column by each number in "1337" if present else return "Description" column

from pyspark.sql.functions import translate,col

display(df.select(translate(col("Description"),"LEET","1337").alias("translated"), col("Description")))

translated,Description
WHI73 HANGING H3AR7 7-1IGH7 HO1D3R,WHITE HANGING HEART T-LIGHT HOLDER
WHI73 M37A1 1AN73RN,WHITE METAL LANTERN
CR3AM CUPID H3AR7S COA7 HANG3R,CREAM CUPID HEARTS COAT HANGER
KNI773D UNION F1AG HO7 WA73R BO7713,KNITTED UNION FLAG HOT WATER BOTTLE
R3D WOO11Y HO77I3 WHI73 H3AR7.,RED WOOLLY HOTTIE WHITE HEART.
S37 7 BABUSHKA N3S7ING BOX3S,SET 7 BABUSHKA NESTING BOXES
G1ASS S7AR FROS73D 7-1IGH7 HO1D3R,GLASS STAR FROSTED T-LIGHT HOLDER
HAND WARM3R UNION JACK,HAND WARMER UNION JACK
HAND WARM3R R3D PO1KA DO7,HAND WARMER RED POLKA DOT
ASSOR73D CO1OUR BIRD ORNAM3N7,ASSORTED COLOUR BIRD ORNAMENT


In [0]:
# extract using regexp_extract

from pyspark.sql.functions import regexp_extract

extract_str="(BLACK|WHITE|RED|GREEN|BLUE)"
df.select(regexp_extract(col("Description"),extract_str,1),col("Description")).show(10)

In [0]:
# more complex example--> like checking if the string is present in the column( boolean)

from pyspark.sql.functions import instr

containsBlack=instr(col("Description"),"BLACK")>=1
containsWhite=instr(col("Description"),"WHITE")>=1
df.withColumn("hasSimpleColor", containsBlack|containsWhite)\
.where("hasSimpleColor")\
.select("Description","hasSimpleColor").show(10)


IV). Working with Dates and Timestamps

In [0]:
from pyspark.sql.functions import current_date, current_timestamp

dateDF=spark.range(10).withColumn("today",current_date()).withColumn("now",current_timestamp())

dateDF.createOrReplaceTempView("dateTable")

dateDF.printSchema()

display(dateDF)

id,today,now
0,2021-08-14,2021-08-14T20:24:29.446+0000
1,2021-08-14,2021-08-14T20:24:29.446+0000
2,2021-08-14,2021-08-14T20:24:29.446+0000
3,2021-08-14,2021-08-14T20:24:29.446+0000
4,2021-08-14,2021-08-14T20:24:29.446+0000
5,2021-08-14,2021-08-14T20:24:29.446+0000
6,2021-08-14,2021-08-14T20:24:29.446+0000
7,2021-08-14,2021-08-14T20:24:29.446+0000
8,2021-08-14,2021-08-14T20:24:29.446+0000
9,2021-08-14,2021-08-14T20:24:29.446+0000


In [0]:
#let's do some calculation on dates. Let's add and subtract 5 days from date

from pyspark.sql.functions import date_add, date_sub
dateDF.select(col("today"),date_sub(col("today"),5),date_add(col("today"),5)).show(2)

In [0]:
# let's figure out how to find the number of days or numbers of months between two dates

from pyspark.sql.functions import datediff,months_between,to_date

dateDF.withColumn("week_ago",date_sub(col("today"),7))\
.select(col("today"),col("week_ago"),datediff(col("today"),col("week_ago"))).show(2)


In [0]:
# let's find the months between two dates

from pyspark.sql.functions import lit
dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))\
.select("*",months_between(col("start"), col("end"))).show(1)

In [0]:
# convert a string to actual date. If spark couldn't process the string , it will return Null instead of error
# default conversion format: YYYY-MM-DD. Note that to_date has optional date format requirement

from pyspark.sql.functions import to_date, lit
spark.range(10).withColumn("date",lit("2017-01-01"))\
.select(to_date(col("date"))).show(2)

In [0]:
#To make the spark understand the date format, we need to specify explicitly the format that we are reading the date in

#before specifying the dateFormat:
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

#After specifying the date format
dateFormat="yyyy-dd-mm"
cleanDateDF=spark.range(1).select(to_date(lit("2017-12-11"),dateFormat).alias("date"),to_date(lit("2017-20-12"),dateFormat).alias("date2"))

cleanDateDF.show()

In [0]:
# let's to_timestamp function which requires the format to be specified explicitly

from pyspark.sql.functions import to_timestamp
cleanDateDF.select(to_timestamp(col("date"),dateFormat)).show()

# note that once we have the date and timestamp in correct format, comparing between them is quite easy

cleanDateDF.where(col("date2")>lit("2017-12-12")).show()

V). Working with NULLs in data

In [0]:
# As a best practice , always use nulls to represent empty or missing data in the columns
# Note: if in the schema you define the column to not have nulls, spark will still allow Nulls to be present in the column

# Coalesce: returns first column value that is non null from a given number of columns

from pyspark.sql.functions import coalesce, col

df.select(coalesce(col("Description"),col("CustomerId"))).show()

In [0]:
%sql

-- ifnull, nullIf, nvl, and nvl2. Check the results and get an idea

SELECT
ifnull(null, 'return_value'),
nullif('value', 'value'),
nvl(null, 'return_value'),
nvl2('not_null', 'return_value', "else_value")
FROM dfTable LIMIT 1



"ifnull(NULL, return_value)","nullif(value, value)","nvl(NULL, return_value)","nvl2(not_null, return_value, else_value)"
return_value,,return_value,return_value


In [0]:
# Use drop to remove rows that contains nulls. note that we interact with null in a dataframe using na

df.na.drop() # drops the row if any column is null

df.na.drop("any") # drops the row if any column is null

df.na.drop("all") # drops the row if all columns are null or NaN

df.na.drop("all",subset=["StockCode","InvoiceNo"]) # we can also apply "all" to a certain sets of columns

########################################

# We can fill the string and integer columns with some data as shown below

df.na.fill("String gets this value") # all columns of type string in a dataframe gets filled with this value
#df.na.fill(5:Integer) # integer columns in a dataframe gets filled
#df.na.fill(5:Double) # double columns in a dataframe gets filled

# we can also specify how a particular column should be filled using key-value style mapping

fill_cols_vals={"StockCode":5,"Description":"No Value"}
df.na.fill(fill_cols_vals)

########################################

# replace is also another fleible option to replace certain value in a column with a new value of the same type as original value

df.na.replace([""],["UNKNOWN"],"Description")


VI). Ordering

In [0]:
# you can use asc_nulls_first, desc_nulls_first, asc_nulls_last, or desc_nulls_last to specify where you would like your null values to
# appear in an ordered DataFrame.


VII). Working with Complex types

A) STRUCT

In [0]:
# There are thre complex types which can help to structure your data for a specific use case

# 1. struct: dataframe within a dataframe.

# lets create struct
from pyspark.sql.functions import struct
complexDF=df.select(struct("Description","InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")

#let's querry
complexDF.select("complex.Description").show()
#complexDF.select("complex.*") # to select all columns in the complex dataframe




B). ARRAYS

In [0]:
# lets split every single word in column of a dataframe and convert that into a row in our dataframe

# we will split and explode function

from pyspark.sql.functions import split, explode, col
df.withColumn("Splitted",split(col("Description"), " ")).withColumn("exploded",explode(col("Splitted")))\
.select("Description","InvoiceNo","exploded").show()

# note : you can use 'array_contains' and 'size' function to check for a string in an array (split column) and size of array (split column in a dataframe)

C). MAPS

In [0]:
# you can create a map just like a key-value pair. and you querry the created map using key. note that missing returns Null

from pyspark.sql.functions import create_map,explode, col
df.select(create_map(col("Description"),col("InvoiceNo")).alias("complex_map")).show(2)

#lets querry map
df.select(create_map(col("Description"),col("InvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)

In [0]:
# on a side note , you can also explode map. It will then convert it into columns of key and value
df.select(create_map(col("Description"),col("InvoiceNo")).alias("complex_map"))\
.selectExpr("explode(complex_map)").show(2)

VIII). Working with JSON

In [0]:
# let's create a json object

jsonDF=spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}'as jsonString""")

# lets extract value '2' and {"myJSONValue" : [1, 2, 3]}} --> this exist as a tuple with one level nesting in the original JSON object

from pyspark.sql.functions import get_json_object,json_tuple

jsonDF.select(get_json_object(col("jsonString"),"$.myJSONKey.myJSONValue[1]").alias("column") , json_tuple(col("jsonString"),"myJSONKey")).show(2)


In [0]:
# you can also use to_json and from_json for converting a struct type to json and conversely converting back from json to the specified schema respectively

from pyspark.sql.functions import to_json,from_json
from pyspark.sql.types import *
parseSchema=StructType([StructField("InvoiceNo",StringType(),True),StructField("Description",StringType(),True)])

df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct")).alias("newJSON"))\
.select(from_json(col("newJSON"),parseSchema),col("newJSON")).show(2)

IX). User-Defined Functions

In [0]:
# user defined functions are helpful to do custom calculation on the dataset. in order to use the UDF , make sure that it's registered with spark before we an use it


udfExampleDF = spark.range(5).toDF("num")

def power3(double_value):
  return double_value ** 3

#lets register in python:
from pyspark.sql.functions import udf
power3udf = udf(power3)


# lets register this function to spark sql (sqlfunctionname, originalUDF, return_type)
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register("power3py", power3, DoubleType())

# let's use this UDF

udfExampleDF.selectExpr("power3py(num)").show(2)


# Chapter 7 - Aggregations

In [0]:
# first, lets get the dataset which we will use for this chapter

# in Python
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/databricks-datasets/definitive-guide/data/retail-data/all/*.csv")\
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")


In [0]:
df.show()

I). Count

In [0]:
# note that count can exist both as a method (action to count no. of rows in a dataframe) or as function to be applied on column of a dataframe

# As a method-->df.count()

from pyspark.sql.functions import count
df.select(count("StockCode")).show()

II). countDistinct

In [0]:
# this is just like counting a distinct/unique values in a column

from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

III). approx_count_distinct

In [0]:
# with large dataset (big data), approximation to certain degree of accuracy work just fine

from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode",0.1)).show()

IV). first and last

In [0]:
# frist and last are based on rows in the dataframe and not on the values in the dataframe
from pyspark.sql.functions import first,last

df.select(first("StockCode"),last("StockCode")).show()

V). min and max

In [0]:
from pyspark.sql.functions import min, max

df.select(min("Quantity"),max("Quantity")).show()

VI). sum

In [0]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

VII). sumDistinct

In [0]:
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

VIII). avg

In [0]:
# you can use avg or mean functions to calculate the average. Another way is to divide sum of quantity by count of quantity

from pyspark.sql.functions import avg,mean,col

df.select(avg(col("Quantity")).alias("avg_purchases"),
         mean(col("Quantity").alias("mean_purchases"))).show()

IX). Variance and standard deviation

In [0]:
# you can use population or sample variance and standard deviation respectively

from pyspark.sql.functions import var_pop,stddev_pop,var_samp,stddev_samp

df.select(var_pop("Quantity"),stddev_pop("Quantity"),var_samp("Quantity"),stddev_samp("Quantity")).show()

X). Aggregating to complex types

In [0]:
from pyspark.sql.functions import collect_set,collect_list
df.agg(collect_set("Country"),collect_list("Country")).show()

XI). Grouping based on a categorical column

In [0]:
df.groupBy("InvoiceNo","CustomerId").count().show(10)

#df.groupBy("InvoiceNo","CustomerId").count().orderBy("InvoiceNo").show(10)

XII). Grouping with Expressions

In [0]:
# let's group the quantity based on invoice number. You will have to use agg to do this
from pyspark.sql.functions import count,expr
df.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"),expr("count(Quantity)"),expr("avg(Quantity)")).show()

XIII). Window Functions

In [0]:
# window functions can be used to perform a set of calculation like ranking , fnding max, min, avg etc within a window/bracket of rows based on some partition column. Example would explain the concept more clearly

spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY") # this is used in order to parse the the date format

# lets first create a date column in the dataframe
from pyspark.sql.functions import col,to_date,to_timestamp
dfwithDate=df.withColumn("date",to_date(col("InvoiceDate"),"MM/d/YYYY H:mm"))
dfwithDate.createOrReplaceTempView("dfWithDate")
display(dfwithDate)


InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,date
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom,2009-12-27
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom,2009-12-27
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom,2009-12-27
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom,2009-12-27
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom,2009-12-27
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850.0,United Kingdom,2009-12-27
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850.0,United Kingdom,2009-12-27
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850.0,United Kingdom,2009-12-27
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850.0,United Kingdom,2009-12-27
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047.0,United Kingdom,2009-12-27


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

windowSpec=Window.partitionBy("CustomerId","date").orderBy(desc("Quantity")).rowsBetween(Window.unboundedPreceding,Window.currentRow)

# find max

from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)


# rank

from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)


#apply to the dataframe
# in Python
from pyspark.sql.functions import col
dfwithDate.where("CustomerID IS NOT NULL").orderBy(desc("CustomerID")).\
select(\
col("CustomerId"),\
col("date"),\
col("Quantity"),\
purchaseRank.alias("quantityRank"),\
purchaseDenseRank.alias("quantityDenseRank"),\
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()


XIV). Rollups ( for group by multiple columns)

In [0]:
dfNoNull=dfwithDate.drop()

rolledUpDF=dfNoNull.rollup("Date","Country").agg(sum("Quantity").alias("total_quantity")).select("Date","Country","total_quantity").orderBy("Date")
rolledUpDF.show()

# you can filter and check null values
#rolledUpDF.where("Country IS NULL").show()
#rolledUpDF.where("Date IS NULL").show()


XV). Cube (helpful in creating summary table)

In [0]:
# helpful in building summary table as per the following questions in current dataframe
#The total across all dates and countries
#The total for each date across all countries
#The total for each country on each date
#The total for each country across all dates

from pyspark.sql.functions import sum
dfNoNull.cube("Date","Country").agg(sum("Quantity")).select("Date","Country","sum(Quantity)").orderBy("Date").show()

XVI) Grouping metadata

In [0]:
# grouping metadata gives us grouping_id() which specify the level of aggregation that we have in our result set

# 3 -->This will appear for the highest-level aggregation, which will gives us the total quantity regardless of customerId and stockCode.
# 2 --> This will appear for all aggregations of individual stock codes. This gives us the total quantity per stock code, regardless of customer.
# 1 --> This will give us the total quantity on a per-customer basis, regardless of item purchased.
# 0 --> This will give us the total quantity for individual customerId and stockCode combinations.

from pyspark.sql.functions import grouping_id,expr
dfNoNull.cube("CustomerId","stockCode").agg(grouping_id(),sum("Quantity")).orderBy(expr("grouping_id()").desc()).show()

XVII). Pivot

In [0]:
# Helps in converting rows to column

pivoted=dfwithDate.groupBy("date").pivot("Country").sum()
#pivoted.printSchema()
pivoted.where("date>'2000-12-05'").select("date","`USA_sum(Quantity)`").show()

# Chapter 8 - JOINS

In [0]:
# lets first build the datasets which would be used for performing and explaning how different join opertaion work in SPARK
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")


#register to SQL views in case you want to do SQL operations
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")


I). Inner Joins

In [0]:

person.join(graduateProgram,person["graduate_program"]==graduateProgram['id'],"inner").show()

# another way : 
# joinExpression = person["graduate_program"] == graduateProgram['id']
# person.join(graduateProgram, joinExpression).show()

II). Outer Joins

In [0]:
person.join(graduateProgram,person["graduate_program"]==graduateProgram['id'],"outer").show()

III). left outer join

In [0]:
 graduateProgram.join(person,person["graduate_program"]==graduateProgram['id'],"left_outer").show()

IV). Right Outer Joins

In [0]:
  person.join(graduateProgram,person["graduate_program"]==graduateProgram['id'],"right_outer").show()

V). Left Semi Joins

In [0]:
# This joins all rows from the left dataset only which have matching keys on the right dataset, if not nothing is returned
graduateProgram.join(person,person["graduate_program"]==graduateProgram['id'],"left_semi").show()

VI). Left Anti Joins

In [0]:
# This is opposite of Left semi join. All the values from left dataset is returned excluding matching values from right dataset 
graduateProgram.join(person,person["graduate_program"]==graduateProgram['id'],"left_anti").show()

VII). Natural Joins

In [0]:

%sql
--The following query will give us incorrect results because the two DataFrames/tables share a column name (id), but it means different things in the datasets
SELECT * FROM graduateProgram NATURAL JOIN person


id,degree,department,school,name,graduate_program,spark_status
0,Masters,School of Information,UC Berkeley,Bill Chambers,0,List(100)
1,Ph.D.,EECS,UC Berkeley,Matei Zaharia,1,"List(500, 250, 100)"
2,Masters,EECS,UC Berkeley,Michael Armbrust,1,"List(250, 100)"


VIII). Cross (Cartesian) Joins

In [0]:
#Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame
graduateProgram.join(person,person["graduate_program"]==graduateProgram['id'],"cross").show()

IX). Handling duplicate column names

In [0]:
# if the two dataframe that you are joining has duplicate column names, there are 3 best ways to handle 
# let's first create a duplicate dataframe
gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")

# problem will occur when we try to reference the column:
# person.join(gradProgramDupe,  gradProgramDupe["graduate_program"] == person["graduate_program"]).select("graduate_program").show()

#################################################################################################################################################

# SOLUTION

# 1. change the joining condition from to the exact column name (string) instead of boolean. This will automatically drop duplicate column 
# person.join(gradProgramDupe, "graduate_program").select("graduate_program").show()


# 2. Drop the column after join
# person.join(gradProgramDupe,  gradProgramDupe["graduate_program"] == person["graduate_program"]).drop(person["graduate_program"]).select("graduate_program").show()

# 3. Rename column before join
# gradProgram3=graduateProgram.withColumnRenamed("id", "grad_id")
# person.join(gradProgram3,  gradProgram3["grad_id"] == person["graduate_program"]).show()


Note: Partitioning data in the dataframes before join, can improve the efficiency of spark join operation

# Chapter 9 - Data Sources

I). Read API structure

In [0]:
# DataFrameReader.format(...).option("key","value").schema(...).load()

# EXAMPLE

# spark.read.format("csv").option("mode","FAILFAST").option("inferSchema","true").option("path","path/to/file(s)").schema(someSchema).load()
# note:instead of giving path in the option you can also give path in the load()
  
# mode has 3 options:
# 1--> FAILFAST (default) : if datasource has corrupt records, read will fail
# 2--> dropMalformed: rows containing corrupt records are droped
# 3--> permissive: corrupt records are placed in a string "_corrupt_record" with null values

II). Write API structure

In [0]:
# DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

# note: save() will contain the location where you want to save the file

# format is optional as by default spar writes in parquet format
# partitionBy, bucketBy and sortBy work with file based data sources

# Example:
# dataFrame.write.format("csv").option("mode","OVERWRITE").option("dateFormat","yyyy-MM-dd").option("path","path/to/file(s)").save()

# mode has following options:

# append
# overwrite
# errorIfExists : throws error if file or data already exists (by default)
# ignore: if data or file exist at the locatio, do nothing with the current dataFrame

I). Spark has core datasources:  CSV, JSON, Parquet, ORC, JDBC/ODBC, Plain-text files

II). Spark has a concept called "Query Pushdown" . It means, we just pass sql querry as a paramenter to spark and spark runs the query directly in the database and stores the results in the dataframe

III). To run the spark job efficiently, store the data in parquet format with gzip compression

IV). Further reading, refer to spark definitive guide pg 160-185, into the mechanics of reading, writing data specifically from the datasources

V). Partitioning Data (Spark optimizations)

In [0]:
# Idea is to partition your data/file by some column  and then save it to the storage. SO in your storage files are saved by this column name. column name could be date or anyother. This you could just query based on this column name to get the data faster

# EXAMPLE
# csvFile.limit(10).write.option("mode",overwrite").partitionBy("DEST_COUNTRY_NAME").save("/tmp/partitioned-files.parquet")



VI). Bucketing (Spark Optimization)

In [0]:
# Idea is to bucket similar data together with same bucketID based on how you want to use the data later. This helps in avoiding expensive shuffles when joining and aggregating

# EXAMPLE:
# numberBuckets = 10
# columnToBucketBy = "count"
# csvFile.write.format("parquet").mode("overwrite").bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")


VII). Managing File Size (Small files problem)

In [0]:
# Problem is Spark doesn't read properly when there are large number of small files in the storage and read can take a lot of time which is not ideal in terms of efficiency.
# you can limit this problem at the time of writing via partition and also limit the number of records written to each file as follows:

# df.write.option("maxRecordsPerFile", 5000)

# Chapter 10 -  Spark SQL

I). Programmatic Interface

In [0]:
# in case you need to call SPARK sql programtically you need to use spark.sql("sample query").show() . This returns a dataframe which you can use to peform more transformation as you deem fit

II). If you have to use spark sql in databricks then you have to define the context with %sql

III). Tables (managed and unmanaged)

In [0]:
# managed tables are the tables completely managed by spark when you use 'saveAsTable' command and store the table data and metadata within spark environment (database)

# unmanaged tables means that you are defining the tables from files on disk

IV). Creating tables --INTERNAL tables (if database is not specified then by default all the tables are created and stored in 'default' databases)

In [0]:
%sql

CREATE TABLE IF NOT EXISTS flights(DEST_COUNTRY_NAME STRING COMMENT "US WILL BE MOST RELEVANT" , ORIGIN_COUNTRY_NAME STRING, count LONG)
USING JSON OPTIONS (PATH '/databricks-datasets/definitive-guide/data/flight-data/json/2015-summary.json');

-- ANOTHER WAY TO CREATE TABLE USING QUERY
--CREATE TABLE flights_from_select USING parquet AS SELECT * FROM flights




In [0]:
%sql
select * from flights

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
%sql

--IF YOU WAN TO CREATE A PARTITIONED TABLE:
CREATE TABLE IF NOT EXISTS PARTITIONED_FLIGHTS USING PARQUET PARTITIONED BY (DEST_COUNTRY_NAME)
AS SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, COUNT from FLIGHTS LIMIT 5

In [0]:
%sql
select * from PARTITIONED_FLIGHTS

V). CREATING EXTERNAL TABLES (UNMANAGED TABLES)

In [0]:
%sql
-- use case would if you want to port hive sql statements direcly into the saprk.
--spark maintain the metadata of the table but files are not maintained by spark at all


CREATE EXTERNAL TABLE IF NOT EXISTS hive_flights(
DEST_CUNTRY_NAME STRING,ORIGIN_COUNTRY_NAME STRING, COUNT LONG)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/data/FLIGHT-DATA-HIVE'

In [0]:
%sql

select * from hive_flights

DEST_CUNTRY_NAME,ORIGIN_COUNTRY_NAME,COUNT


VI). INSERTING INTO TABLES

In [0]:
%sql

insert into partitioned_flights
PARTITION(DEST_COUNTRY_NAME="UNITED STATES")
select origin_country_name,count  from flights
where DEST_COUNTRY_NAME='UNITED_STATES' limit 20

In [0]:
%sql

select * from partitioned_flights

VII). DESCRIBING TABLE METADATA

In [0]:
%sql
describe table flights


col_name,data_type,comment
DEST_COUNTRY_NAME,string,US WILL BE MOST RELEVANT
ORIGIN_COUNTRY_NAME,string,
count,bigint,


In [0]:
%sql

show partitions partitioned_flights

partition
DEST_COUNTRY_NAME=Egypt
DEST_COUNTRY_NAME=UNITED STATES
DEST_COUNTRY_NAME=United States


VIII). REFRESH TABLE METADATA

In [0]:
%sql
refresh table partitioned_flights

-- below command refreshed the partitioned maintained in the catalog
-- MSCK REPAIR TABLE partitioned_flights

IX). DROPPING TABLES

In [0]:
%sql

--note that you cannot delete tables , you can ONLY drop tables
-- ALSO note that if you drop the unmannaged tables , no data will be removed bt you no longer can access the table

DROP TABLE  IF EXISTS flights

X). CACHING TABLES

In [0]:
%sql

cache table flights

--uncache table flights

XI). CREATING VIEWS

In [0]:
%sql
--You can create view, temp view and global tep view (tem and global views are visible only for the sessions)

create view just_usa_view AS
select * from flights where dest_country_name='United States'


In [0]:
%sql
--temp view

create or replace temp view just_usa_view_temp AS
select * from flights where dest_country_name='United States'

In [0]:
%sql
-- global  view

create global temp view just_usa_global_view_temp as
select * from flights where dest_country_name='Unites States'

In [0]:
%sql
show tables

database,tableName,isTemporary
default,flights,False
default,just_usa_view,False
,just_usa_view_temp,True


XII). Dropping Views

In [0]:
%sql

DROP VIEW IF EXISTS just_usa_view;

XIII). DATABASES

In [0]:
%sql

show databases

databaseName
default
some_db


In [0]:
%sql

-- create databases

create database some_db

In [0]:
%sql
--setting up the databases

use some_db


In [0]:
%sql
select * from flights

In [0]:
%sql

select * from default.flights

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
%sql

select current_database()

current_database()
some_db


In [0]:
%sql

use default

XIV). Dropping databases

In [0]:
%sql

DROP DATABASE IF EXISTS some_db;

XV). Select statements

In [0]:
%sql

SELECT
CASE WHEN DEST_COUNTRY_NAME = 'UNITED STATES' THEN 1
WHEN DEST_COUNTRY_NAME = 'Egypt' THEN 0
ELSE -1 END as case_column
FROM flights


case_column
-1
-1
-1
0
-1
-1
-1
-1
-1
-1


XVI). Query complex types

In [0]:
%sql
--You can query complex types in Spark SQL like STRUCS, LISTS and MAPS

-- ## STRUCTS ##

CREATE VIEW IF NOT EXISTS nested_data AS
SELECT (DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME) as country, count FROM flights



In [0]:
%sql

select * from nested_data

country,count
"List(United States, Romania)",15
"List(United States, Croatia)",1
"List(United States, Ireland)",344
"List(Egypt, United States)",15
"List(United States, India)",62
"List(United States, Singapore)",1
"List(United States, Grenada)",62
"List(Costa Rica, United States)",588
"List(Senegal, United States)",40
"List(Moldova, United States)",1


In [0]:
%sql

-- ## Query the struct data column

select country.DEST_COUNTRY_NAME, count from nested_data

-- select country.*, count from nested_data

DEST_COUNTRY_NAME,count
United States,15
United States,1
United States,344
Egypt,15
United States,62
United States,1
United States,62
Costa Rica,588
Senegal,40
Moldova,1


In [0]:
%sql

--LISTS. you can create list of values using collect_list and with collect_set (removes duplicates)

select DEST_COUNTRY_NAME as new_name, collect_list(count) as flights_counts, collect_set(ORIGIN_COUNTRY_NAME) as origin_set
from flights group BY DEST_COUNTRY_NAME

new_name,flights_counts,origin_set
Algeria,List(4),List(United States)
Angola,List(15),List(United States)
Anguilla,List(41),List(United States)
Antigua and Barbuda,List(126),List(United States)
Argentina,List(180),List(United States)
Aruba,List(346),List(United States)
Australia,List(329),List(United States)
Austria,List(62),List(United States)
Azerbaijan,List(21),List(United States)
Bahrain,List(19),List(United States)


In [0]:
%sql
SELECT DEST_COUNTRY_NAME as new_name, collect_list(count)[0]
FROM flights GROUP BY DEST_COUNTRY_NAME


new_name,collect_list(count)[0]
Algeria,4
Angola,15
Anguilla,41
Antigua and Barbuda,126
Argentina,180
Aruba,346
Australia,329
Austria,62
Azerbaijan,21
Bahrain,19


In [0]:
%sql

CREATE OR REPLACE TEMP VIEW flights_agg AS
SELECT DEST_COUNTRY_NAME, collect_list(count) as collected_counts
FROM flights GROUP BY DEST_COUNTRY_NAME


In [0]:
%sql

-- you can explode the list of value columns into individual col
SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg


col,DEST_COUNTRY_NAME
4,Algeria
15,Angola
41,Anguilla
126,Antigua and Barbuda
180,Argentina
346,Aruba
329,Australia
62,Austria
21,Azerbaijan
19,Bahrain


XII). Functions

In [0]:
%sql

show functions

-- show user functions
-- show system functions
-- show functions "s*" -> shows all functions starting with s

function
getargument


XIII). Setting configuration values

In [0]:
# Refer to page 203 for the list of configuration setting to be done based on the needs

# Chapter 11 - Datasets

In [0]:
# Datasets are another structured API apart from Spark SQL and dataframe
# It is useful when you need a bulletproof code and type-safety(trying to subtract strings) at the compile time rather than runtime
# also useful of your business logic is quite large and want to encapsulate in one specific function

## REFER datasets in the book page 205 - 212. 

# Production Applications

# chapter 15 - How to Run Spark on a cluster

In [0]:
Points to Note

* Structured APIs converts spark code to logical plan-->physical plan (RDD)---> executes on cluster of machines

ARCHITECTURE OF SPARK APPLICATION:

1. Spark Driver: Maintains all state of spark cluster. Interface with cluster manager to get physical resources and launch executors. Runs on a physical machine
2. Spark executors: runs the tasks assigned by spark driver and report back the state (success or failure) and results
3. Cluster manager: consists of driver and worker(executors) nodes which are tied to physical machines.
                    Currently there are 3 cluster managers (list will evolve over time)--> built-in stand alone cluster manager, Apache mesos, Hadoop YARN
    

5. Execution modes:
              a). Cluster mode: cluster manager places driver and executors on a cluster
              b). Client mode: driver remains on the client machine which is outside of the cluster. cluster manager maintains the executors within the cluster and client                       machine manages the driver
              c). Local Mode: Entire spark application runs on a single machine

In [0]:
LIFE CYCLE OF A SPARK APPLICATION (OUTSIDE SPARK)

1. Client Request: client process submit a pre compiled JAR or library to the cluster manager and request for resources for spark driver process. Cluster manager accepts this request and places driver on the nodes.  Client process then exits and application starts to run on the cluster
  
2. Launch: Spark driver runs the code on the clutser. For this , code should have SparkSession that initiliazes spark cluster (Driver+executors). Sparksession will subsequently communincate with cluster manager asking it to launch spark executors process across the cluster. No. of executors can be set by the user via spark-submit. Cluster manager then launches executors and sends relevant information about their locations to the driver process
  
3. Execution: Now since the cluster is setup, driver and workers communicates with each other execution the code. Driver schedules the tasks onto each worker and each worker responds with the results and status back to the driver (success or failure)
  
4. Completion: After the spark application is done, driver exits with success or failure. Cluster manager shuts the executors for that driver in the cluster. We can ask the cluster manager finally for the status of the spark application (success or failure)
  
  


In [0]:
LIFE CYCLE OF SPARK APPLICATION (INSIDE SPARK)

1. SparkSession: This is the first step in a spark application. It instantiates the spark and sql contexts.Now you have spark session so you can run your spark code and even access low-level (RDD) and legacy context (sparkcontext and sql context).
  
Note: If you like to access the spark context specifically you can instantiate it from the spark session and access low level API such as RDDs
  
2. Logical Instructions: Spark code consists of transformations and actions. Spark takes your code and converts it into physical plan ultimately. When you call an action on your dataframe , it triggers the execution of the spark jobs or jobs which consists of stages and tasks
  
3. Physical execution of the spark job consists of how actually the spark will execute the code on the cluster by building the physical plan

4.  Spark job: When your actions triggers in the code, spark job gets triggered. Spark job consists of stages and tasks

5. Stages: stages consists of group of tasks that can be executed to compute and do the actual work on a cluster. spark engine will try to fit as many transformations as possible inside one stage but new stage gets created for every shuffle (like  sorting, grouping) 
  
6. Tasks: it consists of block of data and set of transformation which will run on a single executor. Number of tasks depends on number of partition you have configured in the code

In [0]:
EXECUTION DETAILS:
  
1. Pipelining: Spark groups your transformation like 'select' then 'filter' and then 'select' into a stage of tasks. This is called pipelining. You can see it in the spark UI

2. Shuffle Persistence: When spark needs to move the data across nodes it performs cross-network shuffle. It first saves the shuffle files from the source tasks and then runs the stage that groups and fetches the corresponding records from each shuffle file and performs that computation   


In [0]:
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect() # 2500000000000
step4.explain()


# Chapter 16 - Developing Spark Applications

In [0]:
WRITING THE SPARK APPLICAITONS

Basically you can write the spark applications (your code) in either Scala, pyspark or java 


In [0]:
TESTING SPARK APPLICATIONS

Data pipelines and spark applications should be resilent to future changes and keep in mind the following fundamentals when designing the test cases
  
1. Input data resilence
2. Business logic and evolution
3. Resilience in output and atomicity


Tactical approach to Testing Spark Applications:

In general, document the results of tests if input and output types of each function in your code. This would ensure that your code adheres to the above threee funcdamentals

