## We will use Canadian Television Datasets as Tabular Dataset

### Creating our SparkSession object to start using PySpark

In [35]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


In [36]:
spark = SparkSession.builder.getOrCreate()

## How does PySpark represent tabular data?

###  Creating a data frame out of our grocery list

In [37]:
my_grocery_list = [
    ["Banana",2,1.74],
    ['Apple',4,2.04],
    ["Carrot",1,1.09],
    ["Cake",1,10.99],
]

In [38]:
df_grocery_list = spark.createDataFrame(
    my_grocery_list,['Item',"Quantity",'Price']
    )

In [39]:
df_grocery_list.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Price: double (nullable = true)



## PySpark for analyzing and processing tabular data

### Reading our broadcasting information

In [40]:
import os

In [None]:
pwd

In [41]:
DIRECTORY = './Data_Analysis_Pyspark/broadcast_logs/'

In [42]:
logs = spark.read.csv(
    os.path.join(DIRECTORY,'BroadcastLogs_2018_Q3_M8.csv'),
    sep = "|",
    header= True,
    inferSchema=True,
    timestampFormat = 'yyyy-MM-dd',
)

### schema of our logs data frame

In [43]:
logs.printSchema()

root
 |-- BroadcastLogID: integer (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- SequenceNO: integer (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable 

### Selecting five rows of the first three columns of our data frame

In [44]:
logs.select('BroadcastLogID',"LogServiceID","LogDate").show(5,False)

+--------------+------------+----------+
|BroadcastLogID|LogServiceID|LogDate   |
+--------------+------------+----------+
|1196192316    |3157        |2018-08-01|
|1196192317    |3157        |2018-08-01|
|1196192318    |3157        |2018-08-01|
|1196192319    |3157        |2018-08-01|
|1196192320    |3157        |2018-08-01|
+--------------+------------+----------+
only showing top 5 rows



you learned that .show(5, False) shows five rows without truncating
their representation so that we can show the whole content.

### Four ways to select columns in PySpark, all equivalent in terms of results

In [45]:
# Using the string to column conversation

logs.select("BroadCastLogID","LogServiceID","LogDate")

logs.select(*["BroadCastLogID","LogServiceID","LogDate"])


DataFrame[BroadCastLogID: int, LogServiceID: int, LogDate: date]

In [46]:
# Passing the column object explicitly 

logs.select(
    F.col('BroadCastLogID'),F.col('LogServiceID'),F.col("LogDate"),
)

DataFrame[BroadCastLogID: int, LogServiceID: int, LogDate: date]

In [47]:
logs.select(
    *[F.col("BroadCastLogID"),F.col("LogServiceID"),F.col('LogDate'),]
)

DataFrame[BroadCastLogID: int, LogServiceID: int, LogDate: date]

### Peeking at the data frame in chunks of three columns

In [48]:
import numpy as np

In [49]:
column_split = np.array_split(
    np.array(logs.columns),len(logs.columns)//3 ,
)

In [50]:
print(column_split)

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'), array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
      dtype='<U22'), array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'), array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
      dtype='<U22'), array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
      dtype='<U22'), array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'), array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'), array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
      dtype='<U22'), array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
      dtype='<U22'), array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]


In [51]:
for x in column_split:
    logs.select(*x).show(5,False)

+--------------+------------+----------+
|BroadcastLogID|LogServiceID|LogDate   |
+--------------+------------+----------+
|1196192316    |3157        |2018-08-01|
|1196192317    |3157        |2018-08-01|
|1196192318    |3157        |2018-08-01|
|1196192319    |3157        |2018-08-01|
|1196192320    |3157        |2018-08-01|
+--------------+------------+----------+
only showing top 5 rows

+----------+-------------------+----------------------+
|SequenceNO|AudienceTargetAgeID|AudienceTargetEthnicID|
+----------+-------------------+----------------------+
|1         |4                  |NULL                  |
|2         |NULL               |NULL                  |
|3         |NULL               |NULL                  |
|4         |NULL               |NULL                  |
|5         |NULL               |NULL                  |
+----------+-------------------+----------------------+
only showing top 5 rows

+----------+---------------+-----------------+
|CategoryID|ClosedCaptionID|Co

## Keeping what we need: Deleting columns

The other side of selecting columns is choosing what not to select. We could do the full
trip with select(), carefully crafting our list of columns to keep just the one we want.
Fortunately, PySpark also provides a shorter trip: simply drop what you don’t want.

### Getting rid of columns using the drop() method

In [52]:
logs = logs.drop("BroadcastLogID","SequenceNO")

In [53]:
print("BroadcastLogID" in logs.columns)

False


### Getting rid of columns, select style

