# <center>Learnings Of Spark Framework</center>


In [None]:
#Install Java JDK of version greater than 8
#Get the spark zip file from apache organiztion 
#Unzipping the file after getting it
#From that Install findspark 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
#Creating an environment for Java and spark to work with it
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content//spark-3.0.1-bin-hadoop2.7"

In [None]:
#Initalizing the findspark 
import findspark
findspark.init()

#Creating an entry point for spark using the spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
#Creating a Dataframe using the Range Function
rdf = spark.range(50).toDF("Number")

In [None]:
#to view the dataframe and cheking the type of dataframe created
rdf.show(5)
type(rdf)

+------+
|Number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+
only showing top 5 rows



pyspark.sql.dataframe.DataFrame

In [None]:
#to see the number of partions/ splits made
rdf.rdd.getNumPartitions()

2

In [None]:
#Creating a Dataframe with multiple columns
teams = spark.createDataFrame([[1,'Amith','HR',560081],
                           [2,'Soorya','DataScientist',560085],
                           [3,'Vishwas','DataScientist',560082],
                           [4,'Manoj','DataScientist',560084],
                           [5,'Krishna','Faculty',560085],
                           [6,'Yashas','DataScientist',560081],
                           [7,'Harsha','DataScientist',560082],
                           [8,'Girish','HR',560083],
                           [9,'Chandan','Manager',560083],
                          [10,'Punith','Manager',560081]],['id','Name','Position','State'])

In [None]:
##cheking the type of dataframe created and to view the dataframe
print(type(teams))
teams.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+---+-------+-------------+------+
| id|   Name|     Position| State|
+---+-------+-------------+------+
|  1|  Amith|           HR|560081|
|  2| Soorya|DataScientist|560085|
|  3|Vishwas|DataScientist|560082|
|  4|  Manoj|DataScientist|560084|
|  5|Krishna|      Faculty|560085|
|  6| Yashas|DataScientist|560081|
|  7| Harsha|DataScientist|560082|
|  8| Girish|           HR|560083|
|  9|Chandan|      Manager|560083|
| 10| Punith|      Manager|560081|
+---+-------+-------------+------+



In [None]:
#to view the schema and datatype of each columns
teams.printSchema()
teams.dtypes

root
 |-- id: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- State: long (nullable = true)



[('id', 'bigint'),
 ('Name', 'string'),
 ('Position', 'string'),
 ('State', 'bigint')]

In [None]:
#number of rows in a dataframes
teams.count()

10

In [None]:
teams.rdd.getNumPartitions()

2

In [None]:
#Transformation -- filtering datframe based on some conditions 
#teams.where("State == 560081").show()
teams.filter("State == 560081").show()

+---+------+-------------+------+
| id|  Name|     Position| State|
+---+------+-------------+------+
|  1| Amith|           HR|560081|
|  6|Yashas|DataScientist|560081|
| 10|Punith|      Manager|560081|
+---+------+-------------+------+



In [None]:
#Creating a spark context to deal with RDD's
sc = spark.sparkContext

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Create an RDD from a source
rdd1 = sc.textFile("/content/drive/MyDrive/Datasets/toy_dataset.csv") 

In [None]:
#by defalut the csv file is read as text file and since we used spark context its read as rdd
rdd1.take(5)

['Number,City,Gender,Age,Income,Illness',
 '1,Dallas,Male,41,40367.0,No',
 '2,Dallas,Male,54,45084.0,No',
 '3,Dallas,Male,42,52483.0,No',
 '4,Dallas,Male,40,40941.0,No']

In [None]:
rdd1.getNumPartitions()

2

In [None]:
rdd1 = rdd1.map(lambda x: x.split(","))
rdd1.take(5)

[['Number', 'City', 'Gender', 'Age', 'Income', 'Illness'],
 ['1', 'Dallas', 'Male', '41', '40367.0', 'No'],
 ['2', 'Dallas', 'Male', '54', '45084.0', 'No'],
 ['3', 'Dallas', 'Male', '42', '52483.0', 'No'],
 ['4', 'Dallas', 'Male', '40', '40941.0', 'No']]

In [None]:
# Infer the schema, and register the DataFrame as a table.
tempDF = spark.createDataFrame(rdd1)
tempDF.show(3)

+------+------+------+---+-------+-------+
|    _1|    _2|    _3| _4|     _5|     _6|
+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
|     1|Dallas|  Male| 41|40367.0|     No|
|     2|Dallas|  Male| 54|45084.0|     No|
+------+------+------+---+-------+-------+
only showing top 3 rows



the column names is not take by default, so the bootstarp way of assinging the column name is as below

In [None]:
header = rdd1.first()
rows = rdd1.filter(lambda line: line!=header)
rows.take(3)

[['1', 'Dallas', 'Male', '41', '40367.0', 'No'],
 ['2', 'Dallas', 'Male', '54', '45084.0', 'No'],
 ['3', 'Dallas', 'Male', '42', '52483.0', 'No']]

In [None]:
header

['Number', 'City', 'Gender', 'Age', 'Income', 'Illness']

In [None]:
from pyspark.sql import Row
rdd2 = rows.map(lambda line: Row(Number=line[0], City=line[1], Gender=line[2], Age=line[3], Income=line[4], Illness=line[5]))
rdd2.take(5)

[Row(Number='1', City='Dallas', Gender='Male', Age='41', Income='40367.0', Illness='No'),
 Row(Number='2', City='Dallas', Gender='Male', Age='54', Income='45084.0', Illness='No'),
 Row(Number='3', City='Dallas', Gender='Male', Age='42', Income='52483.0', Illness='No'),
 Row(Number='4', City='Dallas', Gender='Male', Age='40', Income='40941.0', Illness='No'),
 Row(Number='5', City='Dallas', Gender='Male', Age='46', Income='50289.0', Illness='No')]

In [None]:
rdd2.toDF().show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|40367.0|     No|
|     2|Dallas|  Male| 54|45084.0|     No|
|     3|Dallas|  Male| 42|52483.0|     No|
|     4|Dallas|  Male| 40|40941.0|     No|
|     5|Dallas|  Male| 46|50289.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



In [None]:
# Infer the schema, and register the DataFrame as a table.
tempDF = spark.createDataFrame(rdd2)
tempDF.show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|40367.0|     No|
|     2|Dallas|  Male| 54|45084.0|     No|
|     3|Dallas|  Male| 42|52483.0|     No|
|     4|Dallas|  Male| 40|40941.0|     No|
|     5|Dallas|  Male| 46|50289.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



In [None]:
tempDF.printSchema()

root
 |-- Number: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Illness: string (nullable = true)



In [None]:
tempDF.count()

150000

In [None]:
tempDF.select(tempDF.Illness).distinct().show()

+-------+
|Illness|
+-------+
|     No|
|    Yes|
+-------+



In [None]:
tempDF.where("Age > 60").count()

16948

In [None]:
#Sampling 20% from dataframe
dfsample = tempDF.sample(False, 0.2, 1234)
dfsample.show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|    15|Dallas|Female| 61|38429.0|     No|
|    17|Dallas|  Male| 27|50398.0|     No|
|    31|Dallas|Female| 44|40353.0|     No|
|    36|Dallas|  Male| 41|50312.0|     No|
|    38|Dallas|Female| 41|29538.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



In [None]:
dfsamplePD = dfsample.toPandas()
type(dfsamplePD)

pandas.core.frame.DataFrame

Reading a CSV file into a DataFrame 

In [None]:
sdf = spark.read.format("csv")\
      .option("header","true")\
      .option("inferSchema","true")\
      .load("/content/drive/MyDrive/Datasets/toy_dataset.csv")

In [None]:
sdf.show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|40367.0|     No|
|     2|Dallas|  Male| 54|45084.0|     No|
|     3|Dallas|  Male| 42|52483.0|     No|
|     4|Dallas|  Male| 40|40941.0|     No|
|     5|Dallas|  Male| 46|50289.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



In [None]:
# rows count
sdf.count()

150000

In [None]:
sdf.printSchema()

root
 |-- Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Income: double (nullable = true)
 |-- Illness: string (nullable = true)



In [None]:
sdf.take(2)

[Row(Number=1, City='Dallas', Gender='Male', Age=41, Income=40367.0, Illness='No'),
 Row(Number=2, City='Dallas', Gender='Male', Age=54, Income=45084.0, Illness='No')]

