<a id="top_of_page">

# Working with Tabular Data

## Table of Contents

- [CLI tool to view sample of CSV file](#cli)
- [Read CSV file](#read_csv)
- [Viewing 3 Columns at a Time](#view_n_columns_at_a_time)
- [Getting rid of columns using the drop() method](#drop_column)
- [Extracting parts of a string with `substr()`](#substr)
- [Creating new column with withColumn()](#withColumn)
- [Renaming one column at a type, the withColumnRenamed() way](#withColumnRenamed)
- [Changing multiple columns - Batch lowercasing using the toDF() method](#toDF)
- [Selecting our columns in alphabetical order using select()](#sorting_columns)
- [Diagnosing a dataframe with describe() and summary()](#describe_and_summarize)
- [Joining Data](#joining)
- [INNER join](#inner_join)
- [Avoiding duplicate column names](#duplicate_column_names)
- [Using multiple left joins](#multiple_left_joins)
- [Summarizing the data via groupby and GroupedData](#groupby)
- [Using `when()` function](#when)
- [Taking care of null values: `dropna()` and `fillna()`](#null_values)

<a id="cli">

## Useful CLI tool to initially assess or view sample of CSV file

[[back to top]](#top_of_page)

Linux Command Line:
- `head -n 5 BroadcastLogs_2018_Q3_M8_sample.csv`    # To view first 5 rows of the csv file


Windows Command Line:
- Not really sure what CLI tool can be used??  Perhaps just use pandas read_csv() with nrows= parameter?

<a id="read_csv">

## Read CSV file

[[back to top]](#top_of_page)

In [1]:
from pathlib import Path
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F

In [2]:
spark = (
    SparkSession.builder
    # Enable eager, interactive mode - typically do not do this with production code
    .config("spark.sql.repl.eagerEval.enabled", "True")
    .config("spark.executor.cores", str(os.cpu_count()))
    .appName("Working with Tabular Data")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")

In [3]:
DIRECTORY = Path("./data/broadcast_logs")

In [4]:
logs = spark.read.csv(
    str(DIRECTORY / "BroadcastLogs_2018_Q3_M8.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd",
)

In [5]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string 

<a id="view_n_columns_at_a_time">

## Viewing 3 Columns at a Time

[[back to top]](#top_of_page)

In [6]:
import numpy as np

In [7]:
column_split = np.array_split(
    np.array(logs.columns), len(logs.columns) // 3
)

In [8]:
print(column_split)

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'), array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
      dtype='<U22'), array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'), array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
      dtype='<U22'), array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
      dtype='<U22'), array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'), array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'), array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
      dtype='<U22'), array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
      dtype='<U22'), array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]


In [9]:
for x in column_split:
    logs.select(*x).show(5, False)

+--------------+------------+-------------------+
|BroadcastLogID|LogServiceID|LogDate            |
+--------------+------------+-------------------+
|1196192316    |3157        |2018-08-01 00:00:00|
|1196192317    |3157        |2018-08-01 00:00:00|
|1196192318    |3157        |2018-08-01 00:00:00|
|1196192319    |3157        |2018-08-01 00:00:00|
|1196192320    |3157        |2018-08-01 00:00:00|
+--------------+------------+-------------------+
only showing top 5 rows

+----------+-------------------+----------------------+
|SequenceNO|AudienceTargetAgeID|AudienceTargetEthnicID|
+----------+-------------------+----------------------+
|1         |4                  |null                  |
|2         |null               |null                  |
|3         |null               |null                  |
|4         |null               |null                  |
|5         |null               |null                  |
+----------+-------------------+----------------------+
only showing top 5 ro

<a id="drop_column">

## Getting rid of columns using the drop() method

[[back to top]](#top_of_page)

In [10]:
logs = logs.drop("BroadcastLogID", "SequenceNO")
 
# Testing if we effectively got rid of the columns
print("BroadcastLogID" in logs.columns)  # => False
print("SequenceNo" in logs.columns)  # => False

False
False


#### Another way using select() with list comprehension and then unpacking items in list to strings of column names

In [None]:
logs = logs.select(
    *[x for x in logs.columns if x not in ["BroadcastLogID", "SequenceNO"]]
)

# Testing if we effectively got rid of the columns
print("BroadcastLogID" in logs.columns)  # => False
print("SequenceNo" in logs.columns)  # => False

## Unfortunate Inconsistency When Unpacking is Needed or Not

With `select()`, you don't necessarily need to unpack list, but with `drop()`, you do need to explicitly unpack a list.  Therefore, best practice to always unpack/* a list.

<a id="substr">

## Extracting parts of a string with `substr()`

[[back to top]](#top_of_page)

In [11]:
logs.select(F.col("Duration")).show(5)

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows



#### What data type is "Duration" column?

In [12]:
print(logs.select(F.col("Duration")).dtypes)

[('Duration', 'string')]


[REFERENCE](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) - Datetime patterns for formatting and parsing

In [13]:
logs.select(
    F.col("Duration"),
    F.col("Duration").substr(1, 2).cast("int").alias("dur_hours"),
    F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
    F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
).distinct().show(5)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:10:06.0000000|        0|         10|          6|
|00:10:37.0000000|        0|         10|         37|
|00:04:52.0000000|        0|          4|         52|
|00:26:41.0000000|        0|         26|         41|
|00:08:18.0000000|        0|          8|         18|
+----------------+---------+-----------+-----------+
only showing top 5 rows



In [14]:
logs.select(
    F.col("Duration"),
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ).alias("Duration_seconds"),
).distinct().show(5)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|00:10:30.0000000|             630|
|00:25:52.0000000|            1552|
|00:28:08.0000000|            1688|
|06:00:00.0000000|           21600|
|00:32:08.0000000|            1928|
+----------------+----------------+
only showing top 5 rows



<a id="withColumn">

## Creating new column with withColumn()

[[back to top]](#top_of_page)

In [15]:
logs = logs.withColumn(
    "Duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)

In [16]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

<a id="withColumnRenamed">

## Renaming one column at a type, the withColumnRenamed() way

[[back to top]](#top_of_page)

In [17]:
logs = logs.withColumnRenamed("Duration_seconds", "duration_seconds")

In [18]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

<a id="toDF">

## Changing multiple columns - Batch lowercasing using the toDF() method

[[back to top]](#top_of_page)

In [19]:
logs.toDF(*[x.lower() for x in logs.columns]).printSchema()

root
 |-- logserviceid: integer (nullable = true)
 |-- logdate: timestamp (nullable = true)
 |-- audiencetargetageid: integer (nullable = true)
 |-- audiencetargetethnicid: integer (nullable = true)
 |-- categoryid: integer (nullable = true)
 |-- closedcaptionid: integer (nullable = true)
 |-- countryoforiginid: integer (nullable = true)
 |-- dubdramacreditid: integer (nullable = true)
 |-- ethnicprogramid: integer (nullable = true)
 |-- productionsourceid: integer (nullable = true)
 |-- programclassid: integer (nullable = true)
 |-- filmclassificationid: integer (nullable = true)
 |-- exhibitionid: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- logentrydate: timestamp (nullable = true)
 |-- productionno: string (nullable = true)
 |-- programtitle: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- subtitle: string (nullable = true)
 |-- networkaffiliationid: integer (nullable = true)
 |-- specialattenti

<a id="sorting_columns">

## Selecting our columns in alphabetical order using select()

[[back to top]](#top_of_page)

In [20]:
logs.select(sorted(logs.columns)).printSchema()

root
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- BroadcastOriginPointID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CompositionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- Language1: integer (nullable = true)
 |-- Language2: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- Producer1: string (nullable = true)
 |-- Producer2: string (nullable = true)
 |-- ProductionNO: 

<a id="describe_and_summarize">

## Diagnosing a dataframe with describe() and summary()

[[back to top]](#top_of_page)

In [None]:
for i in logs.columns:
    logs.describe(i).show()

In [None]:
for i in logs.columns:
    logs.select(i).summary().show()

<a id="joining">

## Joining Data

[[back to top]](#top_of_page)

#### Let's create a 2nd dataframe called "log_identifier"

In [21]:
DIRECTORY = Path("./data/broadcast_logs")
log_identifier = spark.read.csv(
    str(DIRECTORY / "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)

In [22]:
log_identifier.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



In [23]:
log_identifier.show(10)

+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
|         9DAPTN|        3158|        1|
|         9DCFCF|        3159|        1|
|         9DCFRN|        3160|        1|
|         9DCHRO|        3161|        1|
|         9DCIVI|        3162|        1|
+---------------+------------+---------+
only showing top 10 rows



In [24]:
log_identifier.select(F.col("PrimaryFG")).distinct()

PrimaryFG
1
0


#### Let's do a simple groupBy count

In [25]:
(
    log_identifier
    .select(F.col("PrimaryFG"))
    .groupBy(F.col("PrimaryFG"))
    .count()
)

PrimaryFG,count
1,758
0,162


In [26]:
log_identifier.count()

920

#### Filter or limit our log_identifier to have PrimaryFG equal to 1

In [27]:
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)
print(log_identifier.count())

758


#### Let's compare or see which column we can join `logs` and `log_identifier` with.  Let's print out the schema for both:

In [28]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

In [29]:
log_identifier.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



#### From the data dictionary and also based on the columns listed, looks like we can join with the `LogServiceID` column

<a id="inner_join">

## Perform INNER join

[[back to top]](#top_of_page)

In [30]:
logs_and_channels = logs.join(
    log_identifier,
    on="LogServiceID",
    how="inner"
)

In [31]:
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

#### Consequences of Joining - Duplicate Column Names

In [32]:
logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)

In [36]:
logs_and_channels_verbose.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

#### We get an error below because "LogServiceID" is listed twice:

In [37]:
try:
    logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
    print(err)

NameError: name 'AnalysisException' is not defined

<a id="duplicate_column_names">

## How to avoid Duplicate Column Names

[[back to top]](#top_of_page)

3 ways:
- Using simplified join syntax on equi-joins
- Using the origin name
- Using aliases

#### Simplified Join Syntax

In [38]:
logs_and_channels = logs.join(log_identifier, "LogServiceID")
 
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

From the printSchema, we see that only first "LogServiceID" is listed

#### Using the origin name of the column for unambiguous selection

In [40]:
logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)

logs_and_channels.drop(log_identifier["LogServiceID"]).select(
    "LogServiceID")

LogServiceID
3157
3157
3157
3157
3157
3157
3157
3157
3157
3157


#### Using aliases

In [42]:
logs_and_channels_verbose = logs.alias("left").join(
    log_identifier.alias("right"),
    logs["LogServiceID"] == log_identifier["LogServiceID"],
)
 
logs_and_channels_verbose.drop(F.col("right.LogServiceID")).select(
    "LogServiceID"
)

LogServiceID
3157
3157
3157
3157
3157
3157
3157
3157
3157
3157


In summary: All three approaches are valid. The first one works only in the case of equi-joins, but the two others are mostly interchangeable.

<a id="multiple_left_joins">

## Using 2 Left JOINs

[[back to top]](#top_of_page)

In [43]:
DIRECTORY = "./data/broadcast_logs"
 
cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)
 
cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)
 
full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)

In [44]:
full_log.printSchema()

root
 |-- ProgramClassID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttenti

<a id="groupby">

## Summarizing the data via groupby and GroupedData

[[back to top]](#top_of_page)

#### Displaying the most popular types of programs

In [45]:
(full_log
    .groupby("ProgramClassCD", "ProgramClass_Description")
    .agg(F.sum("duration_seconds").alias("duration_total"))
    .orderBy("duration_total", ascending=False).show(100, False)
)

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |652802250     |
|COM           |COMMERCIAL MESSAGE                    |106810189     |
|PFS           |PROGRAM FIRST SEGMENT                 |38817891      |
|SEG           |SEGMENT OF A PROGRAM                  |34891264      |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|27017583      |
|PGI           |PROGRAM INFOMERCIAL                   |23196392      |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |10213461      |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |4537071       |
|ID            |NETWORK IDENTIFICATION MESSAGE        |2179067       |
|NRN           |No recognized nationality             |1704127       |
|PSA           |PUBLIC SERVICE ANNOUNCEMENT           |1622200       |
|MAG  

#### agg() also accepts a dictionary in the form {column_name: aggregation_function} where both are string. Because of this, we can rewrite listing 5.11 like so:

In [46]:
full_log.groupby("ProgramClassCD", "ProgramClass_Description").agg(
    {"duration_seconds": "sum"}
).withColumnRenamed("sum(duration_seconds)", "duration_total").orderBy(
    "duration_total", ascending=False
).show(
    100, False
)

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |652802250     |
|COM           |COMMERCIAL MESSAGE                    |106810189     |
|PFS           |PROGRAM FIRST SEGMENT                 |38817891      |
|SEG           |SEGMENT OF A PROGRAM                  |34891264      |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|27017583      |
|PGI           |PROGRAM INFOMERCIAL                   |23196392      |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |10213461      |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |4537071       |
|ID            |NETWORK IDENTIFICATION MESSAGE        |2179067       |
|NRN           |No recognized nationality             |1704127       |
|PSA           |PUBLIC SERVICE ANNOUNCEMENT           |1622200       |
|MAG  

<a id="when">

## Using `when()` function

[[back to top]](#top_of_page)

In [None]:
F.when(
    F.trim(F.col("ProgramClassCD")).isin(
        ["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]
    ),
    F.col("duration_seconds"),
).otherwise(0)

The above can be translated as:

`"When the field of the column ProgramClass, trimmed of spaces at the beginning and end of the field, is in our list of commercial codes, then take the value of the field in the column duration_seconds. Otherwise, use zero as a value."`

General `when()` usage:

```
(
    F.when([BOOLEAN TEST], [RESULT IF TRUE])
    .when([ANOTHER BOOLEAN TEST], [RESULT IF TRUE])
    .otherwise([DEFAULT RESULT, WILL DEFAULT TO null IF OMITTED])
)
```

**Goal:**
```
"By LogIdentifierID, sum the duration seconds for only commercials, rename this column "duration_commercial", then add duration_seconds, but rename it duration_total, then add a new column called "commercial_ratio" which is a derived or calculated column whose value equals to duration_commercial divided by duration_total"
```

**Answer:**

In [47]:
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col("duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
    )
)
 
answer.orderBy("commercial_ratio", ascending=False).show(1000, False)

+---------------+-------------------+--------------+---------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio     |
+---------------+-------------------+--------------+---------------------+
|HPITV          |403                |403           |1.0                  |
|CIMT           |19935              |19935         |1.0                  |
|TELENO         |545255             |545255        |1.0                  |
|TANG           |271468             |271468        |1.0                  |
|MSET           |101670             |101670        |1.0                  |
|TLNSP          |234455             |234455        |1.0                  |
|INVST          |623057             |633659        |0.9832686034602207   |
|ZT�L�          |669624             |682023        |0.9818202611935375   |
|CANALD         |660132             |673746        |0.9797935720583127   |
|ONEBMS         |563058             |576441        |0.9767834002092148   |
|MMAX           |701438  

<a id="null_values">

## Taking care of null values: `dropna()` and `fillna()`

[[back to top]](#top_of_page)

#### `dropna()`

dropna() has 3 parameters:

- `how`, which can take the value any or all. If any is selected, PySpark will drop records where at least one of the fields is null. In the case of all, only the records where all fields are null will be removed. By default, PySpark will take the any mode.

- `thresh` takes an integer value. If set (its default is None), PySpark will ignore the how parameter and only drop the records with less than thresh non-null values.

- `subset` will take an optional list of columns that dropna() will use to make its decision.

<a id="dropna">

**Goal:**

```
we want to keep only the records that have a commercial_ratio and that are non-null.
```

In [48]:
answer_no_null = answer.dropna(subset=["commercial_ratio"])
 
answer_no_null.orderBy(
    "commercial_ratio", ascending=False).show(1000, False)

+---------------+-------------------+--------------+---------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio     |
+---------------+-------------------+--------------+---------------------+
|HPITV          |403                |403           |1.0                  |
|CIMT           |19935              |19935         |1.0                  |
|TELENO         |545255             |545255        |1.0                  |
|TANG           |271468             |271468        |1.0                  |
|MSET           |101670             |101670        |1.0                  |
|TLNSP          |234455             |234455        |1.0                  |
|INVST          |623057             |633659        |0.9832686034602207   |
|ZT�L�          |669624             |682023        |0.9818202611935375   |
|CANALD         |660132             |673746        |0.9797935720583127   |
|ONEBMS         |563058             |576441        |0.9767834002092148   |
|MMAX           |701438  

In [49]:
print(answer_no_null.count())

322


#### `fillna()`

fillna() takes on two parameters:

- The `value`, which is a Python int, float, string, or bool. PySpark will only fill the compatible columns; for instance, if we were to fillna("zero"), our commercial_ratio, being a double, would not be filled.

- The same `subset` parameter we encountered in dropna(). We can limit the scope of our filling to only the columns we want.

In [50]:
answer_no_null = answer.fillna(0)
 
answer_no_null.orderBy(
    "commercial_ratio", ascending=False).show(1000, False)

+---------------+-------------------+--------------+---------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio     |
+---------------+-------------------+--------------+---------------------+
|HPITV          |403                |403           |1.0                  |
|CIMT           |19935              |19935         |1.0                  |
|TELENO         |545255             |545255        |1.0                  |
|TANG           |271468             |271468        |1.0                  |
|MSET           |101670             |101670        |1.0                  |
|TLNSP          |234455             |234455        |1.0                  |
|INVST          |623057             |633659        |0.9832686034602207   |
|ZT�L�          |669624             |682023        |0.9818202611935375   |
|CANALD         |660132             |673746        |0.9797935720583127   |
|ONEBMS         |563058             |576441        |0.9767834002092148   |
|MMAX           |701438  

In [51]:
print(answer_no_null.count())

324


You can also pass a dict to the fillna() method, with the column names as key and the values as dict values. If we were to use this method for our filling, the code would be like the following code:

In [None]:
# Filling our numerical records with zero using the fillna() method and a dict
answer_no_null = answer.fillna(
    {"duration_commercial": 0, "duration_total": 0, "commercial_ratio": 0}
)

## Full Program

**GOAL**: Our full program, ordering channels by decreasing proportion of commercials

In [None]:
from pathlib import Path
 
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName(
    "Getting the Canadian TV channels with the highest/lowest proportion of commercials."
).getOrCreate()
 
spark.sparkContext.setLogLevel("WARN")
 
# Reading all the relevant data sources
 
DIRECTORY = Path("./data/broadcast_logs")
 
logs = spark.read.csv(
    str(DIRECTORY / "BroadcastLogs_2018_Q3_M8.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
)
 
log_identifier = spark.read.csv(
    str(DIRECTORY / "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)
cd_category = spark.read.csv(
    str(DIRECTORY / "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)
 
cd_program_class = spark.read.csv(
    "./data/broadcast_logs/ReferenceTables/CD_ProgramClass.csv",
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)
 
# Data processing
 
logs = logs.drop("BroadcastLogID", "SequenceNO")
 
logs = logs.withColumn(
    "duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)
 
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)
 
logs_and_channels = logs.join(log_identifier, "LogServiceID")
 
full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)
 
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col("duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
    )
    .fillna(0)
)
 
answer.orderBy("commercial_ratio", ascending=False).show(1000, False)