In [297]:
import pandas as pd
import numpy as np

import pyspark
from pydataset import data

#### Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [282]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [283]:
langs = pd.DataFrame(["python", "r", "java", "spark", "natural", "php"], columns=["language"])

In [284]:
df = spark.createDataFrame(langs)

In [286]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [295]:
df.show(5)

+--------+
|language|
+--------+
|  python|
|       r|
|    java|
|   spark|
| natural|
+--------+
only showing top 5 rows



#### Load the mpg dataset as a spark dataframe.

- Create 1 column of output that contains a message like the one below:
    - The 1999 audi a4 has a 4 cylinder engine.
    - For each vehicle.

- Transform the trans column so that it only contains either manual or auto.

In [298]:
mpg = spark.createDataFrame(data('mpg'))

In [299]:
from pyspark.sql.functions import concat, lit

In [300]:
mpg.show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



In [301]:
mpg.select(concat(lit('The '), mpg.year, lit(' '), mpg.manufacturer, lit(' '), \
                  mpg.model, lit(' has a '), mpg.cyl, lit(' cylinder engine.') ). alias("description")
          ).show(truncate=False)

+--------------------------------------------------------------+
|description                                                   |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [302]:
from pyspark.sql.functions import regexp_extract

In [303]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [304]:
mpg.select(
    regexp_extract(mpg.trans, r'(\w+)', 1).alias('transmission')
    ).show()

+------------+
|transmission|
+------------+
|        auto|
|      manual|
|      manual|
|        auto|
|        auto|
|      manual|
|        auto|
|      manual|
|        auto|
|      manual|
|        auto|
|        auto|
|      manual|
|        auto|
|      manual|
|        auto|
|        auto|
|        auto|
|        auto|
|        auto|
+------------+
only showing top 20 rows



#### Load the tips dataset as a spark dataframe.
- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker

In [305]:
tips = spark.createDataFrame(data('tips'))

In [307]:
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [313]:
from pyspark.sql.functions import col

In [315]:
tips.where(tips.smoker == 'Yes').count() / tips.count()

0.38114754098360654

In [316]:
total_n = tips.count()

tips.groupBy('smoker').count().select('*', (col('count')/total_n)).show()

+------+-----+-------------------+
|smoker|count|      (count / 244)|
+------+-----+-------------------+
|    No|  151| 0.6188524590163934|
|   Yes|   93|0.38114754098360654|
+------+-----+-------------------+



In [317]:
from pyspark.sql.functions import round

In [318]:
# Create a column that contains the tip percentage
tips.select(
    round(tips.tip / tips.total_bill,2).alias('percent')
    ).show()

+-------+
|percent|
+-------+
|   0.06|
|   0.16|
|   0.17|
|   0.14|
|   0.15|
|   0.19|
|   0.23|
|   0.12|
|   0.13|
|   0.22|
|   0.17|
|   0.14|
|    0.1|
|   0.16|
|    0.2|
|   0.18|
|   0.16|
|   0.23|
|   0.21|
|   0.16|
+-------+
only showing top 20 rows



In [319]:
from pyspark.sql.functions import mean

In [320]:
# Calculate the average tip percentage for each combination of sex and smoker
tips.groupby('sex', 'smoker').agg(mean(tips.tip / tips.total_bill).alias('tip_percent')).show()

+------+------+-------------------+
|   sex|smoker|        tip_percent|
+------+------+-------------------+
|  Male|    No| 0.1606687151291298|
|  Male|   Yes| 0.1527711752024851|
|Female|    No| 0.1569209707691836|
|Female|   Yes|0.18215035269941035|
+------+------+-------------------+



#### Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to farenheight.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low tempurature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [321]:
from vega_datasets import data

In [322]:
weather = data('seattle_weather').assign(date=lambda df: df.date.astype(str))

In [323]:
df = spark.createDataFrame(weather)

In [324]:
df.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [325]:
# Convert the temperatures to farenheight.
df.select(
    (df.temp_max * 9/5 + 32).alias('temp_max_F'),
    (df.temp_min * 9/5 + 32).alias('temp_min_F')
).show(5)

