# API Review

## Create the spark session

In [1]:
import pyspark

spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Create Dataframes

In [2]:
import pandas as pd
import numpy as np

np.random.seed(123)

pd_df = pd.DataFrame(dict(n=np.arange(20), group=np.random.choice(list("abc"), 20)))

pd_df

Unnamed: 0,n,group
0,0,c
1,1,b
2,2,c
3,3,c
4,4,a
5,5,c
6,6,c
7,7,b
8,8,c
9,9,b


Convert to a spark dataframe

In [3]:
df = spark.createDataFrame(pd_df)

#must run .show() to see the spark dataframe 
df.show(2)

+---+-----+
|  n|group|
+---+-----+
|  0|    c|
|  1|    b|
+---+-----+
only showing top 2 rows



In [4]:
df.describe().show()

+-------+-----------------+-----+
|summary|                n|group|
+-------+-----------------+-----+
|  count|               20|   20|
|   mean|              9.5| null|
| stddev|5.916079783099616| null|
|    min|                0|    a|
|    max|               19|    c|
+-------+-----------------+-----+



In [5]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(2)

+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|manufacturer|model|displ|year|cyl|     trans|drv|cty|hwy| fl|  class|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
|        audi|   a4|  1.8|1999|  4|  auto(l5)|  f| 18| 29|  p|compact|
|        audi|   a4|  1.8|1999|  4|manual(m5)|  f| 21| 29|  p|compact|
+------------+-----+-----+----+---+----------+---+---+---+---+-------+
only showing top 2 rows



## Create Columns

This returns a column object:

In [6]:

mpg.hwy

Column<'hwy'>

To select the values in the column object, we follow it with show. And we can use .select to select multiple column objects. 

In [7]:
# select 3 columns and show 2 rows
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(2)

+---+---+-----+
|hwy|cty|model|
+---+---+-----+
| 29| 18|   a4|
| 29| 21|   a4|
+---+---+-----+
only showing top 2 rows



In [8]:
# select 1 column, then select that column and add one to each of the values, return and show both columns. 

mpg.select(mpg.hwy, mpg.hwy + 1).show(2)

+---+---------+
|hwy|(hwy + 1)|
+---+---------+
| 29|       30|
| 29|       30|
+---+---------+
only showing top 2 rows



In [9]:
# select & alias hwy column name
mpg.select(mpg.hwy.alias("highway_mileage")).show(2)

+---------------+
|highway_mileage|
+---------------+
|             29|
|             29|
+---------------+
only showing top 2 rows



In [10]:
# create a var col1 to store the column object of hwy, aliased as highway_mileage
col1 = mpg.hwy.alias("highway_mileage")

# create a var col2 to store the column object of hwy divided by 2, aliased as highway_mileage_halved
col2 = (mpg.hwy/2).alias("highway_mileage_halved")

# select both, referencing the new variables, col1 and col2
mpg.select(col1, col2).show(1)

+---------------+----------------------+
|highway_mileage|highway_mileage_halved|
+---------------+----------------------+
|             29|                  14.5|
+---------------+----------------------+
only showing top 1 row



In [11]:
from pyspark.sql.functions import col, expr

In [12]:
col("hwy")

Column<'hwy'>

In [13]:
avg_col = (col("hwy") + col("cty")) / 2

mpg.select(
    col("hwy").alias("highway_mileage"),
    mpg.cty.alias("city_mileage"),
    avg_col.alias("avg_mileage")
).show(2)

+---------------+------------+-----------+
|highway_mileage|city_mileage|avg_mileage|
+---------------+------------+-----------+
|             29|          18|       23.5|
|             29|          21|       25.0|
+---------------+------------+-----------+
only showing top 2 rows



Another way to do what we did above, using expr() ...

In [14]:
mpg.select(
    expr("hwy"),  # the same as `col`
    expr("hwy + 1"),  # an arithmetic expression
    expr("hwy AS highway_mileage"),  # using an alias
    expr("hwy + 1 AS highway_incremented"),  # a combination of the above
).show(5)


