In [16]:
import numpy as np
import pandas as pd
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

from pyspark.sql.functions import col, expr

from pydataset import data
from pyspark.sql.functions import regexp_extract, regexp_replace
from pyspark.sql.functions import *

### 1. Create a spark data frame that contains your favorite programming languages.
- The name of the column should be language
- View the schema of the dataframe
- Output the shape of the dataframe
- Show the first 5 records in the dataframe

In [17]:
pd_df = pd.DataFrame({'language': ['Python', 'Java', 'JavaScript', 'C++', 'Go', 'Darwin', 'Grasshopper']})

In [18]:
pd_df

Unnamed: 0,language
0,Python
1,Java
2,JavaScript
3,C++
4,Go
5,Darwin
6,Grasshopper


In [19]:
df = spark.createDataFrame(pd_df)

In [20]:
df.printSchema()

root
 |-- language: string (nullable = true)



In [21]:
print((df.count(), len(df.columns)))

(7, 1)


In [22]:
df.show(5)

+----------+
|  language|
+----------+
|    Python|
|      Java|
|JavaScript|
|       C++|
|        Go|
+----------+
only showing top 5 rows



### Load the mpg dataset as a spark dataframe.

- Create 1 column of output that contains a message like the one below:
  - The 1999 audi a4 has a 4 cylinder engine.
  - For each vehicle.

- Transform the trans column so that it only contains either manual or auto.

In [23]:
mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [39]:
mpg.select(
    concat(
        lit("The "), 
        col("year"), 
        lit(" "),
        col("manufacturer"), 
        lit(' '), 
        col("model"),
        lit(" has a "), 
        col("cyl"),
        lit(" cylinder engine."),
    ).alias("vericle_cylinder_desc")).show(truncate=False)

+--------------------------------------------------------------+
|vericle_cylinder_desc                                         |
+--------------------------------------------------------------+
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 2008 audi a4 has a 4 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 has a 6 cylinder engine.                     |
|The 2008 audi a4 has a 6 cylinder engine.                     |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 2008 audi a4 quattro has a 4 cylinder engine.             |
|The 1999 audi a4 quattro has a 6 cylinder engine.             |
|The 1999 audi a4 quattro

In [40]:
# new_mpg = mpg.select('*', concat(lit('The '), mpg.year, lit(' '), mpg.manufacturer, lit(' '), mpg.model, lit('has a'), mpg.cyl, lit(' cylinder engine.')).alias('message'))

In [41]:
#new_mpg.show(5)

#### Transform the trans column so that it only contains either manual or auto.

In [42]:
# mpg.select('trans',regexp_replace('trans', r'\(\w+\)$', '').alias('transmission')).show()

multiple ways to do this

In [44]:
mpg.select(
    'trans',
    regexp_extract("trans", r"^(\w+)\(", 1).alias("regexp_extract")).show()

+----------+--------------+
|     trans|regexp_extract|
+----------+--------------+
|  auto(l5)|          auto|
|manual(m5)|        manual|
|manual(m6)|        manual|
|  auto(av)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(av)|          auto|
|manual(m5)|        manual|
|  auto(l5)|          auto|
|manual(m6)|        manual|
|  auto(s6)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(s6)|          auto|
|manual(m6)|        manual|
|  auto(l5)|          auto|
|  auto(s6)|          auto|
|  auto(s6)|          auto|
|  auto(l4)|          auto|
|  auto(l4)|          auto|
+----------+--------------+
only showing top 20 rows



In [47]:
mpg.select(
    'trans',
    regexp_replace("trans", r"\(.+$", "").alias("regexp_replace")).show()

+----------+--------------+
|     trans|regexp_replace|
+----------+--------------+
|  auto(l5)|          auto|
|manual(m5)|        manual|
|manual(m6)|        manual|
|  auto(av)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(av)|          auto|
|manual(m5)|        manual|
|  auto(l5)|          auto|
|manual(m6)|        manual|
|  auto(s6)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(s6)|          auto|
|manual(m6)|        manual|
|  auto(l5)|          auto|
|  auto(s6)|          auto|
|  auto(s6)|          auto|
|  auto(l4)|          auto|
|  auto(l4)|          auto|
+----------+--------------+
only showing top 20 rows



In [48]:
# Method without regex using (WHEN)
mpg.select(
    'trans',
    when(
        mpg.trans.like("auto%"), "auto"
    ).otherwise("manual").alias("when + like")
).show()

+----------+-----------+
|     trans|when + like|
+----------+-----------+
|  auto(l5)|       auto|
|manual(m5)|     manual|
|manual(m6)|     manual|
|  auto(av)|       auto|
|  auto(l5)|       auto|
|manual(m5)|     manual|
|  auto(av)|       auto|
|manual(m5)|     manual|
|  auto(l5)|       auto|
|manual(m6)|     manual|
|  auto(s6)|       auto|
|  auto(l5)|       auto|
|manual(m5)|     manual|
|  auto(s6)|       auto|
|manual(m6)|     manual|
|  auto(l5)|       auto|
|  auto(s6)|       auto|
|  auto(s6)|       auto|
|  auto(l4)|       auto|
|  auto(l4)|       auto|
+----------+-----------+
only showing top 20 rows



### 3. Load the tips dataset as a spark dataframe.