+----------+----------+
|temp_max_F|temp_min_F|
+----------+----------+
|     55.04|      41.0|
|     51.08|     37.04|
|     53.06|     44.96|
|     53.96|     42.08|
|     48.02|     37.04|
+----------+----------+
only showing top 5 rows



In [326]:
from pyspark.sql.functions import month, year, quarter

In [328]:
# Which month has the most rain, on average?
(
    df.withColumn('month', month('date'))
    .groupby('month')
    .agg(mean('precipitation'))
    .sort('month')
).show()

+-----+-------------------+
|month| avg(precipitation)|
+-----+-------------------+
|    1| 3.7580645161290316|
|    2|  3.734513274336283|
|    3|  4.888709677419355|
|    4|  3.128333333333333|
|    5| 1.6733870967741935|
|    6| 1.1075000000000002|
|    7|0.38870967741935486|
|    8| 1.3201612903225806|
|    9| 1.9624999999999997|
|   10|  4.059677419354839|
|   11|  5.354166666666667|
|   12|  5.021774193548388|
+-----+-------------------+



In [359]:
from pyspark.sql.functions import sum, min, max, expr, when

In [335]:
# Which year was the windiest?
(
    df.withColumn('year', year('date'))
    .groupby('year')
    .agg(sum('wind'))
    .show()
)

+----+------------------+
|year|         sum(wind)|
+----+------------------+
|2015|            1153.3|
|2013|1100.8000000000002|
|2014|1236.5000000000005|
|2012|1244.6999999999998|
+----+------------------+



In [336]:
# What is the most frequent type of weather in January?
(
    df.filter(month("date") == 1)
    .groupBy("weather")
    .pivot('weather')
    .count()
    .show()
)

+-------+-------+----+----+----+----+
|weather|drizzle| fog|rain|snow| sun|
+-------+-------+----+----+----+----+
|    fog|   null|  38|null|null|null|
|drizzle|     10|null|null|null|null|
|   rain|   null|null|  35|null|null|
|    sun|   null|null|null|null|  33|
|   snow|   null|null|null|   8|null|
+-------+-------+----+----+----+----+



In [347]:
# What is the average high and low tempurature on sunny days in July in 2013 and 2014?
(
    df.filter(month("date") == 7)
    .filter((year("date") == 2013) | (year("date") == 2014))
    .filter(df.weather == 'sun')
    .agg(mean("temp_max"), mean("temp_min"))
    .show()
)

+------------------+-----------------+
|     avg(temp_max)|    avg(temp_min)|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



In [364]:
# What percentage of days were rainy in q3 of 2015?
(
    df.withColumn('quarter', quarter('date'))
    .withColumn('year', year('date'))
    .filter(expr("year = 2015"))
    .filter(expr('quarter = 3'))
    .select(when(col('precipitation') >0, 1).otherwise(0).alias('was_rainy'))
    .agg(mean('was_rainy'))
).show()

+-------------------+
|     avg(was_rainy)|
+-------------------+
|0.18478260869565216|
+-------------------+



In [370]:
# For each year, find what percentage of days it rained (had non-zero precipitation)
(
    df.withColumn('year', year('date'))
    .withColumn('was_rainy', when(col('precipitation') > 0, 1).otherwise(0))
    .groupby('year')
    .agg(mean('was_rainy'))            
).show()

+----+-------------------+
|year|     avg(was_rainy)|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+



In [371]:
df.createOrReplaceTempView('df')

In [381]:
# with sql
# For each year, find what percentage of days it rained (had non-zero precipitation)
spark.sql('''
SELECT year, AVG(was_rainy)
FROM (
    SELECT *, year(date) as year,
    CASE WHEN precipitation > 0 THEN 1 ELSE 0 END as was_rainy
    from df
) as a
GROUP BY year
''').show()

+----+-------------------+
|year|     avg(was_rainy)|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+

