In [55]:
# notebook dependencies
import pyspark
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import regexp_extract, regexp_replace

# note: the pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean, lit

# note: the following import, imports all pyspark sql functions similar to above
from pyspark.sql.functions import *

# creating the spark instance
spark = pyspark.sql.SparkSession.builder.getOrCreate()

# pandas, numpy, and matplotlib imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 

# pydatasets
from pydataset import data

# tqdm loading bar library
from tqdm.notebook import tqdm, trange
import time # to be used in loop iterations

# disabling warnings
# import warnings
# warnings.filterwarnings('ignore')

In [56]:
# Create a spark data frame that contains your favorite programming languages.

# The name of the column should be language
# View the schema of the dataframe
# Output the shape of the dataframe
# Show the first 5 records in the dataframe

In [57]:
# creating a pyspark dataframe

df = pd.DataFrame({ 
    "n": np.arange(20), 
    "languages": np.random.choice(["Python", "SQL", "JavaScript", "Julia", "Java"], 20)
})

df = spark.createDataFrame(df)
df.show()

+---+----------+
|  n| languages|
+---+----------+
|  0|    Python|
|  1|     Julia|
|  2|    Python|
|  3|      Java|
|  4|    Python|
|  5|       SQL|
|  6|     Julia|
|  7|JavaScript|
|  8|     Julia|
|  9|      Java|
| 10|     Julia|
| 11|    Python|
| 12|    Python|
| 13|     Julia|
| 14|       SQL|
| 15|    Python|
| 16|       SQL|
| 17|JavaScript|
| 18|       SQL|
| 19|     Julia|
+---+----------+



In [58]:
# printing the pyspark dataframe shape

print(f'pyspark df shape: {(df.count(), len(df.columns))}')

pyspark df shape: (20, 2)


In [59]:
# df schema

df.printSchema()

root
 |-- n: long (nullable = true)
 |-- languages: string (nullable = true)



In [60]:
# describing the pyspark df

df.describe().show()

+-------+-----------------+---------+
|summary|                n|languages|
+-------+-----------------+---------+
|  count|               20|       20|
|   mean|              9.5|     null|
| stddev|5.916079783099616|     null|
|    min|                0|     Java|
|    max|               19|      SQL|
+-------+-----------------+---------+



In [61]:
# showing the first 5 records in the dataframe

df.show(5)

+---+---------+
|  n|languages|
+---+---------+
|  0|   Python|
|  1|    Julia|
|  2|   Python|
|  3|     Java|
|  4|   Python|
+---+---------+
only showing top 5 rows



In [62]:
# exercise number 2: Load the mpg dataset as a spark dataframe

mpg = data("mpg")
mpg = spark.createDataFrame(mpg)

mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [63]:
# Create 1 column of output that contains a message like the one below:
# using a combination of pyspark functions (concat, expr, and lit?)

# using the "concatenate with separator method"

mpg = mpg.withColumn("concat_example", concat_ws(
    " ",
    lit("The"),
    mpg.year,
    mpg.model, 
    lit("has a"),
    mpg.cyl,
    lit("cylinder enginge."))
)

# printing the transformation/added column 
mpg.show(5, truncate = False) # checks out!

+------------+-----+-----+----+---+----------+---+---+---+---+-------+-------------------------------------+
|manufacturer|model|displ|year|cyl|trans     |drv|cty|hwy|fl |class  |concat_example                       |
+------------+-----+-----+----+---+----------+---+---+---+---+-------+-------------------------------------+
|audi        |a4   |1.8  |1999|4  |auto(l5)  |f  |18 |29 |p  |compact|The 1999 a4 has a 4 cylinder enginge.|
|audi        |a4   |1.8  |1999|4  |manual(m5)|f  |21 |29 |p  |compact|The 1999 a4 has a 4 cylinder enginge.|
|audi        |a4   |2.0  |2008|4  |manual(m6)|f  |20 |31 |p  |compact|The 2008 a4 has a 4 cylinder enginge.|
|audi        |a4   |2.0  |2008|4  |auto(av)  |f  |21 |30 |p  |compact|The 2008 a4 has a 4 cylinder enginge.|
|audi        |a4   |2.8  |1999|6  |auto(l5)  |f  |16 |26 |p  |compact|The 1999 a4 has a 6 cylinder enginge.|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+-------------------------------------+
only showing top 5 

In [64]:
#  2b. Transform the trans column so that it only contains either manual or auto.
# will use reg expression to accomplish this task

mpg.select("trans", regexp_extract("trans", r'[a-z:]{4,6}', 0).alias("trans_cleaned")).show()

+----------+-------------+
|     trans|trans_cleaned|
+----------+-------------+
|  auto(l5)|         auto|
|manual(m5)|       manual|
|manual(m6)|       manual|
|  auto(av)|         auto|
|  auto(l5)|         auto|
|manual(m5)|       manual|
|  auto(av)|         auto|
|manual(m5)|       manual|
|  auto(l5)|         auto|
|manual(m6)|       manual|
|  auto(s6)|         auto|
|  auto(l5)|         auto|
|manual(m5)|       manual|
|  auto(s6)|         auto|
|manual(m6)|       manual|
|  auto(l5)|         auto|
|  auto(s6)|         auto|
|  auto(s6)|         auto|
|  auto(l4)|         auto|
|  auto(l4)|         auto|
+----------+-------------+
only showing top 20 rows



