# 5 Data frame gymnastics: Joining and grouping

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Create a SparkSession and set the log level to CRITICAL

spark = (
    SparkSession.builder
    .appName("chapter_5")
    .master("local[*]")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

# Path: DataAnalysisWithPythonAndPySpark/code/Ch05/chapter_5.ipynb

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/19 08:08:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
import os
DIRECTORY = '../../../DataAnalysisWithPythonAndPySpark-Data/broadcast_logs'

logs = (
    spark.read.csv(
        os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8.CSV"),
        sep="|",
        header=True,
        inferSchema=True,
        timestampFormat="yyyy-MM-dd",
    )
    .drop("BroadcastLogID", "SequenceNO")
    .withColumn(
        "duration_seconds",
        (
            F.col("Duration").substr(1, 2).cast("int") * 60 * 60
            + F.col("Duration").substr(4, 2).cast("int") * 60
            + F.col("Duration").substr(7, 2).cast("int")
        ),
    )
)


                                                                                

In [7]:
logs.show(5)

23/07/07 08:08:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration_seconds|
+------------+----------+-------------------+----------------------+----------+---------

In [8]:
log_identifier = (spark.read
                  .csv(os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"), sep="|", header=True, inferSchema=True))

log_identifier.show(5)
log_identifier.printSchema()

+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



In [9]:
log_identifier = log_identifier.filter(F.col("PrimaryFG") == 1)
print(log_identifier.count())

log_identifier.show(5)

758
+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows



In [10]:
logs.join(log_identifier, on="LogServiceID", how="inner").show(5)

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+---------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration_seconds|LogIdentifierID|PrimaryFG|
+------------+----------+-----------

A left (also called a left outer) join will add the unmatched records from the left table in the joined table, filling the columns coming from the right table with null.

In [7]:
logs.join(log_identifier, on="LogServiceID", how="left").show(5)

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+---------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration_seconds|LogIdentifierID|PrimaryFG|
+------------+----------+-----------

A left semi-join (how="left_semi") is the same as an inner join, but keeps the columns in the left table. It also won’t duplicate the records in the left table if they fulfill the predicate with more than one record in the right table. Its main purpose is to filter records from a table based on a predicate that is depending on another table.

In [8]:
logs.join(log_identifier, on="LogServiceID", how="left_semi").show(5)

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration_seconds|
+------------+----------+-------------------+----------------------+----------+---------

A left anti-join (how="left_anti") is the opposite of an inner join. It will keep only the records from the left table that do not match the predicate with any record in the right table. If a record from the left table matches a record from the right table, it gets dropped from the join operation.

In [11]:
log_identifier.join(logs, on="LogServiceID", how="left_anti").show(5)

[Stage 25:>                                                         (0 + 1) / 1]

+------------+---------------+---------+
|LogServiceID|LogIdentifierID|PrimaryFG|
+------------+---------------+---------+
|        3158|         9DAPTN|        1|
|        3159|         9DCFCF|        1|
|        3160|         9DCFRN|        1|
|        3161|         9DCHRO|        1|
|        3162|         9DCIVI|        1|
+------------+---------------+---------+
only showing top 5 rows



                                                                                

A full outer (how="outer", how="full", or how="full_outer") join is simply the fusion of a left and right join. It will add the unmatched records from the left and the right table, padding with null. It serves a similar purpose to the left and right join but is not as popular since you’ll generally have one (and only one) anchor table where you want to preserve all records.

In [11]:
logs_and_channels = logs.join(
    log_identifier, 
    on="LogServiceID", 
    how="inner"
    )

A cross join (how="cross") is the nuclear option. It returns a record for every record pair, regardless of the value the predicates return. 

5.1.5 Naming conventions in the joining world

In [12]:
logs_and_channels.show(5)

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+---------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration_seconds|LogIdentifierID|PrimaryFG|
+------------+----------+-----------

In [14]:
logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)

In [17]:
logs_and_channels_verbose.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [22]:
from pyspark.sql.utils import AnalysisException

try:
    logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
    print(err)

[AMBIGUOUS_REFERENCE] Reference `LogServiceID` is ambiguous, could be: [`LogServiceID`, `LogServiceID`].


First, when performing an equi-join, I prefer using the simplified syntax, since it takes care of removing the second instance of the predicate column. This only works when using an equality comparison, since the data is identical in both columns from the predicate, which prevents information loss.

In [13]:
logs_and_channels = logs.join(log_identifier, "LogServiceID")
logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

The second approach relies on the fact that PySpark-joined data frames remember the origin of the columns. Because of this, we can refer to the LogServiceID columns using the same nomenclature as before (i.e., log_identifier["LogServiceID"]). We can then rename this column or delete it, and thus solve our issue. 

In [25]:
logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)

logs_and_channels_verbose.drop(log_identifier["LogServiceID"]).select(["LogServiceID"])

DataFrame[LogServiceID: int]

The last approach is convenient if you use the Column object directly. PySpark will not resolve the origin name when you rely on F.col() to work with columns. To solve this in the most general way, we need to alias() our tables when performing the join, as shown in the following listing.

In [28]:
logs_and_channels_verbose = logs.alias("left").join(
    log_identifier.alias("right"),
    logs["LogServiceID"] == log_identifier["LogServiceID"],
)
 
logs_and_channels_verbose.drop(F.col("right.LogServiceID")).select(
    "LogServiceID"
)

DataFrame[LogServiceID: int]

In [14]:
DIRECTORY = '../../../DataAnalysisWithPythonAndPySpark-Data/broadcast_logs'

cd_catergory = spark.read.csv(
        os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
        sep="|",
        header=True,
        inferSchema=True,
    ).select("CategoryID",
             "CategoryCD",
             F.col("EnglishDescription").alias("Category_Description")
    )

print(cd_catergory.show(3))

cd_program_class = spark.read.csv(
        os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
        sep="|",
        header=True,
        inferSchema=True,
    ).select(
        "ProgramClassID",
        "ProgramClassCD",
        F.col("EnglishDescription").alias("ProgramClass_Description"),
    )

print(cd_program_class.show(3))

full_log = logs_and_channels.join(cd_catergory, on="CategoryID", how="left").join(
    cd_program_class, on="ProgramClassID", how="left"
)

full_log.show(3)

+----------+----------+--------------------+
|CategoryID|CategoryCD|Category_Description|
+----------+----------+--------------------+
|         1|       010|                NEWS|
|         2|       02 |CANREC  ANALYSIS ...|
|         3|       02A|ANALYSIS AND INTE...|
+----------+----------+--------------------+
only showing top 3 rows

None
+--------------+--------------+------------------------+
|ProgramClassID|ProgramClassCD|ProgramClass_Description|
+--------------+--------------+------------------------+
|             1|          AUT |           AUTOPROMOTION|
|             2|          BAL |     BALANCE PROGRAMMING|
|             3|          COM |      COMMERCIAL MESSAGE|
+--------------+--------------+------------------------+
only showing top 3 rows

None
+--------------+----------+------------+----------+-------------------+----------------------+---------------+-----------------+----------------+---------------+------------------+--------------------+------------+------------

In [15]:
full_log.printSchema()

root
 |-- ProgramClassID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

5.2 Summarizing the data via groupby and GroupedData

In [16]:
(
    full_log.groupBy("ProgramClassCD", "ProgramClass_Description")
    .agg(F.sum("duration_seconds").alias("total_duration_seconds"))
    .orderBy(F.desc("total_duration_seconds"))
    .show(100, truncate=False)
)



+--------------+--------------------------------------+----------------------+
|ProgramClassCD|ProgramClass_Description              |total_duration_seconds|
+--------------+--------------------------------------+----------------------+
|PGR           |PROGRAM                               |652802250             |
|COM           |COMMERCIAL MESSAGE                    |106810189             |
|PFS           |PROGRAM FIRST SEGMENT                 |38817891              |
|SEG           |SEGMENT OF A PROGRAM                  |34891264              |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|27017583              |
|PGI           |PROGRAM INFOMERCIAL                   |23196392              |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |10213461              |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |4537071               |
|ID            |NETWORK IDENTIFICATION MESSAGE        |2179067               |
|NRN           |No recognized nationality           

                                                                                

AGGREGATING FOR THE LAZY
agg() also accepts a dictionary in the form {column_name: aggregation_function} where both are string.

In [40]:
# use agg with a dictionary
(
    full_log.groupBy("ProgramClassCD", "ProgramClass_Description")
    .agg({"duration_seconds": "sum"}).withColumnRenamed("sum(duration_seconds)", "total_duration_seconds")
    .orderBy(F.desc("sum(duration_seconds)"))
    .show(5, truncate=False)
)



+--------------+--------------------------------------+----------------------+
|ProgramClassCD|ProgramClass_Description              |total_duration_seconds|
+--------------+--------------------------------------+----------------------+
|PGR           |PROGRAM                               |652802250             |
|COM           |COMMERCIAL MESSAGE                    |106810189             |
|PFS           |PROGRAM FIRST SEGMENT                 |38817891              |
|SEG           |SEGMENT OF A PROGRAM                  |34891264              |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|27017583              |
+--------------+--------------------------------------+----------------------+
only showing top 5 rows



                                                                                

5.2.2 A column is a column: Using agg() with custom column definitions

In [42]:
F.when(
    F.trim(F.col("ProgramClassCD")).isin(
        ["COM", "PRC", "PGI", "PRO", "PSA", "MAG", "LOC", "SPO", "MER", "SOL"]
    ),
    F.col("duration_seconds"),
).otherwise(0)

Column<'CASE WHEN (trim(ProgramClassCD) IN (COM, PRC, PGI, PRO, PSA, MAG, LOC, SPO, MER, SOL)) THEN duration_seconds ELSE 0 END'>

In [17]:
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col("duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.round(F.col(
            "duration_commercial") / F.col("duration_total"), 2)
    )
)

