# Spark DataFrame

Import and Install

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=54e5fdd1217d6bf493cb33a2151d1f18d157b202b695a3d71b61d3ec4095ef98
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql.functions import sum, desc, year, asc
from pyspark.sql import functions as F

Download the data

In [3]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


# Reading the Data


In [5]:
payment_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/payment.csv')
payment_df = payment_df.alias('payment_df')
payment_df.show(3)

+----------+-----------+--------+---------+------+-------------------+-------------------+
|payment_id|customer_id|staff_id|rental_id|amount|       payment_date|        last_update|
+----------+-----------+--------+---------+------+-------------------+-------------------+
|         1|          1|       1|       76|  2.99|2005-05-25 11:30:37|2006-02-15 22:12:30|
|         2|          1|       1|      573|  0.99|2005-05-28 10:35:23|2006-02-15 22:12:30|
|         3|          1|       1|     1185|  5.99|2005-06-15 00:54:12|2006-02-15 22:12:30|
+----------+-----------+--------+---------+------+-------------------+-------------------+
only showing top 3 rows



In [6]:
rental_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/rental.csv')
rental_df = rental_df.alias('rental_df')
rental_df.show(3)

+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|rental_id|        rental_date|inventory_id|customer_id|        return_date|staff_id|        last_update|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|        1|2005-05-24 22:53:30|         367|        130|2005-05-26 22:04:30|       1|2006-02-15 21:30:53|
|        2|2005-05-24 22:54:33|        1525|        459|2005-05-28 19:40:33|       1|2006-02-15 21:30:53|
|        3|2005-05-24 23:03:39|        1711|        408|2005-06-01 22:12:39|       1|2006-02-15 21:30:53|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
only showing top 3 rows



In [7]:
staff_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/staff.csv')
staff_df = staff_df.alias('staff_df')
staff_df.show(3)

+--------+----------+---------+----------+-------+--------------------+--------+------+--------+--------------------+--------------+
|staff_id|first_name|last_name|address_id|picture|               email|store_id|active|username|            password|   last_update|
+--------+----------+---------+----------+-------+--------------------+--------+------+--------+--------------------+--------------+
|       1|      Mike|  Hillyer|         3|   NULL|Mike.Hillyer@saki...|       1|     1|    Mike|8cb2237d0679ca88d...|2/15/2006 3:57|
|       2|       Jon| Stephens|         4|   NULL|Jon.Stephens@saki...|       2|     1|     Jon|                NULL|2/15/2006 3:57|
+--------+----------+---------+----------+-------+--------------------+--------+------+--------+--------------------+--------------+



In [8]:
inventory_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/inventory.csv')
inventory_df = inventory_df.alias('inventory_df')
inventory_df.show(3)

+------------+-------+--------+-------------------+
|inventory_id|film_id|store_id|        last_update|
+------------+-------+--------+-------------------+
|           1|      1|       1|2006-02-15 05:09:17|
|           2|      1|       1|2006-02-15 05:09:17|
|           3|      1|       1|2006-02-15 05:09:17|
+------------+-------+--------+-------------------+
only showing top 3 rows



In [9]:
store_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/store.csv')
store_df = store_df.alias('store_df')
store_df.show(3)

+--------+----------------+----------+-------------------+
|store_id|manager_staff_id|address_id|        last_update|
+--------+----------------+----------+-------------------+
|       1|               1|         1|2006-02-15 04:57:12|
|       2|               2|         2|2006-02-15 04:57:12|
+--------+----------------+----------+-------------------+



In [10]:
address_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/address.csv')
address_df = address_df.alias('address_df')
address_df.show(3)

+----------+------------------+--------+--------+-------+-----------+-----------+-------------------+
|address_id|           address|address2|district|city_id|postal_code|      phone|        last_update|
+----------+------------------+--------+--------+-------+-----------+-----------+-------------------+
|         1| 47 MySakila Drive|    NULL| Alberta|    300|       NULL|       NULL|2014-09-25 22:30:27|
|         2|28 MySQL Boulevard|    NULL|     QLD|    576|       NULL|       NULL|2014-09-25 22:30:09|
|         3| 23 Workhaven Lane|    NULL| Alberta|    300|       NULL|14033335568|2014-09-25 22:30:27|
+----------+------------------+--------+--------+-------+-----------+-----------+-------------------+
only showing top 3 rows



In [11]:
customer_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/customer.csv')
customer_df = customer_df.alias('customer_df')
customer_df.show(3)

+-----------+--------+----------+---------+--------------------+----------+------+-------------------+-------------------+
|customer_id|store_id|first_name|last_name|               email|address_id|active|        create_date|        last_update|
+-----------+--------+----------+---------+--------------------+----------+------+-------------------+-------------------+
|          1|       1|      MARY|    SMITH|MARY.SMITH@sakila...|         5|     1|2006-02-14 22:04:36|2006-02-15 04:57:20|
|          2|       1|  PATRICIA|  JOHNSON|PATRICIA.JOHNSON@...|         6|     1|2006-02-14 22:04:36|2006-02-15 04:57:20|
|          3|       1|     LINDA| WILLIAMS|LINDA.WILLIAMS@sa...|         7|     1|2006-02-14 22:04:36|2006-02-15 04:57:20|
+-----------+--------+----------+---------+--------------------+----------+------+-------------------+-------------------+
only showing top 3 rows