In [65]:
# printing the transformation

mpg = mpg.withColumn("trans", regexp_extract("trans", r'[a-z:]{4,6}', 0))

mpg.show(5) # checks out!

+------------+-----+-----+----+---+------+---+---+---+---+-------+--------------------+
|manufacturer|model|displ|year|cyl| trans|drv|cty|hwy| fl|  class|      concat_example|
+------------+-----+-----+----+---+------+---+---+---+---+-------+--------------------+
|        audi|   a4|  1.8|1999|  4|  auto|  f| 18| 29|  p|compact|The 1999 a4 has a...|
|        audi|   a4|  1.8|1999|  4|manual|  f| 21| 29|  p|compact|The 1999 a4 has a...|
|        audi|   a4|  2.0|2008|  4|manual|  f| 20| 31|  p|compact|The 2008 a4 has a...|
|        audi|   a4|  2.0|2008|  4|  auto|  f| 21| 30|  p|compact|The 2008 a4 has a...|
|        audi|   a4|  2.8|1999|  6|  auto|  f| 16| 26|  p|compact|The 1999 a4 has a...|
+------------+-----+-----+----+---+------+---+---+---+---+-------+--------------------+
only showing top 5 rows



In [66]:
# exercise number 3: Load the tips dataset as a spark dataframe.

tips_df = data("tips")
tips_df.head()

Unnamed: 0,total_bill,tip,sex,smoker,day,time,size
1,16.99,1.01,Female,No,Sun,Dinner,2
2,10.34,1.66,Male,No,Sun,Dinner,3
3,21.01,3.5,Male,No,Sun,Dinner,3
4,23.68,3.31,Male,No,Sun,Dinner,2
5,24.59,3.61,Female,No,Sun,Dinner,4


In [67]:
# creating the pyspark dataframe

tips = spark.createDataFrame(tips_df)
tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



In [68]:
# schema print-out

tips.schema

StructType([StructField('total_bill', DoubleType(), True), StructField('tip', DoubleType(), True), StructField('sex', StringType(), True), StructField('smoker', StringType(), True), StructField('day', StringType(), True), StructField('time', StringType(), True), StructField('size', LongType(), True)])

In [69]:
# df describe 

tips.describe().show()

+-------+------------------+------------------+------+------+----+------+------------------+
|summary|        total_bill|               tip|   sex|smoker| day|  time|              size|
+-------+------------------+------------------+------+------+----+------+------------------+
|  count|               244|               244|   244|   244| 244|   244|               244|
|   mean|19.785942622950813|2.9982786885245907|  null|  null|null|  null| 2.569672131147541|
| stddev| 8.902411954856856| 1.383638189001182|  null|  null|null|  null|0.9510998047322344|
|    min|              3.07|               1.0|Female|    No| Fri|Dinner|                 1|
|    max|             50.81|              10.0|  Male|   Yes|Thur| Lunch|                 6|
+-------+------------------+------------------+------+------+----+------+------------------+



In [70]:
# df dtypes

tips.dtypes

[('total_bill', 'double'),
 ('tip', 'double'),
 ('sex', 'string'),
 ('smoker', 'string'),
 ('day', 'string'),
 ('time', 'string'),
 ('size', 'bigint')]

In [71]:
# summary stats for only numerical cols

num_cols = [item[0] for item in tips.dtypes if item[1].startswith('double') or item[1].startswith("big")]

# running the describe method on only* numerical cols
tips.describe(num_cols).show()

+-------+------------------+------------------+------------------+
|summary|        total_bill|               tip|              size|
+-------+------------------+------------------+------------------+
|  count|               244|               244|               244|
|   mean|19.785942622950813|2.9982786885245907| 2.569672131147541|
| stddev| 8.902411954856856| 1.383638189001182|0.9510998047322344|
|    min|              3.07|               1.0|                 1|
|    max|             50.81|              10.0|                 6|
+-------+------------------+------------------+------------------+



In [72]:
# creating a summary statistics df to work from

summary_stats = tips.describe(num_cols)
summary_stats.show()

+-------+------------------+------------------+------------------+
|summary|        total_bill|               tip|              size|
+-------+------------------+------------------+------------------+
|  count|               244|               244|               244|
|   mean|19.785942622950813|2.9982786885245907| 2.569672131147541|
| stddev| 8.902411954856856| 1.383638189001182|0.9510998047322344|
|    min|              3.07|               1.0|                 1|
|    max|             50.81|              10.0|                 6|
+-------+------------------+------------------+------------------+



In [79]:
# creating a "transpose" function to help manipulate the data and swap cols for rows