answer.orderBy("commercial_ratio", ascending=False).show(1000, False)

                                                                                

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|CIMT           |19935              |19935         |1.0             |
|MSET           |101670             |101670        |1.0             |
|TLNSP          |234455             |234455        |1.0             |
|TELENO         |545255             |545255        |1.0             |
|TANG           |271468             |271468        |1.0             |
|HPITV          |403                |403           |1.0             |
|CANALD         |660132             |673746        |0.98            |
|ZT�L�          |669624             |682023        |0.98            |
|CANALVIE       |639073             |655108        |0.98            |
|MMAX           |701438             |719026        |0.98            |
|INVST          |623057             |633659        |0.98            |
|ONEBMS         |563

5.3 Taking care of null values: Drop and fil

5.3.1 Dropping it like it’s hot: Using dropna() to remove records with null values

dropna() is pretty easy to use. This data frame method takes three parameters:

how, which can take the value any or all. If any is selected, PySpark will drop records where at least one of the fields is null. In the case of all, only the records where all fields are null will be removed. By default, PySpark will take the any mode.
thresh takes an integer value. If set (its default is None), PySpark will ignore the how parameter and only drop the records with less than thresh non-null values.
subset will take an optional list of columns that dropna() will use to make its decision.

In [18]:
answer_no_null = answer.dropna(subset=["commercial_ratio"])

