In [1]:
import pandas as pd
import numpy as np
import pyspark
import pydataset
from pyspark.sql.functions import sum, mean, concat, lit, regexp_extract, regexp_replace, when
from vega_datasets import data
from pyspark.sql.functions import month, year, quarter
from pyspark.sql.functions import *

np.random.seed(123)


spark = pyspark.sql.SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/23 15:12:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/23 15:12:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/23 15:12:30 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## 1. Create a spark data frame that contains your favorite programming languages.
- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [2]:
# Create the dataframe
df = spark.createDataFrame(pd.DataFrame({'language': ['Python', 
                                                      'SQL', 
                                                      'R', 
                                                      'C++', 
                                                      'Java',
                                                      'Ruby']}))



In [3]:
# View the schema
df.schema

StructType(List(StructField(language,StringType,true)))

In [4]:
# Output the shape
print((df.count(), len(df.columns)))

[Stage 0:>                                                          (0 + 8) / 8]

(6, 1)


                                                                                

In [5]:
# Show the first five records in the dataframe
df.show(5)

+--------+
|language|
+--------+
|  Python|
|     SQL|
|       R|
|     C++|
|    Java|
+--------+
only showing top 5 rows



## 2. Load the mpg dataset as a spark dataframe.

**a. Create 1 column of output that contains a message like the one below:**

The 1999 audi a4 has a 4 cylinder engine.

**For each vehicle.**

In [6]:
# Load the data
df = spark.createDataFrame(pydataset.data("mpg"))
df.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [7]:
# Create the column with the message
df.select(
    concat(
        lit("The "),
        col("year"),
        lit(" "),
        col("manufacturer"),
        lit(" "),
        col("model"),
        lit(" has a "),
        col("cyl"),
        lit(" cylinder engine."),
    ).alias("cylinder_description")
).show(truncate=False)

+--------------------------------------------------------------+
|cylinder_description                                          |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

**b. Transform the trans column so that it only contains either manual or auto.**

In [8]:
df.select('trans',regexp_replace('trans', r'\(\w+\)$', '').alias('transmission')).show()

+----------+------------+
|     trans|transmission|
+----------+------------+
|  auto(l5)|        auto|
|manual(m5)|      manual|
|manual(m6)|      manual|
|  auto(av)|        auto|
|  auto(l5)|        auto|
|manual(m5)|      manual|
|  auto(av)|        auto|
|manual(m5)|      manual|
|  auto(l5)|        auto|
|manual(m6)|      manual|
|  auto(s6)|        auto|
|  auto(l5)|        auto|
|manual(m5)|      manual|
|  auto(s6)|        auto|
|manual(m6)|      manual|
|  auto(l5)|        auto|
|  auto(s6)|        auto|
|  auto(s6)|        auto|
|  auto(l4)|        auto|
|  auto(l4)|        auto|
+----------+------------+
only showing top 20 rows



## 3. Load the tips dataset as a spark dataframe.

**a. What percentage of observations are smokers?**

In [9]:
# load data set
df = spark.createDataFrame(pydataset.data("tips"))
df.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [10]:
# Calculate percentage of smokers by creating a filtered subset and taking the mean
smokers = when(df.smoker == 'Yes', 1).otherwise(0)
df.select(mean(smokers)).show()

+-----------------------------------------------+
|avg(CASE WHEN (smoker = Yes) THEN 1 ELSE 0 END)|
+-----------------------------------------------+
|                            0.38114754098360654|
+-----------------------------------------------+



**b. Create a column that contains the tip percentage**

In [11]:
tip_percent = (df.tip / df.total_bill).alias('tip_percent')
df.select(tip_percent).show(5)

+-------------------+
|        tip_percent|
+-------------------+
|0.05944673337257211|
|0.16054158607350097|
|0.16658733936220846|
| 0.1397804054054054|
|0.14680764538430255|
+-------------------+
only showing top 5 rows



**c. Calculate the average tip percentage for each combination of sex and smoker.**

In [12]:
df.groupby('sex', 'smoker').agg(mean(tip_percent)).show()

[Stage 14:>                                                         (0 + 8) / 8]

+------+------+--------------------------------------+
|   sex|smoker|avg((tip / total_bill) AS tip_percent)|
+------+------+--------------------------------------+
|  Male|    No|                    0.1606687151291298|
|Female|    No|                    0.1569209707691836|
|  Male|   Yes|                   0.15277117520248512|
|Female|   Yes|                   0.18215035269941032|
+------+------+--------------------------------------+



                                                                                

## 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.

In [13]:
# Load the data set
df = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
df = spark.createDataFrame(df)
df.show(5)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [14]:
# Converting temps to fahrenheit
df = df.withColumn('temp_max', df.temp_max * (9/5) + 32)
df = df.withColumn('temp_min', df.temp_min * (9/5) + 32)
df.show(5)

+----------+-------------+------------------+--------+----+-------+
|      date|precipitation|          temp_max|temp_min|wind|weather|
+----------+-------------+------------------+--------+----+-------+
|2012-01-01|          0.0|55.040000000000006|    41.0| 4.7|drizzle|
|2012-01-02|         10.9|             51.08|   37.04| 4.5|   rain|
|2012-01-03|          0.8|             53.06|   44.96| 2.3|   rain|
|2012-01-04|         20.3|             53.96|   42.08| 4.7|   rain|
|2012-01-05|          1.3|48.019999999999996|   37.04| 6.1|   rain|
+----------+-------------+------------------+--------+----+-------+
only showing top 5 rows



