In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, regexp_extract, regexp_replace, explode, split
from pyspark.sql.window import Window

In [0]:
dbutils.fs.ls('/mnt/movie-analytics')

[FileInfo(path='dbfs:/mnt/movie-analytics/Raw_data/', name='Raw_data/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/movie-analytics/Transformed_data/', name='Transformed_data/', size=0, modificationTime=0)]

In [0]:
input_path = '/mnt/movie-analytics/Raw_data'
dbutils.fs.ls(input_path)

[FileInfo(path='dbfs:/mnt/movie-analytics/Raw_data/movies.dat', name='movies.dat', size=171308, modificationTime=1712230181000),
 FileInfo(path='dbfs:/mnt/movie-analytics/Raw_data/ratings.dat', name='ratings.dat', size=24594131, modificationTime=1712230187000),
 FileInfo(path='dbfs:/mnt/movie-analytics/Raw_data/users.dat', name='users.dat', size=134368, modificationTime=1712230187000)]

In [0]:
table_names = []

for i in dbutils.fs.ls(input_path):
    table_names.append(i.name.split('/')[0])

print(table_names)

['movies.dat', 'ratings.dat', 'users.dat']


### Spark Data Frames
1. Prepare Movies data: Extracting the Year and Genre from the Text
2. Prepare Users data: Loading a double delimited csv file
3. Prepare Ratings data: Programmatically specifying a schema for the data frame
4. Import Data from URL: Python


In [0]:
# Movies data path and schema
movies_file_path = f"{input_path}/movies.dat"
movies_schema = "movie_id INT, title STRING, genres STRING"

# Users data path and schema
users_file_path = f"{input_path}/users.dat"
users_schema = "UserID INT, Gender STRING, Age INT, Occupation INT, ZipCode INT"

# Ratings data path and schema
ratings_schema = "UserID INT, MovieID INT, Rating INT, Timestamp INT"
ratings_file_path = f"{input_path}/ratings.dat"

# Reading the data from the movies.dat file
movies_data = spark.read.csv(movies_file_path, sep="::", schema=movies_schema)

# Reading the data from the users.dat file
users_data = spark.read.csv(users_file_path, sep="::", schema=users_schema)

# Reading the data from the ratings.dat files
ratings_data = spark.read.csv(ratings_file_path, sep = "::", schema=ratings_schema)

## Movies data Transformation

In [0]:
movies_data.show()

+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|Animation|Childre...|
|       2|      Jumanji (1995)|Adventure|Childre...|
|       3|Grumpier Old Men ...|      Comedy|Romance|
|       4|Waiting to Exhale...|        Comedy|Drama|
|       5|Father of the Bri...|              Comedy|
|       6|         Heat (1995)|Action|Crime|Thri...|
|       7|      Sabrina (1995)|      Comedy|Romance|
|       8| Tom and Huck (1995)|Adventure|Children's|
|       9| Sudden Death (1995)|              Action|
|      10|    GoldenEye (1995)|Action|Adventure|...|
|      11|American Presiden...|Comedy|Drama|Romance|
|      12|Dracula: Dead and...|       Comedy|Horror|
|      13|        Balto (1995)|Animation|Children's|
|      14|        Nixon (1995)|               Drama|
|      15|Cutthroat Island ...|Action|Adventure|...|
|      16|       Casino (1995)|      Drama|Thr

In [0]:
movies_data.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
# Extract the year from the title and create a new column 'year'
movies_data = movies_data.withColumn("release_year", regexp_extract("title", r"\((\d{4})\)", 1))
# Remove the (year) from the title
movies_data = movies_data.withColumn("title", regexp_replace("title", "\s*\(\d{4}\)\s*", ""))
# Extract and explode genres into separate rows
movies_data = movies_data.withColumn("genres", explode(split("genres", "\\|")))
# Rename the column names
movies_data = movies_data.withColumnRenamed("movie_id" , "Movie_id").withColumnRenamed("release_year" , "Release_year")\
    .withColumnRenamed("title" , "Title").withColumnRenamed("genres" , "Genres")
# Adjusting the data types of the columns
movies_data = movies_data.withColumn("Release_year", col('Release_year').cast('integer'))    