In [None]:
sdf.where("Age > 40").count()

91745

In [None]:
sdf.filter("Illness = 'Yes'").count()

12139

In [None]:
# number of columns 
print(len(sdf.columns))
sdf.columns

6


['Number', 'City', 'Gender', 'Age', 'Income', 'Illness']

In [None]:
sdf.describe().show()

+-------+------------------+---------------+------+------------------+------------------+-------+
|summary|            Number|           City|Gender|               Age|            Income|Illness|
+-------+------------------+---------------+------+------------------+------------------+-------+
|  count|            150000|         150000|150000|            150000|            150000| 150000|
|   mean|           75000.5|           null|  null|           44.9502| 91252.79827333333|   null|
| stddev|43301.414526548666|           null|  null|11.572485735982012|24989.500948354525|   null|
|    min|                 1|         Austin|Female|                25|            -654.0|     No|
|    max|            150000|Washington D.C.|  Male|                65|          177157.0|    Yes|
+-------+------------------+---------------+------+------------------+------------------+-------+



In [None]:
sdf.describe(["Income","Illness"]).show()

+-------+------------------+-------+
|summary|            Income|Illness|
+-------+------------------+-------+
|  count|            150000| 150000|
|   mean| 91252.79827333333|   null|
| stddev|24989.500948354525|   null|
|    min|            -654.0|     No|
|    max|          177157.0|    Yes|
+-------+------------------+-------+



# Spark SQL 

In [None]:
sdf.createOrReplaceTempView("sdfsql")

In [None]:
spark.sql("select count(*) from sdfsql").show()

+--------+
|count(1)|
+--------+
|  150000|
+--------+



In [None]:
spark.sql("select * from sdfsql limit 5").show()

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|40367.0|     No|
|     2|Dallas|  Male| 54|45084.0|     No|
|     3|Dallas|  Male| 42|52483.0|     No|
|     4|Dallas|  Male| 40|40941.0|     No|
|     5|Dallas|  Male| 46|50289.0|     No|
+------+------+------+---+-------+-------+



In [None]:
spark.sql("select * from sdfsql where Illness == 'Yes'").count()

12139

In [None]:
#column referncing
from pyspark.sql.functions import expr, col, column
sdf.select(col("City")).distinct().show()

+---------------+
|           City|
+---------------+
|         Dallas|
|    Los Angeles|
|      San Diego|
|         Austin|
|  New York City|
|Washington D.C.|
|  Mountain View|
|         Boston|
+---------------+



In [None]:
sdf.filter(col("City") == "Dallas").count()

19707

In [None]:
# spark.sql("select * from sdfsql where City =='Dallas'").count()
spark.sql("select count(*) from sdfsql where City =='Dallas'").show()

+--------+
|count(1)|
+--------+
|   19707|
+--------+



In [None]:
sdf.select(expr("City AS Location"),"Gender",expr("Illness AS Target")).show(5)

+--------+------+------+
|Location|Gender|Target|
+--------+------+------+
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
+--------+------+------+
only showing top 5 rows



In [None]:
spark.sql("select City AS Location, Gender, Illness as Target from sdfsql limit 5").show()

+--------+------+------+
|Location|Gender|Target|
+--------+------+------+
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
|  Dallas|  Male|    No|
+--------+------+------+



In [None]:
from pyspark.sql.functions import lit
sdftemp = sdf.select("*",lit(10000).alias("Constant"))

In [None]:
sdftemp.show(5)

+------+------+------+---+-------+-------+--------+
|Number|  City|Gender|Age| Income|Illness|Constant|
+------+------+------+---+-------+-------+--------+
|     1|Dallas|  Male| 41|40367.0|     No|   10000|
|     2|Dallas|  Male| 54|45084.0|     No|   10000|
|     3|Dallas|  Male| 42|52483.0|     No|   10000|
|     4|Dallas|  Male| 40|40941.0|     No|   10000|
|     5|Dallas|  Male| 46|50289.0|     No|   10000|
+------+------+------+---+-------+-------+--------+
only showing top 5 rows



In [None]:
sdftemp.printSchema()

root
 |-- Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Income: double (nullable = true)
 |-- Illness: string (nullable = true)
 |-- Constant: integer (nullable = false)



In [None]:
spark.sql("select 10000 as Constant,* from sdfsql limit 5").show()

+--------+------+------+------+---+-------+-------+
|Constant|Number|  City|Gender|Age| Income|Illness|
+--------+------+------+------+---+-------+-------+
|   10000|     1|Dallas|  Male| 41|40367.0|     No|
|   10000|     2|Dallas|  Male| 54|45084.0|     No|
|   10000|     3|Dallas|  Male| 42|52483.0|     No|
|   10000|     4|Dallas|  Male| 40|40941.0|     No|
|   10000|     5|Dallas|  Male| 46|50289.0|     No|
+--------+------+------+------+---+-------+-------+



In [None]:
sdftemp.dtypes

[('Number', 'int'),
 ('City', 'string'),
 ('Gender', 'string'),
 ('Age', 'int'),
 ('Income', 'double'),
 ('Illness', 'string'),
 ('Constant', 'int')]

# Adding column in a formal way

In [None]:
sdf.withColumn("Constant", lit(10000)).show(5)

+------+------+------+---+-------+-------+--------+
|Number|  City|Gender|Age| Income|Illness|Constant|
+------+------+------+---+-------+-------+--------+
|     1|Dallas|  Male| 41|40367.0|     No|   10000|
|     2|Dallas|  Male| 54|45084.0|     No|   10000|
|     3|Dallas|  Male| 42|52483.0|     No|   10000|
|     4|Dallas|  Male| 40|40941.0|     No|   10000|
|     5|Dallas|  Male| 46|50289.0|     No|   10000|
+------+------+------+---+-------+-------+--------+
only showing top 5 rows



In [None]:
#updating the dataframe with new column
sdf = sdf.withColumn("Constant", lit(10000))

In [None]:
sdf.show(5)

+------+------+------+---+-------+-------+--------+
|Number|  City|Gender|Age| Income|Illness|Constant|
+------+------+------+---+-------+-------+--------+
|     1|Dallas|  Male| 41|40367.0|     No|   10000|
|     2|Dallas|  Male| 54|45084.0|     No|   10000|
|     3|Dallas|  Male| 42|52483.0|     No|   10000|
|     4|Dallas|  Male| 40|40941.0|     No|   10000|
|     5|Dallas|  Male| 46|50289.0|     No|   10000|
+------+------+------+---+-------+-------+--------+
only showing top 5 rows



In [None]:
sdf.printSchema()

root
 |-- Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Income: double (nullable = true)
 |-- Illness: string (nullable = true)
 |-- Constant: integer (nullable = false)



In [None]:
sdf.select("City").distinct().show()

+---------------+
|           City|
+---------------+
|         Dallas|
|    Los Angeles|
|      San Diego|
|         Austin|
|  New York City|
|Washington D.C.|
|  Mountain View|
|         Boston|
+---------------+



In [None]:
spark.sql("select distinct City from sdfsql").show()

+---------------+
|           City|
+---------------+
|         Dallas|
|    Los Angeles|
|      San Diego|
|         Austin|
|  New York City|
|Washington D.C.|
|  Mountain View|
|         Boston|
+---------------+



In [None]:
#creating new column as Income_extra, by adding two different columns Income, Constant 
sdf.withColumn("Income_extra", col("Income")+col("Constant")).show(10)

+------+------+------+---+-------+-------+--------+------------+
|Number|  City|Gender|Age| Income|Illness|Constant|Income_extra|
+------+------+------+---+-------+-------+--------+------------+
|     1|Dallas|  Male| 41|40367.0|     No|   10000|     50367.0|
|     2|Dallas|  Male| 54|45084.0|     No|   10000|     55084.0|
|     3|Dallas|  Male| 42|52483.0|     No|   10000|     62483.0|
|     4|Dallas|  Male| 40|40941.0|     No|   10000|     50941.0|
|     5|Dallas|  Male| 46|50289.0|     No|   10000|     60289.0|
|     6|Dallas|Female| 36|50786.0|     No|   10000|     60786.0|
|     7|Dallas|Female| 32|33155.0|     No|   10000|     43155.0|
|     8|Dallas|  Male| 39|30914.0|     No|   10000|     40914.0|
|     9|Dallas|  Male| 51|68667.0|     No|   10000|     78667.0|
|    10|Dallas|Female| 30|50082.0|     No|   10000|     60082.0|
+------+------+------+---+-------+-------+--------+------------+
only showing top 10 rows



