In [33]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName('Joins Exercise').getOrCreate()

logs = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/BroadcastLogs_2018_Q3_M8_sample.CSV",
                       sep = "|",
                       header=True,
                       inferSchema = True,
                       timestampFormat = "yyyy-MM-dd"
                       )

logs.printSchema()
logs.select(F.col("Duration")).show(10,False)

log_identifier = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/ReferenceTables/LogIdentifier.csv",
                                   sep = "|",
                                   header = True,
                                   inferSchema = True,
                                   
                               )

log_identifier.printSchema()



                                                                                

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable 

In [17]:
#JOINS

#Equi inner join because same keys is used and in such cases pyspark does not include duplicate columns
joined_data_inner = logs.join(log_identifier,
         on = "LogServiceID",
         how = "inner")

joined_data_inner.printSchema()

#If joined using condition then there are two occurences of keys. One from left table and one from right table

#Method 1
logs_and_channels_verbose = logs.join(
log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels_verbose.drop(log_identifier["LogServiceID"]).select(
"LogServiceID")

#Method 2 
logs_and_channels_verbose = logs.alias("left").join(
log_identifier.alias("right"),
logs["LogServiceID"] == log_identifier["LogServiceID"],
)
logs_and_channels_verbose.drop(F.col("right.LogServiceID")).select(
"LogServiceID"
)


root
 |-- LogServiceID: integer (nullable = true)
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable 

DataFrame[LogServiceID: int]

In [38]:
cd_category = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/ReferenceTables/CD_Category.csv",
sep="|",
header=True,
inferSchema=True,
).select(
"CategoryID",
"CategoryCD",
F.col("EnglishDescription").alias("Category_Description")
)
cd_category.printSchema()

cd_program_class = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/ReferenceTables/CD_ProgramClass.csv",
sep="|",
header=True,
inferSchema=True,
).select(
"ProgramClassID",
"ProgramClassCD",
F.col("EnglishDescription").alias("ProgramClass_Description"),
)
cd_program_class.printSchema()


joined_data_inner = joined_data_inner.join(cd_category, "CategoryID", how="left").join(
cd_program_class, "ProgramClassID", how="left"
)

full_log.printSchema()

(full_log
.groupby("ProgramClassCD", "ProgramClass_Description")
.agg(F.sum("SequenceNO").alias("duration_total"))
.orderBy("duration_total", ascending=False).show(100, False)
)

root
 |-- CategoryID: integer (nullable = true)
 |-- CategoryCD: string (nullable = true)
 |-- Category_Description: string (nullable = true)

root
 |-- ProgramClassID: integer (nullable = true)
 |-- ProgramClassCD: string (nullable = true)
 |-- ProgramClass_Description: string (nullable = true)

