# Spark 101

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pandas as pd
import numpy as np



from pyspark.sql.functions import *

# Note: The pyspark avg and mean functions are aliases of eachother
# from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import lit





## Exercise 1:
Within your codeup-data-science directory, create a new repo named spark-exercises. This will be where you do your work for this module. Create a repository on GitHub with the same name, and link your local repository to GitHub.

Save this work in your spark-exercises repo. Then add, commit, and push your changes.

Create a jupyter notebook or python script named spark101 for this exercise.

Create a spark data frame that contains your favorite programming languages.

Create a dataframe with one column named language

Hint: Start with a pandas dataframe. Maybe use a dictionary?
View the schema of the dataframe
Output the shape of the dataframe
Show the first 5 records in the dataframe

In [2]:
np.random.seed(123)
pandas_dataframe = pd.DataFrame(
    dict(language=['english','chicken','python','java','java_script']))
pandas_dataframe

Unnamed: 0,language
0,english
1,chicken
2,python
3,java
4,java_script


In [3]:
df = spark.createDataFrame(pandas_dataframe)
df.show(5)

+-----------+
|   language|
+-----------+
|    english|
|    chicken|
|     python|
|       java|
|java_script|
+-----------+



In [4]:
df.describe().show()

+-------+--------+
|summary|language|
+-------+--------+
|  count|       5|
|   mean|    null|
| stddev|    null|
|    min| chicken|
|    max|  python|
+-------+--------+



In [5]:
print("DataFrame shape: ", df.count(), " x ", len(df.columns))

DataFrame shape:  5  x  1


In [6]:
df.printSchema()

root
 |-- language: string (nullable = true)



# Exercise 2:
Load the mpg dataset as a spark dataframe.
a. Create 1 column of output that contains a message like the one below for each record:
The 1999 audi a4 has a 4 cylinder engine.
Hint: You will need to concatenate values that already exist in the data with string literals
b. Transform the trans column so that it only contains either manual or auto.
Hint: Consider spark string methods and when().otherwise() chaining

In [7]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

import pandas as pd
import numpy as np

from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [8]:
mpg.createOrReplaceTempView("mpg")

### a. Create 1 column of output that contains a message like the one below for each record:
The 1999 audi a4 has a 4 cylinder engine.

In [9]:
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
).show(5)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



In [10]:
# Note: The pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import lit
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import when

In [11]:
mpg.select(concat(lit('The '), 
                  mpg.year,
                  lit(' '),
                  mpg.manufacturer,
                  lit(' '),
                  mpg.model,
                  lit(' '),
                 lit('has a '),
                 mpg.cyl,
                 lit(' cylinder egine.')).alias('message')).show(5, truncate=False)

+----------------------------------------+
|message                                 |
+----------------------------------------+
|The 1999 audi a4 has a 4 cylinder egine.|
|The 1999 audi a4 has a 4 cylinder egine.|
|The 2008 audi a4 has a 4 cylinder egine.|
|The 2008 audi a4 has a 4 cylinder egine.|
|The 1999 audi a4 has a 6 cylinder egine.|
+----------------------------------------+
only showing top 5 rows



## b. Transform the trans column so that it only contains either manual or auto.
Hint: Consider spark string methods and when().otherwise() chaining

In [12]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [13]:
mpg.select(mpg.trans,when(mpg.trans.like('auto%'), 'auto')
          .otherwise('manual').alias('manual_auto')).show(5)

+----------+-----------+
|     trans|manual_auto|
+----------+-----------+
|  auto(l5)|       auto|
|manual(m5)|     manual|
|manual(m6)|     manual|
|  auto(av)|       auto|
|  auto(l5)|       auto|
+----------+-----------+
only showing top 5 rows



## Exercise 3:
Load the tips dataset as a spark dataframe.
### a. What percentage of observations are smokers?
Hint: .groupBy() and .withColumn() are useful functions here
### b. Create a column that contains the tip percentage
Hint: .withColumn() is useful here
### c. Calculate the average tip percentage for each combination of sex and smoker.
Hint: Chain additional functions off the answer to part b

In [14]:
from pydataset import data
tips = spark.createDataFrame(data("tips"))
tips

DataFrame[total_bill: double, tip: double, sex: string, smoker: string, day: string, time: string, size: bigint]

### a. What percentage of observations are smokers?
Hint: .groupBy() and .withColumn() are useful functions here

In [15]:
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [16]:
tips.groupBy("smoker").count().withColumn("percent", col("count") / tips.count()).show()

+------+-----+-------------------+
|smoker|count|            percent|
+------+-----+-------------------+
|    No|  151| 0.6188524590163934|
|   Yes|   93|0.38114754098360654|
+------+-----+-------------------+



In [17]:
tips.groupBy(tips.smoker).agg(count(tips.smoker).alias('counter')).withColumn('percent', col('counter') / tips.count()).show()

