#### Loading

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [4]:
spark = SparkSession.builder.getOrCreate()

In [2]:
broadcast_sample = "/sparkdata/DataAnalysisWithPythonAndPySpark-Data/broadcast_logs/BroadcastLogs_2018_Q3_M8_sample.CSV"

In [124]:
logs = (
    spark.read.csv(
        broadcast_sample,
        sep="|",
        header=True,
        inferSchema=True,
        timestampFormat="yyyy-MM-dd",
    )
    .drop("BroadcastLogID", "SequenceNO")
    .withColumn(
        "duration_seconds",
        (
            F.col("Duration").substr(1, 2).cast("int") * 60 * 60
            + F.col("Duration").substr(4, 2).cast("int") * 60
            + F.col("Duration").substr(7, 2).cast("int")
        ),
    )
)

In [125]:
log_id_path = "/sparkdata/DataAnalysisWithPythonAndPySpark-Data/broadcast_logs/ReferenceTables"

In [126]:
log_identifier = spark.read.csv(
    log_id_path+"/LogIdentifier.csv",
    sep="|",
    header=True,
    inferSchema=True
)

log_identifier.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



In [127]:
log_identifier = log_identifier.where(F.col("PrimaryFG")==1)
print(log_identifier.count())

758


In [128]:
log_identifier.show(5)

+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows



#### Join operation
* Two tables, left and rigth
* One or more _predicates_, series of conditions to determine how the records between tables are joined
* A _method_ to indicate how we perform the join

In [None]:
#bare-bone recipe
[LEFT].join(
    [RIGHT],
    on=[PREDICATES]
    how=[METHOD]
)

#### <b>predicades<b>
* If one record in the left table resolves the predicate with more than one record
in the right table (or vice versa), this record will be duplicated in the joined
table.
* If one record in the left or right table does not resolve the predicate with any
record in the other table, it will not be present in the resulting table unless the
join method specifies a protocol for failed predicates.

We can use more than one test in our predicade. Multiple conditions can be separated by Boolean operators such as | (or) and & (and).

In [None]:
# examples of multiple conditioning predicate
(logs["LogServiceID"] == log_identifier["LogServiceID"]) & (logs["left_col"] < log_identifier["right_col"])

(logs["LogServiceID"] == log_identifier["LogServiceID"]) | (logs["left_ col"] > log_identifier["right_col"])

## If we have multiple AND predicates

(left["col1"] == right["colA"]) & (left["col2"]> right["colB"]) & (left["col3"] != right["colC"]))

## we can put them in a list:

[left["col1"] == right["colA"], left["col2"] > right["colB"],left["col3"] != right["colC"]]

#### <b>join methods</b>
A join method boils down to these two questions:
* What happens when the return value of the predicates is True?
* What happens when the return value of the predicates is False?

In [29]:
logs_and_channels = logs.join(
    log_identifier,
    on="LogServiceID",
    how="inner"
)

#### Cross Join
returns a record for every record pair, no matter what the predicates return. Useful when we want a table that contains every single combination.

#### Naming Conventions in joining

PySpark happily joins the two data frames but fails when we try to work with the
ambiguous column.


In [129]:
# simplified syntax
logs_and_channels = logs.join(log_identifier, "LogServiceID") # keeps only first referred column

# PySpark-join data frames remember the origin of the columns
logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)

## Using col() - alias() to table 
logs_and_channels_verbose = logs.alias("left").join(
    log_identifier.alias("right"),
    logs["LogServiceID"] == log_identifier["LogServiceID"],
)

logs_and_channels_verbose.drop(F.col("right.LogServiceID")).select(
    "LogServiceID"
)

DataFrame[LogServiceID: int]

In [32]:
DIRECTORY = "/sparkdata/DataAnalysisWithPythonAndPySpark-Data/broadcast_logs"

In [130]:
cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)

#### Summarizing data via groupby and GroupedData object

In [37]:
full_log.printSchema()

root
 |-- ProgramClassID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [131]:
(full_log
.groupby("ProgramClassCD", "ProgramClass_Description")
.agg(F.sum("duration_seconds").alias("duration_total"))
.orderBy("duration_total", ascending=False).show(100, False)
)

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|PGR           |PROGRAM                               |20992510      |
|COM           |COMMERCIAL MESSAGE                    |3519163       |
|PFS           |PROGRAM FIRST SEGMENT                 |1344762       |
|SEG           |SEGMENT OF A PROGRAM                  |1205998       |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|880600        |
|PGI           |PROGRAM INFOMERCIAL                   |679182        |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |335701        |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |142279        |
|ID            |NETWORK IDENTIFICATION MESSAGE        |74926         |
|NRN           |No recognized nationality             |59686         |
|MAG           |MAGAZINE PROGRAM                      |57622         |
|PSA  

