### DataFrame Joining and Grouping

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
import pandas as pd
import numpy as np

In [4]:
spark = SparkSession.builder.appName('Joining and Grouping').getOrCreate()

In [5]:
logs = (
    spark.read.csv('./DataAnalysisPythonPySpark/data/data/broadcast_logs/ReferenceTables/LogIdentifier.csv',
    sep="|", header=True, inferSchema=True)
    .where(F.col("PrimaryFG")==1)    
)

In [6]:
# printing schema of logs DF
logs.printSchema()

# Count the records where 
# primaryFG is 1
print("Number of Records: ", logs.count())

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)

Number of Records:  758


In [14]:
csvlogs = (
    spark.read.csv('./DataAnalysisPythonPySpark/data/data/broadcast_logs/BroadcastLogs_2018_Q3_M8_sample.csv', sep='|',
    header=True, inferSchema=True, timestampFormat='yyyy-MM-dd')
    .drop(*['BroadCastLogID', 'SequenceNo'])
    .select(
        F.col("*"),
        F.col("Duration").substr(1,2).cast("int").alias("dur_hours"),
        F.col("Duration").substr(4,2).cast("int").alias("dur_minutes"),
        F.col("Duration").substr(7,2).cast("int").alias("dur_seconds"),
    ).distinct()
)

csvlogs.select(F.col('LogServiceID')).show(5)

+------------+
|LogServiceID|
+------------+
|        3157|
|        3157|
|        3157|
|        3157|
|        3157|
+------------+
only showing top 5 rows



In [39]:
# use of join method.
full_log=(
    csvlogs.join(
        logs,
        on='LogServiceID',
        how='inner'
    ).select(
        F.col('ProgramClassCD'),
        F.col('LogServiceID')
        ,F.col('Subtitle')
        ,F.col('Language1')
        ,F.col('LogIdentifierID')
        ,F.col('PrimaryFG'),
        F.col('dur_seconds')
    )
)

full_log.select('*').where(F.col('Subtitle') != 'null').orderBy('Subtitle', ascending=False).show(5)