In [0]:
movies_data.printSchema()

root
 |-- Movie_id: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = false)
 |-- Release_year: integer (nullable = true)



In [0]:
movies_data.show()

+--------+--------------------+----------+------------+
|Movie_id|               Title|    Genres|Release_year|
+--------+--------------------+----------+------------+
|       1|           Toy Story| Animation|        1995|
|       1|           Toy Story|Children's|        1995|
|       1|           Toy Story|    Comedy|        1995|
|       2|             Jumanji| Adventure|        1995|
|       2|             Jumanji|Children's|        1995|
|       2|             Jumanji|   Fantasy|        1995|
|       3|    Grumpier Old Men|    Comedy|        1995|
|       3|    Grumpier Old Men|   Romance|        1995|
|       4|   Waiting to Exhale|    Comedy|        1995|
|       4|   Waiting to Exhale|     Drama|        1995|
|       5|Father of the Bri...|    Comedy|        1995|
|       6|                Heat|    Action|        1995|
|       6|                Heat|     Crime|        1995|
|       6|                Heat|  Thriller|        1995|
|       7|             Sabrina|    Comedy|      

In [0]:
movies_data.printSchema()

root
 |-- Movie_id: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genres: string (nullable = false)
 |-- Release_year: integer (nullable = true)



In [0]:
from pyspark.sql.functions import count

movie_count = movies_data.count()
print("Movie count:" ,movie_count)
count_dis = movies_data.distinct().count()
print("Distinct movie count:" ,count_dis)
duplicate_count = movie_count - count_dis
print("Difference between original and distinct:" ,duplicate_count)

Movie count: 6408
Distinct movie count: 6408
Difference between original and distinct: 0


## Users data Tranformation

In [0]:
users_data.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- ZipCode: integer (nullable = true)



In [0]:
users_data = users_data.withColumnRenamed("UserID" , "User_id").withColumnRenamed("ZipCode" , "Zip_code")

In [0]:
users_data.printSchema()

root
 |-- User_id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zip_code: integer (nullable = true)



In [0]:
users_count = users_data.count()
print("Users count:" ,users_count)
users_dist_count = users_data.distinct().count()
print("Distinct users count:" ,users_dist_count)
dist = users_count - users_dist_count
print("Difference between original and distinct:" ,dist)

Users count: 6040
Distinct users count: 6040
Difference between original and distinct: 0


In [0]:
users_data.show()

+-------+------+---+----------+--------+
|User_id|Gender|Age|Occupation|Zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
|      3|     M| 25|        15|   55117|
|      4|     M| 45|         7|    2460|
|      5|     M| 25|        20|   55455|
|      6|     F| 50|         9|   55117|
|      7|     M| 35|         1|    6810|
|      8|     M| 25|        12|   11413|
|      9|     M| 25|        17|   61614|
|     10|     F| 35|         1|   95370|
|     11|     F| 25|         1|    4093|
|     12|     M| 25|        12|   32793|
|     13|     M| 45|         1|   93304|
|     14|     M| 35|         0|   60126|
|     15|     M| 25|         7|   22903|
|     16|     F| 35|         0|   20670|
|     17|     M| 50|         1|   95350|
|     18|     F| 18|         3|   95825|
|     19|     M|  1|        10|   48073|
|     20|     M| 25|        14|   55113|
+-------+------+---+----------+--------+
only showing top

## Ratings data Transformation

In [0]:
ratings_data.show()

+------+-------+------+---------+
|UserID|MovieID|Rating|Timestamp|
+------+-------+------+---------+
|     1|   1193|     5|978300760|
|     1|    661|     3|978302109|
|     1|    914|     3|978301968|
|     1|   3408|     4|978300275|
|     1|   2355|     5|978824291|
|     1|   1197|     3|978302268|
|     1|   1287|     5|978302039|
|     1|   2804|     5|978300719|
|     1|    594|     4|978302268|
|     1|    919|     4|978301368|
|     1|    595|     5|978824268|
|     1|    938|     4|978301752|
|     1|   2398|     4|978302281|
|     1|   2918|     4|978302124|
|     1|   1035|     5|978301753|
|     1|   2791|     4|978302188|
|     1|   2687|     3|978824268|
|     1|   2018|     4|978301777|
|     1|   3105|     5|978301713|
|     1|   2797|     4|978302039|
+------+-------+------+---------+
only showing top 20 rows



