#### Imports

In [1]:
#import spark for python! 
import pyspark
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
from pyspark.sql.functions import min, max, sum, count, mean, avg, round
from pyspark.sql.functions import concat, lit
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import when
from pyspark.sql.functions import month, year, quarter

import pandas as pd
import numpy as np

from pydataset import data
from vega_datasets import data as data2

In [2]:
#create the spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/30 12:11:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/30 12:11:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Exercises

#### 1. Create a spark data frame that contains your favorite programming languages.

- The name of the column should be language


In [9]:
language_df = spark.createDataFrame(
    pd.DataFrame(
        {
            "language": [
                "python",
                "java",
                "java script",
                "r",
                "jupyter notebook",
                "perl",
                "swift",
                "sql"
            ]
        }
    )
)
language_df.show(truncate=False)

+----------------+
|language        |
+----------------+
|python          |
|java            |
|java script     |
|r               |
|jupyter notebook|
|perl            |
|swift           |
|sql             |
+----------------+



- View the schema of the dataframe


In [12]:
language_df.printSchema()

root
 |-- language: string (nullable = true)



- Output the shape of the dataframe


In [15]:
language_df.count(), len(language_df.columns)

(8, 1)

- Show the first 5 records in the dataframe

In [16]:
language_df.show(5)

+----------------+
|        language|
+----------------+
|          python|
|            java|
|     java script|
|               r|
|jupyter notebook|
+----------------+
only showing top 5 rows



#### 2. Load the mpg dataset as a spark dataframe.

- Create 1 column of output that contains a message like the one below:
    - For each vehicle.

The 1999 audi a4 has a 4 cylinder engine.



In [19]:
df = spark.createDataFrame(data('mpg'))

In [20]:
df.show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-------+
|        audi|                a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|                a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|                a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|                a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|                a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
|        audi|                a4|  2.8|1999|  6|manual(m5)|  f| 18| 26|  p|compact|
|        audi|                a4|  3.1|2008|  6|  auto(av)|  f| 18| 27|  p|compact|
|        audi|        a4 quattro|  1.8|1999|  4|manual(m5)|  4| 18| 26|  p|compact|
|        audi|        a4 quattro|  1.8|1999|  4|  auto(l5)|  4| 16| 25|  p|c

In [34]:
df.select(
    concat(
        lit('The '),
        df.year,
        lit(' '),
        df.manufacturer,
        lit(' '),
        df.model,
        lit(' has a '),
        df.cyl,
        lit(' cylinder engine')
    ).alias('year_make_model_engine_size')
).show(5, truncate=False)

+----------------------------------------+
|year_make_model_engine_size             |
+----------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine|
|The 1999 audi a4 has a 4 cylinder engine|
|The 2008 audi a4 has a 4 cylinder engine|
|The 2008 audi a4 has a 4 cylinder engine|
|The 1999 audi a4 has a 6 cylinder engine|
+----------------------------------------+
only showing top 5 rows



In [42]:
df = df.withColumn(
    'year_make_model_engine_size', # new column name
    (
    concat(
        lit('The '),
        df.year,
        lit(' '),
        df.manufacturer,
        lit(' '),
        df.model,
        lit(' has a '),
        df.cyl,
        lit(' cylinder engine')
    )
) #how the new column is created
)

In [44]:
df.show(10, truncate=False)

+------------+----------+-----+----+---+----------+---+---+---+---+-------+------------------------------------------------+
|manufacturer|model     |displ|year|cyl|trans     |drv|cty|hwy|fl |class  |year_make_model_engine_size                     |
+------------+----------+-----+----+---+----------+---+---+---+---+-------+------------------------------------------------+
|audi        |a4        |1.8  |1999|4  |auto(l5)  |f  |18 |29 |p  |compact|The 1999 audi a4 has a 4 cylinder engine        |
|audi        |a4        |1.8  |1999|4  |manual(m5)|f  |21 |29 |p  |compact|The 1999 audi a4 has a 4 cylinder engine        |
|audi        |a4        |2.0  |2008|4  |manual(m6)|f  |20 |31 |p  |compact|The 2008 audi a4 has a 4 cylinder engine        |
|audi        |a4        |2.0  |2008|4  |auto(av)  |f  |21 |30 |p  |compact|The 2008 audi a4 has a 4 cylinder engine        |
|audi        |a4        |2.8  |1999|6  |auto(l5)  |f  |16 |26 |p  |compact|The 1999 audi a4 has a 6 cylinder engine        |


