<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Joining-Data" data-toc-modified-id="Joining-Data-1">Joining Data</a></span><ul class="toc-item"><li><span><a href="#Understanding-the-join-recipe" data-toc-modified-id="Understanding-the-join-recipe-1.1">Understanding the <code>join</code> recipe</a></span><ul class="toc-item"><li><span><a href="#Important-points" data-toc-modified-id="Important-points-1.1.1">Important points</a></span></li><li><span><a href="#Pyspark-helpers-in-join-logic" data-toc-modified-id="Pyspark-helpers-in-join-logic-1.1.2">Pyspark helpers in join logic</a></span></li><li><span><a href="#Setting-up-join-logic-with-how" data-toc-modified-id="Setting-up-join-logic-with-how-1.1.3">Setting up join logic with <code>how</code></a></span></li></ul></li><li><span><a href="#Warning:-What-happens-when-joining-columns-in-a-distributed-environment" data-toc-modified-id="Warning:-What-happens-when-joining-columns-in-a-distributed-environment-1.2">Warning: What happens when joining columns in a distributed environment</a></span></li><li><span><a href="#Warning:-Joining-tables-with-identically-named-columns-leads-to-errors-downstream" data-toc-modified-id="Warning:-Joining-tables-with-identically-named-columns-leads-to-errors-downstream-1.3">Warning: Joining tables with identically named columns leads to errors downstream</a></span></li><li><span><a href="#Solutions-for-preventing-ambiguous-column-references" data-toc-modified-id="Solutions-for-preventing-ambiguous-column-references-1.4">Solutions for preventing ambiguous column references</a></span></li></ul></li><li><span><a href="#Advanced-groupby-with-GroupedData" data-toc-modified-id="Advanced-groupby-with-GroupedData-2">Advanced <code>groupby</code> with <code>GroupedData</code></a></span><ul class="toc-item"><li><span><a href="#groupby-on-multiple-columns" data-toc-modified-id="groupby-on-multiple-columns-2.1"><code>groupby</code> on multiple columns</a></span></li><li><span><a href="#agg()-vs-sum()" data-toc-modified-id="agg()-vs-sum()-2.2"><code>agg()</code> vs <code>sum()</code></a></span></li><li><span><a href="#Using-agg-with-custom-column-definitions" data-toc-modified-id="Using-agg-with-custom-column-definitions-2.3">Using agg with custom column definitions</a></span></li></ul></li><li><span><a href="#Dropping-unwanted-records---dropna-+-fillna" data-toc-modified-id="Dropping-unwanted-records---dropna-+-fillna-3">Dropping unwanted records - <code>dropna</code> + <code>fillna</code></a></span><ul class="toc-item"><li><span><a href="#dropna" data-toc-modified-id="dropna-3.1"><code>dropna</code></a></span><ul class="toc-item"><li><span><a href="#params" data-toc-modified-id="params-3.1.1">params</a></span></li></ul></li><li><span><a href="#fillna" data-toc-modified-id="fillna-3.2"><code>fillna</code></a></span><ul class="toc-item"><li><span><a href="#params" data-toc-modified-id="params-3.2.1">params</a></span></li></ul></li></ul></li><li><span><a href="#Pulling-it-all-together" data-toc-modified-id="Pulling-it-all-together-4">Pulling it all together</a></span></li></ul></div>

## Joining Data

In [59]:
# Set up
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F


spark = SparkSession.builder.getOrCreate()

# Read the data
DIRECTORY = "./data/Ch04"
logs = spark.read.csv(
    "./output/ch04/logs.csv", # read in data transformed in Ch04
    sep=",",  # default is ","
    quote='"',  # default is double quote.
    header=True,  # set first row as column names
    inferSchema=True,  # infer schema from column names default False
)
logs.printSchema()


# Read link table and filter to only primary channels (ie. PrimaryFG == 1)
log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables", "LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)


