# Spark 101

## Import Libraries

In [5]:
import pandas as pd
import numpy as np
import pyspark #Import Spark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

# Problem 1

Create a spark data frame that contains your favorite programming languages.

 - The name of the column should be `language`
 - View the schema of the dataframe
 - Output the shape of the dataframe
 - Show the first 5 records in the dataframe

In [12]:
np.random.seed(13)

df = spark.createDataFrame(pd.DataFrame({
    "language": np.random.choice(["Python", "Java", "Javascript", "SQL" ,"C++", "Scala"], 40)

}))

In [13]:
df.show()

+----------+
|  language|
+----------+
|Javascript|
|    Python|
|Javascript|
|    Python|
|Javascript|
|       C++|
|      Java|
|       C++|
|Javascript|
|       SQL|
|Javascript|
|       C++|
|Javascript|
|     Scala|
|     Scala|
|Javascript|
|      Java|
|       SQL|
|       C++|
|Javascript|
+----------+
only showing top 20 rows



In [15]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [25]:
print(f"The shape of the DataFrame is (" + str(df.count()) + ", " + str(len(df.columns)) + ").")

The shape of the DataFrame is (40, 1).


In [26]:
df.show(5)

+----------+
|  language|
+----------+
|Javascript|
|    Python|
|Javascript|
|    Python|
|Javascript|
+----------+
only showing top 5 rows



## Problem 2

Load the mpg dataset as a spark dataframe.

 - Create 1 column of output that contains a message like the one below:

    `The 1999 audi a4 has a 4 cylinder engine.`
    
    For each vehicle.
    
    

 - Transform the trans column so that it only contains either manual or auto.

In [27]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [32]:
from  pyspark.sql.functions import concat, lit

In [42]:
mpg.select(concat(lit('The '), 'year', lit(" "), 'manufacturer', lit(" "), 'model', lit(' has a '), 'cyl', lit(' cylinder engine.')).alias("Description")).show(20, False)

+--------------------------------------------------------------+
|Description                                                   |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [43]:
mpg.select('trans').show(20)

+----------+
|     trans|
+----------+
|  auto(l5)|
|manual(m5)|
|manual(m6)|
|  auto(av)|
|  auto(l5)|
|manual(m5)|
|  auto(av)|
|manual(m5)|
|  auto(l5)|
|manual(m6)|
|  auto(s6)|
|  auto(l5)|
|manual(m5)|
|  auto(s6)|
|manual(m6)|
|  auto(l5)|
|  auto(s6)|
|  auto(s6)|
|  auto(l4)|
|  auto(l4)|
+----------+
only showing top 20 rows



In [45]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [50]:
mpg.select(
    regexp_replace("trans", r"\([^)]*\)", "").alias("transition_clean"),
).show(truncate=False)

+----------------+
|transition_clean|
+----------------+
|auto            |
|manual          |
|manual          |
|auto            |
|auto            |
|manual          |
|auto            |
|manual          |
|auto            |
|manual          |
|auto            |
|auto            |
|manual          |
|auto            |
|manual          |
|auto            |
|auto            |
|auto            |
|auto            |
|auto            |
+----------------+
only showing top 20 rows



## Problem 3 

Load the tips dataset as a spark dataframe.

 - What percentage of observations are smokers?
 - Create a column that contains the tip percentage
 - Calculate the average tip percentage for each combination of sex and smoker.

In [53]:
tips = data("tips")
tips = spark.createDataFrame(tips)

In [54]:
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [60]:
total_smokers = tips.select("smoker").where(tips.smoker == "Yes").count()
total_smokers

93

In [62]:
total_population = tips.count()
total_population

244

In [63]:
percent_smokers = total_smokers / total_population
percent_smokers

0.38114754098360654

In [None]:
print 'Value is "{}"'.format(value)

In [75]:
print('In our population {}% are smokers.'.format(round(percent_smokers*100, 2)))

In our population 38.11% are smokers.


In [None]:
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

In [77]:
from pyspark.sql.functions import avg

In [93]:
tips.groupBy("sex", "smoker").agg(avg(tips.tip / tips.total_bill).alias("Average Tip Percentage")).show()

+------+------+----------------------+
|   sex|smoker|Average Tip Percentage|
+------+------+----------------------+
|  Male|    No|    0.1606687151291298|
|Female|    No|    0.1569209707691836|
|  Male|   Yes|    0.1527711752024851|
|Female|   Yes|   0.18215035269941035|
+------+------+----------------------+



## Problem 4 

Use the seattle weather dataset referenced in the lesson to answer the questions below.

 - Convert the temperatures to fahrenheit.
 - Which month has the most rain, on average?
 - Which year was the windiest?
 - What is the most frequent type of weather in January?
 - What is the average high and low temperature on sunny days in July in 2013 and 2014?
 - What percentage of days were rainy in q3 of 2015?
 - For each year, find what percentage of days it rained (had non-zero precipitation).