In [0]:
ratings_data.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)



In [0]:
from pyspark.sql.types import TimestampType
# Adjusting the data types of columns
ratings_data = ratings_data.withColumn("Timestamp", col("Timestamp").cast(TimestampType()))
# Rename the column names
ratings_data = ratings_data.withColumnRenamed("UserID" , "User_id").withColumnRenamed("MovieID" , "Movie_id")

In [0]:
ratings_data.printSchema()

root
 |-- User_id: integer (nullable = true)
 |-- Movie_id: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [0]:
ratings_data.show()

+-------+--------+------+-------------------+
|User_id|Movie_id|Rating|          Timestamp|
+-------+--------+------+-------------------+
|      1|    1193|     5|2000-12-31 22:12:40|
|      1|     661|     3|2000-12-31 22:35:09|
|      1|     914|     3|2000-12-31 22:32:48|
|      1|    3408|     4|2000-12-31 22:04:35|
|      1|    2355|     5|2001-01-06 23:38:11|
|      1|    1197|     3|2000-12-31 22:37:48|
|      1|    1287|     5|2000-12-31 22:33:59|
|      1|    2804|     5|2000-12-31 22:11:59|
|      1|     594|     4|2000-12-31 22:37:48|
|      1|     919|     4|2000-12-31 22:22:48|
|      1|     595|     5|2001-01-06 23:37:48|
|      1|     938|     4|2000-12-31 22:29:12|
|      1|    2398|     4|2000-12-31 22:38:01|
|      1|    2918|     4|2000-12-31 22:35:24|
|      1|    1035|     5|2000-12-31 22:29:13|
|      1|    2791|     4|2000-12-31 22:36:28|
|      1|    2687|     3|2001-01-06 23:37:48|
|      1|    2018|     4|2000-12-31 22:29:37|
|      1|    3105|     5|2000-12-3

In [0]:
ratings_count = ratings_data.count()
print("Ratings count:" ,ratings_count)
ratings_dist_count = ratings_data.distinct().count()
print("Distinct ratings count:" ,ratings_dist_count)
diff = ratings_count - ratings_dist_count
print("Difference between original and distinct:" ,diff)

Ratings count: 1000209
Distinct ratings count: 1000209
Difference between original and distinct: 0


In [0]:
duplicate_rows = ratings_data.groupBy(ratings_data.columns).count().filter("count > 1")
duplicate_rows.count()

0

### SPARK SQL 
1. Create tables for movies.dat, users.dat and ratings.dat: Saving Tables from Spark SQL

In [0]:
movies_data.createOrReplaceTempView("movies")
users_data.createOrReplaceTempView("users")
ratings_data.createOrReplaceTempView("ratings")

In [0]:
spark.sql("CREATE TABLE IF NOT EXISTS movies using DELTA AS SELECT * FROM movies")
spark.sql("CREATE TABLE IF NOT EXISTS users using DELTA AS SELECT * FROM users")
spark.sql("CREATE TABLE IF NOT EXISTS ratings using DELTA AS SELECT * FROM ratings")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.catalog.listTables()

[Table(name='movies', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='ratings', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='users', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='movies', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='users', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

### Spark Data Frames
5. Save table without defining DDL in Hive

In [0]:

movies_data.write.saveAsTable("movies" , mode="overwrite")
users_data.write.saveAsTable("users" , mode="overwrite")
ratings_data.write.saveAsTable("ratings" , mode="overwrite")

## Saving the Transformed data to Data lake storage at 'movie-analytics/Transformed_data/

In [0]:
output_path = '/mnt/movie-analytics/Transformed_data/'
# saving data to data lake 
movies_data.write.format('parquet').mode('overwrite').save(f'{output_path}/movies_data/')
users_data.write.format('parquet').mode('overwrite').save(f'{output_path}/users_data/')
ratings_data.write.format('parquet').mode('overwrite').save(f'{output_path}/ratings_data/')