In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .appName('Chapter 05 Examples') \
    .getOrCreate()

spark

22/12/01 14:51:01 WARN Utils: Your hostname, karlos-300E5M-300E5L resolves to a loopback address: 127.0.1.1; using 10.0.0.89 instead (on interface wlp2s0)
22/12/01 14:51:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/01 14:51:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
logs = spark.read.csv(
   '../../data/broadcast_logs/BroadcastLogs_2018_Q3_M8.CSV',
   sep='|',
   header=True,
   inferSchema=True,
   timestampFormat='yyyy-MM-dd'
)

                                                                                

In [4]:
logs

DataFrame[BroadcastLogID: int, LogServiceID: int, LogDate: timestamp, SequenceNO: int, AudienceTargetAgeID: int, AudienceTargetEthnicID: int, CategoryID: int, ClosedCaptionID: int, CountryOfOriginID: int, DubDramaCreditID: int, EthnicProgramID: int, ProductionSourceID: int, ProgramClassID: int, FilmClassificationID: int, ExhibitionID: int, Duration: string, EndTime: string, LogEntryDate: timestamp, ProductionNO: string, ProgramTitle: string, StartTime: string, Subtitle: string, NetworkAffiliationID: int, SpecialAttentionID: int, BroadcastOriginPointID: int, CompositionID: int, Producer1: string, Producer2: string, Language1: int, Language2: int]

In [5]:
logs = logs.withColumn(
    'DurationInSeconds',
    F.col('Duration').substr(1, 2).cast('int') * 60 * 60  # For hours
    + F.col('Duration').substr(4, 2).cast('int') * 60  # For minutes
    + F.col('Duration').substr(7, 2).cast('int')  # For seconds
)

In [6]:
logs.select('Duration', 'DurationInSeconds') \
    .distinct() \
    .show()



+----------------+-----------------+
|        Duration|DurationInSeconds|
+----------------+-----------------+
|00:10:30.0000000|              630|
|00:25:52.0000000|             1552|
|00:28:08.0000000|             1688|
|06:00:00.0000000|            21600|
|00:32:08.0000000|             1928|
|00:33:52.0000000|             2032|
|01:59:30.0000000|             7170|
|00:56:32.0000000|             3392|
|00:44:15.0000000|             2655|
|00:31:00.0000000|             1860|
|00:12:04.0000000|              724|
|00:32:00.0000000|             1920|
|00:11:08.0000000|              668|
|00:13:56.0000000|              836|
|00:24:58.0000000|             1498|
|00:24:36.0000000|             1476|
|01:53:09.0000000|             6789|
|00:07:10.0000000|              430|
|00:18:23.0000000|             1103|
|00:38:31.0000000|             2311|
+----------------+-----------------+
only showing top 20 rows



                                                                                

In [7]:
logs_identifier = spark.read.csv(
    '../../data/broadcast_logs/ReferenceTables/LogIdentifier.csv',
    sep='|',
    header=True,
    inferSchema=True
)

In [8]:
logs_identifier

DataFrame[LogIdentifierID: string, LogServiceID: int, PrimaryFG: int]

In [9]:
## Filter for primary channels.
logs_identifier = logs_identifier.where(F.col('PrimaryFG') == 1)

In [10]:
cd_category = spark.read.csv(
    '../../data/broadcast_logs/ReferenceTables/CD_Category.csv',
    sep='|',
    header=True,
    inferSchema=True
).select(
    'CategoryID',
    'CategoryCD',
    F.col('EnglishDescription').alias('CategoryDescription')
)

In [11]:
cd_category

DataFrame[CategoryID: int, CategoryCD: string, CategoryDescription: string]

In [12]:
cd_program_class = spark.read.csv(
    '../../data/broadcast_logs/ReferenceTables/CD_ProgramClass.csv',
    sep='|',
    header=True,
    inferSchema=True
).select(
    'ProgramClassID',
    'ProgramClassCD',
    F.col('EnglishDescription').alias('ProgramClassDescription')
)

In [13]:
cd_program_class

DataFrame[ProgramClassID: int, ProgramClassCD: string, ProgramClassDescription: string]

In [14]:
logs_and_channels = logs.join(
    logs_identifier,
    on='LogServiceID',
    how='inner'
)

In [15]:
logs_and_channels

DataFrame[LogServiceID: int, BroadcastLogID: int, LogDate: timestamp, SequenceNO: int, AudienceTargetAgeID: int, AudienceTargetEthnicID: int, CategoryID: int, ClosedCaptionID: int, CountryOfOriginID: int, DubDramaCreditID: int, EthnicProgramID: int, ProductionSourceID: int, ProgramClassID: int, FilmClassificationID: int, ExhibitionID: int, Duration: string, EndTime: string, LogEntryDate: timestamp, ProductionNO: string, ProgramTitle: string, StartTime: string, Subtitle: string, NetworkAffiliationID: int, SpecialAttentionID: int, BroadcastOriginPointID: int, CompositionID: int, Producer1: string, Producer2: string, Language1: int, Language2: int, DurationInSeconds: int, LogIdentifierID: string, PrimaryFG: int]