- Transform the trans column so that it only contains either manual or auto.

In [102]:
df.select(
    'trans',
    regexp_extract('trans', r'(\w+)\(.*\)', 1).alias('trans')

).show()

+----------+------+
|     trans| trans|
+----------+------+
|  auto(l5)|  auto|
|manual(m5)|manual|
|manual(m6)|manual|
|  auto(av)|  auto|
|  auto(l5)|  auto|
|manual(m5)|manual|
|  auto(av)|  auto|
|manual(m5)|manual|
|  auto(l5)|  auto|
|manual(m6)|manual|
|  auto(s6)|  auto|
|  auto(l5)|  auto|
|manual(m5)|manual|
|  auto(s6)|  auto|
|manual(m6)|manual|
|  auto(l5)|  auto|
|  auto(s6)|  auto|
|  auto(s6)|  auto|
|  auto(l4)|  auto|
|  auto(l4)|  auto|
+----------+------+
only showing top 20 rows



#### 3. Load the tips dataset as a spark dataframe.

- What percentage of observations are smokers?


In [103]:
df = spark.createDataFrame(data('tips'))

In [145]:
df.show(10)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
+----------+----+------+------+---+------+----+
only showing top 10 rows



In [156]:
(df.filter(df.smoker=='No').count() / df.count()) *100

61.885245901639344

- Create a column that contains the tip percentage


In [166]:
df.select(
    (df.tip / df.total_bill)
).show(5)

+-------------------+
| (tip / total_bill)|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



In [168]:
df.select(
    .alias('tip_percentage')
).show(5)

+--------------+
|tip_percentage|
+--------------+
|           6.0|
|          16.0|
|          17.0|
|          14.0|
|          15.0|
+--------------+
only showing top 5 rows



In [170]:
df = df.withColumn(
        'tip_percentage', # new column name
        round(((df.tip / df.total_bill)*100))#how the new column is created
    )

In [171]:
df.show()

+----------+----+------+------+---+------+----+--------------+
|total_bill| tip|   sex|smoker|day|  time|size|tip_percentage|
+----------+----+------+------+---+------+----+--------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|           6.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|          16.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|          17.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|          14.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|          15.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|          19.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|          23.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|          12.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|          13.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|          22.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|          17.0|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|          14.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|        

- Calculate the average tip percentage for each combination of sex and smoker.

In [173]:
df.crosstab('sex', 'smoker').show()



+----------+---+---+
|sex_smoker| No|Yes|
+----------+---+---+
|    Female| 54| 33|
|      Male| 97| 60|
+----------+---+---+



                                                                                

In [175]:
df.groupBy('smoker').pivot('sex').mean('tip_percentage').show()



+------+------------------+------------------+
|smoker|            Female|              Male|
+------+------------------+------------------+
|    No|15.685185185185185|16.103092783505154|
|   Yes|18.242424242424242|15.283333333333333|
+------+------------------+------------------+



                                                                                

#### 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.


In [177]:
weather = data2.seattle_weather().assign(date=lambda df: df.date.astype(str))
df = spark.createDataFrame(weather)
df.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



(0°C × 1.8) + 32 = 32°F

In [184]:
df = df.withColumn(
        'temp_max_f', # new column name
        round((df.temp_max * 1.8) + 32, 2)#how the new column is created
    )

In [185]:
df = df.withColumn(
        'temp_min_f', # new column name
        round((df.temp_min * 1.8) + 32, 2)#how the new column is created
    )

In [188]:
df.show(5)

+----------+-------------+--------+--------+----+-------+----------+----------+
|      date|precipitation|temp_max|temp_min|wind|weather|temp_max_f|temp_min_f|
+----------+-------------+--------+--------+----+-------+----------+----------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|     55.04|      41.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|     51.08|     37.04|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|     53.06|     44.96|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|     53.96|     42.08|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|     48.02|     37.04|
+----------+-------------+--------+--------+----+-------+----------+----------+
only showing top 5 rows



