# Exercises

Within your codeup-data-science directory, create a new repo named spark-exercises. This will be where you do your work for this module. Create a repository on GitHub with the same name, and link your local repository to GitHub.

- Save this work in your spark-exercises repo. Then add, commit, and push your changes.

- Create a jupyter notebook or python script named spark101 for this exercise.


In [5]:
import pyspark
# once per notebook
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [6]:
import multiprocessing
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import sqrt
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import when

1) Create a spark data frame that contains your favorite programming languages.
   - The name of the column should be language
   - View the schema of the dataframe
   - Output the shape of the dataframe
   - Show the first 5 records in the dataframe

In [7]:
spark.createDataFrame(["Python","Javascript","C++", "R", "Go", "Kotlin", "Swift", "PHP"], "string").toDF("language").printSchema()

root
 |-- language: string (nullable = true)



In [10]:
spark.createDataFrame(["Python","Javascript","C++", "R", "Go", "Kotlin", "Swift", "PHP"], "string").toDF("language")

DataFrame[language: string]

In [11]:
favorite_programming_languages = spark.createDataFrame(["Python","Javascript","C++", "R", "Go", "Kotlin", "Swift", "PHP"], "string").toDF("language").show(5)

+----------+
|  language|
+----------+
|    Python|
|Javascript|
|       C++|
|         R|
|        Go|
+----------+
only showing top 5 rows



2) Load the mpg dataset as a spark dataframe.

   - Create 1 column of output that contains a message like the one below:

        - The 1999 audi a4 has a 4 cylinder engine.

        - For each vehicle.

   - Transform the trans column so that it only contains either manual or auto.

In [12]:
from pydataset import data

In [13]:
mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import lit

In [17]:
mpg.select((concat(lit("The "),mpg.year,lit(" "), mpg.manufacturer,lit(" "), mpg.model, lit(" has a "), mpg.cyl, lit(" cylinder engine")).alias("description"))).show(20)

+--------------------+
|         description|
+--------------------+
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a6 ...|
|The 2008 audi a6 ...|
|The 2008 audi a6 ...|
|The 2008 chevrole...|
|The 2008 chevrole...|
+--------------------+
only showing top 20 rows



In [18]:
mpg.select(mpg.trans, when(mpg.trans.startswith("a"), "auto").otherwise("manual").alias("transmission")).show(4)

+----------+------------+
|     trans|transmission|
+----------+------------+
|  auto(l5)|        auto|
|manual(m5)|      manual|
|manual(m6)|      manual|
|  auto(av)|        auto|
+----------+------------+
only showing top 4 rows



3) Load the tips dataset as a spark dataframe.

   - What percentage of observations are smokers?
   - Create a column that contains the tip percentage
   - Calculate the average tip percentage for each combination of sex and smoker.


In [19]:
tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [20]:
(tips.filter(tips.smoker == 'Yes').count() / (tips.count())) * 100

38.114754098360656

In [23]:
tips = tips.select('*', (tips.tip / tips.total_bill).alias('tip_pct'))

In [24]:
tips.groupby('sex').pivot('smoker').mean('tip_pct').show(7)

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941032|
|  Male|0.1606687151291298|0.15277117520248512|
+------+------------------+-------------------+



4) Use the seattle weather dataset referenced in the lesson to answer the questions below.

   - Convert the temperatures to farenheight.
   - Which month has the most rain, on average?
   - Which year was the windiest?
   - What is the most frequent type of weather in January?
   - What is the average high and low temperature on sunny days in July in 2013 and 2014?
   - What percentage of days were rainy in q3 of 2015?
   - For each year, find what percentage of days it rained (had non-zero precipitation).

In [54]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(7)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 7 rows



In [46]:
temp_max_f = (col("temp_max") * 9/5) + 32

In [47]:
temp_min_f = (col("temp_min") * 9/5) + 32

In [48]:
seattle = seattle.select(
    "*",
    seattle.temp_max.alias("temp_max_c"),
    seattle.temp_min.alias("temp_min_c"),
    temp_max_f.alias("temp_max_f"),
    temp_min_f.alias("temp_min_f"),
    seattle.wind,
    seattle.weather
).show(7)

+-------------------+-------------+--------+--------+----+-------+----------+----------+----------+----------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|temp_max_c|temp_min_c|temp_max_f|temp_min_f|wind|weather|
+-------------------+-------------+--------+--------+----+-------+----------+----------+----------+----------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|      12.8|       5.0|     55.04|      41.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|      10.6|       2.8|     51.08|     37.04| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|      11.7|       7.2|     53.06|     44.96| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|      12.2|       5.6|     53.96|     42.08| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|     8.9|     2.8| 6.1|   rain|       8.9|       2.8|     48.02|     37.04| 6.1|   rain|
|2012-01