answer_no_null.orderBy("commercial_ratio", ascending=False).show(1000, False)

# answer_no_null.orderBy(F.desc("commercial_ratio")).show(1000, False)

# answer_no_null.orderBy(F.col("commercial_ratio").desc()).show(1000, False)

# answer_no_null.orderBy(F.col("commercial_ratio").asc()).show(1000, False)

# answer_no_null.orderBy(F.asc("commercial_ratio")).show(1000, False)

print(answer_no_null.count())

                                                                                

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|CIMT           |19935              |19935         |1.0             |
|MSET           |101670             |101670        |1.0             |
|TLNSP          |234455             |234455        |1.0             |
|TELENO         |545255             |545255        |1.0             |
|TANG           |271468             |271468        |1.0             |
|HPITV          |403                |403           |1.0             |
|CANALD         |660132             |673746        |0.98            |
|ZT�L�          |669624             |682023        |0.98            |
|CANALVIE       |639073             |655108        |0.98            |
|MMAX           |701438             |719026        |0.98            |
|INVST          |623057             |633659        |0.98            |
|ONEBMS         |563



322


                                                                                

5.3.2 Filling values to our heart’s content using fillna()

fillna() is even simpler than dropna(). This data frame method takes two parameters:

The value, which is a Python int, float, string, or bool. PySpark will only fill the compatible columns; for instance, if we were to fillna("zero"), our commercial_ratio, being a double, would not be filled.
The same subset parameter we encountered in dropna(). We can limit the scope of our filling to only the columns we want.