+---+---------+---------------+-------------------+
|hwy|(hwy + 1)|highway_mileage|highway_incremented|
+---+---------+---------------+-------------------+
| 29|       30|             29|                 30|
| 29|       30|             29|                 30|
| 31|       32|             31|                 32|
| 30|       31|             30|                 31|
| 26|       27|             26|                 27|
+---+---------+---------------+-------------------+
only showing top 5 rows



Briging together all the different ways to accomplish the same task...select a column & alias it. 

In [15]:
mpg.select(
    mpg.hwy.alias("highway"),
    col("hwy").alias("highway"),
    expr("hwy").alias("highway"),
    expr("hwy AS highway"),
).show(5)

+-------+-------+-------+-------+
|highway|highway|highway|highway|
+-------+-------+-------+-------+
|     29|     29|     29|     29|
|     29|     29|     29|     29|
|     31|     31|     31|     31|
|     30|     30|     30|     30|
|     26|     26|     26|     26|
+-------+-------+-------+-------+
only showing top 5 rows



## Spark SQL

In [16]:
# register the table with spark'
# doesn't have to be from SQL to use this
mpg.createOrReplaceTempView("mpg")

In [17]:
spark.sql(
    """
    SELECT hwy, cty, (hwy + cty) / 2 as avg
    FROM mpg
    """
).show(2)

+---+---+----+
|hwy|cty| avg|
+---+---+----+
| 29| 18|23.5|
| 29| 21|25.0|
+---+---+----+
only showing top 2 rows



## Type Casting

In [18]:
mpg.dtypes

[('manufacturer', 'string'),
 ('model', 'string'),
 ('displ', 'double'),
 ('year', 'bigint'),
 ('cyl', 'bigint'),
 ('trans', 'string'),
 ('drv', 'string'),
 ('cty', 'bigint'),
 ('hwy', 'bigint'),
 ('fl', 'string'),
 ('class', 'string')]

In [19]:
mpg.printSchema()

root
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- displ: double (nullable = true)
 |-- year: long (nullable = true)
 |-- cyl: long (nullable = true)
 |-- trans: string (nullable = true)
 |-- drv: string (nullable = true)
 |-- cty: long (nullable = true)
 |-- hwy: long (nullable = true)
 |-- fl: string (nullable = true)
 |-- class: string (nullable = true)



In [20]:
mpg.select(mpg.hwy.cast("string")).printSchema()


root
 |-- hwy: string (nullable = true)



In [21]:
# shows null because can't be converted. 
mpg.select(mpg.model, mpg.model.cast("int")).show(2)

+-----+-----+
|model|model|
+-----+-----+
|   a4| null|
|   a4| null|
+-----+-----+
only showing top 2 rows



## Built in Functions

In [22]:
# avg and mean are aliases of each other 
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
# from pyspark.sql.functions import *

In [23]:
mpg.select(
    sum(mpg.hwy) / count(mpg.hwy).alias("average_1"),
    avg(mpg.hwy).alias("average_2"),
    min(mpg.hwy),
    max(mpg.hwy),
).show()

+--------------------------------------+-----------------+--------+--------+
|(sum(hwy) / count(hwy) AS `average_1`)|        average_2|min(hwy)|max(hwy)|
+--------------------------------------+-----------------+--------+--------+
|                     23.44017094017094|23.44017094017094|      12|      44|
+--------------------------------------+-----------------+--------+--------+



In [24]:
mpg.select(concat(mpg.manufacturer, mpg.model)).show(5)


+---------------------------+
|concat(manufacturer, model)|
+---------------------------+
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
|                     audia4|
+---------------------------+
only showing top 5 rows



The function for string literals: lit

In [25]:
from pyspark.sql.functions import lit


In [26]:
mpg.select(concat(mpg.cyl, lit(" cylinders"))).show(5)

+-----------------------+
|concat(cyl,  cylinders)|
+-----------------------+
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            4 cylinders|
|            6 cylinders|
+-----------------------+
only showing top 5 rows



More String Manipulation

In [27]:
from pyspark.sql.functions import regexp_extract, regexp_replace