In [None]:
#making this changes to original Income column 
sdf = sdf.withColumn("Income", col("Income")+col("Constant"))
sdf.show(10)

+------+------+------+---+-------+-------+--------+
|Number|  City|Gender|Age| Income|Illness|Constant|
+------+------+------+---+-------+-------+--------+
|     1|Dallas|  Male| 41|50367.0|     No|   10000|
|     2|Dallas|  Male| 54|55084.0|     No|   10000|
|     3|Dallas|  Male| 42|62483.0|     No|   10000|
|     4|Dallas|  Male| 40|50941.0|     No|   10000|
|     5|Dallas|  Male| 46|60289.0|     No|   10000|
|     6|Dallas|Female| 36|60786.0|     No|   10000|
|     7|Dallas|Female| 32|43155.0|     No|   10000|
|     8|Dallas|  Male| 39|40914.0|     No|   10000|
|     9|Dallas|  Male| 51|78667.0|     No|   10000|
|    10|Dallas|Female| 30|60082.0|     No|   10000|
+------+------+------+---+-------+-------+--------+
only showing top 10 rows



# Renaming Column

In [None]:
sdf.withColumnRenamed("City","Location").show(5)

+------+--------+------+---+-------+-------+--------+
|Number|Location|Gender|Age| Income|Illness|Constant|
+------+--------+------+---+-------+-------+--------+
|     1|  Dallas|  Male| 41|50367.0|     No|   10000|
|     2|  Dallas|  Male| 54|55084.0|     No|   10000|
|     3|  Dallas|  Male| 42|62483.0|     No|   10000|
|     4|  Dallas|  Male| 40|50941.0|     No|   10000|
|     5|  Dallas|  Male| 46|60289.0|     No|   10000|
+------+--------+------+---+-------+-------+--------+
only showing top 5 rows



# Removing Columns

In [None]:
sdf = sdf.drop("Constant")

In [None]:
sdf.show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|50367.0|     No|
|     2|Dallas|  Male| 54|55084.0|     No|
|     3|Dallas|  Male| 42|62483.0|     No|
|     4|Dallas|  Male| 40|50941.0|     No|
|     5|Dallas|  Male| 46|60289.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



# Changing a Column’s Type (cast)

In [None]:
sdf = sdf.withColumn("sample",lit('2022'))

In [None]:
sdf.printSchema()

root
 |-- Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Income: double (nullable = true)
 |-- Illness: string (nullable = true)
 |-- sample: string (nullable = false)



#### Intentionally i have made the sample column as string type by giving value as 2022. Now lets convert that into Integer type.

In [None]:
sdf.withColumn("sample", col('sample').cast("integer")).printSchema()

root
 |-- Number: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Income: double (nullable = true)
 |-- Illness: string (nullable = true)
 |-- sample: integer (nullable = true)



# Differences in two columns

In [None]:
sampledf = sdf.sample(False,0.2,1234)

In [None]:
sampledf.show(5)

+------+------+------+---+-------+-------+------+
|Number|  City|Gender|Age| Income|Illness|sample|
+------+------+------+---+-------+-------+------+
|    15|Dallas|Female| 61|48429.0|     No|  2022|
|    17|Dallas|  Male| 27|60398.0|     No|  2022|
|    31|Dallas|Female| 44|50353.0|     No|  2022|
|    36|Dallas|  Male| 41|60312.0|     No|  2022|
|    38|Dallas|Female| 41|39538.0|     No|  2022|
+------+------+------+---+-------+-------+------+
only showing top 5 rows



In [None]:
sampledf.count()

30379

In [None]:
sdf.select("Number").subtract(sampledf.select("Number")).count()

119621

In [None]:
sdf.select("Number").subtract(sampledf.select("Number")).show(5)

+------+
|Number|
+------+
|   148|
|   463|
|   471|
|   496|
|   833|
+------+
only showing top 5 rows



# Pair wise Frequencies - Crosstab

In [None]:
sdf = sdf.drop("sample")

In [None]:
sdf.show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|50367.0|     No|
|     2|Dallas|  Male| 54|55084.0|     No|
|     3|Dallas|  Male| 42|62483.0|     No|
|     4|Dallas|  Male| 40|50941.0|     No|
|     5|Dallas|  Male| 46|60289.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



In [None]:
spark.sql("select City,count(*) as cityCount from sdfsql group by City").show()

+---------------+---------+
|           City|cityCount|
+---------------+---------+
|         Dallas|    19707|
|    Los Angeles|    32173|
|      San Diego|     4881|
|         Austin|    12292|
|  New York City|    50307|
|Washington D.C.|     8120|
|  Mountain View|    14219|
|         Boston|     8301|
+---------------+---------+



In [None]:
sdf.crosstab("City","Gender").show()

+---------------+------+-----+
|    City_Gender|Female| Male|
+---------------+------+-----+
|         Boston|  3715| 4586|
|  Mountain View|  6276| 7943|
|      San Diego|  2098| 2783|
|    Los Angeles| 14285|17888|
|         Austin|  5385| 6907|
|  New York City| 22172|28135|
|         Dallas|  8705|11002|
|Washington D.C.|  3564| 4556|
+---------------+------+-----+



In [None]:
sdf.crosstab("City","Illness").show()

+---------------+-----+----+
|   City_Illness|   No| Yes|
+---------------+-----+----+
|         Boston| 7615| 686|
|  Mountain View|13041|1178|
|      San Diego| 4487| 394|
|    Los Angeles|29605|2568|
|         Austin|11281|1011|
|  New York City|46286|4021|
|         Dallas|18094|1613|
|Washington D.C.| 7452| 668|
+---------------+-----+----+



In [None]:
# sdf.groupBy("City","Illness").show() -- error
sdf.groupBy("City","Illness").count().show()

+---------------+-------+-----+
|           City|Illness|count|
+---------------+-------+-----+
|      San Diego|     No| 4487|
|Washington D.C.|     No| 7452|
|         Boston|    Yes|  686|
|    Los Angeles|     No|29605|
|  Mountain View|     No|13041|
|         Dallas|     No|18094|
|    Los Angeles|    Yes| 2568|
|         Austin|    Yes| 1011|
|Washington D.C.|    Yes|  668|
|         Austin|     No|11281|
|  New York City|     No|46286|
|         Dallas|    Yes| 1613|
|  New York City|    Yes| 4021|
|         Boston|     No| 7615|
|  Mountain View|    Yes| 1178|
|      San Diego|    Yes|  394|
+---------------+-------+-----+



# Removing Duplicates

In [None]:
# gives all unique rows based on columns specified
sdf.select("City","Illness").dropDuplicates().show()

+---------------+-------+
|           City|Illness|
+---------------+-------+
|      San Diego|     No|
|Washington D.C.|     No|
|         Boston|    Yes|
|    Los Angeles|     No|
|  Mountain View|     No|
|         Dallas|     No|
|    Los Angeles|    Yes|
|         Austin|    Yes|
|Washington D.C.|    Yes|
|         Austin|     No|
|  New York City|     No|
|         Dallas|    Yes|
|  New York City|    Yes|
|         Boston|     No|
|  Mountain View|    Yes|
|      San Diego|    Yes|
+---------------+-------+



# Working with Nulls in Data

In [None]:
print(sdf.dropna().count())
print(sdf.na.drop().count())

150000
150000


In [None]:
sdf.fillna(-1)

DataFrame[Number: int, City: string, Gender: string, Age: int, Income: double, Illness: string]

In [None]:
## Filling with different values for different columns
fill_cols_vals = {
"Gender": 'Male',
"City" : "Los Angeles",
"Age": 21
}
sdf.na.fill(fill_cols_vals).count()

150000

# Filtering the rows

In [None]:
sdf.filter(col("City")=="Los Angeles").count()

32173

In [None]:
sdf.filter(col('Income')>50000).count()

143708

In [None]:
spark.sql("select count(*) from sdfsql where Income>50000").show()
#Value is change as i have added a 10000 extra for Income column before
spark.sql("select count(*) from sdfsql where Income>40000").show()

+--------+
|count(1)|
+--------+
|  136738|
+--------+