root
 |-- ProgramClassID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (n



+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|COM           |COMMERCIAL MESSAGE                    |112916785     |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|29053568      |
|PGR           |PROGRAM                               |5773262       |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |5007055       |
|ID            |NETWORK IDENTIFICATION MESSAGE        |3947667       |
|NRN           |No recognized nationality             |1244744       |
|SEG           |SEGMENT OF A PROGRAM                  |983220        |
|PSA           |PUBLIC SERVICE ANNOUNCEMENT           |668888        |
|PFS           |PROGRAM FIRST SEGMENT                 |438362        |
|PGI           |PROGRAM INFOMERCIAL                   |301355        |
|LOC           |LOCAL ADVERTISING                     |227776        |
|SPO  

                                                                                

In [40]:
#Lazy Aggregation
full_log.groupby("ProgramClassCD", "ProgramClass_Description").agg(
{"SequenceNO": "sum"}
).withColumnRenamed("sum(SequenceNO)", "duration_total").orderBy(
"duration_total", ascending=False
).show(
100, False
)

+--------------+--------------------------------------+--------------+
|ProgramClassCD|ProgramClass_Description              |duration_total|
+--------------+--------------------------------------+--------------+
|COM           |COMMERCIAL MESSAGE                    |112916785     |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|29053568      |
|PGR           |PROGRAM                               |5773262       |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |5007055       |
|ID            |NETWORK IDENTIFICATION MESSAGE        |3947667       |
|NRN           |No recognized nationality             |1244744       |
|SEG           |SEGMENT OF A PROGRAM                  |983220        |
|PSA           |PUBLIC SERVICE ANNOUNCEMENT           |668888        |
|PFS           |PROGRAM FIRST SEGMENT                 |438362        |
|PGI           |PROGRAM INFOMERCIAL                   |301355        |
|LOC           |LOCAL ADVERTISING                     |227776        |
|SPO  

                                                                                

In [43]:
#When the field of the column ProgramClass, trimmed of spaces at the beginning and
# end of the field, is in our list of commercial codes, then take the value of the field in the
# column duration_seconds. Otherwise, use zero as a value.

answer = (
full_log.groupby("LogIdentifierID") #Grouped Data Object. It is transitional object 
.agg(
    F.sum(
        F.when(
                F.trim(F.col("ProgramClassCD")).isin(["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]),
            F.col("SequenceNO"),
        ).otherwise(0)
    ).alias("duration_commercial"),
    F.sum("SequenceNO").alias("duration_total"),
)
.withColumn(
    "commercial_ratio", F.col(
    "duration_commercial") / F.col("duration_total")
)
)
answer.orderBy("commercial_ratio", ascending=False).show(1000, False)



+---------------+-------------------+--------------+-------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio   |
+---------------+-------------------+--------------+-------------------+
|CI             |599142             |608856        |0.9840454885884347 |
|13ST           |599142             |608856        |0.9840454885884347 |
|SPACE          |629395             |642411        |0.9797388276352678 |
|OWN            |661124             |675703        |0.9784239525353595 |
|CLT            |661124             |675703        |0.9784239525353595 |
|VIVA           |661124             |675703        |0.9784239525353595 |
|ADIK           |584729             |597871        |0.9780186695792236 |
|MYST           |584729             |597871        |0.9780186695792236 |
|TVADIK         |584729             |597871        |0.9780186695792236 |
|CAVE           |630064             |644680        |0.9773282869020289 |
|MENTV          |630064             |644680        

                                                                                

In [48]:
#NULL Handling
answer_no_null = answer.dropna(subset=["commercial_ratio"])
answer_no_null.orderBy(
"commercial_ratio", ascending=False).show(1000, False)

answer_no_null = answer.fillna(0)
answer_no_null.orderBy(
"commercial_ratio", ascending=False).show(1000, False)


answer_no_null = answer.fillna(
{"duration_commercial": 0, "duration_total": 0, "commercial_ratio": 0}
)


+---------------+-------------------+--------------+-------------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio   |
+---------------+-------------------+--------------+-------------------+
|CI             |599142             |608856        |0.9840454885884347 |
|13ST           |599142             |608856        |0.9840454885884347 |
|SPACE          |629395             |642411        |0.9797388276352678 |
|OWN            |661124             |675703        |0.9784239525353595 |
|CLT            |661124             |675703        |0.9784239525353595 |
|VIVA           |661124             |675703        |0.9784239525353595 |
|ADIK           |584729             |597871        |0.9780186695792236 |
|MYST           |584729             |597871        |0.9780186695792236 |
|TVADIK         |584729             |597871        |0.9780186695792236 |
|CAVE           |630064             |644680        |0.9773282869020289 |
|MENTV          |630064             |644680        

In [22]:
#Full Program
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from IPython.display import display, HTML

spark = (SparkSession.builder.appName("Getting the Canadian TV channels with the highest/lowest proportion of commercials.").getOrCreate())

spark.sparkContext.setLogLevel("WARN")

# Reading all the relevant data sources
logs = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/BroadcastLogs_2018_Q3_M8_sample.CSV",
                        sep="|",
                        header=True,
                        inferSchema=True,
                     )

log_identifier = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/ReferenceTables/LogIdentifier.csv",
                                sep="|",
                                header=True,
                                inferSchema=True,
                                )
print(log_identifier.printSchema())
print(log_identifier.show(5))

cd_category = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/ReferenceTables/CD_Category.csv",
                            sep="|",
                            header=True,
                            inferSchema=True,
                            ).select(
                            "CategoryID",
                            "CategoryCD",
                            F.col("EnglishDescription").alias("Category_Description"),
                            )

cd_program_class = spark.read.csv("data/DataAnalysisWithPythonAndPySpark-Data-trunk/broadcast_logs/ReferenceTables/CD_ProgramClass.csv",
                                    sep="|",
                                    header=True,
                                    inferSchema=True,
                                    ).select(
                                            "ProgramClassID",
                                            "ProgramClassCD",
                                            F.col("EnglishDescription").alias("ProgramClass_Description"),
                                            )

# Data processing
logs = logs.drop("BroadcastLogID", "SequenceNO")
logs = logs.withColumn(
"duration_seconds",
        (
        F.col("Duration").substr(12, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(15, 2).cast("int") * 60
        + F.col("Duration").substr(18, 2).cast("int")
        ),
)

print(logs.select(F.col("duration_seconds")).show())
print(logs.show(truncate=True, vertical=True))

log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)
logs_and_channels = logs.join(log_identifier, "LogServiceID")

full_log = logs_and_channels.join(cd_category, "CategoryID",
how="left").join(
cd_program_class, "ProgramClassID", how="left"
)

#display(full_log.show())
full_log.show(n=20, truncate=True, vertical=True)

answer = (
full_log.groupby("LogIdentifierID")
    .agg(
        F.sum(
                F.when(
                        F.trim(F.col("ProgramClassCD")).isin(["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]),
                        F.col("duration_seconds"),
                      ).otherwise(0)
             ).alias("duration_commercial"),
F.sum("duration_seconds").alias("duration_total"),
).withColumn(
"commercial_ratio", F.col("duration_commercial") /F.col("duration_total")).fillna(0)
)


answer.orderBy("commercial_ratio", ascending=False).show(1000, False)

                                                                                

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)

None
+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows

None
+----------------+
|duration_seconds|
+----------------+
|            7200|
|              30|
|              15|
|              15|
|              15|
|              15|
|              30|
|              15|
|              15|
|              15|
|              15|
|              30|
|              30|
|              15|
|              15|
|              15|
|              15|
|              15|
|              60|
|              15|
+----------------+