In [360]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



Convert the temperatures to fahrenheit.

In [361]:
weather.select('temp_max', 'temp_min', ((weather.temp_max * 1.8) + 32.0).alias('temp_max_farenheit'), ((weather.temp_min * 1.8) + 32).alias('temp_min_farenheit')).show()

+--------+--------+------------------+------------------+
|temp_max|temp_min|temp_max_farenheit|temp_min_farenheit|
+--------+--------+------------------+------------------+
|    12.8|     5.0|55.040000000000006|              41.0|
|    10.6|     2.8|             51.08|             37.04|
|    11.7|     7.2|             53.06|             44.96|
|    12.2|     5.6|             53.96|             42.08|
|     8.9|     2.8|48.019999999999996|             37.04|
|     4.4|     2.2|             39.92|             35.96|
|     7.2|     2.8|             44.96|             37.04|
|    10.0|     2.8|              50.0|             37.04|
|     9.4|     5.0|             48.92|              41.0|
|     6.1|     0.6|42.980000000000004|             33.08|
|     6.1|    -1.1|42.980000000000004|             30.02|
|     6.1|    -1.7|42.980000000000004|             28.94|
|     5.0|    -2.8|              41.0|             26.96|
|     4.4|     0.6|             39.92|             33.08|
|     1.1|    

Which month has the most rain, on average?

In [362]:
from pyspark.sql.functions import month, count, max

In [363]:
rainy_days = weather.select('*').where(weather.weather == 'rain')
rainy_days.show()

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|
|2012-01-09|          4.3|     9.4|     5.0| 3.4|   rain|
|2012-01-10|          1.0|     6.1|     0.6| 3.4|   rain|
|2012-01-21|          3.0|     8.3|     3.3| 8.2|   rain|
|2012-01-22|          6.1|     6.7|     2.2| 4.8|   rain|
|2012-01-23|          0.0|     8.3|     1.1| 3.6|   rain|
|2012-01-24|          8.6|    10.0|     2.2| 5.1|   rain|
|2012-01-25|          8.1|     8.9|     4.4| 5.4|   rain|
|2012-01-26|          4.8|     8.9|     1.1| 4.8|   rain|
|2012-01-28|  

In [364]:
rainy_days_by_month = rainy_days.withColumn("month", month("date")).rollup("month").count().sort("count").show()

+-----+-----+
|month|count|
+-----+-----+
|    9|    4|
|    8|    6|
|    7|   14|
|    5|   16|
|    6|   19|
|    4|   20|
|   10|   20|
|   12|   23|
|   11|   25|
|    1|   35|
|    3|   37|
|    2|   40|
| null|  259|
+-----+-----+



February has the most rainy days. 

Which year was the windiest?

In [365]:
from pyspark.sql.functions import year, sum

In [366]:
weather.describe

<bound method DataFrame.describe of DataFrame[date: string, precipitation: double, temp_max: double, temp_min: double, wind: double, weather: string]>

In [367]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind"))
    .sort("year")
    .show()
)

+----+------------------+
|year|         sum(wind)|
+----+------------------+
|2012|1244.7000000000003|
|2013|1100.7999999999997|
|2014|            1236.5|
|2015|            1153.3|
+----+------------------+



In [368]:
from pyspark.sql.functions import asc, desc

In [369]:
weather = weather.withColumn("month", month('date'))
weather.show()

+----------+-------------+--------+--------+----+-------+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|month|
+----------+-------------+--------+--------+----+-------+-----+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|    1|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|    1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|    1|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|    1|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|    1|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|    1|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|    1|
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun|    1|
|2012-01-09|          4.3|     9.4|     5.0| 3.4|   rain|    1|
|2012-01-10|          1.0|     6.1|     0.6| 3.4|   rain|    1|
|2012-01-11|          0.0|     6.1|    -1.1| 5.1|    sun|    1|
|2012-01-12|          0.0|     6.1|    -1.7| 1.9|    sun|    1|
|2012-01-13|          0.0|     5.0|    -

In [370]:
weather.groupBy('month', 'weather').count().orderBy(asc("month"), desc("count")).show(1)

+-----+-------+-----+
|month|weather|count|
+-----+-------+-----+
|    1|    fog|   38|
+-----+-------+-----+
only showing top 1 row



Fog is the most common type of weather in January. 

What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [371]:
from pyspark.sql.functions import max, min

In [372]:
weather = weather.withColumn("year", year('date'))
weather.show()