In [28]:
textdf = spark.createDataFrame(
    pd.DataFrame(
        {
            "address": [
                "600 Navarro St ste 600, San Antonio, TX 78205",
                "3130 Broadway St, San Antonio, TX 78209",
                "303 Pearl Pkwy, San Antonio, TX 78215",
                "1255 SW Loop 410, San Antonio, TX 78227",
            ]
        }
    )
)

textdf.show(truncate=False)

+---------------------------------------------+
|address                                      |
+---------------------------------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |
|303 Pearl Pkwy, San Antonio, TX 78215        |
|1255 SW Loop 410, San Antonio, TX 78227      |
+---------------------------------------------+



Using regexp_extract - extract at least one capture group and create new column of that. 

In [29]:
textdf.select(
    "address",
    regexp_extract("address", r"^(\d+)", 1).alias("street_no"),
    regexp_extract("address", r"^\d+\s([\w\s]+?),", 1).alias("street"),
).show(truncate=False)

+---------------------------------------------+---------+------------------+
|address                                      |street_no|street            |
+---------------------------------------------+---------+------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|600      |Navarro St ste 600|
|3130 Broadway St, San Antonio, TX 78209      |3130     |Broadway St       |
|303 Pearl Pkwy, San Antonio, TX 78215        |303      |Pearl Pkwy        |
|1255 SW Loop 410, San Antonio, TX 78227      |1255     |SW Loop 410       |
+---------------------------------------------+---------+------------------+



regexp_replace lets us make substitutions based on a regular expression.

In [30]:
textdf.select(
    "address",
    regexp_replace("address", r"^.*?,\s*", "").alias("city_state_zip"),
).show(truncate=False)

+---------------------------------------------+---------------------+
|address                                      |city_state_zip       |
+---------------------------------------------+---------------------+
|600 Navarro St ste 600, San Antonio, TX 78205|San Antonio, TX 78205|
|3130 Broadway St, San Antonio, TX 78209      |San Antonio, TX 78209|
|303 Pearl Pkwy, San Antonio, TX 78215        |San Antonio, TX 78215|
|1255 SW Loop 410, San Antonio, TX 78227      |San Antonio, TX 78227|
+---------------------------------------------+---------------------+




## Filtering with .filter and .where

In [31]:
mpg.filter(mpg.cyl == 4).where(mpg["class"] == "subcompact").show()

+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|      model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+-----------+-----+----+---+----------+---+---+---+---+----------+
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 28| 33|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 25| 32|  r|subcompact|
|       honda|      civic|  1.6|1999|  4|manual(m5)|  f| 23| 29|  p|subcompact|
|       honda|      civic|  1.6|1999|  4|  auto(l4)|  f| 24| 32|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|manual(m5)|  f| 26| 34|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 25| 36|  r|subcompact|
|       honda|      civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
|       honda|      civic|  2.0|2008|  4|manual(m6)|  f| 21| 29|  p|subcompact|
|     hyundai|    tiburon|  2.0|1999|  4

## Conditionals with When and Otherwise

In [32]:
from pyspark.sql.functions import when

In [33]:
# goes in order
mpg.select(
    mpg.displ,
    (
        when(mpg.displ < 2, "small")
        .when(mpg.displ < 3, "medium")
        .otherwise("large")
        .alias("engine_size")
    ),
).show(10)

+-----+-----------+
|displ|engine_size|
+-----+-----------+
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
|  2.0|     medium|
|  2.8|     medium|
|  2.8|     medium|
|  3.1|      large|
|  1.8|      small|
|  1.8|      small|
|  2.0|     medium|
+-----+-----------+
only showing top 10 rows



## Sorting & Ordering 

In [34]:
# defaults to ascending
mpg.sort(mpg.hwy).show(8)

