# Spark 101 Exercises

In [60]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pandas as pd
import numpy as np
from pydataset import data

from pyspark.sql.functions import sum, mean, concat, lit, regexp_extract, regexp_replace, when, asc, desc, col, expr
from pyspark.sql.functions import month, year, quarter

## 1. Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [2]:
#First create a pandas dataframe
languages = ['Python', 'SQL', 'Spark', 'HTML', 'Julia', 'R', 'Java']
pandas_dataframe = pd.DataFrame({
    "language": languages
})

#Convert Dataframe to a Spark Onject
df = spark.createDataFrame(pandas_dataframe)

In [3]:
df

DataFrame[language: string]

In [4]:
df.describe().show()

+-------+--------+
|summary|language|
+-------+--------+
|  count|       7|
|   mean|    null|
| stddev|    null|
|    min|    HTML|
|    max|   Spark|
+-------+--------+



In [5]:
df.show(5)

+--------+
|language|
+--------+
|  Python|
|     SQL|
|   Spark|
|    HTML|
|   Julia|
+--------+
only showing top 5 rows



## 2. Load the mpg dataset as a spark dataframe.

- a.Create 1 column of output that contains a message like the one below:

```
The 1999 audi a4 has a 4 cylinder engine.

```
For each vehicle.
- b. Transform the trans column so that it only contains either manual or auto.

In [6]:
mpg = spark.createDataFrame(data("mpg"))

In [7]:
mpg.show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



In [9]:
#desc_col = mpg.manufacturer.The 1999 audi a4 has a 4 cylinder engine."
#mpg.select('*', expr(desc_col).alias('desc')).show(1)

In [10]:
mpg = mpg.select("*", mpg.trans, regexp_extract("trans", r"(.\w+)", 1).alias('simple_trans'))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+----------+------------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|     trans|simple_trans|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+----------+------------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|  auto(l5)|        auto|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|manual(m5)|      manual|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|manual(m6)|      manual|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|  auto(av)|        auto|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|  auto(l5)|        auto|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+----------+------------+
only showing top 5 rows



## 3. Load the tips dataset as a spark dataframe.

- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.

In [11]:
tips = spark.createDataFrame(data("tips"))
tips.show(3)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
+----------+----+------+------+---+------+----+
only showing top 3 rows



In [14]:
tips.createOrReplaceTempView("tips")

In [18]:
#What percentage of observations are smokers?
spark.sql(
    """
SELECT ((SELECT COUNT(smoker)
FROM tips
WHERE smoker = 'Yes') / 
(SELECT COUNT(smoker)
FROM tips)) as pct_smokers
FROM tips
"""
).show(1)

+-------------------+
|        pct_smokers|
+-------------------+
|0.38114754098360654|
+-------------------+
only showing top 1 row



In [19]:
#Create a column that contains the tip percentage
spark.sql(
    """
SELECT *, (tip / total_bill) as tip_pct
FROM tips
    """
).show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [26]:
#Create a column with the tip_pct
tips = tips.select("*", (tips.tip/tips.total_bill).alias('tip_pct'))
tips.show(3)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
+----------+----+------+------+---+------+----+-------------------+
only showing top 3 rows



In [49]:
#Calculate the average tip percentage for each combination of sex and smoker.

#Using a groupb and pivot table to efficiently calculate these values
tips.groupby("sex").pivot('smoker').mean('tip_pct').show()

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941035|
|  Male|0.1606687151291298| 0.1527711752024851|
+------+------------------+-------------------+



## 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [47]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



- Convert the temperatures to farenheight.

In [59]:
weather = weather.withColumn('temp_max_F', ((weather.temp_max.cast('double') * 9 / 5) + 32)).drop('temp_max')
weather = weather.withColumn('temp_min_F', ((weather.temp_min.cast('double') * 9 / 5) + 32)).drop('temp_min')
weather.show(5)

+----------+-------------+----+-------+----------+----------+
|      date|precipitation|wind|weather|temp_max_F|temp_min_F|
+----------+-------------+----+-------+----------+----------+
|2012-01-01|          0.0| 4.7|drizzle|     55.04|      41.0|
|2012-01-02|         10.9| 4.5|   rain|     51.08|     37.04|
|2012-01-03|          0.8| 2.3|   rain|     53.06|     44.96|
|2012-01-04|         20.3| 4.7|   rain|     53.96|     42.08|
|2012-01-05|          1.3| 6.1|   rain|     48.02|     37.04|
+----------+-------------+----+-------+----------+----------+
only showing top 5 rows



Which month has the most rain, on average?

In [65]:
(weather.withColumn('month', month('date'))
    .groupBy('month')
    .agg(mean('precipitation').alias('avg_rainfall'))
    .sort(desc('avg_rainfall'))
    .show(1))

+-----+-----------------+
|month|     avg_rainfall|
+-----+-----------------+
|   11|5.354166666666667|
+-----+-----------------+
only showing top 1 row



- Which year was the windiest?

In [66]:
(weather.withColumn('year', year('date'))
     .groupBy('year')
     .agg(mean('wind').alias('avg_wind'))
     .sort(desc('avg_wind'))
     .show(1))

+----+------------------+
|year|          avg_wind|
+----+------------------+
|2012|3.4008196721311483|
+----+------------------+
only showing top 1 row



- What is the most frequent type of weather in January?

In [73]:
(weather.withColumn('month', month('date'))
     .crosstab('month', 'weather')
     .show())

+-------------+-------+---+----+----+---+
|month_weather|drizzle|fog|rain|snow|sun|
+-------------+-------+---+----+----+---+
|            5|      1| 25|  16|   0| 82|
|           10|      4| 55|  20|   0| 45|
|            1|     10| 38|  35|   8| 33|
|            6|      2| 14|  19|   0| 85|
|            9|      5| 40|   4|   0| 71|
|            2|      4| 36|  40|   3| 30|
|           12|      2| 54|  23|   5| 40|
|            7|      8| 13|  14|   0| 89|
|            3|      3| 36|  37|   6| 42|
|           11|      3| 50|  25|   0| 42|
|            8|      8| 16|   6|   0| 94|
|            4|      4| 34|  20|   1| 61|
+-------------+-------+---+----+----+---+



- What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [107]:
(
weather.filter(((year('date') == 2013) | (year('date') == 2014)) & (month('date') == 7) & (weather.weather == 'sun'))
    .agg(mean(weather.temp_max_F).alias('avg_max_temp'), mean(weather.temp_min_F).alias('avg_min_temp'))
    .show()
)

+-----------------+-----------------+
|     avg_max_temp|     avg_min_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



- What percentage of days were rainy in q3 of 2015?

In [167]:
(
    (weather.withColumn('quarter', quarter("date"))
        .filter(year('date') == 2015)
        .groupBy('quarter')
        .pivot('weather')
        .count(mean('precipitation'))
        .sort('quarter')).select('quarter', 'rain').show()

)

TypeError: _api() takes 1 positional argument but 2 were given

In [115]:
q3_15.where(weather.weather == 'rain').count() / q3_15.count()

NameError: name 'count' is not defined

- For each year, find what percentage of days it rained (had non-zero precipitation).