+--------+
|count(1)|
+--------+
|  143708|
+--------+



In [None]:
sdf.where("City == 'Boston'").where("Illness == 'No'").count()

7615

In [None]:
sdf.filter("City == 'Boston'").filter("Illness == 'No'").count()

7615

In [None]:
sdf.filter((col("City") == 'Boston') & (col("Illness") == 'No')).count()

7615

In [None]:
sdf.where((col("City")== 'Dallas') & (col('Illness')=='No')).count()

18094

# Aggregations

## Count Distinct

In [None]:
from pyspark.sql.functions import countDistinct
sdf.select(countDistinct("City")).show()

+--------------------+
|count(DISTINCT City)|
+--------------------+
|                   8|
+--------------------+



## Approximate Count Distinct

In [None]:
from pyspark.sql.functions import approx_count_distinct
sdf.select(approx_count_distinct("City", 0.1)).show()

+---------------------------+
|approx_count_distinct(City)|
+---------------------------+
|                          8|
+---------------------------+



## First and Last

In [None]:
from pyspark.sql.functions import first, last
sdf.select(first("City"), last("City")).show()

+-----------+----------+
|first(City)|last(City)|
+-----------+----------+
|     Dallas|    Austin|
+-----------+----------+



## Min and Max

In [None]:
from pyspark.sql.functions import min, max
sdf.select(min("Income"), max("Income")).show()

+-----------+-----------+
|min(Income)|max(Income)|
+-----------+-----------+
|     9346.0|   187157.0|
+-----------+-----------+



## Sum and Average

In [None]:
from pyspark.sql.functions import sum, avg
sdf.select(sum('Income')).show()

+---------------+
|    sum(Income)|
+---------------+
|1.5187919741E10|
+---------------+



In [None]:
sdf.select(expr("avg(Income) AS Average_Income")).show()

+------------------+
|    Average_Income|
+------------------+
|101252.79827333333|
+------------------+



## Grouping with Expressions

In [None]:
sdf.groupBy("City").agg(
  avg("Income").alias("Average income /city"),
  expr("count(Income)")).show(truncate = False)

+---------------+--------------------+-------------+
|City           |Average income /city|count(Income)|
+---------------+--------------------+-------------+
|Dallas         |55252.23118688791   |19707        |
|Los Angeles    |105264.15540981568  |32173        |
|San Diego      |110756.20917844704  |4881         |
|Austin         |100277.51342336478  |12292        |
|New York City  |106857.13139324548  |50307        |
|Washington D.C.|80991.61280788177   |8120         |
|Mountain View  |145078.41578170055  |14219        |
|Boston         |101554.57149740995  |8301         |
+---------------+--------------------+-------------+



In [None]:
sdf.groupBy("City").agg(expr("avg(Income)"),expr("std(Income)")).show()

+---------------+------------------+------------------+
|           City|       avg(Income)|       std(Income)|
+---------------+------------------+------------------+
|         Dallas| 55252.23118688791|11263.867908523564|
|    Los Angeles|105264.15540981568|11187.234579914359|
|      San Diego|110756.20917844704|11149.283188322346|
|         Austin|100277.51342336478| 10996.63609028483|
|  New York City|106857.13139324548|11210.580111095656|
|Washington D.C.| 80991.61280788177|11079.854228060589|
|  Mountain View|145078.41578170055| 11227.30230415256|
|         Boston|101554.57149740995|11216.142107086707|
+---------------+------------------+------------------+



In [None]:
sdf.groupBy("Gender").agg(expr("min(Income)"), expr("max(Income)")).show()

+------+-----------+-----------+
|Gender|min(Income)|max(Income)|
+------+-----------+-----------+
|Female|     9346.0|   178440.0|
|  Male|    22596.0|   187157.0|
+------+-----------+-----------+



In [None]:
## Apply sum, min, max, count with groupby to get different summary insight for each group. 
exprs = {x: "sum" for x in sdf.columns}
sdf.groupBy("City").agg(exprs).show()

+---------------+-----------+--------+------------+---------+-----------+-------------+
|           City|sum(Number)|sum(Age)|sum(Illness)|sum(City)|sum(Gender)|  sum(Income)|
+---------------+-----------+--------+------------+---------+-----------+-------------+
|         Dallas|  194192778|  887973|        null|     null|       null| 1.08885572E9|
|    Los Angeles| 2770127473| 1447595|        null|     null|       null|3.386663672E9|
|      San Diego|  660243108|  219475|        null|     null|       null| 5.40601057E8|
|         Austin| 1768259514|  548853|        null|     null|       null|1.232611195E9|
|  New York City| 2256822327| 2259897|        null|     null|       null|5.375661709E9|
|Washington D.C.| 1045592100|  363798|        null|     null|       null| 6.57651896E8|
|  Mountain View| 1554094043|  639752|        null|     null|       null|2.062869994E9|
|         Boston| 1000743657|  375187|        null|     null|       null| 8.43004498E8|
+---------------+-----------+---

# Joins

In [None]:
# Creating a sample dataframe to perform join operation on them
cstBasic = spark.createDataFrame([
    (112,'Ramesh','heblikar','Male',500,[1,2,3,5]),
    (121,'Raghu','singh','Male',650,[2,4,7]),
    (125,'Anjali','rao','Male',1200,[8,9,2,1,5,4,7]),
    (171,'Raksha','verma','Male',500,[2,3,9,10]),
    (82,'Ganesh','singam','Male',750,[1,5,9,7]),
    (97,'Vishwas','DK','Male',750,[2,10,9]),
    (30,'Manoj','S','Male',750,[2,4,7,9]),
    (88,'Chandrika','Mohan','Male',750,[2,4,7,8,10,9]),
    (56,'Soorya','Ramakrishna','Male',1000,[2,4,7,1,3,4,5,10,9])])\
    .toDF("CustomerID",'Fname','Lname','Gender','AvgMonthlySpending','Products_Id')

products = spark.createDataFrame([
    (1,'Shampoo',10),
    (2,'T-shirt',45),
    (3,'Ginger',7),
    (4,'CoolDrinks',5),
    (5,'Tamato',10),
    (6,'Onion',30),
    (7,'Jeans',60),
    (8,'Cream',100),
    (9,'Trimer',150),
    (10,'Phone',500)])\
    .toDF("ProductID",'ProductName','Price')

cstInfo = spark.createDataFrame([
    (112,'2days',450),
    (121,'week ago',200),
    (125,'12days',560),
    (97,'month ago',560),
    (30,'3days',560),
    (1,'3days',560),
    (3,'3days',560),
    (56,'1days',369)])\
    .toDF("CustomerID","LastPurchaseOn","TotalBill")

In [None]:
cstBasic.show()
cstBasic.count()