+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|manufacturer|              model|displ|year|cyl|     trans|drv|cty|hwy| fl| class|
+------------+-------------------+-----+----+---+----------+---+---+---+---+------+
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|manual(m6)|  4|  9| 12|  e|pickup|
|        jeep| grand cherokee 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|        durango 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|   suv|
|       dodge|  dakota pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|       dodge|ram 1500 pickup 4wd|  4.7|2008|  8|  auto(l5)|  4|  9| 12|  e|pickup|
|   chevrolet|    k1500 tahoe 4wd|  5.3|2008|  8|  auto(l4)|  4| 11| 14|  e|   suv|
|        jeep| grand cherokee 4wd|  6.1|2008|  8|  auto(l5)|  4| 11| 14|  p|   suv|
|       dodge|  dakota pickup 4wd|  5.2|1999|  8|  auto(l4)|  4| 11| 15|  r|pickup|
+------------+-------------------+-----+----+---+----------+---+---+---+---+

In [35]:
from pyspark.sql.functions import asc, desc

In [36]:
mpg.sort(mpg.hwy.desc())
# is the same as
mpg.sort(col("hwy").desc())
# is the same as
mpg.sort(desc("hwy")).show(5)

+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|manufacturer|     model|displ|year|cyl|     trans|drv|cty|hwy| fl|     class|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
|  volkswagen|     jetta|  1.9|1999|  4|manual(m5)|  f| 33| 44|  d|   compact|
|  volkswagen|new beetle|  1.9|1999|  4|manual(m5)|  f| 35| 44|  d|subcompact|
|  volkswagen|new beetle|  1.9|1999|  4|  auto(l4)|  f| 29| 41|  d|subcompact|
|      toyota|   corolla|  1.8|2008|  4|manual(m5)|  f| 28| 37|  r|   compact|
|       honda|     civic|  1.8|2008|  4|  auto(l5)|  f| 24| 36|  c|subcompact|
+------------+----------+-----+----+---+----------+---+---+---+---+----------+
only showing top 5 rows



In [None]:
mpg.sort(desc("class"), mpg.cyl.asc(), col("hwy").desc()).show()

## Grouping & Aggregating

In [None]:
mpg.groupBy(mpg.cyl)
mpg.groupBy(col("cyl"))
mpg.groupBy("cyl")

In [None]:
mpg.groupBy(mpg.cyl).agg(avg(mpg.cty), avg(mpg.hwy)).show()

In [None]:
mpg.groupBy("cyl", "class").agg(avg(mpg.cty), avg(mpg.hwy)).show()

Rollup will do the same aggregations, but also include overall totals. 

In [None]:
mpg.rollup("cyl").count().sort("cyl").show()

Here the null value in cyl indicates the total count.

In [None]:
mpg.rollup("cyl").agg(expr("avg(hwy)")).sort("cyl").show()

Here the null value in cyl indicates the total count.

In [None]:
mpg.rollup("cyl", "class").mean("hwy").sort(col("cyl"), col("class")).show()


## Crosstables & Pivot Tables

Crosstab is a simple way to get counts. 

In [None]:
mpg.crosstab("class", "cyl").show()

We can use pivot to compute different aggregations than count. 

In [None]:
mpg.groupby("class").pivot("cyl").mean("hwy").show()


## Missing Values

In [None]:
df = spark.createDataFrame(
    pd.DataFrame(
        {"x": [1, 2, np.nan, 4, 5, np.nan], "y": [np.nan, 0, 0, 3, 1, np.nan]}
    )
)
df.show()

In [None]:
df.na.drop().show()

In [None]:
df.na.fill(0).show()

In [None]:
df.na.fill(0, subset="x").show()

In [None]:
df.na.drop(subset="y").show()

## Transformations of Dataframes

In [None]:
# how is spark thinking about our df? 
mpg.explain()


Only a single step above ^

This one below shows another step after "Scan ExistingRDD", a "Project" that contains the names of the columns we are looking for.

In [None]:
mpg.select(mpg.cyl, mpg.hwy).explain()


And now we are going to do a more advanced select calcluation, but this is still just a single step. 

In [None]:
mpg.select(((mpg.cyl + mpg.hwy) / 2).alias("avg_mpg")).explain()


Notice that our filter below is also a single step.



In [None]:
mpg.filter(mpg.cyl == 6).explain()

In [None]:
mpg.select("cyl", "hwy").filter(expr("cyl = 6")).explain()
mpg.filter(expr("cyl = 6")).select("cyl", "hwy").explain()


## More DF Manipulations

