- environment setup

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

# Creating Dataframes

In [2]:
# create pandas dataframe
import pandas as pd
import numpy as np

np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)
pandas_dataframe

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c
5,5,c
6,6,a
7,7,b
8,8,a
9,9,b


In [3]:
# spark can convert any pandas dataframe into a spark dataframe
df = spark.createDataFrame(pandas_dataframe)
df

DataFrame[n: bigint, group: string]

In [4]:
# spark is 'lazy', in that it will not display data until explicitly told to do so
df.show(5)

+---+-----+
|  n|group|
+---+-----+
|  0|    b|
|  1|    b|
|  2|    c|
|  3|    a|
|  4|    c|
+---+-----+
only showing top 5 rows



In [5]:
# .describe will give you info on the column dtypes
df.describe()

DataFrame[summary: string, n: string, group: string]

In [6]:
# once again you must summon the data with .show() if you wish to see it though
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



In [7]:
from pydataset import data

# make spark dataframe from mpg data
mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [8]:
# this is an invalid way to select a column, unlike pandas
mpg.hwy

Column<'hwy'>

In [9]:
# .select is needed to call on any specific column
mpg.select(mpg.hwy, mpg.cty, mpg.model)

DataFrame[hwy: bigint, cty: bigint, model: string]

In [10]:
# data still won't appear until you use .show()
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(10)

+---+---+----------+
|hwy|cty|     model|
+---+---+----------+
| 29| 18|        a4|
| 29| 21|        a4|
| 31| 20|        a4|
| 30| 21|        a4|
| 26| 16|        a4|
| 26| 18|        a4|
| 27| 18|        a4|
| 26| 18|a4 quattro|
| 25| 16|a4 quattro|
| 28| 20|a4 quattro|
+---+---+----------+
only showing top 10 rows



In [11]:
# can do simple arithmetic
mpg.hwy + 1

Column<'(hwy + 1)'>

In [12]:
# this will return a new column with the the number 1 added to the number in the hwy column
mpg.select(mpg.hwy, mpg.hwy + 1).show(5)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
| 31|       32|
| 30|       31|
| 26|       27|
+---+---------+
only showing top 5 rows



In [13]:
# .alias will allow you to rename a column in a spark dataframe
mpg.select(mpg.hwy.alias("highway_mileage")).show(5)

+---------------+
|highway_mileage|
+---------------+
|             29|
|             29|
|             31|
|             30|
|             26|
+---------------+
only showing top 5 rows



In [14]:
# columns can also be stored as variables to be called on later
col1 = mpg.hwy.alias("highway_mileage")
col2 = (mpg.hwy / 2).alias("highway_mileage_halved")
mpg.select(col1, col2).show(5)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
|             29|                  14.5|
|             31|                  15.5|
|             30|                  15.0|
|             26|                  13.0|
+---------------+----------------------+
only showing top 5 rows



# Other ways to create columns

## col

In [15]:
from pyspark.sql.functions import col, expr

In [16]:
# col(column) can be used to summon a specific column as well
col("hwy")

Column<'hwy'>

In [17]:
# make a new feature by perfrom algebra on 2 or more columns, save to variable
avg_column = (col("hwy") + col("cty")) / 2

# use variable as a feature in new dataframe
mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_column.alias("avg_mileage"),
).show(5)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
|             31|          20|       25.5|
|             30|          21|       25.5|
|             26|          16|       21.0|
+---------------+------------+-----------+
only showing top 5 rows



## expr

In [18]:
# expr(column) is essentially the same as col(), however it allows for manipulation of the data
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression
    expr("hwy AS highway_mileage"),  # using an alias
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above
).show(5)

+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



In [19]:
# all methods are valid for returning the same data
mpg.select(
    mpg.hwy.alias("highway"),
    col("hwy").alias("highway"),
    expr("hwy").alias("highway"),
    expr("hwy AS highway"),
).show(5)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
|     30|     30|     30|     30|
|     26|     26|     26|     26|
+-------+-------+-------+-------+
only showing top 5 rows



# Spark SQL

In [20]:
mpg.createOrReplaceTempView("mpg")

In [21]:
# .sql('query') will allow you to use syntactically correct sql queries on the data in your df
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
)

DataFrame[hwy: bigint, cty: bigint, avg: double]