In [12]:
film_df = spark.read.format("csv").option("header", "true").load(r'/content/gdrive/My Drive/models/rental/film.csv')
film_df = film_df.alias('film_df')
film_df.show(3)

+-------+----------------+--------------------+------------+-----------+--------------------+---------------+-----------+------+----------------+------+--------------------+-------------------+
|film_id|           title|         description|release_year|language_id|original_language_id|rental_duration|rental_rate|length|replacement_cost|rating|    special_features|        last_update|
+-------+----------------+--------------------+------------+-----------+--------------------+---------------+-----------+------+----------------+------+--------------------+-------------------+
|      1|ACADEMY DINOSAUR|A Epic Drama of a...|        2006|          1|                NULL|              6|       0.99|    86|           20.99|    PG|Deleted Scenes,Be...|2006-02-15 05:03:42|
|      2|  ACE GOLDFINGER|A Astounding Epis...|        2006|          1|                NULL|              3|       4.99|    48|           12.99|     G|Trailers,Deleted ...|2006-02-15 05:03:42|
|      3|ADAPTATION HOLES|A As

# Question 1

Create a DataFrame based on the payment DataFrame. It contains a
list of customer_id and total payment amount grouped by customer_id, sorted
by the total amount in descending order. Other columns may be included as
needed.

In [14]:
# First, group the payment DataFrame by customer_id and calculate the sum of payment amount for each customer.
# Then, order the DataFrame by the total payment amount in descending order.
customer_payment_df = payment_df.groupBy("customer_id").agg(sum("amount").alias("total_payment")) \
    .orderBy("total_payment", ascending=False)

# Show top 3 rows for verification purposes
customer_payment_df.show(3)

+-----------+------------------+
|customer_id|     total_payment|
+-----------+------------------+
|        526| 221.5500000000001|
|        148| 216.5400000000001|
|        144|195.58000000000007|
+-----------+------------------+
only showing top 3 rows



# Question 2

Create a DataFrame based on the rental DataFrame. It contains a list
of rentals from the rental DataFrame that were rented in 2005 ordered on
return_date in ascending order. Other columns may be included as needed.

In [19]:
# First, filter the rental DataFrame to include only rentals from 2005.
# Finally, order the DataFrame by return_date in ascending order.
rentals_2005_df = rental_df.filter(year("rental_date") == 2005) \
    .orderBy("return_date")

# Show top 3 rows for verification purposes
rentals_2005_df.show(3)

+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|rental_id|        rental_date|inventory_id|customer_id|        return_date|staff_id|        last_update|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
|       32|2005-05-25 04:06:21|        3832|        230|2005-05-25 23:55:21|       1|2006-02-15 21:30:53|
|       21|2005-05-25 01:59:46|         146|        388|2005-05-26 01:01:46|       2|2006-02-15 21:30:53|
|       14|2005-05-25 00:31:15|        2701|        446|2005-05-26 02:56:15|       1|2006-02-15 21:30:53|
+---------+-------------------+------------+-----------+-------------------+--------+-------------------+
only showing top 3 rows



# Question 3

Create a DataFrame based on rental DataFrame . It contains a list of
rental_id count from rental DataFrame grouped by staff_id.

In [20]:
# First, group the rental DataFrame by staff_id and count the number of rentals for each staff.
rental_count_by_staff_df = rental_df.groupBy("staff_id").count()

# Show top 3 rows for verification purposes
rental_count_by_staff_df.show(3)

+--------+-----+
|staff_id|count|
+--------+-----+
|       1| 8040|
|       2| 8004|
+--------+-----+



# Question 4

Now add staff first and last name.

In [22]:
# First, select the columns 'staff_id', 'first_name', and 'last_name' from the staff DataFrame.
# Then, join the rental count DataFrame with the staff DataFrame on 'staff_id'.
rental_count_with_staff_name_df = rental_count_by_staff_df.join(staff_df, "staff_id") \
    .select("staff_id", "first_name", "last_name", "count")

# Show top 3 rows for verification purposes
rental_count_with_staff_name_df.show(3)

+--------+----------+---------+-----+
|staff_id|first_name|last_name|count|
+--------+----------+---------+-----+
|       1|      Mike|  Hillyer| 8040|
|       2|       Jon| Stephens| 8004|
+--------+----------+---------+-----+



# Question 5

Create a DataFrame based on rental DataFrame, joined with
inventory, store, and address DataFrames to get store address, joined with
customer DataFrame to get customer first and last name, joined with staff
DataFrame to get staff first and last name, joined with film data set to get film
name and description, and ordered by return_date in the ascending order.

