In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os
import pandas as pd

In [2]:
spark = (
    SparkSession.builder.
    appName("Working with tabular data").
    getOrCreate())

In [3]:
PATH = r"C:\Users\oluwa\Documents\broadcast_logs"

In [4]:
logs = spark.read.csv(os.path.join(PATH, 'BroadcastLogs_2018_Q3_M8.csv'),
                      header=True,
                      sep="|",
                      inferSchema=True,
                      timestampFormat="yyyy-MM-dd"
                     )

In [5]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: timestamp (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: timestamp (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string 

In [None]:
logs.columns

In [None]:
logs.select('BroadcastLogID',
 'LogServiceID',
 'LogDate').show(5, truncate=False)

In [None]:
#exclude 'BroadcastLogID' and 'SequenceNO' columns

logs_copy = logs.select(*[x for x in logs.columns if x not in ['BroadcastLogID', 'SequenceNO', 'LogServiceID']])

In [None]:
logs_copy.columns

In [None]:
logs_copy.select(F.col('LogDate')).show(5, False)

In [None]:
logs_copy.select(F.substring(F.col('LogDate'), 1, 4)).distinct().show(5)

In [None]:
logs_copy.select(
    F.col('Duration').alias("full_duration")).show(5, False)

In [None]:
logs_copy.select(
    F.col('Duration').alias('full'),
    F.col('Duration').substr(1,2).cast('int').alias('hour'),
    F.col('Duration').substr(4,2).cast('int').alias('min'),
    F.col('Duration').substr(7,2).cast('int').alias('sec'),
    (F.col('Duration').substr(1,2).cast('int') * 60 * 60 +
    F.col('Duration').substr(4,2).cast('int') * 60 +
    F.col('Duration').substr(7,2).cast('int')).alias('duration_secs')
).distinct().show(5, False)

In [None]:
logs_copy = logs.withColumn('duration_secs',
F.col('Duration').substr(1,2).cast('int') * 60 * 60 +
F.col('Duration').substr(4,2).cast('int') * 60 +
F.col('Duration').substr(7,2).cast('int'))

In [None]:
logs_copy.printSchema()

In [None]:
logs_copy.select(F.col('Duration'), F.col('duration_secs')).show(5, False)

In [None]:
logs_copy = logs_copy.select(*[x.lower() for x in logs_copy.columns])

In [None]:
logs_copy.printSchema()

In [None]:
logs_copy.columns

In [None]:
for w in logs_copy.columns:
    logs_copy.describe(w).show()

In [None]:
logs_copy.columns