In [1]:
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import functions as f

from pydataset import data

import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

import warnings
warnings.filterwarnings('ignore')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/19 15:38:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/19 15:38:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### 1.) Create a spark data frame that contains your favorite programming languages.

 - The name of the column should be language
 - View the schema of the dataframe
 - Output the shape of the dataframe
 - Show the first 5 records in the dataframe

In [2]:
df = pd.DataFrame()
df['language'] = ['Python', 'SQL', 'R', 'C++', 'Java', 'C#']
df = spark.createDataFrame(df)

In [3]:
# The name of the column should be language
df.select(df.language).show()

                                                                                

+--------+
|language|
+--------+
|  Python|
|     SQL|
|       R|
|     C++|
|    Java|
|      C#|
+--------+



In [4]:
# View the schema of the dataframe
df.printSchema()

root
 |-- language: string (nullable = true)



In [5]:
# Output the shape of the dataframe
df.toPandas().shape

(6, 1)

In [6]:
# Show the first 5 records in the dataframe
df.show(5)

+--------+
|language|
+--------+
|  Python|
|     SQL|
|       R|
|     C++|
|    Java|
+--------+
only showing top 5 rows



#### 2.) Load the mpg dataset as a spark dataframe.

 a.) Create 1 column of output that contains a message like the one below:
 
     - The 1999 audi a4 has a 4 cylinder engine.
 For each vehicle.

 b.) Transform the trans column so that it only contains either manual or auto.

In [7]:
mpg = spark.createDataFrame(data('mpg'))
mpg.show(3)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 3 rows



In [8]:
# Create the column with messages
mpg.select(
    f.concat(
        f.lit('The '),
        mpg.year,
        f.lit(' '),
        mpg.manufacturer,
        f.lit(' '),
        mpg.model,
        f.lit(' has a(n) '),
        mpg.cyl,
        f.lit(' cyliner engine.')
        ).alias('Model Explanation')
).show(20, False)

+----------------------------------------------------------------+
|Model Explanation                                               |
+----------------------------------------------------------------+
|The 1999 audi a4 has a(n) 4 cyliner engine.                     |
|The 1999 audi a4 has a(n) 4 cyliner engine.                     |
|The 2008 audi a4 has a(n) 4 cyliner engine.                     |
|The 2008 audi a4 has a(n) 4 cyliner engine.                     |
|The 1999 audi a4 has a(n) 6 cyliner engine.                     |
|The 1999 audi a4 has a(n) 6 cyliner engine.                     |
|The 2008 audi a4 has a(n) 6 cyliner engine.                     |
|The 1999 audi a4 quattro has a(n) 4 cyliner engine.             |
|The 1999 audi a4 quattro has a(n) 4 cyliner engine.             |
|The 2008 audi a4 quattro has a(n) 4 cyliner engine.             |
|The 2008 audi a4 quattro has a(n) 4 cyliner engine.             |
|The 1999 audi a4 quattro has a(n) 6 cyliner engine.          

In [9]:
# Transform the trans column so that it only contains either manual or auto.
mpg.select(
    mpg.trans,
    f.regexp_extract('trans', r'\w+',0).alias("manual_or_auto")
).show()

+----------+--------------+
|     trans|manual_or_auto|
+----------+--------------+
|  auto(l5)|          auto|
|manual(m5)|        manual|
|manual(m6)|        manual|
|  auto(av)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(av)|          auto|
|manual(m5)|        manual|
|  auto(l5)|          auto|
|manual(m6)|        manual|
|  auto(s6)|          auto|
|  auto(l5)|          auto|
|manual(m5)|        manual|
|  auto(s6)|          auto|
|manual(m6)|        manual|
|  auto(l5)|          auto|
|  auto(s6)|          auto|
|  auto(s6)|          auto|
|  auto(l4)|          auto|
|  auto(l4)|          auto|
+----------+--------------+
only showing top 20 rows



#### 3.) Load the tips dataset as a spark dataframe.

 - What percentage of observations are smokers?
 - Create a column that contains the tip percentage
 - Calculate the average tip percentage for each combination of sex and smoker.

In [10]:
tips = spark.createDataFrame(data('tips'))

In [11]:
# What percentage of observations are smokers?
tips.agg(f.mean(f.when(tips.smoker == 'Yes',1)
                .otherwise(0))
         .alias("Percent_smokers")
        ).show()


+-------------------+
|    Percent_smokers|
+-------------------+
|0.38114754098360654|
+-------------------+



In [12]:
# Create a column that contains the tip percentage
tips.select(
    tips.tip,
    tips.total_bill,
    f.round(
        (tips.tip / tips.total_bill)
        ,2)
    .alias('tip_percentage')
).show()