+------+-------+-------------------+
|smoker|counter|            percent|
+------+-------+-------------------+
|    No|    151| 0.6188524590163934|
|   Yes|     93|0.38114754098360654|
+------+-------+-------------------+



### b. Create a column that contains the tip percentage
Hint: .withColumn() is useful here

In [18]:
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [19]:
tips.withColumn('tip_percentage', round((tips.tip / tips.total_bill)*100,2)).show()

+----------+----+------+------+---+------+----+--------------+
|total_bill| tip|   sex|smoker|day|  time|size|tip_percentage|
+----------+----+------+------+---+------+----+--------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|          5.94|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|         16.05|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|         16.66|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|         13.98|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|         14.68|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|         18.62|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|         22.81|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|         11.61|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|         13.03|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|         21.85|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|         16.65|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|         14.18|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        

### c. Calculate the average tip percentage for each combination of sex and smoker.
Hint: Chain additional functions off the answer to part b

In [20]:
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [21]:
tips.withColumn('tip_percentage', round((tips.tip / tips.total_bill)*100,2)).groupBy('sex').pivot('smoker').agg(mean('tip_percentage')).show()

+------+-----------------+------------------+
|   sex|               No|               Yes|
+------+-----------------+------------------+
|Female|15.69111111111111|18.214545454545455|
|  Male|16.06659793814433|15.276666666666667|
+------+-----------------+------------------+



# Question 4
Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [24]:
from vega_datasets import data

In [25]:
weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



## Convert the temperatures to fahrenheit.

In [45]:
weather.withColumn('temp_max_f', (weather.temp_max * 9/5) +32).show(5)

+----------+-------------+--------+--------+----+-------+----------+
|      date|precipitation|temp_max|temp_min|wind|weather|temp_max_f|
+----------+-------------+--------+--------+----+-------+----------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|     55.04|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|     51.08|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|     53.06|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|     53.96|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|     48.02|
+----------+-------------+--------+--------+----+-------+----------+
only showing top 5 rows



In [44]:
weather.select(((weather.temp_max * 9/5) +32).alias('temp_max_f')).show(5)

+----------+
|temp_max_f|
+----------+
|     55.04|
|     51.08|
|     53.06|
|     53.96|
|     48.02|
+----------+
only showing top 5 rows



## Which month has the most rain, on average?

In [30]:
weather.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [40]:
# extract month and groupby month
from pyspark.sql.functions import month, year, quarter
(
    weather.withColumn("month", month("date"))
.groupBy("month")
    .agg(avg("precipitation")
         .alias("total_rainfall"))
    .sort("total_rainfall").show()
)

+-----+-------------------+
|month|     total_rainfall|
+-----+-------------------+
|    7|0.38870967741935486|
|    6| 1.1075000000000002|
|    8| 1.3201612903225806|
|    5| 1.6733870967741935|
|    9| 1.9624999999999997|
|    4|  3.128333333333333|
|    2|  3.734513274336283|
|    1| 3.7580645161290316|
|   10|  4.059677419354839|
|    3|  4.888709677419355|
|   12|  5.021774193548389|
|   11|  5.354166666666667|
+-----+-------------------+



## Which year was the windiest?

In [43]:
(
weather.withColumn('year', year('date'))
.groupBy('year')
.agg(sum('wind')
    .alias('windiest_year'))).sort('windiest_year').show()

+----+------------------+
|year|     windiest_year|
+----+------------------+
|2013|1100.8000000000006|
|2015|1153.3000000000002|
|2014|1236.5000000000007|
|2012|            1244.7|
+----+------------------+



## What is the most frequent type of weather in January?

In [65]:
(
    weather.withColumn('month', month('date'))
    .filter(col('month') == 1)
    .groupby('weather')
    .count()
    .sort(col('count').desc())
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



## What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [66]:
(
    weather.filter(month('date') == 7)
    .filter(year('date') > 2012)
    .filter(year('date') < 2015)
    .filter(col('weather') == lit('sun'))
    .agg(
        avg('temp_max').alias('average_high_temp'),
        avg('temp_min').alias('average_min_temp'),).show()
)

+------------------+-----------------+
| average_high_temp| average_min_temp|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



## What percentage of days were rainy in q3 of 2015?

In [67]:
(
    weather.filter(year('date') == 2015)
    .filter(quarter('date') == 3)
    .select(when(col('weather') == 'rain', 1)
            .otherwise(0)
           .alias('rain'))
    .agg(mean('rain'))
    .show()

)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



## For each year, find what percentage of days it rained (had non-zero precipitation).

In [68]:
(
    weather.withColumn('year', year('date'))
    .select(when(col('precipitation') > 0, 1)
           .otherwise(0).alias('rain'), 'year')
    .agg(mean('rain'))
    .show()

)

+------------------+
|         avg(rain)|
+------------------+
|0.4264202600958248|
+------------------+