In [54]:
logs = logs.select(
    *[x for x in logs.columns if x not in ["BroadcastLogID","SequenceNO"]]
)

In [55]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

### Selecting and displaying the Duration column

In [56]:
logs.select(F.col('Duration')).show(5)

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows



In [57]:
print(logs.select(F.col("Duration")).dtypes)

[('Duration', 'string')]


### Extracting the hours, minutes, and seconds from the Duration column

In [58]:
logs.select(
    F.col('Duration'),
    F.col("Duration").substr(1,2).cast("int").alias("dur_hours"),
    F.col("Duration").substr(4,2).cast("int").alias("dur_minutes"),
    F.col("Duration").substr(7,2).cast("int").alias("dur_seconds"),

).distinct().show(5)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:10:06.0000000|        0|         10|          6|
|00:10:37.0000000|        0|         10|         37|
|00:04:52.0000000|        0|          4|         52|
|00:26:41.0000000|        0|         26|         41|
|00:08:18.0000000|        0|          8|         18|
+----------------+---------+-----------+-----------+
only showing top 5 rows



### Creating a duration in second field from the Duration column

In [59]:
logs.select(
    F.col("Duration"),
    (
        F.col("Duration").substr(1,2).cast("int")*60*60
        + F.col("Duration").substr(4,2).cast("int") * 60
        + F.col("Duration").substr(7,2).cast("int")
    ).alias("Duration_seconds")
).distinct().show(5)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|00:10:30.0000000|             630|
|00:25:52.0000000|            1552|
|00:28:08.0000000|            1688|
|06:00:00.0000000|           21600|
|00:32:08.0000000|            1928|
+----------------+----------------+
only showing top 5 rows



### Creating a new column with withColumn()

In [60]:
logs.withColumn(
"Duration_seconds",
(
F.col("Duration").substr(1, 2).cast("int") * 60 * 60
+ F.col("Duration").substr(4, 2).cast("int") * 60
+ F.col("Duration").substr(7, 2).cast("int")
),
)

DataFrame[LogServiceID: int, LogDate: date, AudienceTargetAgeID: int, AudienceTargetEthnicID: int, CategoryID: int, ClosedCaptionID: int, CountryOfOriginID: int, DubDramaCreditID: int, EthnicProgramID: int, ProductionSourceID: int, ProgramClassID: int, FilmClassificationID: int, ExhibitionID: int, Duration: string, EndTime: string, LogEntryDate: date, ProductionNO: string, ProgramTitle: string, StartTime: string, Subtitle: string, NetworkAffiliationID: int, SpecialAttentionID: int, BroadcastOriginPointID: int, CompositionID: int, Producer1: string, Producer2: string, Language1: int, Language2: int, Duration_seconds: int]

In [61]:
logs =logs.withColumn(
"Duration_seconds",
(
F.col("Duration").substr(1, 2).cast("int") * 60 * 60
+ F.col("Duration").substr(4, 2).cast("int") * 60
+ F.col("Duration").substr(7, 2).cast("int")
),
)

In [62]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

### Renaming one column at a type, the withColumnRenamed() way

In [63]:
logs = logs.withColumnRenamed("Duration_seconds",'duration_seconds')

In [64]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

### Batch lowercasing using the toDF() method

In [65]:
logs.toDF(*[x.lower() for x in logs.columns]).printSchema()

root
 |-- logserviceid: integer (nullable = true)
 |-- logdate: date (nullable = true)
 |-- audiencetargetageid: integer (nullable = true)
 |-- audiencetargetethnicid: integer (nullable = true)
 |-- categoryid: integer (nullable = true)
 |-- closedcaptionid: integer (nullable = true)
 |-- countryoforiginid: integer (nullable = true)
 |-- dubdramacreditid: integer (nullable = true)
 |-- ethnicprogramid: integer (nullable = true)
 |-- productionsourceid: integer (nullable = true)
 |-- programclassid: integer (nullable = true)
 |-- filmclassificationid: integer (nullable = true)
 |-- exhibitionid: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- logentrydate: date (nullable = true)
 |-- productionno: string (nullable = true)
 |-- programtitle: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- subtitle: string (nullable = true)
 |-- networkaffiliationid: integer (nullable = true)
 |-- specialattentionid: inte

### Selecting our columns in alphabetical order using select()

In [66]:
logs.select(sorted(logs.columns)).printSchema()

root
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- BroadcastOriginPointID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CompositionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- Language1: integer (nullable = true)
 |-- Language2: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- Producer1: string (nullable = true)
 |-- Producer2: string (nullable = true)
 |-- ProductionNO: string (nu

### Describing everything in one fell swoop

### Summarizing everything in one fell swoop