## Examples for Aggregation functions, joins and SQL 

In [4]:
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName("SimpleApp2").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "5")
netflix = spark.read.csv("../data/netflix_titles.csv",header='true')
netflix.limit(5).toPandas().head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,TV Show,3%,,"João Miguel, Bianca Comparato, Michel Gomes, R...",Brazil,"August 14, 2020",2020,TV-MA,4 Seasons,"International TV Shows, TV Dramas, TV Sci-Fi &...",In a future where the elite inhabit an island ...
1,s2,Movie,7:19,Jorge Michel Grau,"Demián Bichir, Héctor Bonilla, Oscar Serrano, ...",Mexico,"December 23, 2016",2016,TV-MA,93 min,"Dramas, International Movies",After a devastating earthquake hits Mexico Cit...
2,s3,Movie,23:59,Gilbert Chan,"Tedd Chan, Stella Chung, Henley Hii, Lawrence ...",Singapore,"December 20, 2018",2011,R,78 min,"Horror Movies, International Movies","When an army recruit is found dead, his fellow..."
3,s4,Movie,9,Shane Acker,"Elijah Wood, John C. Reilly, Jennifer Connelly...",United States,"November 16, 2017",2009,PG-13,80 min,"Action & Adventure, Independent Movies, Sci-Fi...","In a postapocalyptic world, rag-doll robots hi..."
4,s5,Movie,21,Robert Luketic,"Jim Sturgess, Kevin Spacey, Kate Bosworth, Aar...",United States,"January 1, 2020",2008,PG-13,123 min,Dramas,A brilliant group of students become card-coun...


In [3]:
avocadoprice = spark.read.csv("../data/avocado.csv",header='true')

avocadoprice.cache()
avocadoprice.createOrReplaceTempView("avocadopriceTable")

avocadoprice.limit(5).toPandas().head()

Unnamed: 0,_c0,Date,AveragePrice,Total Volume,4046,4225,4770,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,0,2015-12-27,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,1,2015-12-20,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2,2015-12-13,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,3,2015-12-06,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,4,2015-11-29,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany


In [5]:
from pyspark.sql.functions import count
avocadoprice.select(count("AveragePrice")).show()

+-------------------+
|count(AveragePrice)|
+-------------------+
|              18249|
+-------------------+



In [6]:
from pyspark.sql.functions import countDistinct
avocadoprice.select(countDistinct("AveragePrice")).show()

+----------------------------+
|count(DISTINCT AveragePrice)|
+----------------------------+
|                         259|
+----------------------------+



In [9]:
from pyspark.sql.functions import min,max
avocadoprice.select(min("AveragePrice"),max("AveragePrice")).show()

+-----------------+-----------------+
|min(AveragePrice)|max(AveragePrice)|
+-----------------+-----------------+
|             0.44|             3.25|
+-----------------+-----------------+



In [12]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

avocadoprice.select(var_pop("AveragePrice"), var_samp("AveragePrice"),
    stddev_pop("AveragePrice"), stddev_samp("AveragePrice")).show()

+---------------------+----------------------+------------------------+-------------------------+
|var_pop(AveragePrice)|var_samp(AveragePrice)|stddev_pop(AveragePrice)|stddev_samp(AveragePrice)|
+---------------------+----------------------+------------------------+-------------------------+
|   0.1621395230146024|   0.16214840834576277|     0.40266552250546905|       0.4026765554955525|
+---------------------+----------------------+------------------------+-------------------------+



In [14]:
from pyspark.sql.functions import skewness, kurtosis

avocadoprice.select(skewness("AveragePrice"), kurtosis("AveragePrice")).show()

+----------------------+----------------------+
|skewness(AveragePrice)|kurtosis(AveragePrice)|
+----------------------+----------------------+
|    0.5802550380696546|   0.32477799072951186|
+----------------------+----------------------+



## Grouping

In [16]:
avocadoprice.groupBy("year","region").count().show()

+----+-------------------+-----+
|year|             region|count|
+----+-------------------+-----+
|2015|             Albany|  104|
|2015|            Chicago|  104|
|2015|           Columbus|  104|
|2015|        GrandRapids|  104|
|2015|         Louisville|  104|
|2015|          Nashville|  104|
|2015|      PhoenixTucson|  104|
|2015|         Pittsburgh|  104|
|2015|    RichmondNorfolk|  104|
|2015|            StLouis|  104|
|2016|BaltimoreWashington|  104|
|2016|             Boston|  104|
|2016| HarrisburgScranton|  104|
|2016|       Indianapolis|  104|
|2016|           Midsouth|  104|
|2016|            Orlando|  104|
|2016|       Philadelphia|  104|
|2016|         Pittsburgh|  104|
|2016|    RichmondNorfolk|  104|
|2016|            Roanoke|  104|
+----+-------------------+-----+
only showing top 20 rows



In [17]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

