In [1]:
import pyspark
from pyspark.sql.functions import *
import pandas as pd
import numpy as np

spark = pyspark.sql.SparkSession.builder.getOrCreate()

1. Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [2]:
# Create pandas df
fav_lang_df = pd.DataFrame(
        {
            "languages": ['python','sql','go','c','java','scala']
        })

In [3]:
# verify it worked
fav_lang_df

Unnamed: 0,languages
0,python
1,sql
2,go
3,c
4,java
5,scala


In [4]:
# put into spark df
df = spark.createDataFrame(fav_lang_df)

In [5]:
# view schema
df.describe

<bound method DataFrame.describe of DataFrame[languages: string]>

In [6]:
# output df shape
print((df.count(), len(df.columns)))

(6, 1)


In [7]:
# output of first 5 rows
df.show(5)

+---------+
|languages|
+---------+
|   python|
|      sql|
|       go|
|        c|
|     java|
+---------+
only showing top 5 rows



1. Load the mpg dataset as a spark dataframe.

    1. Create 1 column of output that contains a message like the one below for each vehicle:
        ```The 1999 audi a4 has a 4 cylinder engine.```

    1. Transform the trans column so that it only contains either manual or auto.

In [8]:
from pydataset import data

In [9]:
mpg = spark.createDataFrame(data("mpg"))

In [10]:
mpg.show(10)

+------------+----------+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+----------+-----+----+---+----------+---+---+---+---+-------+
|        audi|        a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|        a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|        a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|        a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|        a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
|        audi|        a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|
|        audi|        a4|  3.1|2008|  6|  auto(av)|  f| 18| 27|  p|compact|
|        audi|a4 quattro|  1.8|1999|  4|manual(m5)|  4| 18| 26|  p|compact|
|        audi|a4 quattro|  1.8|1999|  4|  auto(l5)|  4| 16| 25|  p|compact|
|        audi|a4 quattro|  2.0|2008|  4|manual(m6)|  4| 20| 28|  p|compact|
+-----------

In [11]:
mpg.select(concat(lit('The '), 
                 mpg.year, 
                 lit(' '), 
                 mpg.manufacturer, 
                 lit(' '), 
                 mpg.model, 
                 lit(' has a '), 
                 mpg.cyl, 
                 lit(' cylinder engine.')
                ).alias('Make/model/PU')
         ).show(5)

+--------------------+
|       Make/model/PU|
+--------------------+
|The 1999 audi a4 ...|
|The 1999 audi a4 ...|
|The 2008 audi a4 ...|
|The 2008 audi a4 ...|
|The 1999 audi a4 ...|
+--------------------+
only showing top 5 rows



In [39]:
mpg.where(mpg["trans"] == "auto(l6)").show()

+------------+---------------+-----+----+---+--------+---+---+---+---+-------+
|manufacturer|          model|displ|year|cyl|   trans|drv|cty|hwy| fl|  class|
+------------+---------------+-----+----+---+--------+---+---+---+---+-------+
|       dodge|    caravan 2wd|  3.8|2008|  6|auto(l6)|  f| 16| 23|  r|minivan|
|       dodge|    caravan 2wd|  4.0|2008|  6|auto(l6)|  f| 16| 23|  r|minivan|
|        ford| expedition 2wd|  5.4|2008|  8|auto(l6)|  r| 12| 18|  r|    suv|
|        ford|   explorer 4wd|  4.6|2008|  8|auto(l6)|  4| 13| 19|  r|    suv|
|     lincoln|  navigator 2wd|  5.4|2008|  8|auto(l6)|  r| 12| 18|  r|    suv|
|     mercury|mountaineer 4wd|  4.6|2008|  8|auto(l6)|  4| 13| 19|  r|    suv|
+------------+---------------+-----+----+---+--------+---+---+---+---+-------+



In [37]:
mpg.select(
    'trans',
    regexp_extract("trans", r"^(\w+)\(", 1).alias("regexp_extract")).show()

+----------+--------------+
|     trans|regexp_extract|
+----------+--------------+
|  auto(l5)|          auto|
|manual(m5)|        manual|
|manual(m6)|        manual|
|  auto(av)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(av)|          auto|
|manual(m5)|        manual|
|  auto(l5)|          auto|
|manual(m6)|        manual|
|  auto(s6)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(s6)|          auto|
|manual(m6)|        manual|
|  auto(l5)|          auto|
|  auto(s6)|          auto|
|  auto(s6)|          auto|
|  auto(l4)|          auto|
|  auto(l4)|          auto|
+----------+--------------+
only showing top 20 rows



1. Load the tips dataset as a spark dataframe.

    1. What percentage of observations are smokers?
    1. Create a column that contains the tip percentage
    1. Calculate the average tip percentage for each combination of sex and smoker.


In [14]:
tips = spark.createDataFrame(data('tips'))

In [15]:
tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [16]:
tips.createOrReplaceTempView('tips')

In [17]:
spark.sql('''SELECT * FROM tips''').show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [18]:
spark.sql('''SELECT((SELECT COUNT(smoker)
FROM tips
WHERE smoker = "Yes") / (SELECT COUNT(smoker)
FROM tips)) as smoker_pct
FROM tips''').show(1)

+-------------------+
|         smoker_pct|
+-------------------+
|0.38114754098360654|
+-------------------+
only showing top 1 row