In [22]:
# we must still use the .show() command
spark.sql(
    """
SELECT hwy, cty, (hwy + cty) / 2 AS avg
FROM mpg
"""
).show(5)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
| 31| 20|25.5|
| 30| 21|25.5|
| 26| 16|21.0|
+---+---+----+
only showing top 5 rows



# Type Casting

In [23]:
# show column dtypes
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [24]:
# will also display column dtypes
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [25]:
# .cast('string') will convert the dtype of the column to a string
mpg.select(mpg.hwy.cast("string")).printSchema()

root
 |-- hwy: string (nullable = true)



In [26]:
# you can convert to any kind of dtype

# however if the dtypes are incompatible, it will return the data as null
mpg.select(mpg.model, mpg.model.cast("int")).show(5)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 5 rows



# Basic Built-in Functions

In [27]:
# Note: The pyspark avg and mean functions are aliases of eachother
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean

In [28]:
# from pyspark.sql.functions import * can be used to import all functions

In [29]:
# functions can be used to manipulate data as well
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

+--------------------------------------+-----------------+--------+--------+
|(sum(hwy) / count(hwy) AS `average_1`)|        average_2|min(hwy)|max(hwy)|
+--------------------------------------+-----------------+--------+--------+
|                     23.44017094017094|23.44017094017094|      12|      44|
+--------------------------------------+-----------------+--------+--------+



In [30]:
# concat() can be used to combine 2 or more columns into 1
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)

+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



In [31]:
from pyspark.sql.functions import lit

In [32]:
# lit('string') will allow you to add a custom string to the concat 
mpg.select(concat(mpg.cyl, lit(" cylinders"))).show(5)

+-----------------------+
|concat(cyl,  cylinders)|
+-----------------------+
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            6 cylinders|
+-----------------------+
only showing top 5 rows



# More pyspark Functions for String Manipulation

In [33]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [34]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



In [35]:
# regexp_extract('column', regex_expression) allows us to use regex to look for specific information to extract
textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



In [36]:
# regexp_replace('column', regex, 'repacement') allows us to make substitutions

# in this case everything before the first comma is replaced by an empty space
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+



# .filter and .where

In [37]:
# .filter and .where both return a df where only the specified conditions are met
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

# When and Otherwise

In [38]:
from pyspark.sql.functions import when

In [39]:
# when(column = condtion, 'result') will make a new feature based off of the condition
mpg.select(mpg.hwy, when(mpg.hwy > 25, "good_mileage").alias("mpg_desc")).show(
    12
)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25|        null|
| 28|good_mileage|
| 27|good_mileage|
| 25|        null|
+---+------------+
only showing top 12 rows



In [40]:
# if the condition is not met in a when statement the observation will return null

# otherwise('other_result') can specify a value if the condition is not met
mpg.select(
    mpg.hwy,
    when(mpg.hwy > 25, "good_mileage")
    .otherwise("bad_mileage")
    .alias("mpg_desc"),
).show(12)

+---+------------+
|hwy|    mpg_desc|
+---+------------+
| 29|good_mileage|
| 29|good_mileage|
| 31|good_mileage|
| 30|good_mileage|
| 26|good_mileage|
| 26|good_mileage|
| 27|good_mileage|
| 26|good_mileage|
| 25| bad_mileage|
| 28|good_mileage|
| 27|good_mileage|
| 25| bad_mileage|
+---+------------+
only showing top 12 rows



In [41]:
# can do multiple when() statements for multiple values
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



# Sorting and Ordering

In [42]:
# sort(column) will sort the dataframe by the values in specified column
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|  land rover|        range rover|  4.0|1999|  8|  auto(l4)|  4| 11| 15|  p|   suv|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

In [43]:
# by default all sort() functions are in ascending order, if you want descending you must import it
from pyspark.sql.functions import asc, desc

In [44]:
## .sort(column.desc()) changes the sort to descending
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# is the same as
mpg.sort(desc("hwy")).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [45]:
# you can sort by multiple columns