In [24]:
# First, join the rental DataFrame with inventory, store, address, customer, staff, and film DataFrames.
# Finally, order the DataFrame by return_date in ascending order.
rental_full_info_df = rental_df.join(inventory_df, "inventory_id") \
    .join(store_df, "store_id") \
    .join(address_df, "address_id") \
    .join(customer_df, "customer_id") \
    .join(staff_df, "staff_id") \
    .join(film_df, "film_id") \
    .select(
        rental_df["rental_date"].alias("rental_date"),
        rental_df["return_date"].alias("return_date"),
        address_df["address"].alias("store_address"),
        film_df["title"].alias("film_title"),
        film_df["description"].alias("film_description"),
        customer_df["first_name"].alias("customer_first_name"),
        customer_df["last_name"].alias("customer_last_name"),
        staff_df["first_name"].alias("staff_first_name"),
        staff_df["last_name"].alias("staff_last_name")
    ) \
    .orderBy(asc("return_date"))

# Show top 3 rows for verification purposes
rental_full_info_df.show(3)

+-------------------+-------------------+------------------+------------------+--------------------+-------------------+------------------+----------------+---------------+
|        rental_date|        return_date|     store_address|        film_title|    film_description|customer_first_name|customer_last_name|staff_first_name|staff_last_name|
+-------------------+-------------------+------------------+------------------+--------------------+-------------------+------------------+----------------+---------------+
|2005-05-25 04:06:21|2005-05-25 23:55:21|28 MySQL Boulevard| STALLION SUNDANCE|A Fast-Paced Tale...|                JOY|            GEORGE|            Mike|        Hillyer|
|2005-05-25 01:59:46|2005-05-26 01:01:46| 47 MySakila Drive|     APACHE DIVINE|A Awe-Inspiring R...|              CRAIG|           MORRELL|             Jon|       Stephens|
|2005-05-25 00:31:15|2005-05-26 02:56:15|28 MySQL Boulevard|MONTEREY LABYRINTH|A Awe-Inspiring D...|           THEODORE|              C

# Question 6

Create a DataFrame that contains a list of customer_id whose sum
amount of payment in 2005 is more than $200.

In [26]:
# First, filter the payment DataFrame to include only payments from 2005.
# Then, group the DataFrame by customer_id and calculate the sum of payment amount for each customer.
# Finally, filter out customers whose total payment amount is more than $200.
customer_payment_2005_df = payment_df.filter(year("payment_date") == 2005) \
    .groupBy("customer_id").agg(sum("amount").alias("total_payment_2005"))

customers_over_200_df = customer_payment_2005_df.filter("total_payment_2005 > 200")

# Show top 3 rows for verification purposes
customers_over_200_df.show()

+-----------+------------------+
|customer_id|total_payment_2005|
+-----------+------------------+
|        526| 221.5500000000001|
|        148| 216.5400000000001|
+-----------+------------------+



# Question 7

Join the result with customer DataFrame set to get their names.

In [28]:
# First, join the DataFrame obtained in Task 6 with the customer DataFrame on 'customer_id'.
customers_over_200_with_names_df = customers_over_200_df.join(customer_df, "customer_id") \
    .select("customer_id", "first_name", "last_name", "total_payment_2005")

# Show top 3 rows for verification purposes
customers_over_200_with_names_df.show()

+-----------+----------+---------+------------------+
|customer_id|first_name|last_name|total_payment_2005|
+-----------+----------+---------+------------------+
|        526|      KARL|     SEAL| 221.5500000000001|
|        148|   ELEANOR|     HUNT| 216.5400000000001|
+-----------+----------+---------+------------------+



# Question 8

Create a DataFrame that contains a list of customer_id that rented
in 2005; Create another DataFrame that contains a list of customer_id that
rented in 2006 (filter using rental_date).

a. (1 point) Now create a DataFrame of customer_id that exist in the 2005
DataFrame but not in the 2006 DataFrame . The count should be 441.

b. (1 point) Now create a DataFrame of customer_id that exist in the 2006
DataFrame but not in the 2005 DataFrame . The count should be 0.

In [30]:
# First, filter the rental DataFrame to include only rentals from 2005 and 2006.
# Then, select distinct customer_ids for each year.
# Finally, find the difference between the two sets of customer_ids to get customers who exist in 2005 but not in 2006, and vice versa.
customers_2005_df = rental_df.filter(year("rental_date") == 2005) \
    .select("customer_id").distinct()

customers_2006_df = rental_df.filter(year("rental_date") == 2006) \
    .select("customer_id").distinct()

# Customers in 2005 but not in 2006
customers_2005_not_2006_df = customers_2005_df.subtract(customers_2006_df)

# Show count rows for verification purposes
print("Count of customers in 2005 but not in 2006:", customers_2005_not_2006_df.count())

# Customers in 2006 but not in 2005
customers_2006_not_2005_df = customers_2006_df.subtract(customers_2005_df)

# Show count for verification purposes
print("Count of customers in 2006 but not in 2005:", customers_2006_not_2005_df.count())

Count of customers in 2005 but not in 2006: 441
Count of customers in 2006 but not in 2005: 0