windowSpec = Window\
.partitionBy("_c0", "Date")\
.orderBy(desc("Total Volume"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [19]:
from pyspark.sql.functions import max
from pyspark.sql.functions import col
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [20]:
maxPurchaseQuantity

Column<'max(Quantity) OVER (PARTITION BY _c0, Date ORDER BY Total Volume DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)'>

In [21]:
pivoted = avocadoprice.groupBy("region").pivot("type").sum()

In [24]:
pivoted.limit(6).toPandas().head()


Unnamed: 0,region
0,Charlotte
1,Midsouth
2,Sacramento
3,Southeast
4,TotalUS


## Joins

In [8]:
happy2018 = spark.read.csv("../data/happiness/2018.csv",header='true')
happy2018.limit(5).toPandas().head()

Unnamed: 0,Overall rank,Country or region,Score,GDP per capita,Social support,Healthy life expectancy,Freedom to make life choices,Generosity,Perceptions of corruption
0,1,Finland,7.632,1.305,1.592,0.874,0.681,0.202,0.393
1,2,Norway,7.594,1.456,1.582,0.861,0.686,0.286,0.34
2,3,Denmark,7.555,1.351,1.59,0.868,0.683,0.284,0.408
3,4,Iceland,7.495,1.343,1.644,0.914,0.677,0.353,0.138
4,5,Switzerland,7.487,1.42,1.549,0.927,0.66,0.256,0.357


In [9]:
happy2019 = spark.read.csv("../data/happiness/2019.csv",header='true')
happy2019.limit(5).toPandas().head()

Unnamed: 0,Overall rank,Country or region,Score,GDP per capita,Social support,Healthy life expectancy,Freedom to make life choices,Generosity,Perceptions of corruption
0,1,Finland,7.769,1.34,1.587,0.986,0.596,0.153,0.393
1,2,Denmark,7.6,1.383,1.573,0.996,0.592,0.252,0.41
2,3,Norway,7.554,1.488,1.582,1.028,0.603,0.271,0.341
3,4,Iceland,7.494,1.38,1.624,1.026,0.591,0.354,0.118
4,5,Netherlands,7.488,1.396,1.522,0.999,0.557,0.322,0.298


In [10]:

happy2018.cache()
happy2018.createOrReplaceTempView("happy2018Table")
happy2019.cache()
happy2019.createOrReplaceTempView("happy2019Table")

In [24]:
spark.sql("select `Country or region` as Country, `Social support` as social  from happy2018Table limit 5").show()

+-----------+------+
|    Country|social|
+-----------+------+
|    Finland| 1.592|
|     Norway| 1.582|
|    Denmark| 1.590|
|    Iceland| 1.644|
|Switzerland| 1.549|
+-----------+------+



In [27]:
joinExpression = happy2018["Country or region"] == happy2019['Country or region']
joinExpression

Column<'(Country or region = Country or region)'>

In [28]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
    .toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
    .toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
    .toDF("id", "status")

In [29]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [30]:
joinExpression = person["graduate_program"] == graduateProgram['id']

In [33]:
person.join(graduateProgram, joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [34]:
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [37]:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [38]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



In [39]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [40]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
| id| degree|          department|     school| id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|  0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|  2|Michael Armbrust|               1|     [250, 100]|
+---+-------+--------------------+-----------+---+----------------+----------------+---------------+



## Spark SQL

In [41]:
avocadoprice.limit(5).toPandas().head()

Unnamed: 0,_c0,Date,AveragePrice,Total Volume,4046,4225,4770,Total Bags,Small Bags,Large Bags,XLarge Bags,type,year,region
0,0,2015-12-27,1.33,64236.62,1036.74,54454.85,48.16,8696.87,8603.62,93.25,0.0,conventional,2015,Albany
1,1,2015-12-20,1.35,54876.98,674.28,44638.81,58.33,9505.56,9408.07,97.49,0.0,conventional,2015,Albany
2,2,2015-12-13,0.93,118220.22,794.7,109149.67,130.5,8145.35,8042.21,103.14,0.0,conventional,2015,Albany
3,3,2015-12-06,1.08,78992.15,1132.0,71976.41,72.58,5811.16,5677.4,133.76,0.0,conventional,2015,Albany
4,4,2015-11-29,1.28,51039.6,941.48,43838.39,75.78,6183.95,5986.26,197.69,0.0,conventional,2015,Albany


In [51]:
spark.sql("""SELECT Date, sum(AveragePrice) FROM avocadopriceTable GROUP BY Date """).show()


+----------+---------------------------------+
|      Date|sum(CAST(AveragePrice AS DOUBLE))|
+----------+---------------------------------+
|2015-11-29|               145.03000000000003|
|2015-10-25|                           149.37|
|2015-10-11|               148.31999999999996|
|2015-08-02|               159.54999999999998|
|2015-07-26|               153.44999999999996|
|2015-05-24|               149.08999999999997|
|2015-02-22|               147.17000000000002|
|2016-11-27|               161.62000000000003|
|2016-11-20|               161.96999999999994|
|2016-10-30|               183.81999999999994|
|2016-10-16|               160.32000000000002|
|2016-10-09|               160.15999999999997|
|2016-09-25|                           165.72|
|2016-08-21|               152.89000000000001|
|2016-05-29|               134.55999999999995|
|2016-05-22|               132.26000000000005|
|2016-03-13|               129.28000000000003|
|2017-12-10|               144.88000000000005|
|2017-11-19| 