Once a dataframe is grouped by, it is not a data frame anymore; instead, it becomes a GroupedData object. This object is a transitional object: we can't really inspect it (there is no .show() method), and it’s waiting for further instructions to become showable again. We have the key (or keys, if we groupby() multiple columns), and the rest of the columns are grouped inside some “cell” awaiting a summary function so that they can be promoted to a bona fide column again.

#### agg()
will take one or more aggregate .functions from the pyspark.sql.functions moduleagg(). It also accepts a dictionary in the form {column_name:aggregation_function}. Can also use groupby()+apply() and applyInPandas() (split-apply-combine pattern)

In [None]:
full_log.groupby("ProgramClassCD", "ProgramClass_Description").agg(
    {"duration_seconds": "sum"}
).withColumnRenamed("sum(duration_seconds)", "duration_total").orderBy(
    "duration_total", ascending=False
).show(
    100, False
)

#### Custom columns definitions


In [45]:
F.when(
    F.trim(F.col("ProgramClassCD")).isin(
        ["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]
    ),
    F.col("duration_seconds"),
).otherwise(0)

# WHEN the field of the column ProgramClass, TRIMMED of spaces at the beginning and
# end of the field, IS IN our list of commercial codes, then take the value of the field in the
# column duration_seconds. OTHERWISE, use zero as a value.

Column<'CASE WHEN (trim(ProgramClassCD) IN (COM, PRC, PGI, PRO, PSA, MAG, LOC, SPO, MER, SOL)) THEN duration_seconds ELSE 0 END'>

We can also chain multiple _when_ functions:


In [None]:
(
F.when([BOOLEAN TEST], [RESULT IF TRUE])
    .when([ANOTHER BOOLEAN TEST], [RESULT IF TRUE])
    .otherwise([DEFAULT RESULT, WILL DEFAULT TO null IF OMITTED])
)

In [132]:
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
            ),
                F.col("duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.col(
            "duration_commercial") / F.col("duration_total")
    ).orderBy("duration_total",ascending=False)
)

In [133]:
answer.show(10,7)

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|           CKAL|              26548|        113368|         0.23...|
|           CKVU|              26647|        113367|         0.23...|
|           CKEM|              26565|        113308|         0.23...|
|           CHMI|              26627|        113305|         0.23...|
|           CJNT|              26130|        112904|         0.23...|
|            DIY|              26147|        112587|         0.23...|
|           CITY|              28987|        112061|         0.25...|
|           COOK|              25161|        111561|         0.22...|
|            WFN|              24600|        111498|         0.22...|
|           FASH|              24800|        111320|         0.22...|
+---------------+-------------------+--------------+----------------+
only showing top 10 

#### Null-values: drop and fill
We can either _dropna()_, or _fillna()_ them.

* _dropna()_: takes three parameters:
    * _how_: takes _any_ or _all_ - if _any_ is selected, PySpark will drop records that contain at least one of the fields null. In case of _all_, onlye records where all fields are null are dropped.
    * _thresh_: default to _None_. If set, PysPark will ignore _how_ and drop records with less than the _thresh_ non-values.
    * _subset_: optional list of columns.

In [67]:
answer_no_null = answer.dropna(subset=["commercial_ratio"])

In [68]:
answer_no_null.orderBy(
    "commercial_ratio", ascending=False).show(1000, False)

+---------------+-------------------+--------------+---------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio     |
+---------------+-------------------+--------------+---------------------+
|CIMT           |775                |775           |1.0                  |
|TLNSP          |15480              |15480         |1.0                  |
|TELENO         |17790              |17790         |1.0                  |
|MSET           |2700               |2700          |1.0                  |
|HPITV          |13                 |13            |1.0                  |
|TANG           |8125               |8125          |1.0                  |
|MMAX           |23333              |23582         |0.9894410991434145   |
|MPLU           |20587              |20912         |0.9844586840091814   |
|INVST          |20094              |20470         |0.9816316560820714   |
|ZT�L�          |21542              |21965         |0.9807420896881403   |
|RAPT           |17916   

#### _fillna()_
takes two parameters:
 * _value_: PySpark only fills compatible columns
 * _subset_: same aprameter seen in _dropna_

In [69]:
answer_no_null = answer.fillna(0)

Using dicts

In [70]:
# Filling our numerical records with zero using the fillna() method and a dict
answer_no_null = answer.fillna(
{"duration_commercial": 0, "duration_total": 0, "commercial_ratio": 0}
)

## multiple chained fillna() achieves the same results

