<a href="https://colab.research.google.com/github/thimotyb/spark-notebooks/blob/main/Data_Analysis_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Data Analysis with PySpark in Google Colab

PySpark is Python interface for Apache Spark. The primary use cases for PySpark are to work with huge amounts of data and for creating data pipelines.

You don't need to work with big data to benefit from PySpark. I find that the SparkSQL is a great tool for performing routine data anlysis. Pandas can get slow and you may find yourself writing a lot of code for data cleaning whereas the same actions take much less code in SQL. Let's get started!

See more here! http://spark.apache.org/docs/latest/api/python/

# 1. Installing PySpark in Google Colab

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
#  https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.83)] [1 InRelease 0 B/129 kB [0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.83)] [1 InRelease 75.0 kB/129[0m                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
[33m0% [Waiting for headers] [1 InRelease 111 kB/129 kB 86%] [Waiting for headers] [0m[33m0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcont[0m                                                                               Ign:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connected to 

In [None]:
spark

In [2]:
# Get more example data
!git clone https://github.com/thimotyb/DataAnalysisWithPythonAndPySpark-Data.git

Cloning into 'DataAnalysisWithPythonAndPySpark-Data'...
remote: Enumerating objects: 71, done.[K
remote: Total 71 (delta 0), reused 0 (delta 0), pack-reused 71 (from 1)[K
Receiving objects: 100% (71/71), 828.81 MiB | 26.14 MiB/s, done.
Updating files: 100% (59/59), done.


In [5]:
!mv DataAnalysisWithPythonAndPySpark-Data /content/data

# PySpark Data Analysis Examples

## WordCount Example

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract

spark = SparkSession.builder.getOrCreate()

book = spark.read.text("data/gutenberg_books/1342-0.txt")

lines = book.select(split(book.value, " ").alias("line"))

words = lines.select(explode(col("line")).alias("word"))

words_lower = words.select(lower(col("word")).alias("word_lower"))

words_clean = words_lower.select(
    regexp_extract(col("word_lower"), "[a-z]*", 0).alias("word")
)

words_nonull = words_clean.where(col("word") != "")

In [None]:
words_nonull.show(50)

+------------+
|        word|
+------------+
|         the|
|     project|
|   gutenberg|
|       ebook|
|          of|
|       pride|
|         and|
|   prejudice|
|          by|
|        jane|
|      austen|
|        this|
|       ebook|
|          is|
|         for|
|         the|
|         use|
|          of|
|      anyone|
|    anywhere|
|          at|
|          no|
|        cost|
|         and|
|        with|
|      almost|
|          no|
|restrictions|
|  whatsoever|
|         you|
|         may|
|        copy|
|          it|
|        give|
|          it|
|        away|
|          or|
|          re|
|          it|
|       under|
|         the|
|       terms|
|          of|
|         the|
|     project|
|   gutenberg|
|     license|
|    included|
|        with|
|        this|
+------------+
only showing top 50 rows



In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

In [None]:
(
    spark.read.csv("./data/list_of_numbers/sample.csv", header=True)
    .withColumn(
        "new_column", F.when(F.col("old_column") > 10, 10).otherwise(0)
    )
    .where("old_column > 8")
    .groupby("new_column")
    .count()
    .write.csv("updated_frequencies.csv", mode="overwrite")
)

# Using PySpark SQL

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

In [None]:
my_grocery_list = [
    ["Banana", 2, 1.74],
    ["Apple", 4, 2.04],
    ["Carrot", 1, 1.09],
    ["Cake", 1, 10.99],
]

df_grocery_list = spark.createDataFrame(
    my_grocery_list, ["Item", "Quantity", "Price"]
)

df_grocery_list.printSchema()

root
 |-- Item: string (nullable = true)
 |-- Quantity: long (nullable = true)
 |-- Price: double (nullable = true)



In [None]:
df_grocery_list.select("Item", "Price").where("Price > 2").show()

+-----+-----+
| Item|Price|
+-----+-----+
|Apple| 2.04|
| Cake|10.99|
+-----+-----+



In [None]:
DIRECTORY = "data/broadcast_logs"
logs = (
    spark.read.csv(
        os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
        sep="|",
        header=True,
        inferSchema=True,
        timestampFormat="yyyy-MM-dd",
    ))

In [None]:
logs.select("BroadcastLogID", "LogServiceID", "LogDate").show(5, False)

+--------------+------------+----------+
|BroadcastLogID|LogServiceID|LogDate   |
+--------------+------------+----------+
|1196192316    |3157        |2018-08-01|
|1196192317    |3157        |2018-08-01|
|1196192318    |3157        |2018-08-01|
|1196192319    |3157        |2018-08-01|
|1196192320    |3157        |2018-08-01|
+--------------+------------+----------+
only showing top 5 rows



In [None]:
# Using the string to column conversion
logs.select("BroadCastLogID", "LogServiceID", "LogDate")
logs.select(*["BroadCastLogID", "LogServiceID", "LogDate"])

# Passing the column object explicitly
logs.select(
    F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")
)
logs.select(
    *[F.col("BroadCastLogID"), F.col("LogServiceID"), F.col("LogDate")]
)

DataFrame[BroadCastLogID: int, LogServiceID: int, LogDate: date]

In [None]:
import numpy as np

column_split = np.array_split(
    np.array(logs.columns), len(logs.columns) // 3
)

In [None]:
column_split

[array(['BroadcastLogID', 'LogServiceID', 'LogDate'], dtype='<U22'),
 array(['SequenceNO', 'AudienceTargetAgeID', 'AudienceTargetEthnicID'],
       dtype='<U22'),
 array(['CategoryID', 'ClosedCaptionID', 'CountryOfOriginID'], dtype='<U22'),
 array(['DubDramaCreditID', 'EthnicProgramID', 'ProductionSourceID'],
       dtype='<U22'),
 array(['ProgramClassID', 'FilmClassificationID', 'ExhibitionID'],
       dtype='<U22'),
 array(['Duration', 'EndTime', 'LogEntryDate'], dtype='<U22'),
 array(['ProductionNO', 'ProgramTitle', 'StartTime'], dtype='<U22'),
 array(['Subtitle', 'NetworkAffiliationID', 'SpecialAttentionID'],
       dtype='<U22'),
 array(['BroadcastOriginPointID', 'CompositionID', 'Producer1'],
       dtype='<U22'),
 array(['Producer2', 'Language1', 'Language2'], dtype='<U22')]

In [None]:
for x in column_split:
    logs.select(*x).show(5, False)

+--------------+------------+----------+
|BroadcastLogID|LogServiceID|LogDate   |
+--------------+------------+----------+
|1196192316    |3157        |2018-08-01|
|1196192317    |3157        |2018-08-01|
|1196192318    |3157        |2018-08-01|
|1196192319    |3157        |2018-08-01|
|1196192320    |3157        |2018-08-01|
+--------------+------------+----------+
only showing top 5 rows

+----------+-------------------+----------------------+
|SequenceNO|AudienceTargetAgeID|AudienceTargetEthnicID|
+----------+-------------------+----------------------+
|1         |4                  |NULL                  |
|2         |NULL               |NULL                  |
|3         |NULL               |NULL                  |
|4         |NULL               |NULL                  |
|5         |NULL               |NULL                  |
+----------+-------------------+----------------------+
only showing top 5 rows

+----------+---------------+-----------------+
|CategoryID|ClosedCaptionID|Co

In [7]:
DIRECTORY = "data/broadcast_logs"
logs = (
    spark.read.csv(
        os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
        sep="|",
        header=True,
        inferSchema=True,
        timestampFormat="yyyy-MM-dd",
    )
    .drop("BroadcastLogID", "SequenceNO")
    .withColumn(
        "duration_seconds",
        (
            F.col("Duration").substr(1, 2).cast("int") * 60 * 60
            + F.col("Duration").substr(4, 2).cast("int") * 60
            + F.col("Duration").substr(7, 2).cast("int")
        ),
    )
)

In [None]:
logs.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [None]:
logs.select(F.col("Duration")).show(5)

+----------------+
|        Duration|
+----------------+
|02:00:00.0000000|
|00:00:30.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
|00:00:15.0000000|
+----------------+
only showing top 5 rows



In [None]:
print(logs.select(F.col("Duration")).dtypes)

[('Duration', 'string')]


In [None]:
logs.select(
    F.col("Duration"),
    F.col("Duration").substr(1, 2).cast("int").alias("dur_hours"),
    F.col("Duration").substr(4, 2).cast("int").alias("dur_minutes"),
    F.col("Duration").substr(7, 2).cast("int").alias("dur_seconds"),
).distinct().show(
    5
)

+----------------+---------+-----------+-----------+
|        Duration|dur_hours|dur_minutes|dur_seconds|
+----------------+---------+-----------+-----------+
|00:04:52.0000000|        0|          4|         52|
|00:10:06.0000000|        0|         10|          6|
|00:26:41.0000000|        0|         26|         41|
|00:05:29.0000000|        0|          5|         29|
|00:08:18.0000000|        0|          8|         18|
+----------------+---------+-----------+-----------+
only showing top 5 rows



In [None]:
logs.select(
    F.col("Duration"),
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ).alias("Duration_seconds"),
).distinct().show(5)

+----------------+----------------+
|        Duration|Duration_seconds|
+----------------+----------------+
|01:59:30.0000000|            7170|
|00:31:00.0000000|            1860|
|00:28:08.0000000|            1688|
|00:10:30.0000000|             630|
|00:32:00.0000000|            1920|
+----------------+----------------+
only showing top 5 rows



In [None]:
#for i in logs.columns:
#    logs.describe(i).show()

for i in logs.columns:
    logs.select(i).summary().show()

+-------+--------------------+
|summary|      BroadcastLogID|
+-------+--------------------+
|  count|              238945|
|   mean|1.2168651122760174E9|
| stddev| 1.496913424143109E7|
|    min|          1195788151|
|    25%|          1204679733|
|    50%|          1213242718|
|    75%|          1226220079|
|    max|          1249431576|
+-------+--------------------+

+-------+------------------+
|summary|      LogServiceID|
+-------+------------------+
|  count|            238945|
|   mean| 3450.890284375065|
| stddev|199.50673962555592|
|    min|              3157|
|    25%|              3287|
|    50%|              3379|
|    75%|              3627|
|    max|              3925|
+-------+------------------+

+-------+
|summary|
+-------+
|  count|
|   mean|
| stddev|
|    min|
|    25%|
|    50%|
|    75%|
|    max|
+-------+

+-------+-----------------+
|summary|       SequenceNO|
+-------+-----------------+
|  count|           238945|
|   mean|466.3855824562138|
| stddev|311.5109

In [10]:
DIRECTORY = "data/broadcast_logs"
log_identifier = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/LogIdentifier.csv"),
    sep="|",
    header=True,
    inferSchema=True,
)

log_identifier.printSchema()

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)



In [12]:
log_identifier.show(5)

+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
+---------------+------------+---------+
only showing top 5 rows



In [11]:
logs_and_channels_verbose = logs.join(
    log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"]
)

logs_and_channels_verbose.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [13]:
logs_and_channels = logs.join(log_identifier, "LogServiceID")

logs_and_channels.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

In [14]:
DIRECTORY = "data/broadcast_logs"

cd_category = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_Category.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

cd_program_class = spark.read.csv(
    os.path.join(DIRECTORY, "ReferenceTables/CD_ProgramClass.csv"),
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)

In [15]:
full_log.show(10)

+--------------+----------+------------+----------+-------------------+----------------------+---------------+-----------------+----------------+---------------+------------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+---------+----------+--------------------+--------------+------------------------+
|ProgramClassID|CategoryID|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|duration

# Commercial Ratio Example

In [None]:
#  commercials.py #############################################################
#
# This program computes the commercial ratio for each channel present in the
# dataset.
#
###############################################################################

import os

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName(
    "Getting the Canadian TV channels with the highest/lowest proportion of commercials."
).getOrCreate()

spark.sparkContext.setLogLevel("WARN")

###############################################################################
# Reading all the relevant data sources
###############################################################################

DIRECTORY = "data/broadcast_logs"

logs = spark.read.csv(
    os.path.join(DIRECTORY, "BroadcastLogs_2018_Q3_M8_sample.CSV"),
    sep="|",
    header=True,
    inferSchema=True,
)

log_identifier = spark.read.csv(
    "data/broadcast_logs/ReferenceTables/LogIdentifier.csv",
    sep="|",
    header=True,
    inferSchema=True,
)

cd_category = spark.read.csv(
    "data/broadcast_logs/ReferenceTables/CD_Category.csv",
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "CategoryID",
    "CategoryCD",
    F.col("EnglishDescription").alias("Category_Description"),
)

cd_program_class = spark.read.csv(
    "data/broadcast_logs/ReferenceTables/CD_ProgramClass.csv",
    sep="|",
    header=True,
    inferSchema=True,
).select(
    "ProgramClassID",
    "ProgramClassCD",
    F.col("EnglishDescription").alias("ProgramClass_Description"),
)

###############################################################################
# Data processing
###############################################################################

logs = logs.drop("BroadcastLogID", "SequenceNO")

logs = logs.withColumn(
    "duration_seconds",
    (
        F.col("Duration").substr(1, 2).cast("int") * 60 * 60
        + F.col("Duration").substr(4, 2).cast("int") * 60
        + F.col("Duration").substr(7, 2).cast("int")
    ),
)

log_identifier = log_identifier.where(F.col("PrimaryFG") == 1)

logs_and_channels = logs.join(log_identifier, "LogServiceID")

full_log = logs_and_channels.join(cd_category, "CategoryID", how="left").join(
    cd_program_class, "ProgramClassID", how="left"
)

full_log.groupby("LogIdentifierID").agg(
    F.sum(
        F.when(
            F.trim(F.col("ProgramClassCD")).isin(
                ["COM", "PRC", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
            ),
            F.col("duration_seconds"),
        ).otherwise(0)
    ).alias("duration_commercial"),
    F.sum("duration_seconds").alias("duration_total"),
).withColumn(
    "commercial_ratio", F.col("duration_commercial") / F.col("duration_total")
).orderBy(
    "commercial_ratio", ascending=False
).show(
    1000, False
)

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|CJCO           |0                  |NULL          |NULL            |
|BRAVO          |0                  |NULL          |NULL            |
|CFTF           |0                  |NULL          |NULL            |
|CKCS           |0                  |NULL          |NULL            |
|CJNT           |0                  |NULL          |NULL            |
|CKES           |0                  |NULL          |NULL            |
|CHBX           |0                  |NULL          |NULL            |
|BBCKID         |0                  |NULL          |NULL            |
|BOOK           |0                  |NULL          |NULL            |
|CHAN           |0                  |NULL          |NULL            |
|CEVASI         |0                  |NULL          |NULL            |
|CMT            |0  