def TransposeDF(df, columns, pivotCol):
    '''"Transpose" function to help manipulate the data and swap cols for rows.
    Function takes in a dataframe, columns to tranpose, and specific column to pivot on (does not change).'''

    columnsValue = list(map(lambda x: str("'") + str(x) + str("',")  + str(x), columns))

    stackCols = ','.join(x for x in columnsValue)

    df_1 = df.selectExpr(pivotCol, "stack(" + str(len(columns)) + "," + stackCols + ")")\
           .select(pivotCol, "col0", "col1")

    final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))\
                 .withColumnRenamed("col0", pivotCol)

    return final_df

In [74]:
# let's see if the function works

summary_stats = TransposeDF(summary_stats, num_cols, "summary")
summary_stats.show() # checks out!

+----------+-----+-----+------------------+----+------------------+
|   summary|count|  max|              mean| min|            stddev|
+----------+-----+-----+------------------+----+------------------+
|total_bill|  244|50.81|19.785942622950813|3.07| 8.902411954856856|
|       tip|  244| 10.0|2.9982786885245907| 1.0| 1.383638189001182|
|      size|  244|    6| 2.569672131147541|   1|0.9510998047322344|
+----------+-----+-----+------------------+----+------------------+



In [75]:
# creating a range column 

summary_stats = summary_stats.withColumn("range", summary_stats["max"] - summary_stats["min"])
summary_stats.show() # checks out!

+----------+-----+-----+------------------+----+------------------+-----+
|   summary|count|  max|              mean| min|            stddev|range|
+----------+-----+-----+------------------+----+------------------+-----+
|total_bill|  244|50.81|19.785942622950813|3.07| 8.902411954856856|47.74|
|       tip|  244| 10.0|2.9982786885245907| 1.0| 1.383638189001182|  9.0|
|      size|  244|    6| 2.569672131147541|   1|0.9510998047322344|  5.0|
+----------+-----+-----+------------------+----+------------------+-----+



In [76]:
# What percentage of observations are smokers?

tips.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     18.43| 3.0|  Male|    No|Sun|Dinner|   4|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     21.58|3.92|  Male|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinne

In [77]:
# filter for smokers in tips dataframe

smoker_df = tips.filter(tips.smoker == "Yes")
smoker_count = smoker_df.count()
smoker_count

93

In [78]:
# print the percentage of smokers in tips df

print('Percentage of smokers in tips df: {:.2f}'.format(smoker_count / tips.count()))

Percentage of smokers in tips df: 0.38


In [80]:
# calculating tip percentage

tips.withColumn("tip_percentage", expr("tip / total_bill")).show(5)

+----------+----+------+------+---+------+----+-------------------+
|total_bill| tip|   sex|smoker|day|  time|size|     tip_percentage|
+----------+----+------+------+---+------+----+-------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|0.05944673337257211|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|0.16054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|0.16658733936220846|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 0.1397804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|0.14680764538430255|
+----------+----+------+------+---+------+----+-------------------+
only showing top 5 rows



In [82]:
# avg. tip percentage for ea. combination of male/female custoemrs

tips.withColumn("tip_percentage", col("tip") / col("total_bill"))\
    .groupby("sex")\
        .pivot("smoker")\
            .agg((round(mean("tip_percentage"), 2)))\
                .show() 

+------+----+----+
|   sex|  No| Yes|
+------+----+----+
|Female|0.16|0.18|
|  Male|0.16|0.15|
+------+----+----+



In [84]:
# exercise number 4: weather datasets

import vega_datasets
weather = spark.createDataFrame(vega_datasets.data.seattle_weather())

In [86]:
# showing the dataframe head
weather.show(5)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|     8.9|     2.8| 6.1|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [90]:
# convert celcius to fahrenheit using an equation

weather = weather.withColumn(
    "temp_max", round((col("temp_max") * (9/5) + 32), 2))\
        .withColumn("temp_min", round(col("temp_min") * (9/5) + 32, 2))

weather.show(5)

+-------------------+-------------+--------+--------+----+-------+
|               date|precipitation|temp_max|temp_min|wind|weather|
+-------------------+-------------+--------+--------+----+-------+
|2012-01-01 00:00:00|          0.0|  131.07|   105.8| 4.7|drizzle|
|2012-01-02 00:00:00|         10.9|  123.94|   98.67| 4.5|   rain|
|2012-01-03 00:00:00|          0.8|  127.51|  112.93| 2.3|   rain|
|2012-01-04 00:00:00|         20.3|  129.13|  107.74| 4.7|   rain|
|2012-01-05 00:00:00|          1.3|  118.44|   98.67| 6.1|   rain|
+-------------------+-------------+--------+--------+----+-------+
only showing top 5 rows



In [92]:
# which year was the windiest?

weather.withColumn("year", year("date"))\
    .groupby("year")\
        .agg(sum("wind")).show()

+----+------------------+
|year|         sum(wind)|
+----+------------------+
|2012|            1244.7|
|2013|1100.8000000000006|
|2014|1236.5000000000007|
|2015|1153.3000000000002|
+----+------------------+



22/09/12 23:51:49 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 7199034 ms exceeds timeout 120000 ms
22/09/12 23:51:49 WARN SparkContext: Killing executors is not supported by current scheduler.