## the column called first will be sorted first, and each column will be sorted in the order they are called
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|manufacturer|             model|displ|year|cyl|     trans|drv|cty|hwy| fl|class|
+------------+------------------+-----+----+---+----------+---+---+---+---+-----+
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 20| 27|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 20| 26|  r|  suv|
|      subaru|      forester awd|  2.5|1999|  4|manual(m5)|  4| 18| 25|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|manual(m5)|  4| 19| 25|  p|  suv|
|      subaru|      forester awd|  2.5|1999|  4|  auto(l4)|  4| 18| 24|  r|  suv|
|      subaru|      forester awd|  2.5|2008|  4|  auto(l4)|  4| 18| 23|  p|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|manual(m5)|  4| 15| 20|  r|  suv|
|      toyota|       4runner 4wd|  2.7|1999|  4|  auto(l4)|  4| 16| 20|  r|  suv|
|        jeep|grand cherokee 4wd|  3.0|2008|  6|  auto(l5)|  4| 17| 22|  d|  suv|
|        jeep|gr

# Grouping and Aggregating

In [46]:
# .groupby(column) allows you to view the data by making the values of that feature your observations
mpg.groupBy(mpg.cyl)

<pyspark.sql.group.GroupedData at 0x7f98e97a70a0>

In [47]:
# groupby requires aggregate functions to work
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+------------------+-----------------+
|cyl|          avg(cty)|         avg(hwy)|
+---+------------------+-----------------+
|  6| 16.21518987341772|22.82278481012658|
|  5|              20.5|            28.75|
|  8|12.571428571428571|17.62857142857143|
|  4|21.012345679012345|28.80246913580247|
+---+------------------+-----------------+



In [48]:
# you can groupby multiple features if you wish
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

+---+----------+------------------+------------------+
|cyl|     class|          avg(cty)|          avg(hwy)|
+---+----------+------------------+------------------+
|  5|   compact|              21.0|              29.0|
|  5|subcompact|              20.0|              28.5|
|  6|subcompact|              17.0|24.714285714285715|
|  6|    pickup|              14.5|              17.9|
|  4|subcompact|22.857142857142858| 30.80952380952381|
|  8|       suv|12.131578947368421|16.789473684210527|
|  8|    pickup|              11.8|              15.8|
|  8|   midsize|              16.0|              24.0|
|  4|   midsize|              20.5|           29.1875|
|  8|   2seater|              15.4|              24.8|
|  6|   compact|16.923076923076923|25.307692307692307|
|  6|   minivan|              15.6|              22.2|
|  4|   compact|            21.375|          29.46875|
|  8|subcompact|              14.8|              21.6|
|  6|   midsize|17.782608695652176| 26.26086956521739|
|  4|   mi

In [49]:
# .rollup() will do the same aggregations as groupby, but will also do total
mpg.rollup("cyl").count().sort("cyl").show()

+----+-----+
| cyl|count|
+----+-----+
|null|  234|
|   4|   81|
|   5|    4|
|   6|   79|
|   8|   70|
+----+-----+



In [50]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

+----+-----------------+
| cyl|         avg(hwy)|
+----+-----------------+
|null|23.44017094017094|
|   4|28.80246913580247|
|   5|            28.75|
|   6|22.82278481012658|
|   8|17.62857142857143|
+----+-----------------+



In [51]:
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()

+----+----------+------------------+
| cyl|     class|          avg(hwy)|
+----+----------+------------------+
|null|      null| 23.44017094017094|
|   4|      null| 28.80246913580247|
|   4|   compact|          29.46875|
|   4|   midsize|           29.1875|
|   4|   minivan|              24.0|
|   4|    pickup|20.666666666666668|
|   4|subcompact| 30.80952380952381|
|   4|       suv|             23.75|
|   5|      null|             28.75|
|   5|   compact|              29.0|
|   5|subcompact|              28.5|
|   6|      null| 22.82278481012658|
|   6|   compact|25.307692307692307|
|   6|   midsize| 26.26086956521739|
|   6|   minivan|              22.2|
|   6|    pickup|              17.9|
|   6|subcompact|24.714285714285715|
|   6|       suv|              18.5|
|   8|      null| 17.62857142857143|
|   8|   2seater|              24.8|
+----+----------+------------------+
only showing top 20 rows



# Crosstabs and Pivot Tables

In [52]:
# .crosstab(col1, col2) will make a crosstab table of 2 columns
mpg.crosstab("class", "cyl").show()