+----------+-------------+--------+--------+----+-------+-----+----+
|      date|precipitation|temp_max|temp_min|wind|weather|month|year|
+----------+-------------+--------+--------+----+-------+-----+----+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|    1|2012|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|    1|2012|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|    1|2012|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|    1|2012|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|    1|2012|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|    1|2012|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|    1|2012|
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun|    1|2012|
|2012-01-09|          4.3|     9.4|     5.0| 3.4|   rain|    1|2012|
|2012-01-10|          1.0|     6.1|     0.6| 3.4|   rain|    1|2012|
|2012-01-11|          0.0|     6.1|    -1.1| 5.1|    sun|    1|2012|
|2012-01-12|          0.0|     6.1

In [373]:
(
    weather
    .where((weather.month == 7) & ((weather.year == 2013)|(weather.year ==2014)))
    .groupBy('year')
    .agg(avg('temp_max').alias('Average Maximum Temperature'), avg('temp_min').alias('Average Minimum Temperature'))
    .show()
)

+----+---------------------------+---------------------------+
|year|Average Maximum Temperature|Average Minimum Temperature|
+----+---------------------------+---------------------------+
|2013|         26.093548387096785|          13.93225806451613|
|2014|         26.899999999999995|         14.425806451612905|
+----+---------------------------+---------------------------+



In [374]:
from pyspark.sql.functions import quarter, expr

In [375]:
weather = weather.withColumn('quarter', quarter('date'))
weather.show()

+----------+-------------+--------+--------+----+-------+-----+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|month|year|quarter|
+----------+-------------+--------+--------+----+-------+-----+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|    1|2012|      1|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|    1|2012|      1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|    1|2012|      1|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|    1|2012|      1|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|    1|2012|      1|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|    1|2012|      1|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|    1|2012|      1|
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun|    1|2012|      1|
|2012-01-09|          4.3|     9.4|     5.0| 3.4|   rain|    1|2012|      1|
|2012-01-10|          1.0|     6.1|     0.6| 3.4|   rain|    1|2012|      1|

What percentage of days were rainy in q3 of 2015?

In [376]:
(
    weather
    .where((weather.quarter == 3) & (weather.year == 2015) & (weather.weather == 'rain'))
    .count()
)

2

In [377]:
(
    weather
    .where((weather.quarter == 3) & (weather.year == 2015))
    .count()
)

92

In [378]:
percent_rainy_days = (
    weather
    .where((weather.quarter == 3) & (weather.year == 2015) & (weather.weather == 'rain'))
    .count()
) / (
    weather
    .where((weather.quarter == 3) & (weather.year == 2015))
    .count()
)

In [379]:
percent_rainy_days

0.021739130434782608

In [380]:
print("The percent of rainy days in Q3 of 2015 was {}%.".format(round(percent_rainy_days*100, 2)))

The percent of rainy days in Q3 of 2015 was 2.17%.


 For each year, find what percentage of days it rained (had non-zero precipitation).

In [381]:
weather = weather.withColumn("rained", weather.precipitation > 0)

In [382]:
weather.show()

+----------+-------------+--------+--------+----+-------+-----+----+-------+------+
|      date|precipitation|temp_max|temp_min|wind|weather|month|year|quarter|rained|
+----------+-------------+--------+--------+----+-------+-----+----+-------+------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|    1|2012|      1| false|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|    1|2012|      1|  true|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|    1|2012|      1|  true|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|    1|2012|      1|  true|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|    1|2012|      1|  true|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|    1|2012|      1|  true|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|    1|2012|      1| false|
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun|    1|2012|      1| false|
|2012-01-09|          4.3|     9.4|     5.0| 3.4|   rain|    1|2012|      1|

In [383]:
weather = weather.select('*', weather.rained.cast('int').alias("rain2"))

In [384]:
weather.show()

+----------+-------------+--------+--------+----+-------+-----+----+-------+------+-----+
|      date|precipitation|temp_max|temp_min|wind|weather|month|year|quarter|rained|rain2|
+----------+-------------+--------+--------+----+-------+-----+----+-------+------+-----+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|    1|2012|      1| false|    0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|    1|2012|      1|  true|    1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|    1|2012|      1|  true|    1|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|    1|2012|      1|  true|    1|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|    1|2012|      1|  true|    1|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|    1|2012|      1|  true|    1|
|2012-01-07|          0.0|     7.2|     2.8| 2.3|   rain|    1|2012|      1| false|    0|
|2012-01-08|          0.0|    10.0|     2.8| 2.0|    sun|    1|2012|      1| false|    0|
|2012-01-0

In [393]:
(
    weather
    .groupby('year')
    .agg((sum('rain2')/count('rained')*100).alias('Percent Rainy Days')).show()
)

+----+------------------+
|year|Percent Rainy Days|
+----+------------------+
|2012| 48.36065573770492|
|2013| 41.64383561643836|
|2014|  41.0958904109589|
|2015| 39.45205479452055|
+----+------------------+