+----+----------+--------------+
| tip|total_bill|tip_percentage|
+----+----------+--------------+
|1.01|     16.99|          0.06|
|1.66|     10.34|          0.16|
| 3.5|     21.01|          0.17|
|3.31|     23.68|          0.14|
|3.61|     24.59|          0.15|
|4.71|     25.29|          0.19|
| 2.0|      8.77|          0.23|
|3.12|     26.88|          0.12|
|1.96|     15.04|          0.13|
|3.23|     14.78|          0.22|
|1.71|     10.27|          0.17|
| 5.0|     35.26|          0.14|
|1.57|     15.42|           0.1|
| 3.0|     18.43|          0.16|
|3.02|     14.83|           0.2|
|3.92|     21.58|          0.18|
|1.67|     10.33|          0.16|
|3.71|     16.29|          0.23|
| 3.5|     16.97|          0.21|
|3.35|     20.65|          0.16|
+----+----------+--------------+
only showing top 20 rows



In [13]:
# Calculate the average tip percentage for each combination of sex and smoker.
tips.groupBy(tips.sex, tips.smoker).agg(
    f.avg(tips.tip / tips.total_bill)
).show(3)

[Stage 14:>                                                       (0 + 10) / 10]

+------+------+-----------------------+
|   sex|smoker|avg((tip / total_bill))|
+------+------+-----------------------+
|  Male|    No|     0.1606687151291298|
|Female|    No|    0.15692097076918363|
|  Male|   Yes|     0.1527711752024851|
+------+------+-----------------------+
only showing top 3 rows



                                                                                

#### 4.) Use the seattle weather dataset referenced in the lesson to answer the questions below.

 - Convert the temperatures to fahrenheit.
 - Which month has the most rain, on average?
 - Which year was the windiest?
 - What is the most frequent type of weather in January?
 - What is the average high and low temperature on sunny days in July in 2013 and 2014?
 - What percentage of days were rainy in q3 of 2015?
 - For each year, find what percentage of days it rained (had non-zero precipitation).


In [16]:
from vega_datasets import data as vega_data
weather = spark.createDataFrame(vega_data('seattle_weather'))

In [18]:
# Convert the temperatures to fahrenheit.
weather = weather.withColumn(
    "temp_max", (f.col("temp_max") * 9 / 5 + 32)
).withColumn("temp_min", (f.col("temp_min") * 9 / 5 + 32))

In [19]:
# Which month has the most rain, on average?
weather.withColumn("month",f.month("date")
             ).groupBy("month").agg(
                                f.round(
                                    f.avg("precipitation"),2
                                        ).alias("Average_rainfall_per_month")
                                    ).sort('month').show()

+-----+--------------------------+
|month|Average_rainfall_per_month|
+-----+--------------------------+
|    1|                      3.76|
|    2|                      3.73|
|    3|                      4.89|
|    4|                      3.13|
|    5|                      1.67|
|    6|                      1.11|
|    7|                      0.39|
|    8|                      1.32|
|    9|                      1.96|
|   10|                      4.06|
|   11|                      5.35|
|   12|                      5.02|
+-----+--------------------------+



In [20]:
# Which year was the windiest?
weather.withColumn('year',f.year('date')).groupBy('year')\
                .agg(f.round(f.sum('wind'),2)\
                .alias('Total_wind'))\
                .sort(f.asc('year')).show()
                                              

+----+----------+
|year|Total_wind|
+----+----------+
|2012|    1244.7|
|2013|    1100.8|
|2014|    1236.5|
|2015|    1153.3|
+----+----------+



In [22]:
# What is the most frequent type of weather in January?
weather.withColumn('month',f.month('date')
              ).groupby('month','weather').agg(f.count('weather').alias('count')
).sort(f.asc('month'),f.desc('count')).show(1)

+-----+-------+-----+
|month|weather|count|
+-----+-------+-----+
|    1|    fog|   38|
+-----+-------+-----+
only showing top 1 row



In [26]:
# What is the average high and low temperature on sunny days in July in 2013 and 2014?
(
    weather.filter(f.month("date") == 7)
    .filter(f.year("date") > 2012)
    .filter(f.year("date") < 2015)
    .filter(f.col("weather") == f.lit("sun"))
    .agg(
        f.avg("temp_max").alias("average_high_temp"),
        f.avg("temp_min").alias("average_low_temp"),
    )
    .show()
)

+-----------------+-----------------+
|average_high_temp| average_low_temp|
+-----------------+-----------------+
|80.29192307692308|57.52884615384615|
+-----------------+-----------------+



In [34]:
# What percentage of days were rainy in q3 of 2015?
(
    weather.filter(f.year("date") == 2015)
    .filter(f.quarter("date") == 3)
    .select(f.when(f.col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(f.mean("rain"))
    .show()
)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



In [36]:
# For each year, find what percentage of days it rained (had non-zero precipitation).
(weather.withColumn('non_zero',
             (weather.precipitation > 0).cast('int'))
 .groupby(f.year('date'))
 .agg(f.avg('non_zero'))
).show()

+----------+-------------------+
|year(date)|      avg(non_zero)|
+----------+-------------------+
|      2012|0.48360655737704916|
|      2013|0.41643835616438357|
|      2014|  0.410958904109589|
|      2015|0.39452054794520547|
+----------+-------------------+