AnalysisException: Column 'ProgramClassCD' does not exist. Did you mean one of the following? [ProgramClassID, ProgramTitle, PrimaryFG, DubDramaCreditID, LogDate, LogServiceID, Producer1, Producer2, ProductionNO, CategoryID, CompositionID, Duration, EthnicProgramID, Language1, Language2, LogEntryDate, dur_hours, dur_minutes, ClosedCaptionID, EndTime, ExhibitionID, LogIdentifierID, StartTime, Subtitle, dur_seconds, ProductionSourceID, CountryOfOriginID, FilmClassificationID, NetworkAffiliationID, SpecialAttentionID, AudienceTargetAgeID, BroadcastOriginPointID, AudienceTargetEthnicID];
'Project ['ProgramClassCD, LogServiceID#527, Subtitle#547, Language1#554, LogIdentifierID#17, PrimaryFG#19, dur_seconds#617]
+- Project [LogServiceID#527, LogDate#528, AudienceTargetAgeID#530, AudienceTargetEthnicID#531, CategoryID#532, ClosedCaptionID#533, CountryOfOriginID#534, DubDramaCreditID#535, EthnicProgramID#536, ProductionSourceID#537, ProgramClassID#538, FilmClassificationID#539, ExhibitionID#540, Duration#541, EndTime#542, LogEntryDate#543, ProductionNO#544, ProgramTitle#545, StartTime#546, Subtitle#547, NetworkAffiliationID#548, SpecialAttentionID#549, BroadcastOriginPointID#550, CompositionID#551, ... 9 more fields]
   +- Join Inner, (LogServiceID#527 = LogServiceID#18)
      :- Deduplicate [NetworkAffiliationID#548, SpecialAttentionID#549, ProgramClassID#538, Producer2#553, Duration#541, ClosedCaptionID#533, CategoryID#532, EthnicProgramID#536, ProductionNO#544, CompositionID#551, Subtitle#547, LogEntryDate#543, StartTime#546, EndTime#542, LogDate#528, ExhibitionID#540, AudienceTargetEthnicID#531, Language1#554, dur_minutes#616, ProductionSourceID#537, CountryOfOriginID#534, dur_hours#615, ProgramTitle#545, BroadcastOriginPointID#550, ... 7 more fields]
      :  +- Project [LogServiceID#527, LogDate#528, AudienceTargetAgeID#530, AudienceTargetEthnicID#531, CategoryID#532, ClosedCaptionID#533, CountryOfOriginID#534, DubDramaCreditID#535, EthnicProgramID#536, ProductionSourceID#537, ProgramClassID#538, FilmClassificationID#539, ExhibitionID#540, Duration#541, EndTime#542, LogEntryDate#543, ProductionNO#544, ProgramTitle#545, StartTime#546, Subtitle#547, NetworkAffiliationID#548, SpecialAttentionID#549, BroadcastOriginPointID#550, CompositionID#551, ... 7 more fields]
      :     +- Project [LogServiceID#527, LogDate#528, AudienceTargetAgeID#530, AudienceTargetEthnicID#531, CategoryID#532, ClosedCaptionID#533, CountryOfOriginID#534, DubDramaCreditID#535, EthnicProgramID#536, ProductionSourceID#537, ProgramClassID#538, FilmClassificationID#539, ExhibitionID#540, Duration#541, EndTime#542, LogEntryDate#543, ProductionNO#544, ProgramTitle#545, StartTime#546, Subtitle#547, NetworkAffiliationID#548, SpecialAttentionID#549, BroadcastOriginPointID#550, CompositionID#551, ... 4 more fields]
      :        +- Relation [BroadcastLogID#526,LogServiceID#527,LogDate#528,SequenceNO#529,AudienceTargetAgeID#530,AudienceTargetEthnicID#531,CategoryID#532,ClosedCaptionID#533,CountryOfOriginID#534,DubDramaCreditID#535,EthnicProgramID#536,ProductionSourceID#537,ProgramClassID#538,FilmClassificationID#539,ExhibitionID#540,Duration#541,EndTime#542,LogEntryDate#543,ProductionNO#544,ProgramTitle#545,StartTime#546,Subtitle#547,NetworkAffiliationID#548,SpecialAttentionID#549,... 6 more fields] csv
      +- Filter (PrimaryFG#19 = 1)
         +- Relation [LogIdentifierID#17,LogServiceID#18,PrimaryFG#19] csv


#### Join cases
- inner join
- left_outer
- right_outer
- outer
- full
- full_outer
- left_semi
- left_anti
- cross

In [9]:
cdcat = spark.read.csv('./DataAnalysisPythonPySpark/data/data/broadcast_logs/ReferenceTables/CD_Category.csv',
    sep='|', header=True, inferSchema=True
    ).select(
        "CategoryID", "CategoryCD", 
        F.col('EnglishDescription').alias('Category_Description')
    )

cdprog = spark.read.csv('./DataAnalysisPythonPySpark/data/data/broadcast_logs/ReferenceTables/CD_ProgramClass.csv',
    sep="|", header=True, inferSchema=True
).select(
    "ProgramClassID", "ProgramClassCD",
    F.col("EnglishDescription").alias('ProgramClass_Desciption')
)

In [10]:
print("CD Category: \n", cdcat.columns)
cdcat.select('*').show(5, False)
print("CD programm: \n",cdprog.columns)
cdprog.select('*').show(5, False)

CD Category: 
 ['CategoryID', 'CategoryCD', 'Category_Description']
+----------+----------+---------------------------+
|CategoryID|CategoryCD|Category_Description       |
+----------+----------+---------------------------+
|1         |010       |NEWS                       |
|2         |02        |CANREC  ANALYSIS (old)     |
|3         |02A       |ANALYSIS AND INTERPRETATION|
|4         |02B       |LONG-FORM DOCUMENTARY      |
|5         |030       |REPORTING & ACTUALITIES    |
+----------+----------+---------------------------+
only showing top 5 rows

CD programm: 
 ['ProgramClassID', 'ProgramClassCD', 'ProgramClass_Desciption']
+--------------+--------------+-----------------------+
|ProgramClassID|ProgramClassCD|ProgramClass_Desciption|
+--------------+--------------+-----------------------+
|1             |AUT           |AUTOPROMOTION          |
|2             |BAL           |BALANCE PROGRAMMING    |
|3             |COM           |COMMERCIAL MESSAGE     |
|4             |COR     

In [12]:
(cdcat.join(cdprog,on=cdcat['CategoryID']==cdprog['ProgramClassID'], how='inner').select(
        F.col('CategoryID').alias('ID'),
        F.col('ProgramClassID'),
        F.col('CategoryCD'),
        F.col('Category_Description'),
        F.col('ProgramClassCD'),
        F.col('ProgramClass_Desciption'),

    ) 
    .groupBy("ProgramClassCD", "ProgramClass_Desciption")
    .agg(F.sum('ID').alias('ID sum'))
    .orderBy('ID sum', ascending=False)
    .show(5,truncate=False)
)

+--------------+-------------------------------------+------+
|ProgramClassCD|ProgramClass_Desciption              |ID sum|
+--------------+-------------------------------------+------+
|MOS           |Mosaic                               |30    |
|SPO           |SPONSORSHIP MESSAGE                  |29    |
|SOL           |SOLICITATION MESSAGE                 |28    |
|SO            |MAY IDENTIFY THE SIGN ON\OFF OF A DAY|27    |
|SEG           |SEGMENT OF A PROGRAM                 |26    |
+--------------+-------------------------------------+------+
only showing top 5 rows



In [34]:
full_log.columns

AttributeError: 'NoneType' object has no attribute 'columns'

In [37]:
answer = (
    full_log.groupby('LogIdentifierID')
    .agg(
        F.sum(
            F.when(
                F.trim(F.col('ProgramClassCD')).isin(
                    ['COM', "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
                ),
                F.col('dur_seconds')
            ).otherwise(0)
        ).alias('duration_commercial'),
        F.sum('dur_seconds').alias('duration_total'),
    )
    .withColumn('commercial_ratio', F.col('duration_commercial')/F.col('duration_total'))
)

answer.orderBy('commercial_ratio', ascending=False).show(5,)

AnalysisException: Column 'ProgramClassCD' does not exist. Did you mean one of the following? [PrimaryFG, LogServiceID, Language1, LogIdentifierID, Subtitle, dur_seconds];
'Aggregate [LogIdentifierID#17], [LogIdentifierID#17, sum(CASE WHEN trim('ProgramClassCD, None) IN (COM,PRC,PGI,PRO,LOC,SPO,MER,SOL) THEN dur_seconds#617 ELSE 0 END) AS duration_commercial#1927, sum(dur_seconds#617) AS duration_total#1929L]
+- Project [LogServiceID#527, Subtitle#547, Language1#554, LogIdentifierID#17, PrimaryFG#19, dur_seconds#617]
   +- Project [LogServiceID#527, LogDate#528, AudienceTargetAgeID#530, AudienceTargetEthnicID#531, CategoryID#532, ClosedCaptionID#533, CountryOfOriginID#534, DubDramaCreditID#535, EthnicProgramID#536, ProductionSourceID#537, ProgramClassID#538, FilmClassificationID#539, ExhibitionID#540, Duration#541, EndTime#542, LogEntryDate#543, ProductionNO#544, ProgramTitle#545, StartTime#546, Subtitle#547, NetworkAffiliationID#548, SpecialAttentionID#549, BroadcastOriginPointID#550, CompositionID#551, ... 9 more fields]
      +- Join Inner, (LogServiceID#527 = LogServiceID#18)
         :- Deduplicate [NetworkAffiliationID#548, SpecialAttentionID#549, ProgramClassID#538, Producer2#553, Duration#541, ClosedCaptionID#533, CategoryID#532, EthnicProgramID#536, ProductionNO#544, CompositionID#551, Subtitle#547, LogEntryDate#543, StartTime#546, EndTime#542, LogDate#528, ExhibitionID#540, AudienceTargetEthnicID#531, Language1#554, dur_minutes#616, ProductionSourceID#537, CountryOfOriginID#534, dur_hours#615, ProgramTitle#545, BroadcastOriginPointID#550, ... 7 more fields]
         :  +- Project [LogServiceID#527, LogDate#528, AudienceTargetAgeID#530, AudienceTargetEthnicID#531, CategoryID#532, ClosedCaptionID#533, CountryOfOriginID#534, DubDramaCreditID#535, EthnicProgramID#536, ProductionSourceID#537, ProgramClassID#538, FilmClassificationID#539, ExhibitionID#540, Duration#541, EndTime#542, LogEntryDate#543, ProductionNO#544, ProgramTitle#545, StartTime#546, Subtitle#547, NetworkAffiliationID#548, SpecialAttentionID#549, BroadcastOriginPointID#550, CompositionID#551, ... 7 more fields]
         :     +- Project [LogServiceID#527, LogDate#528, AudienceTargetAgeID#530, AudienceTargetEthnicID#531, CategoryID#532, ClosedCaptionID#533, CountryOfOriginID#534, DubDramaCreditID#535, EthnicProgramID#536, ProductionSourceID#537, ProgramClassID#538, FilmClassificationID#539, ExhibitionID#540, Duration#541, EndTime#542, LogEntryDate#543, ProductionNO#544, ProgramTitle#545, StartTime#546, Subtitle#547, NetworkAffiliationID#548, SpecialAttentionID#549, BroadcastOriginPointID#550, CompositionID#551, ... 4 more fields]
         :        +- Relation [BroadcastLogID#526,LogServiceID#527,LogDate#528,SequenceNO#529,AudienceTargetAgeID#530,AudienceTargetEthnicID#531,CategoryID#532,ClosedCaptionID#533,CountryOfOriginID#534,DubDramaCreditID#535,EthnicProgramID#536,ProductionSourceID#537,ProgramClassID#538,FilmClassificationID#539,ExhibitionID#540,Duration#541,EndTime#542,LogEntryDate#543,ProductionNO#544,ProgramTitle#545,StartTime#546,Subtitle#547,NetworkAffiliationID#548,SpecialAttentionID#549,... 6 more fields] csv
         +- Filter (PrimaryFG#19 = 1)
            +- Relation [LogIdentifierID#17,LogServiceID#18,PrimaryFG#19] csv