In [26]:
answer_no_null = answer.fillna(0, subset=["commercial_ratio"])

answer_no_null.orderBy("commercial_ratio", ascending=False).show(1000, False)

print('\n', answer_no_null.count())

                                                                                

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|CIMT           |19935              |19935         |1.0             |
|MSET           |101670             |101670        |1.0             |
|TLNSP          |234455             |234455        |1.0             |
|TELENO         |545255             |545255        |1.0             |
|TANG           |271468             |271468        |1.0             |
|HPITV          |403                |403           |1.0             |
|CANALD         |660132             |673746        |0.98            |
|ZT�L�          |669624             |682023        |0.98            |
|CANALVIE       |639073             |655108        |0.98            |
|MMAX           |701438             |719026        |0.98            |
|INVST          |623057             |633659        |0.98            |
|ONEBMS         |563




 324


                                                                                

You can also pass a dict to the fillna() method, with the column names as key and the values as dict values. If we were to use this method for our filling, the code would be like the following code:

In [None]:
# Filling our numerical records with zero using the fillna() method and a dictionary
answer_no_null = answer.fillna({"commercial_ratio": 0, "duration_commercial": 0, "duration_total": 0})

Final code that we can spark-submit:

In [31]:
import os
 
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName(
    "Getting the Canadian TV channels with the highest/lowest proportion of commercials."
).getOrCreate()
 
spark.sparkContext.setLogLevel("WARN")
 
# Reading all the relevant data sources
 
DIRECTORY = "../../../DataAnalysisWithPythonAndPySpark-Data/broadcast_logs"
 
logs = spark.read.csv(
    os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd",
)
 
log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)
cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)
 
cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)
 
# Data processing
 
logs = logs.drop("BroadcastLogID", "SequenceNO")
 
logs = logs.withColumn(
    "duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)
 
log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)
 
logs_and_channels = logs.join(log_identifier, "LogServiceID")
 
full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)
 
answer = (
    full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
            F.when(
                F.trim(F.col("ProgramClassCD")).isin(
                    ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col("duration_seconds"),
            ).otherwise(0)
        ).alias("duration_commercial"),
        F.sum("duration_seconds").alias("duration_total"),
    )
    .withColumn(
        "commercial_ratio", F.round(F.col("duration_commercial") / F.col("duration_total"), 2)
    )
    .fillna(0)
)
 
answer.orderBy("commercial_ratio", ascending=False).show(1000, False)



+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|CIMT           |19935              |19935         |1.0             |
|MSET           |101670             |101670        |1.0             |
|TLNSP          |234455             |234455        |1.0             |
|TELENO         |545255             |545255        |1.0             |
|TANG           |271468             |271468        |1.0             |
|HPITV          |403                |403           |1.0             |
|CANALD         |660132             |673746        |0.98            |
|ZT�L�          |669624             |682023        |0.98            |
|CANALVIE       |639073             |655108        |0.98            |
|MMAX           |701438             |719026        |0.98            |
|INVST          |623057             |633659        |0.98            |
|ONEBMS         |563

                                                                                