# Spark Exercises

### 1.) Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [1]:
import pyspark

import pandas as pd
import numpy as np

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(language=np.random.choice(("Java","Python", "HTML", "C++",".Net"), 20))
)
pandas_dataframe

Unnamed: 0,language
0,C++
1,C++
2,Python
3,HTML
4,.Net
5,HTML
6,.Net
7,.Net
8,.Net
9,Python


In [4]:
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[language: string]

In [5]:
df.show(5)

+--------+
|language|
+--------+
|     C++|
|     C++|
|  Python|
|    HTML|
|    .Net|
+--------+
only showing top 5 rows



In [6]:
df.describe()

DataFrame[summary: string, language: string]

In [7]:
df.describe().show()

+-------+--------+
|summary|language|
+-------+--------+
|  count|      20|
|   mean|    null|
| stddev|    null|
|    min|    .Net|
|    max|  Python|
+-------+--------+



In [8]:
len(df.columns), df.count()

(1, 20)

### 2.) Load the mpg dataset as a spark dataframe.

- Create 1 column of output that contains a message like the one below:


       - The 1999 audi a4 has a 4 cylinder engine.
       - For each vehicle.

- Transform the trans column so that it only contains either manual or auto.

In [9]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [10]:
from pyspark.sql.functions import *

In [11]:
mpg.select(concat(lit("The "), mpg.year,lit(" "),  mpg.manufacturer,lit(" "), mpg.model, lit(" has a "), mpg.cyl, lit(" cylinder engine.")).alias("Description")).show(5)

+--------------------+
|         Description|
+--------------------+
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
+--------------------+
only showing top 5 rows



In [12]:
mpg.select(
    'trans',
    regexp_replace('trans', r'.{4}$', '').alias('trans')
).show(10)

+----------+------+
|     trans| trans|
+----------+------+
|  auto(l5)|  auto|
|manual(m5)|manual|
|manual(m6)|manual|
|  auto(av)|  auto|
|  auto(l5)|  auto|
|manual(m5)|manual|
|  auto(av)|  auto|
|manual(m5)|manual|
|  auto(l5)|  auto|
|manual(m6)|manual|
+----------+------+
only showing top 10 rows



### 3.) Load the tips dataset as a spark dataframe.

- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.

In [13]:
import pydataset
tips = pydataset.data('tips')
tips.head()

Unnamed: 0,total_bill,tip,sex,smoker,day,time,size
1,16.99,1.01,Female,No,Sun,Dinner,2
2,10.34,1.66,Male,No,Sun,Dinner,3
3,21.01,3.5,Male,No,Sun,Dinner,3
4,23.68,3.31,Male,No,Sun,Dinner,2
5,24.59,3.61,Female,No,Sun,Dinner,4


In [14]:
tips = spark.createDataFrame(tips) # any pandas dataframe

In [15]:
tips.groupBy("smoker").count().withColumn(
    "percent",
    concat(round((col("count") / tips.count() * 100), 0).cast("int")),
).show()

+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|     62|
|   Yes|   93|     38|
+------+-----+-------+



In [16]:
#Calculate tip percentage
tip_percentage = tips.tip / tips.total_bill

In [17]:
tip_percentage.alias('tip_perc')

Column<'(tip / total_bill) AS tip_perc'>

In [18]:
#Create column tip_perc
tips.select('*', tip_percentage.alias('tip_perc')).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|           tip_perc|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [19]:
#Calculate the average tip percentage for each combination of sex and smoker.
(
    tips.withColumn("tip_percentage", col('tip') / col('total_bill'))
    .groupby("sex")
    .pivot("smoker")
    .agg(round(mean("tip_percentage"), 4))
    .show()
)

+------+------+------+
|   sex|    No|   Yes|
+------+------+------+
|Female|0.1569|0.1822|
|  Male|0.1607|0.1528|
+------+------+------+



### 4.) Use the seattle weather dataset referenced in the lesson to answer the questions below.

   - Convert the temperatures to fahrenheit.
   - Which month has the most rain, on average?
   - Which year was the windiest?
   - What is the most frequent type of weather in January?
   - What is the average high and low temperature on sunny days in July in 2013 and 2014?
   - What percentage of days were rainy in q3 of 2015?
   - For each year, find what percentage of days it rained (had non-zero precipitation).

