# Spark 101 Exercises

In [18]:
import pandas as pd
import numpy as np
import pyspark
from pydataset import data
from pyspark.sql import functions as f

spark = pyspark.sql.SparkSession.builder.getOrCreate()

import warnings
warnings.filterwarnings('ignore')

### 1) Create a spark data frame that contains your favorite programming languages.

In [6]:
fav_lang = pd.DataFrame()
fav_lang['language'] = ['Python','SQL','C','R','Java','HTML']
df = spark.createDataFrame(fav_lang)

- The name of the column should be language

In [8]:
df.select(df.language).show()

                                                                                

+--------+
|language|
+--------+
|  Python|
|     SQL|
|       C|
|       R|
|    Java|
|    HTML|
+--------+



- View the schema of the dataframe

In [16]:
df.printSchema()

root
 |-- language: string (nullable = true)



- Output the shape of the dataframe

In [14]:
df.toPandas().shape

(6, 1)

- Show the first 5 records in the dataframe

In [15]:
df.show(5)

+--------+
|language|
+--------+
|  Python|
|     SQL|
|       C|
|       R|
|    Java|
+--------+
only showing top 5 rows



### 2) Load the mpg dataset as a spark dataframe.

In [20]:
mpg = spark.createDataFrame(data('mpg'))

- Create 1 column of output that contains a message like the one below:

For each vehicle.

In [28]:
mpg.select(
    concat(
        lit('The '),
        mpg.year,
        lit(' '),
        mpg.manufacturer,
        lit(' '),
        mpg.model,
        lit(' has a '),
        mpg.cyl,
        lit(' cyliner engine.')
    ).alias('Model Explanation')
).show(10, False)

+------------------------------------------------+
|Model Explanation                               |
+------------------------------------------------+
|The 1999 audi a4 has a 4 cyliner engine.        |
|The 1999 audi a4 has a 4 cyliner engine.        |
|The 2008 audi a4 has a 4 cyliner engine.        |
|The 2008 audi a4 has a 4 cyliner engine.        |
|The 1999 audi a4 has a 6 cyliner engine.        |
|The 1999 audi a4 has a 6 cyliner engine.        |
|The 2008 audi a4 has a 6 cyliner engine.        |
|The 1999 audi a4 quattro has a 4 cyliner engine.|
|The 1999 audi a4 quattro has a 4 cyliner engine.|
|The 2008 audi a4 quattro has a 4 cyliner engine.|
+------------------------------------------------+
only showing top 10 rows



- Transform the trans column so that it only contains either manual or auto.

In [40]:
mpg.select(
    mpg.trans,
    regexp_extract('trans', r'\w+',0).alias("manual_or_auto")
).show()

+----------+--------------+
|     trans|manual_or_auto|
+----------+--------------+
|  auto(l5)|          auto|
|manual(m5)|        manual|
|manual(m6)|        manual|
|  auto(av)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(av)|          auto|
|manual(m5)|        manual|
|  auto(l5)|          auto|
|manual(m6)|        manual|
|  auto(s6)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(s6)|          auto|
|manual(m6)|        manual|
|  auto(l5)|          auto|
|  auto(s6)|          auto|
|  auto(s6)|          auto|
|  auto(l4)|          auto|
|  auto(l4)|          auto|
+----------+--------------+
only showing top 20 rows



### 3) Load the tips dataset as a spark dataframe.

In [41]:
tips = spark.createDataFrame(data('tips'))

- What percentage of observations are smokers?

In [50]:
tips.agg(mean(when(tips.smoker == 'Yes',1).otherwise(0)).alias("Percent_smokers")).show()

+-------------------+
|    Percent_smokers|
+-------------------+
|0.38114754098360654|
+-------------------+



- Create a column that contains the tip percentage

In [59]:
tips.select(
    tips.tip,
    tips.total_bill,
    f.round(
        (tips.tip / tips.total_bill)
        ,2)
    .alias('tip_percentage')
).show()

+----+----------+--------------+
| tip|total_bill|tip_percentage|
+----+----------+--------------+
|1.01|     16.99|          0.06|
|1.66|     10.34|          0.16|
| 3.5|     21.01|          0.17|
|3.31|     23.68|          0.14|
|3.61|     24.59|          0.15|
|4.71|     25.29|          0.19|
| 2.0|      8.77|          0.23|
|3.12|     26.88|          0.12|
|1.96|     15.04|          0.13|
|3.23|     14.78|          0.22|
|1.71|     10.27|          0.17|
| 5.0|     35.26|          0.14|
|1.57|     15.42|           0.1|
| 3.0|     18.43|          0.16|
|3.02|     14.83|           0.2|
|3.92|     21.58|          0.18|
|1.67|     10.33|          0.16|
|3.71|     16.29|          0.23|
| 3.5|     16.97|          0.21|
|3.35|     20.65|          0.16|
+----+----------+--------------+
only showing top 20 rows



- Calculate the average tip percentage for each combination of sex and smoker.

In [77]:
tips.groupBy(tips.sex, tips.smoker).agg(
    f.round(avg(
            (tips.tip / tips.total_bill)
        ),2
           ).alias('tip_percentage')
).show()

+------+------+--------------+
|   sex|smoker|tip_percentage|
+------+------+--------------+
|  Male|    No|          0.16|
|Female|    No|          0.16|
|  Male|   Yes|          0.15|
|Female|   Yes|          0.18|
+------+------+--------------+



### 4) Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [69]:
from vega_datasets import data as vega_data
sw = spark.createDataFrame(vega_data('seattle_weather'))

