# Spark API Exercises

## Testing Installation

In [1]:
import pandas as pd
import numpy as np

In [2]:
from pyspark.sql.functions import *

In [3]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.range(5).show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/15 21:34:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [4]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [5]:
import multiprocessing
import pyspark

nprocs = multiprocessing.cpu_count()

spark = (pyspark.sql.SparkSession.builder
 .master('local')
 .config('spark.jars.packages', 'mysql:mysql-connector-java:8.0.16')
 .config('spark.driver.memory', '4G')
 .config('spark.driver.cores', nprocs)
 .config('spark.sql.shuffle.partitions', nprocs)
 .appName('MySparkApplication')
 .getOrCreate())

22/09/15 21:34:31 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## 1. Create a spark data frame that contains your favorite programming languages.

- a. The name of the column should be language
- b. View the schema of the dataframe
- c. Output the shape of the dataframe
- d. Show the first 5 records in the dataframe

In [6]:
# Creating dataframe with my favorite languages
pandas_dataframe = pd.DataFrame({'Python', 'SQL', 'HTML', 'CSS', 'JavaScript', 'R', 'Markdown'}, columns = ['languages'])

In [7]:
pandas_dataframe

Unnamed: 0,languages
0,SQL
1,Python
2,R
3,CSS
4,JavaScript
5,HTML
6,Markdown


In [8]:
# Convert pandas dataframe to spark dataframe
language = spark.createDataFrame(pandas_dataframe)
language

DataFrame[languages: string]

In [9]:
# Output schema of dataframe
language.printSchema()

root
 |-- languages: string (nullable = true)



In [10]:
# Viewing the top 5 records
language.show(5)

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

+----------+
| languages|
+----------+
|       SQL|
|    Python|
|         R|
|       CSS|
|JavaScript|
+----------+
only showing top 5 rows



By default spark will show the first 20 rows, but we can specify how many we want by passing a number to .show.

In [11]:
language.describe().show()

+-------+---------+
|summary|languages|
+-------+---------+
|  count|        7|
|   mean|     null|
| stddev|     null|
|    min|      CSS|
|    max|      SQL|
+-------+---------+



## 2. Load the mpg dataset as a spark dataframe.
### a. Create 1 column of output that contains a message like the one below, for each vehicle.
            The 1999 audi a4 has a 4 cylinder engine.

In [12]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [13]:
# Register a table with spark
mpg.createOrReplaceTempView("mpg")

In [14]:
spark.sql(
'''
SELECT CONCAT("The ", year, " ", manufacturer, " ", model, "has a ", cyl, " cylinder engine.") as summary
FROM mpg
'''
).show(5)

+--------------------+
|             summary|
+--------------------+
|The 1999 audi a4h...|
|The 1999 audi a4h...|
|The 2008 audi a4h...|
|The 2008 audi a4h...|
|The 1999 audi a4h...|
+--------------------+
only showing top 5 rows



### b. Transform the `trans` column so that it only contains either `manual` or `auto`. 

In [15]:
mpg.select(
    'trans',
    regexp_extract("trans", r"^(\w+)\(", 1).alias("regexp_extract"),
    regexp_replace("trans", r"\(.+$", "").alias("regexp_replace"),
    when(
        mpg.trans.like("auto%"), "auto"
    ).otherwise("manual").alias("when + like")
).show()