- What percentage of observations are smokers?
- Create a column that contains the tip percentage
- Calculate the average tip percentage for each combination of sex and smoker.

In [36]:
tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [49]:
tips.groupBy("smoker").count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



In [50]:
tips.groupBy("smoker").count().withColumn(
    "percent",
    concat(round((col("count") / tips.count() * 100), 0).cast("int"), lit("%")),
).show()

+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|    62%|
|   Yes|   93|    38%|
+------+-----+-------+



Create a column that contains the tip percentage

In [51]:
# adding the column with the tip percentage
tips.withColumn("tip_percentage", col('tip') / col('total_bill')).show()

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|0.18623962040332148|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|0.22805017103762829|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|0.11607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|0.13031914893617022|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2| 0.2185385656292287|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 0.1665043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|0

Calculate the average tip percentage for each combination of sex and smoker

In [53]:
(
    tips.withColumn("tip_percentage", col('tip') / col('total_bill'))
    .groupBy("sex")
    .pivot('smoker')
    .agg(round(mean("tip_percentage"), 4))
    .show()
)

+------+------+------+
|   sex|    No|   Yes|
+------+------+------+
|Female|0.1569|0.1822|
|  Male|0.1607|0.1528|
+------+------+------+



### 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

- Convert the temperatures to fahrenheit.
- Which month has the most rain, on average?
- Which year was the windiest?
- What is the most frequent type of weather in January?
- What is the average high and low temperature on sunny days in July in 2013 and 2014?
- What percentage of days were rainy in q3 of 2015?
- For each year, find what percentage of days it rained (had non-zero precipitation).

In [38]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



Convert the temperatures to fahrenheit

In [54]:
# pandas equilvalent -- df.temp_max = df.temp_max * 9 / 5 + 32

weather = weather.withColumn(
    "temp_max", (col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (col("temp_min") * 9 / 5 + 32))

In [55]:
weather.show(4)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|   55.04|    41.0| 4.7|drizzle|
|2012-01-02|         10.9|   51.08|   37.04| 4.5|   rain|
|2012-01-03|          0.8|   53.06|   44.96| 2.3|   rain|
|2012-01-04|         20.3|   53.96|   42.08| 4.7|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 4 rows



Which month has the most rain, on average?

In [58]:
row = (
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_monthly_precipitation"))
    .groupBy("month")
    .agg(mean("total_monthly_precipitation").alias("avg_monthly_rain"))
    .sort(col("avg_monthly_rain").desc())
    .first()
)
row

Row(month=11, avg_monthly_rain=160.625)

In [59]:
row?

[0;31mSignature:[0m      [0mrow[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mType:[0m           Row
[0;31mString form:[0m    Row(month=11, avg_monthly_rain=160.625)
[0;31mLength:[0m         2
[0;31mFile:[0m           /opt/homebrew/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py
[0;31mDocstring:[0m     
A row in :class:`DataFrame`.
The fields in it can be accessed:

* like attributes (``row.key``)
* like dictionary values (``row[key]``)

``key in row`` will search through row keys.

Row can be used to create a row object by using named arguments.
It is not allowed to omit a named argument to represent that the value is
None or missing. This should be explicitly set to None in this case.

.. versionchanged:: 3.0.0
    Rows created from named arguments no longer have
    field names sorted alphabetically and will be ordered in the position as
    entered.

Examples
--------
>>> row = Row(name="Alice", age=11)
>>> row
Row(name=

In [60]:
row.avg_monthly_rain

160.625

Which year is the windiest?

In [61]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("total_winds"))
    .sort(col("total_winds").desc())
    .head(5)
)                   

[Row(year=2012, total_winds=1244.7),
 Row(year=2014, total_winds=1236.5000000000007),
 Row(year=2015, total_winds=1153.3000000000002),
 Row(year=2013, total_winds=1100.8000000000006)]

What is the most frequent type of weather in January?

In [62]:
(
    weather.withColumn("month", month("date"))
    .filter(col("month") == 1)
    .groupBy("weather")
    .count()
    .sort(col("count").desc())
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [63]:
(
    weather.filter(month("date") == 7)
    .filter(year("date") > 2012)
    .filter(year("date") < 2015)
    .filter(col("weather") == lit("sun"))
    .agg(
        avg("temp_max").alias("average_high_temp"),
        avg("temp_min").alias("average_low_temp"),
    )
    .show()
)

+-----------------+-----------------+
|average_high_temp| average_low_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



What percentage of days were rainy in q3 of 2015?

In [64]:
# in pandas -- (df.weather == "rain").mean()
# measure a rainy day by weather == rain
(
    weather.filter(year("date") == 2015)
    .filter(quarter("date") == 3)
    .select(when(col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(mean("rain"))
    .show()
)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



For each year, find what percentage of days it rained (had non-zero precipitation).

In [65]:
# measure a rainy day by precipitation > 0
(
    weather.withColumn("year", year("date"))
    .select(when(col("precipitation") > 0, 1).otherwise(0).alias("rain"), "year")
    .groupby("year")
    .agg(mean("rain"))
    .show()
)

+----+-------------------+
|year|          avg(rain)|
+----+-------------------+
|2015|0.39452054794520547|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2012|0.48360655737704916|
+----+-------------------+