- Convert the temperatures to fahrenheit.

In [76]:
sw.select(
    f.round((sw.temp_max * (9/5) + 32),2).alias('temp_max_fahrenheit'),
    f.round((sw.temp_min * (9/5) + 32),2).alias('temp_min_fahrenheit')
).show()

+-------------------+-------------------+
|temp_max_fahrenheit|temp_min_fahrenheit|
+-------------------+-------------------+
|              55.04|               41.0|
|              51.08|              37.04|
|              53.06|              44.96|
|              53.96|              42.08|
|              48.02|              37.04|
|              39.92|              35.96|
|              44.96|              37.04|
|               50.0|              37.04|
|              48.92|               41.0|
|              42.98|              33.08|
|              42.98|              30.02|
|              42.98|              28.94|
|               41.0|              26.96|
|              39.92|              33.08|
|              33.98|              26.06|
|              35.06|              26.96|
|              37.94|               32.0|
|               32.0|              26.96|
|              30.02|              26.96|
|              44.96|              30.02|
+-------------------+-------------

- Which month has the most rain, on average?

In [94]:
sw.withColumn("month",f.month("date")
             ).groupBy("month").agg(
                                f.round(
                                    f.avg("precipitation"),2
                                        ).alias("Average_rainfall_per_month")
                                    ).sort('month').show()

+-----+--------------------------+
|month|Average_rainfall_per_month|
+-----+--------------------------+
|    1|                      3.76|
|    2|                      3.73|
|    3|                      4.89|
|    4|                      3.13|
|    5|                      1.67|
|    6|                      1.11|
|    7|                      0.39|
|    8|                      1.32|
|    9|                      1.96|
|   10|                      4.06|
|   11|                      5.35|
|   12|                      5.02|
+-----+--------------------------+



- Which year was the windiest?

In [104]:
sw.withColumn('year',f.year('date')).groupBy('year').agg(f.round(f.sum('wind'),2).alias('Total_wind')).sort(asc('year')).show(1)

+----+----------+
|year|Total_wind|
+----+----------+
|2012|    1244.7|
+----+----------+
only showing top 1 row



- What is the most frequent type of weather in January?

In [117]:
sw.withColumn('month',f.month('date')
             ).groupby('month','weather').agg(f.count('weather').alias('count')
                                             ).sort(asc('month'),desc('count')).show(1)

+-----+-------+-----+
|month|weather|count|
+-----+-------+-----+
|    1|    fog|   38|
+-----+-------+-----+
only showing top 1 row



- What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [136]:
(sw
.filter(((f.year('date') == 2013) | (f.year('date') == 2014)) 
                                  & (f.month('date') == 4) 
                                  & (sw.weather == 'sun'))
.groupby(f.year('date').alias('year'),
         f.month('date').alias('month'),
         'weather')
.agg(f.round(f.avg('temp_min'),2).alias('avg_temp_min'),
     f.round(f.avg('temp_max'),2).alias('avg_temp_max')
    )
).show()

+----+-----+-------+------------+------------+
|year|month|weather|avg_temp_min|avg_temp_max|
+----+-----+-------+------------+------------+
|2013|    4|    sun|        6.79|        15.6|
|2014|    4|    sun|        6.65|        16.8|
+----+-----+-------+------------+------------+



- What percentage of days were rainy in q3 of 2015?

In [187]:
(sw.filter(
    (f.year('date') == 2015) & (f.quarter('date') == 3))
    .withColumn('rain' ,(sw.weather == 'rain').cast('int'))
    .withColumn('drizzle' ,(sw.weather == 'drizzle').cast('int'))
 .groupby(f.year('date'), 
          f.quarter('date'))
 .agg(f.avg('rain') + f.avg('drizzle'))
 .show()
)

+----------+-------------+--------------------------+
|year(date)|quarter(date)|(avg(rain) + avg(drizzle))|
+----------+-------------+--------------------------+
|      2015|            3|       0.07608695652173914|
+----------+-------------+--------------------------+



# For each year, find what percentage of days it rained (had non-zero precipitation).

### If Fog, drizzle, snow and rain all count as non-zero percipiation

In [216]:
(sw.withColumn('rain' ,(sw.weather == 'rain').cast('int'))
   .withColumn('drizzle' ,(sw.weather == 'drizzle').cast('int'))
   .withColumn('fog' ,(sw.weather == 'fog').cast('int'))
   .withColumn('snow' ,(sw.weather == 'snow').cast('int'))
   .groupBy(f.year('date'))
   .agg(f.avg('rain') + f.avg('drizzle') + f.avg('fog') + f.avg('snow'))
   .show()
)

+----------+-----------------------------------------------------+
|year(date)|(((avg(rain) + avg(drizzle)) + avg(fog)) + avg(snow))|
+----------+-----------------------------------------------------+
|      2012|                                   0.6775956284153005|
|      2013|                                   0.4383561643835616|
|      2014|                                   0.4219178082191781|
|      2015|                                   0.5068493150684932|
+----------+-----------------------------------------------------+



### Actual Problem

In [226]:
(sw
 .withColumn('non_zero',
             (sw.precipitation > 0).cast('int'))
 .groupby(f.year('date'))
 .agg(f.avg('non_zero'))
).show()

[Stage 331:>                                                        (0 + 8) / 8]

+----------+-------------------+
|year(date)|      avg(non_zero)|
+----------+-------------------+
|      2012|0.48360655737704916|
|      2013|0.41643835616438357|
|      2014|  0.410958904109589|
|      2015|0.39452054794520547|
+----------+-------------------+



                                                                                