For these examples, we'll be working with a dataset of observations of the weather in seattle.



In [None]:
from vega_datasets import data

weather = data.seattle_weather().assign(date=lambda df: df.date.astype(str))
weather = spark.createDataFrame(weather)
weather.show(6)

In [None]:
# print number of rows & columns
print(weather.count(), "rows", len(weather.columns), "columns")


In [None]:
# get the date range of the dataset. 
min_date, max_date = weather.select(min("date"), max("date")).first()
min_date, max_date

In [None]:
# compute temp average 
weather = weather.withColumn(
    "temp_avg", expr("ROUND(temp_min + temp_max) / 2")
).drop("temp_max", "temp_min")

weather.show(6)

Calculate total rainfall

In [None]:
from pyspark.sql.functions import month, year, quarter


In [None]:
(
    weather.withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("precipitation").alias("total_rainfall"))
    .sort("month")
    .show()
)

Let's now take a look at the average temperature for each type of weather in December 2013:

In [None]:
(
    weather.filter(month("date") == 12)
    .filter(year("date") == 2013)
    .groupBy("weather")
    .agg(mean("temp_avg"))
    .show()
)

Let's now find out how many days had freezing temperatures in each month of 2013.

In [None]:
(
    weather.filter(year("date") == 2013)
    .withColumn("freezing_temps", (weather.temp_avg <= 0).cast("int"))
    .withColumn("month", month("date"))
    .groupBy("month")
    .agg(sum("freezing_temps").alias("no_of_days_with_freezing_temps"))
    .sort("month")
    .show()
)

One last example, let's calculate the average temperature for each quarter of each year:



In [None]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("year", "quarter")
    .agg(mean("temp_avg").alias("temp_avg"))
    .sort("year", "quarter")
    .show()
)



We could use a pivot table instead: 

In [None]:
(
    weather.withColumn("quarter", quarter("date"))
    .withColumn("year", year("date"))
    .groupBy("quarter")
    .pivot("year")
    .agg(expr("ROUND(MEAN(temp_avg), 2) AS temp_avg"))
    .sort("quarter")
    .show()
)



## Joins

We'll start by creating some data that we can join together:



In [None]:
users = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4, 5, 6],
            "name": ["bob", "joe", "sally", "adam", "jane", "mike"],
            "role_id": [1, 2, 3, 3, np.nan, np.nan],
        }
    )
)
roles = spark.createDataFrame(
    pd.DataFrame(
        {
            "id": [1, 2, 3, 4],
            "name": ["admin", "author", "reviewer", "commenter"],
        }
    )
)
print("--- users ---")
users.show()
print("--- roles ---")
roles.show()

To join two dataframes together, we'll need to call the .join method on one of them and supply the other as an argument. In addition, we'll need to supply the condition on which we are joining. In our case, we are joining where the role_id column on the users table is equal to the id column on the roles table.

In [None]:
users.join(roles, on=users.role_id == roles.id).show()

By default, spark will perform an inner join, meaning that records from both dataframes will have a match with the other. We can also specify either a left or a right join, which will keep all of the records from either the left or right side, even if those records don't have a match with the other dataframe.

In [None]:
users.join(roles, on=users.role_id == roles.id, how="left").show()

In [None]:
users.join(roles, on=users.role_id == roles.id, how="right").show()


Notice that examples above have a duplicate id column. There are several ways we could go about dealing with this:

alias each dataframe + explicitly select columns after joining (this could also be implemented with spark SQL)
rename duplicated columns before merging
drop duplicated columns after the merge (.drop(right.id))



# Wrangling

In this lesson, we will acquire and prepare the data we will use in the rest of this module.

- Acquiring Data
- Data Prep
- Train Test Split

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

## Acquisition

Spark lets us read data in from a variety of data sources using what it calls a DataFrameReader. We can access the read property of our spark object and then set various options and read from a data source.

### Using Data Schemas

In [None]:
df = spark.read.csv("data/source.csv", sep=",", header=True, inferSchema=True)


### Writing Data

## Data Prep

### Column Renaming

### Data Types

### Data Transformations

### New Features

### Joining New Dataset

## Data Splitting