In [55]:
def c_to_f(celsius):
    f = (celsius * (9/5)) + 32 # Thx Matt
    return f

In [56]:
from pyspark.sql.functions import to_timestamp

In [57]:
from pyspark.sql.functions import month, year, quarter

In [58]:
weather.select(
    '*',
    c_to_f(weather.temp_max).alias('temp_max_f'),
    c_to_f(weather.temp_min).alias('temp_min_f')
).show(5)

+----------+-------------+--------+--------+----+-------+------------------+----------+
|      date|precipitation|temp_max|temp_min|wind|weather|        temp_max_f|temp_min_f|
+----------+-------------+--------+--------+----+-------+------------------+----------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|55.040000000000006|      41.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|             51.08|     37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|             53.06|     44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|             53.96|     42.08|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|48.019999999999996|     37.04|
+----------+-------------+--------+--------+----+-------+------------------+----------+
only showing top 5 rows



In [59]:
from pyspark.sql.functions import month, year, quarter

In [61]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12| 622.7000000000002|
+-----+------------------+



In [62]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(mean("wind").alias("Avgerage_Wind"))
    .sort("year")
    .show()
)

+----+------------------+
|year|     Avgerage_Wind|
+----+------------------+
|2012| 3.400819672131148|
|2013|3.0158904109589058|
|2014| 3.387671232876714|
|2015| 3.159726027397261|
+----+------------------+



In [63]:
weather = weather.withColumn('month', month('date')).withColumn('year', year('date')).withColumn('quarter', quarter('date'))

In [64]:
weather.crosstab('month', 'weather').sort('month_weather', ascending=True).show()

+-------------+-------+---+----+----+---+
|month_weather|drizzle|fog|rain|snow|sun|
+-------------+-------+---+----+----+---+
|            1|     10| 38|  35|   8| 33|
|           10|      4| 55|  20|   0| 45|
|           11|      3| 50|  25|   0| 42|
|           12|      2| 54|  23|   5| 40|
|            2|      4| 36|  40|   3| 30|
|            3|      3| 36|  37|   6| 42|
|            4|      4| 34|  20|   1| 61|
|            5|      1| 25|  16|   0| 82|
|            6|      2| 14|  19|   0| 85|
|            7|      8| 13|  14|   0| 89|
|            8|      8| 16|   6|   0| 94|
|            9|      5| 40|   4|   0| 71|
+-------------+-------+---+----+----+---+



In [65]:
weather.filter(expr('(year == 2013 or year == 2014) and month == 07')).groupby('weather').pivot('year').mean('temp_min').show()


+-------+------------------+------------------+
|weather|              2013|              2014|
+-------+------------------+------------------+
|    fog|13.133333333333335|14.440000000000001|
|   rain|              15.0|              15.0|
|    sun|13.981481481481483|14.400000000000002|
+-------+------------------+------------------+



In [66]:
weather.filter(expr('(year == 2013 or year == 2014) and month == 07')).groupby('weather').pivot('year').mean('temp_max').show()

+-------+------------------+------------------+
|weather|              2013|              2014|
+-------+------------------+------------------+
|    fog| 22.96666666666667|25.439999999999998|
|   rain|              22.2|              29.4|
|    sun|26.585185185185193|            27.092|
+-------+------------------+------------------+



In [67]:
q3_15 = weather.filter(expr('year == 2015 and quarter == 3'))

In [68]:
q3_15.where(weather.weather == 'rain').count() / q3_15.count()

0.021739130434782608

In [69]:
(weather.filter(expr('year == 2012')).filter(expr('precipitation > 0')).count()) / (weather.filter(expr('year == 2012')).count())

0.48360655737704916

In [70]:
(weather.filter(expr('year == 2013')).filter(expr('precipitation > 0')).count()) / (weather.filter(expr('year == 2013')).count())

0.41643835616438357

In [71]:
(weather.filter(expr('year == 2014')).filter(expr('precipitation > 0')).count()) / (weather.filter(expr('year == 2014')).count())

0.410958904109589

In [72]:
(weather.filter(expr('year == 2015')).filter(expr('precipitation > 0')).count()) / (weather.filter(expr('year == 2015')).count())

0.39452054794520547