- Which month has the most rain, on average?

In [15]:
(df.withColumn('month', month('date'))
    .groupBy('month')
    .agg(mean('precipitation').alias('avg_rain'))
    .sort(desc('avg_rain'))
    .show(1))

[Stage 19:>                                                         (0 + 8) / 8]

+-----+-----------------+
|month|         avg_rain|
+-----+-----------------+
|   11|5.354166666666667|
+-----+-----------------+
only showing top 1 row



                                                                                

- Which year was the windiest?

In [16]:
(df.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("total_wind"))
    .sort("total_wind")
    .show()
) # 2012 is windiest



+----+------------------+
|year|        total_wind|
+----+------------------+
|2013|1100.8000000000006|
|2015|1153.3000000000002|
|2014|1236.5000000000007|
|2012|            1244.7|
+----+------------------+



                                                                                

- What is the most frequent type of weather in January?

In [17]:
# add year column to df
df = df.withColumn("year", year("date"))
df.show()

+----------+-------------+------------------+------------------+----+-------+----+
|      date|precipitation|          temp_max|          temp_min|wind|weather|year|
+----------+-------------+------------------+------------------+----+-------+----+
|2012-01-01|          0.0|55.040000000000006|              41.0| 4.7|drizzle|2012|
|2012-01-02|         10.9|             51.08|             37.04| 4.5|   rain|2012|
|2012-01-03|          0.8|             53.06|             44.96| 2.3|   rain|2012|
|2012-01-04|         20.3|             53.96|             42.08| 4.7|   rain|2012|
|2012-01-05|          1.3|48.019999999999996|             37.04| 6.1|   rain|2012|
|2012-01-06|          2.5|             39.92|             35.96| 2.2|   rain|2012|
|2012-01-07|          0.0|             44.96|             37.04| 2.3|   rain|2012|
|2012-01-08|          0.0|              50.0|             37.04| 2.0|    sun|2012|
|2012-01-09|          4.3|             48.92|              41.0| 3.4|   rain|2012|
|201

In [18]:
january = df.filter(month('date') == '01').groupby('weather').count()
january = january.withColumnRenamed('count', 'n_days')
january.show()
# Fog wins most frequent

[Stage 26:>                                                         (0 + 8) / 8]

+-------+------+
|weather|n_days|
+-------+------+
|drizzle|    10|
|   rain|    35|
|    sun|    33|
|   snow|     8|
|    fog|    38|
+-------+------+



                                                                                

- What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [19]:
# Calculate high
df_13_14 = df.filter(((df.year == 2013) | (df.year == 2014)) & (df.weather == 'sun'))
df_13_14.select(mean('temp_max')).show()

+-----------------+
|    avg(temp_max)|
+-----------------+
|65.37874999999998|
+-----------------+



In [20]:
# Calculate low
df_13_14.select(mean('temp_min')).show()

+-----------------+
|    avg(temp_min)|
+-----------------+
|48.28913461538461|
+-----------------+



- What percentage of days were rainy in q3 of 2015?

In [21]:
# add quarter column to df
df = df.withColumn('quarter', quarter('date'))
df.show(5)

+----------+-------------+------------------+--------+----+-------+----+-------+
|      date|precipitation|          temp_max|temp_min|wind|weather|year|quarter|
+----------+-------------+------------------+--------+----+-------+----+-------+
|2012-01-01|          0.0|55.040000000000006|    41.0| 4.7|drizzle|2012|      1|
|2012-01-02|         10.9|             51.08|   37.04| 4.5|   rain|2012|      1|
|2012-01-03|          0.8|             53.06|   44.96| 2.3|   rain|2012|      1|
|2012-01-04|         20.3|             53.96|   42.08| 4.7|   rain|2012|      1|
|2012-01-05|          1.3|48.019999999999996|   37.04| 6.1|   rain|2012|      1|
+----------+-------------+------------------+--------+----+-------+----+-------+
only showing top 5 rows



In [22]:
# Subset the data with our paramaters
rain_q3 = df.filter(expr('year == 2015 AND quarter == 3'))
rain_q3.show(5)

+----------+-------------+-----------------+-----------------+----+-------+----+-------+
|      date|precipitation|         temp_max|         temp_min|wind|weather|year|quarter|
+----------+-------------+-----------------+-----------------+----+-------+----+-------+
|2015-07-01|          0.0|89.96000000000001|            62.96| 4.3|    sun|2015|      3|
|2015-07-02|          0.0|            93.02|64.03999999999999| 3.4|    sun|2015|      3|
|2015-07-03|          0.0|            91.94|64.03999999999999| 2.6|    sun|2015|      3|
|2015-07-04|          0.0|            91.94|             59.0| 2.9|    sun|2015|      3|
|2015-07-05|          0.0|91.03999999999999|            62.06| 2.1|    sun|2015|      3|
+----------+-------------+-----------------+-----------------+----+-------+----+-------+
only showing top 5 rows



In [23]:
# Calculate the percentage of whole
rain_q3.where(rain_q3.weather=='rain').count() / rain_q3.count()

0.021739130434782608

- For each year, find what percentage of days it rained (had non-zero precipitation).

In [24]:
df = df.withColumn('had_precip', when((df.precipitation > 0), 1)
                                    .otherwise(0))

In [25]:
df.groupby('year').agg(mean('had_precip')).show()

+----+-------------------+
|year|    avg(had_precip)|
+----+-------------------+
|2012|0.48360655737704916|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2015|0.39452054794520547|
+----+-------------------+