+----------+--------------+--------------+-----------+
|     trans|regexp_extract|regexp_replace|when + like|
+----------+--------------+--------------+-----------+
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|manual(m6)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(av)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(l5)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|  auto(l5)|          auto|          auto|       auto|
|manual(m5)|        manual|        manual|     manual|
|  auto(s6)|          auto|          auto|       auto|
|manual(m6)|        manual|        manual|     manual|
|  auto(l5

## 3. Load the tips dataset as a spark dataframe.

### a. What percentage of observations are smokers?


In [17]:
# Load the tips dataset

tips = spark.createDataFrame(data("tips"))
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [18]:
tips.groupBy("smoker").count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



In [19]:
tips.count()

244

In [20]:
tips.groupby('smoker').count().withColumn('percent', col('count') / tips.count()).show()

+------+-----+-------------------+
|smoker|count|            percent|
+------+-----+-------------------+
|    No|  151| 0.6188524590163934|
|   Yes|   93|0.38114754098360654|
+------+-----+-------------------+



In [21]:
tips.groupBy("smoker").count().withColumn(
    "percent",
    concat(round((col("count") / tips.count() * 100), 0).cast("int"), lit("%")),
).show()

+------+-----+-------+
|smoker|count|percent|
+------+-----+-------+
|    No|  151|    62%|
|   Yes|   93|    38%|
+------+-----+-------+



### b. Create a column that contains the tip percentage


In [22]:

# tips.withColumn("tip_percentage", col('tip') / col('total_bill')).show()

tips.withColumn("tip_percentage", expr('tip / total_bill')).show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



### c. Calculate the average tip percentage for each combination of sex and smoker.

In [23]:
# method chaining - df.method1().method2()....
(
    tips.withColumn("tip_percentage", col('tip') / col('total_bill'))
    .groupby("sex")
    .pivot("smoker") # make a pivot table
    .agg(round(mean("tip_percentage"), 4))
    .show()
)

+------+------+------+
|   sex|    No|   Yes|
+------+------+------+
|  Male|0.1607|0.1528|
|Female|0.1569|0.1822|
+------+------+------+



## 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [26]:
# import Seattle weather dataset
from vega_datasets import data

weather = data.seattle_weather()
weather = spark.createDataFrame(weather)
weather.show(4)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 4 rows



### a. Convert the temperatures to fahrenheit.


In [27]:
weather = spark.createDataFrame(weather)
weather = weather.withColumn(
    "temp_max", (col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (col("temp_min") * 9 / 5 + 32))
weather.show(5)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|   55.04|    41.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|   51.08|   37.04| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|   53.06|   44.96| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|   53.96|   42.08| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|   48.02|   37.04| 6.1|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 5 rows



### b. Which month has the most rain, on average?


In [28]:
row = (
    weather.withColumn("month", month("date"))
    .withColumn("year", year("date"))
    .groupBy("month", "year")
    .agg(sum("precipitation").alias("total_monthly_precipitation"))
    .groupBy("month")
    .agg(mean("total_monthly_precipitation").alias("avg_monthly_rain"))
    .sort(col("avg_monthly_rain").desc())
    .first()
)
row

Row(month=11, avg_monthly_rain=160.625)

### c. Which year was the windiest?


In [29]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(sum("wind").alias("total_winds"))
    .sort(col("total_winds").desc())
    .head(5)
)

[Row(year=2012, total_winds=1244.6999999999998),
 Row(year=2014, total_winds=1236.5),
 Row(year=2015, total_winds=1153.3),
 Row(year=2013, total_winds=1100.8000000000002)]

### d. What is the most frequent type of weather in January?


In [30]:
(
    weather.withColumn("month", month("date"))
    .filter(col("month") == 1)
    .groupBy("weather")
    .count()
    .sort(col("count").desc())
    .show()
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
|   rain|   35|
|    sun|   33|
|drizzle|   10|
|   snow|    8|
+-------+-----+



In [31]:
# .value_counts for spark
weather.groupBy('weather').count().show()

+-------+-----+
|weather|count|
+-------+-----+
|drizzle|   54|
|   rain|  259|
|    sun|  714|
|   snow|   23|
|    fog|  411|
+-------+-----+



### e. What is the average high and low temperature on sunny days in July in 2013 and 2014?


In [32]:
(
    weather.filter(month("date") == 7)
    .filter(year("date") > 2012)
    .filter(year("date") < 2015)
    .filter(col("weather") == lit("sun"))
    .agg(
        avg("temp_max").alias("average_high_temp"),
        avg("temp_min").alias("average_low_temp"),
    )
    .show()
)

+-----------------+-----------------+
|average_high_temp| average_low_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



### f. What percentage of days were rainy in q3 of 2015?


In [33]:
(
    weather.filter(year("date") == 2015)
    .filter(quarter("date") == 3)
    .select(when(col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(mean("rain"))
    .show()
)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



### g. For each year, find what percentage of days it rained (had non-zero precipitation).

In [34]:
# measure a rainy day by precipitation > 0
(
    weather.withColumn("year", year("date"))
    .select(when(col("precipitation") > 0, 1).otherwise(0).alias("did_rain"), "year")
    .groupby("year")
    .agg(mean("did_rain"))
    .show()
)

+----+-------------------+
|year|      avg(did_rain)|
+----+-------------------+
|2012|0.48360655737704916|
|2013|0.41643835616438357|
|2014|  0.410958904109589|
|2015|0.39452054794520547|
+----+-------------------+