In [183]:
df.select(
    round((df.temp_max * 1.8) + 32, 2).alias('temp_max_f'),
    round((df.temp_min * 1.8) + 32, 2).alias('temp_min_f')

).show(5)

+----------+----------+
|temp_max_f|temp_min_f|
+----------+----------+
|     55.04|      41.0|
|     51.08|     37.04|
|     53.06|     44.96|
|     53.96|     42.08|
|     48.02|     37.04|
+----------+----------+
only showing top 5 rows



- Which month has the most rain, on average?


In [191]:
(
    df.withColumn('the_month', month('date'))
    .groupBy('the_month')
    .agg(round(avg('precipitation')).alias('avg_rain'))
    .sort('avg_rain')
).show()

+---------+--------+
|the_month|avg_rain|
+---------+--------+
|        7|     0.0|
|        6|     1.0|
|        8|     1.0|
|        5|     2.0|
|        9|     2.0|
|        4|     3.0|
|        1|     4.0|
|        2|     4.0|
|       10|     4.0|
|        3|     5.0|
|       12|     5.0|
|       11|     5.0|
+---------+--------+



- Which year was the windiest?


In [198]:
(
    df.withColumn('the_year', year('date'))
    .groupBy('the_year')
    .agg((avg('wind')).alias('avg_wind'))
    .sort('avg_wind')
).show()

[Stage 254:>                                                        (0 + 8) / 8]

+--------+------------------+
|the_year|          avg_wind|
+--------+------------------+
|    2013|3.0158904109589058|
|    2015| 3.159726027397261|
|    2014| 3.387671232876714|
|    2012| 3.400819672131148|
+--------+------------------+



                                                                                

- What is the most frequent type of weather in January?


In [223]:
df = df.withColumn('the_month', month('date'))

In [228]:
df.crosstab('the_month', 'weather').show()

[Stage 295:>                                                        (0 + 8) / 8]

+-----------------+-------+---+----+----+---+
|the_month_weather|drizzle|fog|rain|snow|sun|
+-----------------+-------+---+----+----+---+
|                7|      8| 13|  14|   0| 89|
|               11|      3| 50|  25|   0| 42|
|                3|      3| 36|  37|   6| 42|
|                8|      8| 16|   6|   0| 94|
|                5|      1| 25|  16|   0| 82|
|                6|      2| 14|  19|   0| 85|
|                9|      5| 40|   4|   0| 71|
|                1|     10| 38|  35|   8| 33|
|               10|      4| 55|  20|   0| 45|
|                4|      4| 34|  20|   1| 61|
|               12|      2| 54|  23|   5| 40|
|                2|      4| 36|  40|   3| 30|
+-----------------+-------+---+----+----+---+



                                                                                

- What is the average high and low temperature on sunny days in July in 2013 and 2014?


In [231]:
df.show(3)

+----------+-------------+--------+--------+----+-------+----------+----------+---------+
|      date|precipitation|temp_max|temp_min|wind|weather|temp_max_f|temp_min_f|the_month|
+----------+-------------+--------+--------+----+-------+----------+----------+---------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|     55.04|      41.0|        1|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|     51.08|     37.04|        1|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|     53.06|     44.96|        1|
+----------+-------------+--------+--------+----+-------+----------+----------+---------+
only showing top 3 rows



In [246]:
df = df.withColumn('the_year', year('date'))

In [250]:
filtered_df = df.filter((col('the_month') == 7) &
            (col('the_year').isin([2013, 2014])) &
            (col('weather') == 'sun'))

In [251]:
filtered_df.select(
    avg('temp_max'),
    avg('temp_min')


).show()

[Stage 325:>                                                        (0 + 8) / 8]

+------------------+-----------------+
|     avg(temp_max)|    avg(temp_min)|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



                                                                                

- What percentage of days were rainy in q3 of 2015?


- For each year, find what percentage of days it rained (had non-zero precipitation).

23/06/30 17:20:47 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 992277 ms exceeds timeout 120000 ms
23/06/30 17:20:47 WARN SparkContext: Killing executors is not supported by current scheduler.
23/06/30 17:36:07 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B