In [1]:
"""Checkpoint code for the book Data Analysis with Python and PySpark, Chapter 4."""

import os
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [15]:
def split_table(df, sections, records):
    col_split = np.array_split(np.array(df.columns), sections)
    for x in col_split:
        df.select(*x).show(records, False)

In [3]:
spark = SparkSession.builder.appName(
    "Getting the Canadian TV channels with the highest/lowest proportion of commercials."
).getOrCreate()

In [4]:
DIRECTORY = "../../data/broadcast_logs"

In [5]:
logs = (
    spark.read.csv(
        os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8.CSV"),
        sep="|",
        header=True,
        inferSchema=True,
        timestampFormat="yyyy-MM-dd",
    )
    .drop("BroadcastLogID", "SequenceNO")
    .withColumn(
        "duration_seconds",
        (
            F.col("Duration").substr(1, 2).cast("int") * 60 * 60
            + F.col("Duration").substr(4, 2).cast("int") * 60
            + F.col("Duration").substr(7, 2).cast("int")
        ),
    )
)

In [6]:
log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)

In [7]:
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)

In [8]:
logs_and_channels = logs.join(log_identifier, on="LogServiceID", how="inner")

In [9]:
logs_and_channels_verbose = (
    logs.alias("left")
    .join(
        log_identifier.alias("right"),
        on=logs["LogServiceID"] == log_identifier["LogServiceID"],
    )
    .drop(F.col("right.LogServiceID"))
)

In [10]:
cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

In [11]:
cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

In [21]:
full_log = logs_and_channels.join(cd_category, on="CategoryID", how="left").join(
    cd_program_class, on="ProgramClassID", how="left"
)

In [17]:
split_table(full_log, 4, 10)

+--------------+----------+------------+----------+-------------------+----------------------+---------------+-----------------+----------------+
|ProgramClassID|CategoryID|LogServiceID|LogDate   |AudienceTargetAgeID|AudienceTargetEthnicID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|
+--------------+----------+------------+----------+-------------------+----------------------+---------------+-----------------+----------------+
|19            |13        |3157        |2018-08-01|4                  |NULL                  |3              |3                |NULL            |
|20            |NULL      |3157        |2018-08-01|NULL               |NULL                  |1              |NULL             |NULL            |
|3             |NULL      |3157        |2018-08-01|NULL               |NULL                  |1              |NULL             |NULL            |
|3             |NULL      |3157        |2018-08-01|NULL               |NULL                  |1              |NULL          