# Spark Exercises

## Exercise 1:

Create a jupyter notebook or python script named `spark101` for this exercise.

Create a spark data frame that contains your favorite programming languages.

- Create a dataframe with one column named `language`
> Hint: Start with a pandas dataframe. Maybe use a dictionary?
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [2]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
import pandas as pd
import numpy as np

pandas_dataframe = pd.DataFrame(
    {'languages':['python','c++','c','java','javascript','typescript','ruby','r','scala']}
)
pandas_dataframe

Unnamed: 0,languages
0,python
1,c++
2,c
3,java
4,javascript
5,typescript
6,ruby
7,r
8,scala


In [4]:
df = spark.createDataFrame(pandas_dataframe)
df.show(5)

+----------+
| languages|
+----------+
|    python|
|       c++|
|         c|
|      java|
|javascript|
+----------+
only showing top 5 rows



In [5]:
df.describe().show()

+-------+----------+
|summary| languages|
+-------+----------+
|  count|         9|
|   mean|      null|
| stddev|      null|
|    min|         c|
|    max|typescript|
+-------+----------+



In [6]:
print("DataFrame shape: ", df.count(), " x ", len(df.columns))

DataFrame shape:  9  x  1


## Exercise 2:

Load the `mpg` dataset as a spark dataframe.

a. Create 1 column of output that contains a message like the one below for each record:

    The 1999 audi a4 has a 4 cylinder engine.

> Hint: You will need to concatenate values that already exist in the data with string literals

b. Transform the trans column so that it only contains either manual or auto.

> Hint: Consider spark string methods and `when().otherwise()` chaining

In [7]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import *

mpg.select(
    concat(
        lit("The "),
        col("year"),
        lit(" "),
        col("manufacturer"),
        lit(" "),
        col("model"),
        lit(" has a "),
        col("cyl"),
        lit(" cylinder engine."),
    ).alias("vehicle_cylinder_desc")
).show(truncate=False)

+--------------------------------------------------------------+
|vehicle_cylinder_desc                                         |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [9]:
mpg.select(
    'trans',
    regexp_extract("trans", r"^(\w+)", 1).alias("regexp_extract"),
    regexp_replace("trans", r"\(.+$", "").alias("regexp_replace"),
    when(
        mpg.trans.like("a%"), "auto")
    .otherwise("manual").alias("when + like")
).show()

+----------+--------------+--------------+-----------+
|     trans|regexp_extract|regexp_replace|when + like|
+----------+--------------+--------------+-----------+
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|manual(m6)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(l5)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(l5

## Exercise 3: 

Load the `tips` dataset as a spark dataframe.

a. What percentage of observations are smokers?
> Hint: `.groupBy()` and `.withColumn()` are useful functions here

b. Create a column that contains the tip percentage
> Hint: `.withColumn()` is useful here

c. Calculate the average tip percentage for each combination of sex and smoker.
> Hint: Chain additional functions off the answer to part b 

In [10]:
tips = spark.createDataFrame(data("tips"))

In [11]:
tips.groupBy("smoker").count().withColumn("percent", 
                                          round((col("count")/tips.count()*100), 0)).show()

+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|   62.0|
|   Yes|   93|   38.0|
+------+-----+-------+



In [12]:
tips.withColumn("tip_percentage", col('tip') / col('total_bill')).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [13]:
tips.withColumn("tip_percentage", col('tip') / col('total_bill')).groupby("sex").pivot("smoker").agg(mean("tip_percentage")).show()

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941032|
|  Male|0.1606687151291298|0.15277117520248512|
+------+------------------+-------------------+



## Exercise 4:

Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [27]:
from vega_datasets import data
weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [72]:
(
    weather.withColumn('temp_max', (col('temp_max') * 9 / 5 + 32)
    ).withColumn('temp_min', (col('temp_min') * 9 / 5 + 32))
    .show()
)

+----------+-------------+------------------+-----------------+----+-------+
|      date|precipitation|          temp_max|         temp_min|wind|weather|
+----------+-------------+------------------+-----------------+----+-------+
|2012-01-01|          0.0|           131.072|            105.8| 4.7|drizzle|
|2012-01-02|         10.9|123.94399999999999|           98.672| 4.5|   rain|
|2012-01-03|          0.8|127.50800000000001|          112.928| 2.3|   rain|
|2012-01-04|         20.3|           129.128|          107.744| 4.7|   rain|
|2012-01-05|          1.3|           118.436|           98.672| 6.1|   rain|
|2012-01-06|          2.5|103.85600000000001|           96.728| 2.2|   rain|
|2012-01-07|          0.0|           112.928|           98.672| 2.3|   rain|
|2012-01-08|          0.0|             122.0|           98.672| 2.0|    sun|
|2012-01-09|          4.3|120.05600000000001|            105.8| 3.4|   rain|
|2012-01-10|          1.0|           109.364|           91.544| 3.4|   rain|

In [45]:
(
    weather.withColumn('month', month('date'))
    .groupBy('month')
    .agg(sum('precipitation').alias('total_rainfall'))
    .groupBy('month')
    .agg(round(mean("total_rainfall")).alias("average_rainfall"))
    .sort("month")
    .show()
)

+-----+----------------+
|month|average_rainfall|
+-----+----------------+
|    1|           466.0|
|    2|           422.0|
|    3|           606.0|
|    4|           375.0|
|    5|           208.0|
|    6|           133.0|
|    7|            48.0|
|    8|           164.0|
|    9|           235.0|
|   10|           503.0|
|   11|           643.0|
|   12|           623.0|
+-----+----------------+



In [24]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(mean("wind").alias("average_wind"))
    .sort("year")
    .show()
)

+----+------------------+
|year|      average_wind|
+----+------------------+
|2012| 3.400819672131148|
|2013|3.0158904109589058|
|2014| 3.387671232876714|
|2015| 3.159726027397261|
+----+------------------+



In [48]:
(
    weather.filter(month('date') == 7)
    .filter(year('date') > 2012)
    .filter(year('date') < 2015)
    .filter(col('weather') == lit('sun'))
    .agg(avg('temp_max').alias('average_high_temp'),
         avg('temp_min').alias('average_low_temp')
        )
    .show()
)

+------------------+-----------------+
| average_high_temp| average_low_temp|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



In [54]:
(
    weather.filter(year('date') == 2015)
    .filter(quarter('date') == 3)
    .select(when(col('weather') == 'rain', 1).otherwise(0).alias('rain'))
    .agg(mean('rain'))
    .show()
)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



In [55]:
(
    weather.withColumn('year',year('date'))
    .select(when(col('precipitation') > 0, 1).otherwise(0).alias('rain'),'year')
    .groupby('year')
    .agg(mean('rain'))
    .show()
)

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2012|0.48360655737704916|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2015|0.39452054794520547|
+----+-------------------+