+----------+---+---+---+---+
| class_cyl|  4|  5|  6|  8|
+----------+---+---+---+---+
|   midsize| 16|  0| 23|  2|
|subcompact| 21|  2|  7|  5|
|   2seater|  0|  0|  0|  5|
|    pickup|  3|  0| 10| 20|
|   minivan|  1|  0| 10|  0|
|       suv|  8|  0| 16| 38|
|   compact| 32|  2| 13|  0|
+----------+---+---+---+---+



In [53]:
# crosstab simply does counting

# pivot(col2).aggregate allows you to change the data to the value of an aggregate functions

## in this case the table will be class and cyl, and the data will be the mean hwy value for each combination
mpg.groupby("class").pivot("cyl").mean("hwy").show()

+----------+------------------+----+------------------+------------------+
|     class|                 4|   5|                 6|                 8|
+----------+------------------+----+------------------+------------------+
|subcompact| 30.80952380952381|28.5|24.714285714285715|              21.6|
|   compact|          29.46875|29.0|25.307692307692307|              null|
|   minivan|              24.0|null|              22.2|              null|
|       suv|             23.75|null|              18.5|16.789473684210527|
|   midsize|           29.1875|null| 26.26086956521739|              24.0|
|    pickup|20.666666666666668|null|              17.9|              15.8|
|   2seater|              null|null|              null|              24.8|
+----------+------------------+----+------------------+------------------+



# Handling Missing Data

In [54]:
# create df that has missing values
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
|NaN|NaN|
+---+---+



In [55]:
# .na.drop() will remove all observations with missing data
df.na.drop().show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



In [56]:
# .na.fill(value) will replace all missing data with the specificed value
df.na.fill(0).show()

+---+---+
|  x|  y|
+---+---+
|1.0|0.0|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|0.0|
+---+---+



In [57]:
# subset = 'column' will ensure that only data in the specfied column is replaced if it's null
df.na.fill(0, subset="x").show()

+---+---+
|  x|  y|
+---+---+
|1.0|NaN|
|2.0|0.0|
|0.0|0.0|
|4.0|3.0|
|5.0|1.0|
|0.0|NaN|
+---+---+



In [58]:
df.na.drop(subset="y").show()

+---+---+
|  x|  y|
+---+---+
|2.0|0.0|
|NaN|0.0|
|4.0|3.0|
|5.0|1.0|
+---+---+



# Explaining DataFrame Transformations

In [59]:
# .explain() will demonstrate what spark is thinking about the dataframe
mpg.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [60]:
mpg.select(mpg.cyl, mpg.hwy).explain()