In [16]:
full_logs = logs_and_channels.join(
    cd_category,
    on='CategoryID',
    how='left'
).join(
    cd_program_class,
    on='ProgramClassID',
    how='left'
)

In [17]:
full_logs

DataFrame[ProgramClassID: int, CategoryID: int, LogServiceID: int, BroadcastLogID: int, LogDate: timestamp, SequenceNO: int, AudienceTargetAgeID: int, AudienceTargetEthnicID: int, ClosedCaptionID: int, CountryOfOriginID: int, DubDramaCreditID: int, EthnicProgramID: int, ProductionSourceID: int, FilmClassificationID: int, ExhibitionID: int, Duration: string, EndTime: string, LogEntryDate: timestamp, ProductionNO: string, ProgramTitle: string, StartTime: string, Subtitle: string, NetworkAffiliationID: int, SpecialAttentionID: int, BroadcastOriginPointID: int, CompositionID: int, Producer1: string, Producer2: string, Language1: int, Language2: int, DurationInSeconds: int, LogIdentifierID: string, PrimaryFG: int, CategoryCD: string, CategoryDescription: string, ProgramClassCD: string, ProgramClassDescription: string]

#### Displaying the most popular types of programs according to the total duration.

In [18]:
full_logs.groupBy('ProgramClassCD', 'ProgramClassDescription') \
    .agg(F.sum('DurationInSeconds').alias('SumOfDurationInSeconds')) \
    .orderBy('SumOfDurationInSeconds', ascending=False) \
    .show(truncate=False)



+--------------+--------------------------------------+----------------------+
|ProgramClassCD|ProgramClassDescription               |SumOfDurationInSeconds|
+--------------+--------------------------------------+----------------------+
|PGR           |PROGRAM                               |652802250             |
|COM           |COMMERCIAL MESSAGE                    |106810189             |
|PFS           |PROGRAM FIRST SEGMENT                 |38817891              |
|SEG           |SEGMENT OF A PROGRAM                  |34891264              |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|27017583              |
|PGI           |PROGRAM INFOMERCIAL                   |23196392              |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |10213461              |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |4537071               |
|ID            |NETWORK IDENTIFICATION MESSAGE        |2179067               |
|NRN           |No recognized nationality           

                                                                                

#### Compute final answer: What are the channels with the greatest and least proportion of commercials?

In [22]:
commercial_codes = ['COM', 'PRC', 'PGI', 'PRO', 'PSA', 'MAG', 'LOC',
    'SPO', 'MER', 'SOL']

commercial_ratio_by_channel = full_logs.groupBy('LogIdentifierID') \
    .agg(
        F.sum(
            F.when(
                F.trim(F.col('ProgramClassCD')).isin(commercial_codes),
                F.col('DurationInSeconds')
            ).otherwise(0)
        ).alias('DurationCommercial'),
        F.sum('DurationInSeconds').alias('SumOfDurationInSeconds')
    ).withColumn(
        'CommercialRatio',
        F.col('DurationCommercial') / F.col('SumOfDurationInSeconds')
    ).orderBy('CommercialRatio', ascending=False) \

commercial_ratio_by_channel.show(1000, truncate=False)



+---------------+------------------+----------------------+---------------------+
|LogIdentifierID|DurationCommercial|SumOfDurationInSeconds|CommercialRatio      |
+---------------+------------------+----------------------+---------------------+
|HPITV          |403               |403                   |1.0                  |
|CIMT           |19935             |19935                 |1.0                  |
|TLNSP          |234455            |234455                |1.0                  |
|TELENO         |545255            |545255                |1.0                  |
|MSET           |101670            |101670                |1.0                  |
|TANG           |271468            |271468                |1.0                  |
|INVST          |623057            |633659                |0.9832686034602207   |
|ZT�L�          |669624            |682023                |0.9818202611935375   |
|CANALD         |660132            |673746                |0.9797935720583127   |
|ONEBMS         

                                                                                

#### Dropping only the records that have a null `commercial_ratio` value.

In [28]:
commercial_ratio_by_channel = commercial_ratio_by_channel.dropna(
    subset=['CommercialRatio']
)

commercial_ratio_by_channel.show(1000, truncate=False)

                                                                                

+---------------+------------------+----------------------+---------------------+
|LogIdentifierID|DurationCommercial|SumOfDurationInSeconds|CommercialRatio      |
+---------------+------------------+----------------------+---------------------+
|HPITV          |403               |403                   |1.0                  |
|CIMT           |19935             |19935                 |1.0                  |
|TLNSP          |234455            |234455                |1.0                  |
|TELENO         |545255            |545255                |1.0                  |
|MSET           |101670            |101670                |1.0                  |
|TANG           |271468            |271468                |1.0                  |
|INVST          |623057            |633659                |0.9832686034602207   |
|ZT�L�          |669624            |682023                |0.9818202611935375   |
|CANALD         |660132            |673746                |0.9797935720583127   |
|ONEBMS         