In [21]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)


+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [22]:
#Convert the temperatures to fahrenheit.

#To convert temperatures in degrees Celsius to Fahrenheit, multiply by 1.8 (or 9/5) and add 32

weather.withColumn(
    "temp_max", (col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (col("temp_min") * 9 / 5 + 32)).show()

+----------+-------------+------------------+------------------+----+-------+
|      date|precipitation|          temp_max|          temp_min|wind|weather|
+----------+-------------+------------------+------------------+----+-------+
|2012-01-01|          0.0|             55.04|              41.0| 4.7|drizzle|
|2012-01-02|         10.9|             51.08|             37.04| 4.5|   rain|
|2012-01-03|          0.8|             53.06|             44.96| 2.3|   rain|
|2012-01-04|         20.3|             53.96|             42.08| 4.7|   rain|
|2012-01-05|          1.3|             48.02|             37.04| 6.1|   rain|
|2012-01-06|          2.5|             39.92|             35.96| 2.2|   rain|
|2012-01-07|          0.0|             44.96|             37.04| 2.3|   rain|
|2012-01-08|          0.0|              50.0|             37.04| 2.0|    sun|
|2012-01-09|          4.3|             48.92|              41.0| 3.4|   rain|
|2012-01-10|          1.0|42.980000000000004|             33.08|

In [23]:
# Which month has the most rain, on average?

from pyspark.sql.functions import month

(
    weather.withColumn("month", month("date"))
    .groupBy('month')
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("total_rainfall")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    7|              48.2|
|    6|             132.9|
|    8|             163.7|
|    5|             207.5|
|    9|235.49999999999997|
|    4|             375.4|
|    2|             422.0|
|    1|465.99999999999994|
|   10|             503.4|
|    3|             606.2|
|   12| 622.7000000000002|
|   11|             642.5|
+-----+------------------+



In [24]:
#Which year was the windiest?

(
    weather.withColumn("year", year("date"))
    .groupBy('year')
    .agg(mean("wind").alias("avg_wind"))
    .sort("avg_wind")
    .show()
)

+----+------------------+
|year|          avg_wind|
+----+------------------+
|2013|3.0158904109589058|
|2015| 3.159726027397261|
|2014| 3.387671232876714|
|2012| 3.400819672131148|
+----+------------------+



In [25]:
#What is the most frequent type of weather in January?

(
    weather.withColumn("month", month("date"))
    .filter(col("month") == 1)
    .groupBy("weather")
    .count()
    .sort(col("count").desc())
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



In [26]:
# What is the average high and low temperature on sunny days in July in 2013 and 2014?

(
    weather.filter(month("date") == 7)
    .filter(year("date") > 2012)
    .filter(year("date") < 2015)
    .filter(col("weather") == lit("sun"))
    .agg(
        avg("temp_max").alias("average_high_temp"),
        avg("temp_min").alias("average_low_temp"),
    )
    .show()
)

+------------------+-----------------+
| average_high_temp| average_low_temp|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



In [27]:
weather.createOrReplaceTempView("weather")

In [28]:
spark.sql(
    """
SELECT avg(temp_max) AS avg_max_temp, avg(temp_min) AS avg_min_temp
FROM weather
WHERE date LIKE '2013-07%' 
OR  date LIKE '2014-07%'
AND weather = 'sun'
"""
).show(5)



+------------------+------------------+
|      avg_max_temp|      avg_min_temp|
+------------------+------------------+
|26.539285714285718|14.141071428571431|
+------------------+------------------+



In [29]:
#  What percentage of days were rainy in q3 of 2015?

# measure a rainy day by weather == rain
(
    weather.filter(year("date") == 2015)
    .filter(quarter("date") == 3)
    .select(when(col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(mean("rain"))
    .show()
)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



In [30]:
#For each year, find what percentage of days it rained (had non-zero precipitation).

# measure a rainy day by precipitation > 0
(
    weather.withColumn("year", year("date"))
    .select(when(col("precipitation") > 0, 1).otherwise(0).alias("rain"), "year")
    .groupby("year")
    .agg(mean("rain"))
    .show()
)

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2012|0.48360655737704916|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2015|0.39452054794520547|
+----+-------------------+