# Show results
log_identifier.printSchema()
log_identifier.show(5)
print("Unique primary channels: ", log_identifier.count())

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: string (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: string (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nulla

### Understanding the `join` recipe

```py
[LEFT].join(
    [RIGHT],
    on=[PREDICTES],
    how=[METHOD]
)
```

#### Important points

1. If one record in the left table resolves the predicate with more than one record in the right table (or vice versa), __this record will be duplicated in the joined table__.
2. If one record in the left or in the right table does not resolve the predicate with any record in the other table, __it will not be present in the resulting table, unless the join method specifies a protocol for failed predicates__.

#### Pyspark helpers in join logic

- You can put multiple `and` predicates into a list, like:
    ```py
    [
        left["col1"] == right["colA"], 
        left["col2"] > right["colB"],  # value on left table is greater than the right
        left["col3"] != right["colC"]
    ]
    ```
- You can test equality just by specifying the column name, or list of column names

#### Setting up join logic with `how`

1. `cross` - returns a record for every record pair. not common.
2. `inner` = returns record if predicate is true, otherwise drops it. most common, pyspark `join` default. 
3. `left` & `right` - similar to `inner`, except on what to do with false predicates:
    - `left` join adds unmatched records from the left table in the joined table, and fills in columns from right able with `None`
    - `right` join adds unmatched records nad fills in column vice versa.
4. `outer` - adds unmatched records from the left and right able, padding with `None`.
5. `left_semi` - same as inner join but only keeps columns in left table. 
6. `left_anti` - returns only records that don't match the predicate with any record in the right table.  opposite of `left` join.

In [60]:
# Join `logs` with `log_identifier` using the 'LogServiceID' column
joined = logs.join(log_identifier, on="LogServiceID", how="inner")

In [61]:
# Additionally join CategoryID and ProgramClassID table
# Use left joins since keys may not be available in the link table.

# CategoryID
cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables", "CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

# ProgramClass
cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables", "CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)


# Join all to joined table
full_log = joined.join(cd_category, "CategoryID", how="left",).join(
    cd_program_class, "ProgramClassID", how="left",
)


# Check if additional columns were joined to original log data frame
full_log.printSchema()

root
 |-- ProgramClassID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogDate: string (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: string (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nulla

### Warning: What happens when joining columns in a distributed environment

>  To be able to process a comparison between records, the data needs to be on the same machine. If not, PySpark will move the data in an operation called a _shuffle_, which is slow and expensive.  More on join strategies in later chapters.

### Warning: Joining tables with identically named columns leads to errors downstream

PySpark happily joins the two data frames together but fails when we try to work with the ambiguous column.

In [62]:
# Joining two tables with the same LogServiceID column
logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels_verbose.printSchema()


print(
    'Joined table now has two "LogServiceID" columns: ',
    [col for col in logs_and_channels_verbose.columns if col == "LogServiceID"],
    "\n",
)
print('Selecting "LogServiceID" will now throw an error')


# Selecting "LogServiceID" will throw an error
try:
    logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
    print("AnalysisException: ", err)

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: string (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: string (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nulla

### Solutions for preventing ambiguous column references

1. Use simplified syntax (ie. passing string of column you want). Auto-removes second instance of predicate column.  Can only use on equi-joins.
    ```
    logs_and_channels = logs.join(log_identifier, "LogServiceID")
    ```
2. Refer to the pre-existing table name.
    ```
    logs_and_channels_verbose.select(log_identifier["LogServiceID"])
    ```
3. Use the `Column` object directly
    ```
    logs_and_channels_verbose = logs.alias("left").join(
    log_identifier.alias("right"),
    logs["LogServiceID"] == log_identifier["LogServiceID"],
    )

    logs_and_channels_verbose.drop(F.col("right.LogServiceID")).select(
        "LogServiceID"
    )
    ```

## Advanced `groupby` with `GroupedData`

Goal: __What channels have the most and least proportion of commercials?__

Task:
1. Get number of seconds when the program is a commerical
2. Get total number of seconds.

### `groupby` on multiple columns

- Grouped by results are `GroupedData` objects, not `data frame`.  Can't call `show()` on it.
- You can "show" by running summary functions on it, like `F.sum`.
- `GroupedData` object holds all non-key columns in a group cell (see fig 5.7)

![grouped](./notes/img/grouped.png)


### `agg()` vs `sum()`

- `agg` can take an arbitrary number of aggregate functions
- You can alias resulting columns, unlike `sum`

In [65]:
# Group by ProgramClassCD and ProgramClass_Description, sum total duration for each

full_log.groupby("ProgramClassCD", "ProgramClass_Description").agg(
    F.sum("duration_seconds").alias("duration_total")
).orderBy("duration_total", ascending=False).show(100, False)


# Another way by passing dictionary to agg
# full_log.groupby("ProgramClassCD", "ProgramClass_Description").agg(
#     {"duration_seconds": "sum"}
# ).withColumnRenamed("sum(duration_seconds)", "duration_total").orderBy(
#     "duration_total", ascending=False
# ).show(
#     100, False
# )

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |20992510      |
|COM           |COMMERCIAL MESSAGE                    |3519163       |
|PFS           |PROGRAM FIRST SEGMENT                 |1344762       |
|SEG           |SEGMENT OF A PROGRAM                  |1205998       |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|880600        |
|PGI           |PROGRAM INFOMERCIAL                   |679182        |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |335701        |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |142279        |
|ID            |NETWORK IDENTIFICATION MESSAGE        |74926         |
|NRN           |No recognized nationality             |59686         |
|MAG           |MAGAZINE PROGRAM                      |57622         |
|PSA  

### Using agg with custom column definitions

`when` logic:

```py
(
F.when([BOOLEAN TEST], [RESULT IF TRUE])
 .when([ANOTHER BOOLEAN TEST], [RESULT IF TRUE])
 .otherwise([DEFAULT RESULT, WILL DEFAULT TO null IF OMITTED])
)
```


In [78]:
# Goal: Compute only the commercial time for each program


# Create custom column logic - get duration_seconds if ProgramClassCD matches an item in
# the list
is_commercial = F.when(
    F.trim(F.col("ProgramClassCD")).isin(
        ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
    ),
    F.col("duration_seconds"),
).otherwise(0)


# Use custom column logic to build a duration_commercial column,
# along with duration_total
commercial_time = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(is_commercial).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
    )
)

commercial_time.orderBy("commercial_ratio", ascending=False).show(20, False)

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio  |
+---------------+-------------------+--------------+------------------+
|CIMT           |775                |775           |1.0               |
|TELENO         |17790              |17790         |1.0               |
|MSET           |2700               |2700          |1.0               |
|HPITV          |13                 |13            |1.0               |
|TLNSP          |15480              |15480         |1.0               |
|TANG           |8125               |8125          |1.0               |
|MMAX           |23333              |23582         |0.9894410991434145|
|MPLU           |20587              |20912         |0.9844586840091814|
|INVST          |20094              |20470         |0.9816316560820714|
|ZT�L�          |21542              |21965         |0.9807420896881403|
|RAPT           |17916              |18279         |0.9801411455

## Dropping unwanted records - `dropna` + `fillna`

### `dropna`

#### params
1. `how`, which can take the value any or all. If any is selected, PySpark will drop records where at least one of the fields are null. In the case of all, only the records where all fields are null will be removed. By default, PySpark will take the any mode.
2. `thresh` takes an integer value. If set (its default is None), PySpark will ignore the how parameter and only drop the records with less than thresh non-null values.
3. `subset` will take an optional list of columns that drop will use to make its decision.

In [83]:
# Drop records that have a commericla_ratio of null

c_time_no_null = commercial_time.dropna(subset=["commercial_ratio"])
c_time_no_null.orderBy("commercial_ratio", ascending=False).show()


# Check record counts for each
print("Records in commercial_time: ", commercial_time.count())
print("Records in c_time_no_null: ", c_time_no_null.count())

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|  commercial_ratio|
+---------------+-------------------+--------------+------------------+
|          HPITV|                 13|            13|               1.0|
|           CIMT|                775|           775|               1.0|
|           MSET|               2700|          2700|               1.0|
|          TLNSP|              15480|         15480|               1.0|
|         TELENO|              17790|         17790|               1.0|
|           TANG|               8125|          8125|               1.0|
|           MMAX|              23333|         23582|0.9894410991434145|
|           MPLU|              20587|         20912|0.9844586840091814|
|          INVST|              20094|         20470|0.9816316560820714|
|          ZT�L�|              21542|         21965|0.9807420896881403|
|           RAPT|              17916|         18279|0.9801411455

### `fillna`

#### params

1. `value`, either a Python int, float, string or bool.
2. `subset`, which columns to fill

__Tip__: You can fill nulls differently for each column by passing a dictionary:

```py
answer_no_null = answer.fillna(
    {"duration_commercial": 0, "duration_total": 0, "commercial_ratio": 0}
)
```

In [84]:
# Fill null fields

c_time_fill_null = commercial_time.fillna(0)
c_time_fill_null.orderBy("commercial_ratio", ascending=False).show()


# Check record counts for each
print("Records in commercial_time: ", commercial_time.count())
print("Records in c_time_no_null: ", c_time_fill_null.count())

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|  commercial_ratio|
+---------------+-------------------+--------------+------------------+
|           CIMT|                775|           775|               1.0|
|           MSET|               2700|          2700|               1.0|
|          TLNSP|              15480|         15480|               1.0|
|          HPITV|                 13|            13|               1.0|
|         TELENO|              17790|         17790|               1.0|
|           TANG|               8125|          8125|               1.0|
|           MMAX|              23333|         23582|0.9894410991434145|
|           MPLU|              20587|         20912|0.9844586840091814|
|          INVST|              20094|         20470|0.9816316560820714|
|          ZT�L�|              21542|         21965|0.9807420896881403|
|           RAPT|              17916|         18279|0.9801411455

## Pulling it all together

[summary code of all the steps taken in this notebook as a spark script](code/Ch04-05/commercials.py)