+----------+---------+-----------+------+------------------+--------------------+
|CustomerID|    Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|
+----------+---------+-----------+------+------------------+--------------------+
|       112|   Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|
|       121|    Raghu|      singh|  Male|               650|           [2, 4, 7]|
|       125|   Anjali|        rao|  Male|              1200|[8, 9, 2, 1, 5, 4...|
|       171|   Raksha|      verma|  Male|               500|       [2, 3, 9, 10]|
|        82|   Ganesh|     singam|  Male|               750|        [1, 5, 9, 7]|
|        97|  Vishwas|         DK|  Male|               750|          [2, 10, 9]|
|        30|    Manoj|          S|  Male|               750|        [2, 4, 7, 9]|
|        88|Chandrika|      Mohan|  Male|               750| [2, 4, 7, 8, 10, 9]|
|        56|   Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|
+----------+----

9

In [None]:
products.show()
products.count()

+---------+-----------+-----+
|ProductID|ProductName|Price|
+---------+-----------+-----+
|        1|    Shampoo|   10|
|        2|    T-shirt|   45|
|        3|     Ginger|    7|
|        4| CoolDrinks|    5|
|        5|     Tamato|   10|
|        6|      Onion|   30|
|        7|      Jeans|   60|
|        8|      Cream|  100|
|        9|     Trimer|  150|
|       10|      Phone|  500|
+---------+-----------+-----+



10

In [None]:
cstInfo.show()
cstInfo.count()

+----------+--------------+---------+
|CustomerID|LastPurchaseOn|TotalBill|
+----------+--------------+---------+
|       112|         2days|      450|
|       121|      week ago|      200|
|       125|        12days|      560|
|        97|     month ago|      560|
|        30|         3days|      560|
|         1|         3days|      560|
|         3|         3days|      560|
|        56|         1days|      369|
+----------+--------------+---------+



8

In [None]:
#Creating temperory tables to perform sql queries 
cstBasic.createOrReplaceTempView("cstbasicsql")
products.createOrReplaceTempView("productssql")
cstInfo.createOrReplaceTempView("cstinfosql")

### Performing joins on tables created

In [None]:
cstBasic.join(cstInfo,cstBasic['CustomerID']==cstInfo['CustomerID']).show()

+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|CustomerID|  Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|CustomerID|LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|       112| Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|       112|         2days|      450|
|        56| Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|        56|         1days|      369|
|       125| Anjali|        rao|  Male|              1200|[8, 9, 2, 1, 5, 4...|       125|        12days|      560|
|        97|Vishwas|         DK|  Male|               750|          [2, 10, 9]|        97|     month ago|      560|
|        30|  Manoj|          S|  Male|               750|        [2, 4, 7, 9]|        30|         3days|      560|
|       121|  Raghu|      singh|  Male|               650|           [2,

In [None]:
spark.sql("select * from cstbasicsql join cstinfosql on cstbasicsql.CustomerID == cstinfosql.CustomerID").show()

+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|CustomerID|  Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|CustomerID|LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|       112| Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|       112|         2days|      450|
|        56| Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|        56|         1days|      369|
|       125| Anjali|        rao|  Male|              1200|[8, 9, 2, 1, 5, 4...|       125|        12days|      560|
|        97|Vishwas|         DK|  Male|               750|          [2, 10, 9]|        97|     month ago|      560|
|        30|  Manoj|          S|  Male|               750|        [2, 4, 7, 9]|        30|         3days|      560|
|       121|  Raghu|      singh|  Male|               650|           [2,

In [None]:
cstBasic.join(cstInfo, cstBasic['CustomerID'] == cstInfo['CustomerID'], "inner").show()

+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|CustomerID|  Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|CustomerID|LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|       112| Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|       112|         2days|      450|
|        56| Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|        56|         1days|      369|
|       125| Anjali|        rao|  Male|              1200|[8, 9, 2, 1, 5, 4...|       125|        12days|      560|
|        97|Vishwas|         DK|  Male|               750|          [2, 10, 9]|        97|     month ago|      560|
|        30|  Manoj|          S|  Male|               750|        [2, 4, 7, 9]|        30|         3days|      560|
|       121|  Raghu|      singh|  Male|               650|           [2,

In [None]:
#to drop column while joining itself
cstBasic.join(cstInfo, on=cstBasic['CustomerID'] == cstInfo['CustomerID']).drop(cstInfo['CustomerID']).show()

+----------+-------+-----------+------+------------------+--------------------+--------------+---------+
|CustomerID|  Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+--------------------+--------------+---------+
|       112| Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|         2days|      450|
|        56| Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|         1days|      369|
|       125| Anjali|        rao|  Male|              1200|[8, 9, 2, 1, 5, 4...|        12days|      560|
|        97|Vishwas|         DK|  Male|               750|          [2, 10, 9]|     month ago|      560|
|        30|  Manoj|          S|  Male|               750|        [2, 4, 7, 9]|         3days|      560|
|       121|  Raghu|      singh|  Male|               650|           [2, 4, 7]|      week ago|      200|
+----------+-------+-----------+------+----------------

In [None]:
#outer Joins
cstBasic.join(cstInfo,(cstBasic['CustomerID'] == cstInfo['CustomerID']), "outer").drop(cstInfo['CustomerID']).show(truncate=False)

+----------+---------+-----------+------+------------------+----------------------------+--------------+---------+
|CustomerID|Fname    |Lname      |Gender|AvgMonthlySpending|Products_Id                 |LastPurchaseOn|TotalBill|
+----------+---------+-----------+------+------------------+----------------------------+--------------+---------+
|112       |Ramesh   |heblikar   |Male  |500               |[1, 2, 3, 5]                |2days         |450      |
|56        |Soorya   |Ramakrishna|Male  |1000              |[2, 4, 7, 1, 3, 4, 5, 10, 9]|1days         |369      |
|88        |Chandrika|Mohan      |Male  |750               |[2, 4, 7, 8, 10, 9]         |null          |null     |
|null      |null     |null       |null  |null              |null                        |3days         |560      |
|null      |null     |null       |null  |null              |null                        |3days         |560      |
|171       |Raksha   |verma      |Male  |500               |[2, 3, 9, 10]       

In [None]:
spark.sql("select * from cstbasicsql full outer join cstinfosql on cstbasicsql.CustomerID == cstinfosql.CustomerID").show()
spark.sql("select * from cstbasicsql full outer join cstinfosql on cstbasicsql.CustomerID == cstinfosql.CustomerID").count()

+----------+---------+-----------+------+------------------+--------------------+----------+--------------+---------+
|CustomerID|    Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|CustomerID|LastPurchaseOn|TotalBill|
+----------+---------+-----------+------+------------------+--------------------+----------+--------------+---------+
|       112|   Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|       112|         2days|      450|
|        56|   Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|        56|         1days|      369|
|        88|Chandrika|      Mohan|  Male|               750| [2, 4, 7, 8, 10, 9]|      null|          null|     null|
|      null|     null|       null|  null|              null|                null|         1|         3days|      560|
|      null|     null|       null|  null|              null|                null|         3|         3days|      560|
|       171|   Raksha|      verma|  Male|               

11

In [None]:
cstBasic.join(cstInfo,(cstBasic['CustomerID'] == cstInfo['CustomerID']), "left_outer").drop(cstInfo['CustomerID']).show(truncate=False)
cstBasic.join(cstInfo,(cstBasic['CustomerID'] == cstInfo['CustomerID']), "left_outer").drop(cstInfo['CustomerID']).count()

+----------+---------+-----------+------+------------------+----------------------------+--------------+---------+
|CustomerID|Fname    |Lname      |Gender|AvgMonthlySpending|Products_Id                 |LastPurchaseOn|TotalBill|
+----------+---------+-----------+------+------------------+----------------------------+--------------+---------+
|112       |Ramesh   |heblikar   |Male  |500               |[1, 2, 3, 5]                |2days         |450      |
|56        |Soorya   |Ramakrishna|Male  |1000              |[2, 4, 7, 1, 3, 4, 5, 10, 9]|1days         |369      |
|88        |Chandrika|Mohan      |Male  |750               |[2, 4, 7, 8, 10, 9]         |null          |null     |
|171       |Raksha   |verma      |Male  |500               |[2, 3, 9, 10]               |null          |null     |
|125       |Anjali   |rao        |Male  |1200              |[8, 9, 2, 1, 5, 4, 7]       |12days        |560      |
|82        |Ganesh   |singam     |Male  |750               |[1, 5, 9, 7]        

9

In [None]:
cstBasic.join(cstInfo,(cstBasic['CustomerID'] == cstInfo['CustomerID']), "right_outer").drop(cstInfo['CustomerID']).show(truncate=False)
cstBasic.join(cstInfo,(cstBasic['CustomerID'] == cstInfo['CustomerID']), "right_outer").drop(cstInfo['CustomerID']).count()

+----------+-------+-----------+------+------------------+----------------------------+--------------+---------+
|CustomerID|Fname  |Lname      |Gender|AvgMonthlySpending|Products_Id                 |LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+----------------------------+--------------+---------+
|112       |Ramesh |heblikar   |Male  |500               |[1, 2, 3, 5]                |2days         |450      |
|56        |Soorya |Ramakrishna|Male  |1000              |[2, 4, 7, 1, 3, 4, 5, 10, 9]|1days         |369      |
|null      |null   |null       |null  |null              |null                        |3days         |560      |
|null      |null   |null       |null  |null              |null                        |3days         |560      |
|125       |Anjali |rao        |Male  |1200              |[8, 9, 2, 1, 5, 4, 7]       |12days        |560      |
|97        |Vishwas|DK         |Male  |750               |[2, 10, 9]                  |month ago

8

## Natural Joins

In [None]:
spark.sql("select * from cstbasicsql natural join cstinfosql").show()

+----------+-------+-----------+------+------------------+--------------------+--------------+---------+
|CustomerID|  Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+--------------------+--------------+---------+
|       112| Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|         2days|      450|
|        56| Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|         1days|      369|
|       125| Anjali|        rao|  Male|              1200|[8, 9, 2, 1, 5, 4...|        12days|      560|
|        97|Vishwas|         DK|  Male|               750|          [2, 10, 9]|     month ago|      560|
|        30|  Manoj|          S|  Male|               750|        [2, 4, 7, 9]|         3days|      560|
|       121|  Raghu|      singh|  Male|               650|           [2, 4, 7]|      week ago|      200|
+----------+-------+-----------+------+----------------

## Cross (Cartesian) Joins

In [None]:
cstBasic.join(cstInfo,(cstBasic['CustomerID'] == cstInfo['CustomerID']), "cross").drop(cstInfo['CustomerID']).show(truncate=False)
cstBasic.join(cstInfo,(cstBasic['CustomerID'] == cstInfo['CustomerID']), "cross").drop(cstInfo['CustomerID']).count()

+----------+-------+-----------+------+------------------+----------------------------+--------------+---------+
|CustomerID|Fname  |Lname      |Gender|AvgMonthlySpending|Products_Id                 |LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+----------------------------+--------------+---------+
|112       |Ramesh |heblikar   |Male  |500               |[1, 2, 3, 5]                |2days         |450      |
|56        |Soorya |Ramakrishna|Male  |1000              |[2, 4, 7, 1, 3, 4, 5, 10, 9]|1days         |369      |
|125       |Anjali |rao        |Male  |1200              |[8, 9, 2, 1, 5, 4, 7]       |12days        |560      |
|97        |Vishwas|DK         |Male  |750               |[2, 10, 9]                  |month ago     |560      |
|30        |Manoj  |S          |Male  |750               |[2, 4, 7, 9]                |3days         |560      |
|121       |Raghu  |singh      |Male  |650               |[2, 4, 7]                   |week ago 

6

In [None]:
spark.sql("select * from cstbasicsql cross join cstinfosql on cstbasicsql.CustomerID == cstinfosql.CustomerID").show()

+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|CustomerID|  Fname|      Lname|Gender|AvgMonthlySpending|         Products_Id|CustomerID|LastPurchaseOn|TotalBill|
+----------+-------+-----------+------+------------------+--------------------+----------+--------------+---------+
|       112| Ramesh|   heblikar|  Male|               500|        [1, 2, 3, 5]|       112|         2days|      450|
|        56| Soorya|Ramakrishna|  Male|              1000|[2, 4, 7, 1, 3, 4...|        56|         1days|      369|
|       125| Anjali|        rao|  Male|              1200|[8, 9, 2, 1, 5, 4...|       125|        12days|      560|
|        97|Vishwas|         DK|  Male|               750|          [2, 10, 9]|        97|     month ago|      560|
|        30|  Manoj|          S|  Male|               750|        [2, 4, 7, 9]|        30|         3days|      560|
|       121|  Raghu|      singh|  Male|               650|           [2,

In [None]:
#cross joins work when there is no condition given while joining 
cstBasic.crossJoin(cstInfo).show()
cstBasic.crossJoin(cstInfo).count()

+----------+------+--------+------+------------------+--------------------+----------+--------------+---------+
|CustomerID| Fname|   Lname|Gender|AvgMonthlySpending|         Products_Id|CustomerID|LastPurchaseOn|TotalBill|
+----------+------+--------+------+------------------+--------------------+----------+--------------+---------+
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|       112|         2days|      450|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|       121|      week ago|      200|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|       125|        12days|      560|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|        97|     month ago|      560|
|       121| Raghu|   singh|  Male|               650|           [2, 4, 7]|       112|         2days|      450|
|       121| Raghu|   singh|  Male|               650|           [2, 4, 7]|       121|      week ago|   

72

In [None]:
spark.sql("select * from cstbasicsql cross join cstinfosql").show()

+----------+------+--------+------+------------------+--------------------+----------+--------------+---------+
|CustomerID| Fname|   Lname|Gender|AvgMonthlySpending|         Products_Id|CustomerID|LastPurchaseOn|TotalBill|
+----------+------+--------+------+------------------+--------------------+----------+--------------+---------+
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|       112|         2days|      450|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|       121|      week ago|      200|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|       125|        12days|      560|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|        97|     month ago|      560|
|       121| Raghu|   singh|  Male|               650|           [2, 4, 7]|       112|         2days|      450|
|       121| Raghu|   singh|  Male|               650|           [2, 4, 7]|       121|      week ago|   

## Joins on Complex Types

In [None]:
products.show()

+---------+-----------+-----+
|ProductID|ProductName|Price|
+---------+-----------+-----+
|        1|    Shampoo|   10|
|        2|    T-shirt|   45|
|        3|     Ginger|    7|
|        4| CoolDrinks|    5|
|        5|     Tamato|   10|
|        6|      Onion|   30|
|        7|      Jeans|   60|
|        8|      Cream|  100|
|        9|     Trimer|  150|
|       10|      Phone|  500|
+---------+-----------+-----+



In [None]:
cstBasic.join(products, expr("array_contains(Products_Id, ProductID)")).show(truncate = False)

+----------+------+--------+------+------------------+---------------------+---------+-----------+-----+
|CustomerID|Fname |Lname   |Gender|AvgMonthlySpending|Products_Id          |ProductID|ProductName|Price|
+----------+------+--------+------+------------------+---------------------+---------+-----------+-----+
|112       |Ramesh|heblikar|Male  |500               |[1, 2, 3, 5]         |1        |Shampoo    |10   |
|112       |Ramesh|heblikar|Male  |500               |[1, 2, 3, 5]         |2        |T-shirt    |45   |
|112       |Ramesh|heblikar|Male  |500               |[1, 2, 3, 5]         |3        |Ginger     |7    |
|112       |Ramesh|heblikar|Male  |500               |[1, 2, 3, 5]         |5        |Tamato     |10   |
|121       |Raghu |singh   |Male  |650               |[2, 4, 7]            |2        |T-shirt    |45   |
|121       |Raghu |singh   |Male  |650               |[2, 4, 7]            |4        |CoolDrinks |5    |
|125       |Anjali|rao     |Male  |1200              |[

In [None]:
spark.sql("select * from cstbasicsql outer join productssql on array_contains(Products_Id, ProductID)").show()

+----------+------+--------+------+------------------+--------------------+---------+-----------+-----+
|CustomerID| Fname|   Lname|Gender|AvgMonthlySpending|         Products_Id|ProductID|ProductName|Price|
+----------+------+--------+------+------------------+--------------------+---------+-----------+-----+
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|        1|    Shampoo|   10|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|        2|    T-shirt|   45|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|        3|     Ginger|    7|
|       112|Ramesh|heblikar|  Male|               500|        [1, 2, 3, 5]|        5|     Tamato|   10|
|       121| Raghu|   singh|  Male|               650|           [2, 4, 7]|        2|    T-shirt|   45|
|       121| Raghu|   singh|  Male|               650|           [2, 4, 7]|        4| CoolDrinks|    5|
|       125|Anjali|     rao|  Male|              1200|[8, 9, 2, 

# Random Samples

In [None]:
sampleDF1 = sdf.sample(False, 0.2, 1234)
sampleDF2 = sdf.sample(False, 0.2, 4321)
print(sampleDF1.count(), sampleDF2.count())

30379 29994


# Random Splits

In [None]:
splits = sdf.randomSplit([0.7,0.3],seed= 45)
print(splits[0].count())
splits[1].count()

104987


45013

In [None]:
splits[0].show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     2|Dallas|  Male| 54|55084.0|     No|
|     4|Dallas|  Male| 40|50941.0|     No|
|     5|Dallas|  Male| 46|60289.0|     No|
|     6|Dallas|Female| 36|60786.0|     No|
|     8|Dallas|  Male| 39|40914.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



# Map Transformation

In [None]:
sdf.select(col('City')).rdd.map(lambda x: (x,len(x))).toDF().show(10)


+--------+---+
|      _1| _2|
+--------+---+
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
|[Dallas]|  1|
+--------+---+
only showing top 10 rows



# Sorting Rows

In [None]:
sdf.show()

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|50367.0|     No|
|     2|Dallas|  Male| 54|55084.0|     No|
|     3|Dallas|  Male| 42|62483.0|     No|
|     4|Dallas|  Male| 40|50941.0|     No|
|     5|Dallas|  Male| 46|60289.0|     No|
|     6|Dallas|Female| 36|60786.0|     No|
|     7|Dallas|Female| 32|43155.0|     No|
|     8|Dallas|  Male| 39|40914.0|     No|
|     9|Dallas|  Male| 51|78667.0|     No|
|    10|Dallas|Female| 30|60082.0|     No|
|    11|Dallas|Female| 48|51524.0|    Yes|
|    12|Dallas|  Male| 47|64777.0|     No|
|    13|Dallas|  Male| 46|72749.0|     No|
|    14|Dallas|Female| 42|60894.0|     No|
|    15|Dallas|Female| 61|48429.0|     No|
|    16|Dallas|  Male| 43|44074.0|     No|
|    17|Dallas|  Male| 27|60398.0|     No|
|    18|Dallas|  Male| 38|56373.0|    Yes|
|    19|Dallas|  Male| 47|61137.0|     No|
|    20|Dallas|Female| 35|33688.0|     No|
+------+---

In [None]:
#orderby income in descending order
sdf.orderBy(col('Income').desc()).show()

+------+-------------+------+---+--------+-------+
|Number|         City|Gender|Age|  Income|Illness|
+------+-------------+------+---+--------+-------+
|109351|Mountain View|  Male| 58|187157.0|     No|
|105282|Mountain View|  Male| 41|186746.0|     No|
|109061|Mountain View|  Male| 61|183991.0|     No|
|110878|Mountain View|  Male| 52|183826.0|     No|
|112193|Mountain View|  Male| 58|182825.0|     No|
|102882|Mountain View|  Male| 47|181862.0|     No|
|112341|Mountain View|  Male| 37|180965.0|     No|
|107155|Mountain View|  Male| 31|180937.0|     No|
|106021|Mountain View|  Male| 27|180927.0|     No|
|105905|Mountain View|  Male| 30|180463.0|     No|
|114767|Mountain View|  Male| 48|180289.0|     No|
|109556|Mountain View|  Male| 43|180179.0|     No|
|107486|Mountain View|  Male| 63|179843.0|     No|
|115296|Mountain View|  Male| 46|178918.0|     No|
|102763|Mountain View|  Male| 62|178911.0|     No|
|114626|Mountain View|  Male| 51|178743.0|     No|
|107178|Mountain View|  Male| 4

In [None]:
sdf.orderBy(col("Income"), ascending=True).show()

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|   246|Dallas|Female| 40| 9346.0|    Yes|
|  7897|Dallas|Female| 34|10584.0|     No|
| 18485|Dallas|Female| 43|12474.0|     No|
|  9299|Dallas|Female| 31|12667.0|    Yes|
| 17136|Dallas|Female| 39|13842.0|     No|
|  2119|Dallas|Female| 56|14371.0|     No|
|  4247|Dallas|Female| 38|15717.0|     No|
| 18986|Dallas|Female| 40|16722.0|     No|
|   407|Dallas|Female| 65|16733.0|     No|
| 17103|Dallas|Female| 40|18129.0|     No|
|  1826|Dallas|Female| 59|18182.0|     No|
|  2524|Dallas|Female| 36|18279.0|     No|
|  5499|Dallas|Female| 63|18548.0|     No|
|  1038|Dallas|Female| 42|18613.0|     No|
|  7636|Dallas|Female| 26|18719.0|     No|
|  3087|Dallas|Female| 51|18812.0|     No|
| 14204|Dallas|Female| 49|19046.0|     No|
|  6623|Dallas|Female| 40|19531.0|     No|
| 17821|Dallas|Female| 57|19760.0|    Yes|
| 11466|Dallas|Female| 35|20129.0|     No|
+------+---

# Repartition and Coalesce

In [None]:
#to see the number of partitons before performing the repartioning
sdf.rdd.getNumPartitions()

2

In [None]:
#repartition into 5 partitions
tempP = sdf.repartition(5)

tempP.rdd.getNumPartitions()

5

# Unions and condtional append
Joining or appending two tables one below another

In [None]:
prod_one = spark.createDataFrame([
    (1,'Shampoo',10),
    (2,'T-shirt',45),
    (3,'Ginger',7),
    (4,'CoolDrinks',5),
    (5,'Tamato',10)])\
    .toDF("ProductID",'ProductName','Price')

prod_two = spark.createDataFrame([
    (6,'Onion',30),
    (7,'Jeans',60),
    (8,'Cream',100),
    (9,'Trimer',150),
    (10,'Phone',500)])\
    .toDF("ProductID",'ProductName','Price')

In [None]:
prod_one.show()
prod_two.show()

+---------+-----------+-----+
|ProductID|ProductName|Price|
+---------+-----------+-----+
|        1|    Shampoo|   10|
|        2|    T-shirt|   45|
|        3|     Ginger|    7|
|        4| CoolDrinks|    5|
|        5|     Tamato|   10|
+---------+-----------+-----+

+---------+-----------+-----+
|ProductID|ProductName|Price|
+---------+-----------+-----+
|        6|      Onion|   30|
|        7|      Jeans|   60|
|        8|      Cream|  100|
|        9|     Trimer|  150|
|       10|      Phone|  500|
+---------+-----------+-----+



In [None]:
#Unions of these two dataframes
prod_one.union(prod_two).show()

+---------+-----------+-----+
|ProductID|ProductName|Price|
+---------+-----------+-----+
|        1|    Shampoo|   10|
|        2|    T-shirt|   45|
|        3|     Ginger|    7|
|        4| CoolDrinks|    5|
|        5|     Tamato|   10|
|        6|      Onion|   30|
|        7|      Jeans|   60|
|        8|      Cream|  100|
|        9|     Trimer|  150|
|       10|      Phone|  500|
+---------+-----------+-----+



In [None]:
#condtional appending -- here the conditon applies to both the dataframes 
prod_one.union(prod_two).where("price>10").show()

+---------+-----------+-----+
|ProductID|ProductName|Price|
+---------+-----------+-----+
|        2|    T-shirt|   45|
|        6|      Onion|   30|
|        7|      Jeans|   60|
|        8|      Cream|  100|
|        9|     Trimer|  150|
|       10|      Phone|  500|
+---------+-----------+-----+



# User Defined Functions - UDF

In [None]:
sdf.show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|50367.0|     No|
|     2|Dallas|  Male| 54|55084.0|     No|
|     3|Dallas|  Male| 42|62483.0|     No|
|     4|Dallas|  Male| 40|50941.0|     No|
|     5|Dallas|  Male| 46|60289.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import udf

In [None]:
func1 = udf(lambda x: x-10000)

In [None]:
#subtracting 10000 from income 
sdf_1 = sdf.withColumn("Income",func1(sdf['Income']))
sdf_1.show()

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|40367.0|     No|
|     2|Dallas|  Male| 54|45084.0|     No|
|     3|Dallas|  Male| 42|52483.0|     No|
|     4|Dallas|  Male| 40|40941.0|     No|
|     5|Dallas|  Male| 46|50289.0|     No|
|     6|Dallas|Female| 36|50786.0|     No|
|     7|Dallas|Female| 32|33155.0|     No|
|     8|Dallas|  Male| 39|30914.0|     No|
|     9|Dallas|  Male| 51|68667.0|     No|
|    10|Dallas|Female| 30|50082.0|     No|
|    11|Dallas|Female| 48|41524.0|    Yes|
|    12|Dallas|  Male| 47|54777.0|     No|
|    13|Dallas|  Male| 46|62749.0|     No|
|    14|Dallas|Female| 42|50894.0|     No|
|    15|Dallas|Female| 61|38429.0|     No|
|    16|Dallas|  Male| 43|34074.0|     No|
|    17|Dallas|  Male| 27|50398.0|     No|
|    18|Dallas|  Male| 38|46373.0|    Yes|
|    19|Dallas|  Male| 47|51137.0|     No|
|    20|Dallas|Female| 35|23688.0|     No|
+------+---

In [None]:
#another function
func2 = udf(lambda x: (x+str(len(x))))

In [None]:
sdf.withColumn("function_columns",func2(sdf['City'])).show(5)

+------+------+------+---+-------+-------+----------------+
|Number|  City|Gender|Age| Income|Illness|function_columns|
+------+------+------+---+-------+-------+----------------+
|     1|Dallas|  Male| 41|50367.0|     No|         Dallas6|
|     2|Dallas|  Male| 54|55084.0|     No|         Dallas6|
|     3|Dallas|  Male| 42|62483.0|     No|         Dallas6|
|     4|Dallas|  Male| 40|50941.0|     No|         Dallas6|
|     5|Dallas|  Male| 46|60289.0|     No|         Dallas6|
+------+------+------+---+-------+-------+----------------+
only showing top 5 rows



In [None]:
avgIncome = sdf.select(expr("avg(Income)"))

def indicator_var(x):
  if x > avgIncome.collect()[0][0]:
    return 1
  else:
    return 0
func3 = udf(indicator_var)

In [None]:
#Could not solve this error 
#sdf.select(func3(col("Income"))).show()

# Correlation between two columns

In [None]:
from pyspark.sql.functions import corr
print(sdf.stat.corr("Income", "Age"))
sdf.select(corr("Income", "Age")).show()

-0.0013181143630557161
+--------------------+
|   corr(Income, Age)|
+--------------------+
|-0.00131811436305...|
+--------------------+



In [None]:
sdf.stat.freqItems(['City'],0.3).show(truncate=False)

+------------------------------------+
|City_freqItems                      |
+------------------------------------+
|[Los Angeles, New York City, Austin]|
+------------------------------------+



# String Manipulations

In [None]:
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim

In [None]:
df = spark.createDataFrame([[1,2,3]])
df.select(ltrim(lit("   Hello    ")).alias("Ltrim")).show()

+---------+
|    Ltrim|
+---------+
|Hello    |
+---------+



In [None]:
df.select(rtrim(lit("   hello    "))).show()

+-------------------+
|rtrim(   hello    )|
+-------------------+
|              hello|
+-------------------+



In [None]:
df.select(trim(lit("    Hello    "))).show()

+-------------------+
|trim(    Hello    )|
+-------------------+
|              Hello|
+-------------------+



In [None]:
df.select(lpad(lit("hello"),10,"@")).show()

+------------------+
|lpad(hello, 10, @)|
+------------------+
|        @@@@@hello|
+------------------+



In [None]:
df.select(rpad(lit("Hello"),20,"@")).show()

+--------------------+
|  rpad(Hello, 20, @)|
+--------------------+
|Hello@@@@@@@@@@@@@@@|
+--------------------+



In [None]:
from pyspark.sql.functions import translate
sdf.select(
translate(col("Gender"), "MaleFemale", "12"),
col("Gender"))\
.show(10)

+---------------------------------+------+
|translate(Gender, MaleFemale, 12)|Gender|
+---------------------------------+------+
|                               12|  Male|
|                               12|  Male|
|                               12|  Male|
|                               12|  Male|
|                               12|  Male|
|                                2|Female|
|                                2|Female|
|                               12|  Male|
|                               12|  Male|
|                                2|Female|
+---------------------------------+------+
only showing top 10 rows



# Working with Date and Time

In [None]:
from pyspark.sql.functions import current_date, current_timestamp, date_add, date_sub

In [None]:
dateDF = spark.range(5)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.show(truncate = False)

+---+----------+-----------------------+
|id |today     |now                    |
+---+----------+-----------------------+
|0  |2022-12-12|2022-12-12 15:19:21.775|
|1  |2022-12-12|2022-12-12 15:19:21.775|
|2  |2022-12-12|2022-12-12 15:19:21.775|
|3  |2022-12-12|2022-12-12 15:19:21.775|
|4  |2022-12-12|2022-12-12 15:19:21.775|
+---+----------+-----------------------+



In [None]:
dateDF.select(date_sub(col("today"),5),date_add(col("today"),5)).show()

+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
|        2022-12-07|        2022-12-17|
|        2022-12-07|        2022-12-17|
|        2022-12-07|        2022-12-17|
|        2022-12-07|        2022-12-17|
|        2022-12-07|        2022-12-17|
+------------------+------------------+



In [None]:
from pyspark.sql.functions import datediff, months_between, to_date

In [None]:
dateDF = dateDF.withColumn("Month-ago",date_sub(col("today"),30))
dateDF.show()

+---+----------+--------------------+----------+
| id|     today|                 now| Month-ago|
+---+----------+--------------------+----------+
|  0|2022-12-12|2022-12-12 15:19:...|2022-11-12|
|  1|2022-12-12|2022-12-12 15:19:...|2022-11-12|
|  2|2022-12-12|2022-12-12 15:19:...|2022-11-12|
|  3|2022-12-12|2022-12-12 15:19:...|2022-11-12|
|  4|2022-12-12|2022-12-12 15:19:...|2022-11-12|
+---+----------+--------------------+----------+



In [None]:
dateDF.select(datediff(col("today"),col("Month-ago")).alias("Datediff")).show()
dateDF.select(datediff(to_date(lit("2020-09-08")),to_date(lit("2020-07-10")))).show()

+--------+
|Datediff|
+--------+
|      30|
|      30|
|      30|
|      30|
|      30|
+--------+

+------------------------------------------------------+
|datediff(to_date('2020-09-08'), to_date('2020-07-10'))|
+------------------------------------------------------+
|                                                    60|
|                                                    60|
|                                                    60|
|                                                    60|
|                                                    60|
+------------------------------------------------------+



In [None]:
dateDF.select(months_between(col("today"),col("Month-ago"))).show()
dateDF.select(months_between(to_date(lit("2020-09-08")),to_date(lit("2020-07-10"))).alias("Month Between")).show()

+--------------------------------------+
|months_between(today, Month-ago, true)|
+--------------------------------------+
|                                   1.0|
|                                   1.0|
|                                   1.0|
|                                   1.0|
|                                   1.0|
+--------------------------------------+

+-------------+
|Month Between|
+-------------+
|   1.93548387|
|   1.93548387|
|   1.93548387|
|   1.93548387|
|   1.93548387|
+-------------+



In [None]:
from pyspark.sql.functions import unix_timestamp, from_unixtime
dateFormat = "yyyy-dd-MM"

In [None]:
dateDF.select(to_date(unix_timestamp(lit("2022-30-12"),dateFormat).cast("timestamp"))).show()

+----------------------------------------------------------------------+
|to_date(CAST(unix_timestamp('2022-30-12', 'yyyy-dd-MM') AS TIMESTAMP))|
+----------------------------------------------------------------------+
|                                                            2022-12-30|
|                                                            2022-12-30|
|                                                            2022-12-30|
|                                                            2022-12-30|
|                                                            2022-12-30|
+----------------------------------------------------------------------+



# Broadcast Variables

In [None]:
sdf.show(5)

+------+------+------+---+-------+-------+
|Number|  City|Gender|Age| Income|Illness|
+------+------+------+---+-------+-------+
|     1|Dallas|  Male| 41|50367.0|     No|
|     2|Dallas|  Male| 54|55084.0|     No|
|     3|Dallas|  Male| 42|62483.0|     No|
|     4|Dallas|  Male| 40|50941.0|     No|
|     5|Dallas|  Male| 46|60289.0|     No|
+------+------+------+---+-------+-------+
only showing top 5 rows



In [None]:
CityBC = {"Dallas":"DA","Los Angeles":"LA","San Diego":"SD","Austin":"AU","New York City":"NYC",
          "Washington D.C.":"WDC","Mountain View":"MV","Boston":"BO"}
cityBroadcasted = sc.broadcast(CityBC)

In [None]:
cityBroadcasted.value

{'Dallas': 'DA',
 'Los Angeles': 'LA',
 'San Diego': 'SD',
 'Austin': 'AU',
 'New York City': 'NYC',
 'Washington D.C.': 'WDC',
 'Mountain View': 'MV',
 'Boston': 'BO'}

In [None]:
my_collection = "Dallas Los Angeles San Diego Austin New York City Washington D.C. Mountain View Boston".split(" ")
    
words = sc.parallelize(my_collection, 2)
words.map(lambda x: (x,cityBroadcasted.value.get(x,0))).collect()

[('Dallas', 'DA'),
 ('Los', 0),
 ('Angeles', 0),
 ('San', 0),
 ('Diego', 0),
 ('Austin', 'AU'),
 ('New', 0),
 ('York', 0),
 ('City', 0),
 ('Washington', 0),
 ('D.C.', 0),
 ('Mountain', 0),
 ('View', 0),
 ('Boston', 'BO')]