In [19]:
spark.sql(
    """
SELECT *, (tip / total_bill) as tip_pct
FROM tips
    """
).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|            tip_pct|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

In [20]:
tips = spark.sql(
                    """
                SELECT *, (tip / total_bill) as tip_pct
                FROM tips
                    """
                )

In [21]:
tips.groupby('sex').pivot('smoker').mean('tip_pct').show()

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941032|
|  Male|0.1606687151291298|0.15277117520248512|
+------+------------------+-------------------+



4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

    1. Convert the temperatures to fahrenheit.
    1. Which month has the most rain, on average?
    1. Which year was the windiest?
    1. What is the most frequent type of weather in January?
    1. What is the average high and low temperature on sunny days in July in 2013 and 2014?
    1. What percentage of days were rainy in q3 of 2015?
    1. For each year, find what percentage of days it rained (had non-zero precipitation).

In [22]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [23]:
weather = weather.withColumn('f_temp_max', ((weather.temp_max.cast('decimal') * 9 / 5) + 32))\
                 .withColumn('f_temp_min', ((weather.temp_min.cast('decimal') * 9 / 5) + 32))\
                 .drop('temp_min','temp_max')

In [24]:
weather.show()

+----------+-------------+----+-------+----------+----------+
|      date|precipitation|wind|weather|f_temp_max|f_temp_min|
+----------+-------------+----+-------+----------+----------+
|2012-01-01|          0.0| 4.7|drizzle| 55.400000| 41.000000|
|2012-01-02|         10.9| 4.5|   rain| 51.800000| 37.400000|
|2012-01-03|          0.8| 2.3|   rain| 53.600000| 44.600000|
|2012-01-04|         20.3| 4.7|   rain| 53.600000| 42.800000|
|2012-01-05|          1.3| 6.1|   rain| 48.200000| 37.400000|
|2012-01-06|          2.5| 2.2|   rain| 39.200000| 35.600000|
|2012-01-07|          0.0| 2.3|   rain| 44.600000| 37.400000|
|2012-01-08|          0.0| 2.0|    sun| 50.000000| 37.400000|
|2012-01-09|          4.3| 3.4|   rain| 48.200000| 41.000000|
|2012-01-10|          1.0| 3.4|   rain| 42.800000| 33.800000|
|2012-01-11|          0.0| 5.1|    sun| 42.800000| 30.200000|
|2012-01-12|          0.0| 1.9|    sun| 42.800000| 28.400000|
|2012-01-13|          0.0| 1.3|    sun| 41.000000| 26.600000|
|2012-01

In [25]:
weather.withColumn('month', month('date'))\
    .groupBy('month')\
    .agg(mean('precipitation').alias('avg_rain'))\
    .sort(desc('avg_rain'))\
    .show(1)

+-----+-----------------+
|month|         avg_rain|
+-----+-----------------+
|   11|5.354166666666667|
+-----+-----------------+
only showing top 1 row



In [26]:
weather.withColumn('year', year('date'))\
    .groupBy('year')\
    .agg(mean('wind').alias('avg_wind'))\
    .sort(desc('avg_wind'))\
    .show(1)

+----+-----------------+
|year|         avg_wind|
+----+-----------------+
|2012|3.400819672131148|
+----+-----------------+
only showing top 1 row



In [27]:
weather.withColumn('month', month('date'))\
     .crosstab('month', 'weather').sort(asc('month_weather')).show(1)

+-------------+-------+---+----+----+---+
|month_weather|drizzle|fog|rain|snow|sun|
+-------------+-------+---+----+----+---+
|            1|     10| 38|  35|   8| 33|
+-------------+-------+---+----+----+---+
only showing top 1 row



In [28]:
weather.filter(((year('date') == 2013) | (year('date') == 2014)) & (month('date') == 7) & (weather.weather == 'sun'))\
    .agg(mean(weather.f_temp_max).alias('avg_max_temp'), mean(weather.f_temp_min).alias('avg_min_temp'))\
    .show()

+-------------+-------------+
| avg_max_temp| avg_min_temp|
+-------------+-------------+
|80.3230769231|57.4423076923|
+-------------+-------------+



In [29]:
quart = weather.withColumn("quarter", quarter("date")).withColumn("year", year("date"))

In [30]:
quart_rain = quart.filter(expr('year == 2015 AND quarter == 3'))

In [31]:
quart_rain.where(quart_rain.weather == 'rain').count() / quart_rain.count()

0.021739130434782608

In [32]:
years = weather.withColumn('year', year('date'))

In [33]:
(years.filter(expr('year == 2012')).filter(expr('precipitation > 0')).count()) / (years.filter(expr('year == 2012')).count())

0.48360655737704916

In [34]:
(years.filter(expr('year == 2013')).filter(expr('precipitation > 0')).count()) / (years.filter(expr('year == 2013')).count())

0.41643835616438357

In [35]:
(years.filter(expr('year == 2014')).filter(expr('precipitation > 0')).count()) / (years.filter(expr('year == 2014')).count())

0.410958904109589

In [36]:
(years.filter(expr('year == 2015')).filter(expr('precipitation > 0')).count()) / (years.filter(expr('year == 2015')).count())

0.39452054794520547

In [40]:
# instructor solution
(
    weather.withColumn('year', year('date'))
    .select(when(col('precipitation') > 0, 1).otherwise(0).alias('rain'), 'year')
    .groupby('year')
    .agg(mean('rain'))
    .show()
)

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+