== Physical Plan ==
*(1) Project [cyl#270L, hwy#274L]
+- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [61]:
mpg.select(((mpg.cyl + mpg.hwy) / 2).alias("avg_mpg")).explain()

== Physical Plan ==
*(1) Project [(cast((cyl#270L + hwy#274L) as double) / 2.0) AS avg_mpg#1619]
+- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [62]:
mpg.filter(mpg.cyl == 6).explain()

== Physical Plan ==
*(1) Filter (isnotnull(cyl#270L) AND (cyl#270L = 6))
+- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [63]:
mpg.select("cyl", "hwy").filter(expr("cyl = 6")).explain()
mpg.filter(expr("cyl = 6")).select("cyl", "hwy").explain()

== Physical Plan ==
*(1) Project [cyl#270L, hwy#274L]
+- *(1) Filter (isnotnull(cyl#270L) AND (cyl#270L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]


== Physical Plan ==
*(1) Project [cyl#270L, hwy#274L]
+- *(1) Filter (isnotnull(cyl#270L) AND (cyl#270L = 6))
   +- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [64]:
mpg.selectExpr("cyl + 3 * 16 / 4 + 19 AS unused", "hwy").select(
    "hwy"
).explain()

== Physical Plan ==
*(1) Project [hwy#274L]
+- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [65]:
mpg.select(min(mpg.cyl)).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[min(cyl#270L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#737]
   +- *(1) HashAggregate(keys=[], functions=[partial_min(cyl#270L)])
      +- *(1) Project [cyl#270L]
         +- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [66]:
mpg.groupby(mpg.cyl).agg(min(mpg.hwy), max(mpg.hwy)).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[cyl#270L], functions=[min(hwy#274L), max(hwy#274L)])
+- Exchange hashpartitioning(cyl#270L, 200), ENSURE_REQUIREMENTS, [id=#758]
   +- *(1) HashAggregate(keys=[cyl#270L], functions=[partial_min(hwy#274L), partial_max(hwy#274L)])
      +- *(1) Project [cyl#270L, hwy#274L]
         +- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




In [67]:
(
    mpg.select(col("cyl"), expr("(cty + hwy) / 2 AS avg_mpg"))
    .filter(expr('class == "compact"'))
    .groupby("cyl")
    .agg(min("avg_mpg"), avg("avg_mpg"), max("avg_mpg"))
    .explain()
)

== Physical Plan ==
*(2) HashAggregate(keys=[cyl#270L], functions=[min(avg_mpg#1656), avg(avg_mpg#1656), max(avg_mpg#1656)])
+- Exchange hashpartitioning(cyl#270L, 200), ENSURE_REQUIREMENTS, [id=#783]
   +- *(1) HashAggregate(keys=[cyl#270L], functions=[partial_min(avg_mpg#1656), partial_avg(avg_mpg#1656), partial_max(avg_mpg#1656)])
      +- *(1) Project [cyl#270L, (cast((cty#273L + hwy#274L) as double) / 2.0) AS avg_mpg#1656]
         +- *(1) Filter (isnotnull(class#276) AND (class#276 = compact))
            +- *(1) Scan ExistingRDD[manufacturer#266,model#267,displ#268,year#269L,cyl#270L,trans#271,drv#272,cty#273L,hwy#274L,fl#275,class#276]




# More Dataframe Manipulation Examples

In [68]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

+----------+-------------+--------+--------+----+-------+
|      date|precipitation|temp_max|temp_min|wind|weather|
+----------+-------------+--------+--------+----+-------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|
+----------+-------------+--------+--------+----+-------+
only showing top 6 rows



In [69]:
# unlike pandas there is no .shape command, you have to print a count of rows and columns
print(weather.count(), "rows", len(weather.columns), "columns")

1461 rows 6 columns


In [70]:
# find the date range of the first and last date in the dataframe
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date

('2012-01-01', '2015-12-31')

In [71]:
weather = weather.withColumn(
    "temp_avg", expr("ROUND(temp_min + temp_max) / 2")
)

In [72]:
weather.show(6)

+----------+-------------+--------+--------+----+-------+--------+
|      date|precipitation|temp_max|temp_min|wind|weather|temp_avg|
+----------+-------------+--------+--------+----+-------+--------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|     9.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|     6.5|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|     9.5|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|     9.0|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|     6.0|
|2012-01-06|          2.5|     4.4|     2.2| 2.2|   rain|     3.5|
+----------+-------------+--------+--------+----+-------+--------+
only showing top 6 rows



In [73]:
from pyspark.sql.functions import month, year, quarter

In [74]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

+-----+------------------+
|month|    total_rainfall|
+-----+------------------+
|    1|465.99999999999994|
|    2|             422.0|
|    3|             606.2|
|    4|             375.4|
|    5|             207.5|
|    6|             132.9|
|    7|              48.2|
|    8|             163.7|
|    9|235.49999999999997|
|   10|             503.4|
|   11|             642.5|
|   12|             622.7|
+-----+------------------+



In [75]:
(
    weather.filter(month("date") == 12)
    .filter(year("date") == 2013)
    .groupBy("weather")
    .agg(mean("temp_avg"))
    .show()
)

+-------+-----------------+
|weather|    avg(temp_avg)|
+-------+-----------------+
|    fog|7.555555555555555|
|    sun|2.977272727272727|
+-------+-----------------+



In [76]:
(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    .withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    .sort("month")
    .show()
)

+-----+------------------------------+
|month|no_of_days_with_freezing_temps|
+-----+------------------------------+
|    1|                             3|
|    2|                             0|
|    3|                             0|
|    4|                             0|
|    5|                             0|
|    6|                             0|
|    7|                             0|
|    8|                             0|
|    9|                             0|
|   10|                             0|
|   11|                             0|
|   12|                             5|
+-----+------------------------------+



In [77]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("year", "quarter")
    .agg(mean("temp_avg").alias("temp_avg"))
    .sort("year", "quarter")
    .show()
)

+----+-------+------------------+
|year|quarter|          temp_avg|
+----+-------+------------------+
|2012|      1| 5.587912087912088|
|2012|      2|12.675824175824175|
|2012|      3|            18.375|
|2012|      4| 8.581521739130435|
|2013|      1| 6.405555555555556|
|2013|      2|14.505494505494505|
|2013|      3| 19.47826086956522|
|2013|      4| 8.032608695652174|
|2014|      1| 7.205555555555556|
|2014|      2|14.296703296703297|
|2014|      3|19.858695652173914|
|2014|      4|  9.88586956521739|
|2015|      1| 8.972222222222221|
|2015|      2|15.258241758241759|
|2015|      3|19.407608695652176|
|2015|      4| 8.956521739130435|
+----+-------+------------------+



In [78]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("quarter")
    .pivot("year")
    .agg(expr("ROUND(MEAN(temp_avg), 2) AS temp_avg"))
    .sort("quarter")
    .show()
)

+-------+-----+-----+-----+-----+
|quarter| 2012| 2013| 2014| 2015|
+-------+-----+-----+-----+-----+
|      1| 5.59| 6.41| 7.21| 8.97|
|      2|12.68|14.51| 14.3|15.26|
|      3|18.38|19.48|19.86|19.41|
|      4| 8.58| 8.03| 9.89| 8.96|
+-------+-----+-----+-----+-----+



# Joins

In [79]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

--- users ---
+---+-----+-------+
| id| name|role_id|
+---+-----+-------+
|  1|  bob|    1.0|
|  2|  joe|    2.0|
|  3|sally|    3.0|
|  4| adam|    3.0|
|  5| jane|    NaN|
|  6| mike|    NaN|
+---+-----+-------+

--- roles ---
+---+---------+
| id|     name|
+---+---------+
|  1|    admin|
|  2|   author|
|  3| reviewer|
|  4|commenter|
+---+---------+



In [80]:
# join tables by specifying which column on each table is the same
users.join(roles, on=users.role_id == roles.id).show()

+---+-----+-------+---+--------+
| id| name|role_id| id|    name|
+---+-----+-------+---+--------+
|  1|  bob|    1.0|  1|   admin|
|  3|sally|    3.0|  3|reviewer|
|  4| adam|    3.0|  3|reviewer|
|  2|  joe|    2.0|  2|  author|
+---+-----+-------+---+--------+



In [81]:
# by default spark will perform an inner join unless specified otherwise
users.join(roles, on=users.role_id == roles.id, how="left").show()

+---+-----+-------+----+--------+
| id| name|role_id|  id|    name|
+---+-----+-------+----+--------+
|  5| jane|    NaN|null|    null|
|  6| mike|    NaN|null|    null|
|  1|  bob|    1.0|   1|   admin|
|  3|sally|    3.0|   3|reviewer|
|  4| adam|    3.0|   3|reviewer|
|  2|  joe|    2.0|   2|  author|
+---+-----+-------+----+--------+



In [82]:
users.join(roles, on=users.role_id == roles.id, how="right").show()

+----+-----+-------+---+---------+
|  id| name|role_id| id|     name|
+----+-----+-------+---+---------+
|   1|  bob|    1.0|  1|    admin|
|null| null|   null|  4|commenter|
|   3|sally|    3.0|  3| reviewer|
|   4| adam|    3.0|  3| reviewer|
|   2|  joe|    2.0|  2|   author|
+----+-----+-------+---+---------+



# Visualization (or Lack Therof)

- there is no visualizations for spark

- if you wish to make visualizations you must convert the dataframe to pandas using .toPandas and then make pandas visualizations 

# Exercises

## 1. Create a spark data frame that contains your favorite programming languages.

In [83]:
languages = {'languages': ['python', 'java', 'html', 'javascript', 'typescript', 'c#', 'c++', 'c', 'sql']}

- The name of the column should be language

In [84]:
df = spark.createDataFrame(pd.DataFrame(languages))

In [85]:
df.show()

+----------+
| languages|
+----------+
|    python|
|      java|
|      html|
|javascript|
|typescript|
|        c#|
|       c++|
|         c|
|       sql|
+----------+



- View the schema of the dataframe

In [86]:
df.schema

StructType(List(StructField(languages,StringType,true)))

- Output the shape of the dataframe

In [87]:
print((df.count(), len(df.columns)))

(9, 1)


- Show the first 5 records in the dataframe

In [88]:
df.show(5)

+----------+
| languages|
+----------+
|    python|
|      java|
|      html|
|javascript|
|typescript|
+----------+
only showing top 5 rows



## 2. Load the mpg dataset as a spark dataframe.

In [89]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



- A. Create 1 column of output that contains a message like the one below:
    
    'The 1999 audi a4 has a 4 cylinder engine.'

In [140]:
mpg.select(concat(lit('The '), mpg.year, lit(' '), mpg.manufacturer, lit(' has a '), mpg.cyl, lit(' cylinder engine')).alias('message')).show(truncate=False)

+------------------------------------------+
|message                                   |
+------------------------------------------+
|The 1999 audi has a 4 cylinder engine     |
|The 1999 audi has a 4 cylinder engine     |
|The 2008 audi has a 4 cylinder engine     |
|The 2008 audi has a 4 cylinder engine     |
|The 1999 audi has a 6 cylinder engine     |
|The 1999 audi has a 6 cylinder engine     |
|The 2008 audi has a 6 cylinder engine     |
|The 1999 audi has a 4 cylinder engine     |
|The 1999 audi has a 4 cylinder engine     |
|The 2008 audi has a 4 cylinder engine     |
|The 2008 audi has a 4 cylinder engine     |
|The 1999 audi has a 6 cylinder engine     |
|The 1999 audi has a 6 cylinder engine     |
|The 2008 audi has a 6 cylinder engine     |
|The 2008 audi has a 6 cylinder engine     |
|The 1999 audi has a 6 cylinder engine     |
|The 2008 audi has a 6 cylinder engine     |
|The 2008 audi has a 8 cylinder engine     |
|The 2008 chevrolet has a 8 cylinder engine|
|The 2008 

- B. Transform the trans column so that it only contains either manual or auto.

In [91]:
mpg.show(5)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
|        audi|   a4|  2.0|2008|  4|manual(m6)|  f| 20| 31|  p|compact|
|        audi|   a4|  2.0|2008|  4|  auto(av)|  f| 21| 30|  p|compact|
|        audi|   a4|  2.8|1999|  6|  auto(l5)|  f| 16| 26|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 5 rows



In [92]:
mpg.select(mpg.trans,
          when((mpg.trans.contains('auto')), 'auto')
          .otherwise('manual')
          .alias('trans')).show(20)

+----------+------+
|     trans| trans|
+----------+------+
|  auto(l5)|  auto|
|manual(m5)|manual|
|manual(m6)|manual|
|  auto(av)|  auto|
|  auto(l5)|  auto|
|manual(m5)|manual|
|  auto(av)|  auto|
|manual(m5)|manual|
|  auto(l5)|  auto|
|manual(m6)|manual|
|  auto(s6)|  auto|
|  auto(l5)|  auto|
|manual(m5)|manual|
|  auto(s6)|  auto|
|manual(m6)|manual|
|  auto(l5)|  auto|
|  auto(s6)|  auto|
|  auto(s6)|  auto|
|  auto(l4)|  auto|
|  auto(l4)|  auto|
+----------+------+
only showing top 20 rows



## 3. Load the tips dataset as a spark dataframe.

In [93]:
from pydataset import data

In [94]:
tips = spark.createDataFrame(data("tips"))

tips.show(5)

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|
+----------+----+------+------+---+------+----+
only showing top 5 rows



- A. What percentage of observations are smokers?

In [95]:
tips.groupby(tips.smoker).count().show()

+------+-----+
|smoker|count|
+------+-----+
|    No|  151|
|   Yes|   93|
+------+-----+



- B. Create a column that contains the tip percentage

In [96]:
tips2 = tips.select(tips.smoker, tips.sex, tips.total_bill, tips.tip, (tips.tip / tips.total_bill).alias('tip_percent'))

In [139]:
tips2.show(5)

+------+------+----------+----+-------------------+
|smoker|   sex|total_bill| tip|        tip_percent|
+------+------+----------+----+-------------------+
|    No|Female|     16.99|1.01|0.05944673337257211|
|    No|  Male|     10.34|1.66|0.16054158607350097|
|    No|  Male|     21.01| 3.5|0.16658733936220846|
|    No|  Male|     23.68|3.31| 0.1397804054054054|
|    No|Female|     24.59|3.61|0.14680764538430255|
+------+------+----------+----+-------------------+
only showing top 5 rows



- C. Calculate the average tip percentage for each combination of sex and smoker.

In [97]:
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean

In [98]:
tips2.groupby('sex').pivot('smoker').mean('tip_percent').show()

+------+------------------+-------------------+
|   sex|                No|                Yes|
+------+------------------+-------------------+
|Female|0.1569209707691836|0.18215035269941035|
|  Male|0.1606687151291298| 0.1527711752024851|
+------+------------------+-------------------+



## 4. Use the seattle weather dataset referenced in the lesson to answer the questions below.

In [99]:
weather.show(5)

+----------+-------------+--------+--------+----+-------+--------+
|      date|precipitation|temp_max|temp_min|wind|weather|temp_avg|
+----------+-------------+--------+--------+----+-------+--------+
|2012-01-01|          0.0|    12.8|     5.0| 4.7|drizzle|     9.0|
|2012-01-02|         10.9|    10.6|     2.8| 4.5|   rain|     6.5|
|2012-01-03|          0.8|    11.7|     7.2| 2.3|   rain|     9.5|
|2012-01-04|         20.3|    12.2|     5.6| 4.7|   rain|     9.0|
|2012-01-05|          1.3|     8.9|     2.8| 6.1|   rain|     6.0|
+----------+-------------+--------+--------+----+-------+--------+
only showing top 5 rows



- Convert the temperatures to farenheight.

In [100]:
weather.select(((weather.temp_avg * (9/5)) + 32).alias('temp_f')).show(5)

+------+
|temp_f|
+------+
|  48.2|
|  43.7|
|  49.1|
|  48.2|
|  42.8|
+------+
only showing top 5 rows



- Which month has the most rain, on average?

In [101]:
from pyspark.sql.functions import asc, desc

In [102]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(mean("precipitation").alias("avg_rainfall"))
    .sort(desc("avg_rainfall"))
    .show(1)
)

+-----+-----------------+
|month|     avg_rainfall|
+-----+-----------------+
|   11|5.354166666666667|
+-----+-----------------+
only showing top 1 row



- Which year was the windiest?

In [103]:
(
    weather.withColumn("year", year("date"))
    .groupBy("year")
    .agg(mean("wind").alias("avg_wind"))
    .sort(desc("avg_wind"))
    .show(1)
)

+----+-----------------+
|year|         avg_wind|
+----+-----------------+
|2012|3.400819672131147|
+----+-----------------+
only showing top 1 row



- What is the most frequent type of weather in January?

In [104]:
(
    weather.filter(month("date") == 1).
    groupby('weather').count().sort(desc('count'))
    .show(1)
)

+-------+-----+
|weather|count|
+-------+-----+
|    fog|   38|
+-------+-----+
only showing top 1 row



- What is the average high and low temperature on sunny days in July in 2013 and 2014?

In [111]:
(
    weather.filter(month("date") == 7)
    .filter((year("date") == 2013) | (year("date") == 2014))
    .filter(weather.weather == 'sun')
    .agg(mean("temp_max"), mean("temp_min"))
    .show()
)

+------------------+-----------------+
|     avg(temp_max)|    avg(temp_min)|
+------------------+-----------------+
|26.828846153846158|14.18269230769231|
+------------------+-----------------+



- What percentage of days were rainy in q3 of 2015?

In [125]:
(
    
    weather.filter(quarter('date') == 3).filter(year('date') == 2015)
    .select(when(col("weather") == "rain", 1).otherwise(0).alias("rain"))
    .agg(mean("rain")).show()

)

+--------------------+
|           avg(rain)|
+--------------------+
|0.021739130434782608|
+--------------------+



- For each year, find what percentage of days it rained (had non-zero precipitation).

In [137]:
weather.where(weather.precipitation > 0).groupby(year('date')).count().show()

+----------+-----+
|year(date)|count|
+----------+-----+
|      2015|  144|
|      2013|  152|
|      2014|  150|
|      2012|  177|
+----------+-----+