#### Exercises

In [137]:
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
            ),
                F.col("duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
        ).withColumn(
            "commercial_ratio", F.col("duration_commercial") /
            F.col("duration_total")
        )
    .fillna(0)
)

Write PySpark code that will return the result of the following code block without
using a left anti-join:


In [None]:
left.join(right, how="left_anti",
    on="my_column").select("my_column").distinct()

In [None]:
left.join(right,
    how='left',
    on=left['my_colum'] == right['my_column']).where(right['my_column'].isnull()
    ).select(left['my_column'])

Using the data from the data/broadcast_logs/Call_Signs.csv (careful: the delimiter
here is the comma, not the pipe!), add the Undertaking_Name to our final table to
display a human-readable description of the channel.

In [77]:
call_signs = spark.read.csv(
    os.path.join(DIRECTORY, "Call_Signs.csv"),
    header=True,
    inferSchema=True,
    sep=',')

In [78]:
call_signs.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- UndertakingNO: integer (nullable = true)
 |-- Undertaking_Name: string (nullable = true)



In [138]:
answer_name = answer.alias("left").join(
    call_signs.select(F.col("LogIdentifierID"),F.col("Undertaking_Name")).alias("right"),
    on="LogIdentifierID"
)

The government of Canada is asking for your analysis, but they’d like the PRC to be
weighted differently. They’d like each PRC second to be considered 0.75 commercial
seconds. Modify the program to account for this change.

In [139]:
PRC_vs_Commercial = (
F.when(
F.trim(F.col("ProgramClassCD")).isin(
["COM", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
),
F.col("duration_seconds"),
)
.when(
F.trim(F.col("ProgramClassCD")) == "PRC",
F.col("duration_seconds") * 0.75,
)
.otherwise(0)
)

In [140]:
exo5_6_df = (
full_log.groupby("LogIdentifierID")
.agg(
F.sum(PRC_vs_Commercial).alias("duration_commercial"),
F.sum("duration_seconds").alias("duration_total"),
)
.withColumn(
"commercial_ratio",
F.col("duration_commercial") / F.col("duration_total"),
)
)
exo5_6_df.orderBy("commercial_ratio", ascending=False).show(5, False)

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio  |
+---------------+-------------------+--------------+------------------+
|CIMT           |775.0              |775           |1.0               |
|HPITV          |13.0               |13            |1.0               |
|MSET           |2651.25            |2700          |0.9819444444444444|
|TELENO         |17291.25           |17790         |0.971964586846543 |
|TLNSP          |14872.5            |15480         |0.9607558139534884|
+---------------+-------------------+--------------+------------------+
only showing top 5 rows



On the data frame returned from commercials.py, return the number of channels in
each bucket based on their commercial_ratio. (Hint: look at the documentation for
round on how to round a value.)

In [142]:
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)

logs_and_channels = logs.join(log_identifier, "LogServiceID")

full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)


In [145]:
result = full_log.groupby("LogIdentifierID").agg(
    F.sum(
        F.when(
            F.trim(F.col("ProgramClassCD")).isin(
                ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
            ),
            F.col("duration_seconds"),
        ).otherwise(0)
    ).alias("duration_commercial"),
    F.sum("duration_seconds").alias("duration_total"),
).withColumn(
    "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
).orderBy(
    "commercial_ratio", ascending=False
)

In [146]:
result.show()

+---------------+-------------------+--------------+------------------+
|LogIdentifierID|duration_commercial|duration_total|  commercial_ratio|
+---------------+-------------------+--------------+------------------+
|           CIMT|                775|           775|               1.0|
|          TLNSP|              15480|         15480|               1.0|
|         TELENO|              17790|         17790|               1.0|
|           MSET|               2700|          2700|               1.0|
|          HPITV|                 13|            13|               1.0|
|           TANG|               8125|          8125|               1.0|
|           MMAX|              23333|         23582|0.9894410991434145|
|           MPLU|              20587|         20912|0.9844586840091814|
|          INVST|              20094|         20470|0.9816316560820714|
|          ZT�L�|              21542|         21965|0.9807420896881403|
|           RAPT|              17916|         18279|0.9801411455

In [150]:
result.groupby(F.round(F.col("commercial_ratio"),1).alias("commercial_ratio")).agg(F.count("*").alias("number_of_channels")).orderBy("commercial_ratio",ascending=False).show(5)

+----------------+------------------+
|commercial_ratio|number_of_channels|
+----------------+------------------+
|             1.0|                19|
|             0.9|                 5|
|             0.8|                 5|
|             0.5|                 2|
|             0.4|                 3|
+----------------+------------------+
only showing top 5 rows

