In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=22d761d995e6fcb66a242c47ceb39bdb8420dc85c191d008bd3c2849406af514
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [3]:
# Create a CSV file with movie ratings data
csv_data = """UserID,MovieID,Rating,Timestamp
U001,M001,4,2024-05-01 14:30:00
U002,M002,5,2024-05-01 16:00:00
U003,M001,3,2024-05-02 10:15:00
U001,M003,2,2024-05-02 13:45:00
U004,M002,4,2024-05-03 18:30:00
"""

# Write the data to a CSV file
with open('/content/movie_ratings.csv', 'w') as f:
    f.write(csv_data)

print("CSV file created at /content/movie_ratings.csv")


CSV file created at /content/movie_ratings.csv


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
import os
from pyspark.sql import functions as F

# Initialize Spark session with Delta support
spark = SparkSession.builder \
    .appName("MovieRatingsDataIngestion") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0").getOrCreate()

# Define schema for movie ratings data
schema = StructType([
    StructField("UserID", StringType(), True),
    StructField("MovieID", StringType(), True),
    StructField("Rating", IntegerType(), True),
    StructField("Timestamp", TimestampType(), True)
])

# Define paths
raw_data_path = "/content/movie_ratings.csv"
delta_table_path = "/content/movie_ratings_delta"

# Ingest data
if os.path.exists(raw_data_path):
    try:
        # Read the CSV data
        ratings_df = spark.read.csv(raw_data_path, schema=schema, header=True)

        # Check for invalid ratings
        invalid_ratings_df = ratings_df.filter((F.col("Rating") < 1) | (F.col("Rating") > 5))
        if invalid_ratings_df.count() > 0:
            print(f"Invalid ratings found:\n{invalid_ratings_df.show()}")

        # Write to Delta table
        ratings_df.write.format("delta").mode("overwrite").save(delta_table_path)
        print("Data loaded and saved as Delta table.")
    except Exception as e:
        print(f"Error during data ingestion: {e}")
else:
    print(f"File not found: {raw_data_path}")


Error during data ingestion: An error occurred while calling o52.save.
: java.lang.AbstractMethodError: Receiver class org.apache.spark.sql.delta.commands.WriteIntoDelta does not define or inherit an implementation of the resolved method 'abstract void org$apache$spark$sql$catalyst$plans$logical$Command$_setter_$nodePatterns_$eq(scala.collection.Seq)' of interface org.apache.spark.sql.catalyst.plans.logical.Command.
	at org.apache.spark.sql.catalyst.plans.logical.Command.$init$(Command.scala:38)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.<init>(WriteIntoDelta.scala:53)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:154)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.sca

Task 2: Data Cleaning

In [None]:
# Load the existing Delta table
ratings_df = spark.read.format("delta").load(delta_table_path)

# Clean the data
cleaned_df = ratings_df.filter((F.col("Rating") >= 1) & (F.col("Rating") <= 5)) \
                        .dropDuplicates(["UserID", "MovieID"])

# Save cleaned data to a new Delta table
cleaned_delta_table_path = "/content/movie_ratings_cleaned_delta"
cleaned_df.write.format("delta").mode("overwrite").save(cleaned_delta_table_path)
print("Cleaned data saved to Delta table.")


Task 3: Movie Rating Analysis

In [None]:
# Load cleaned data
cleaned_df = spark.read.format("delta").load(cleaned_delta_table_path)

# Analyze the ratings
analysis_df = cleaned_df.groupBy("MovieID").agg(
    F.avg("Rating").alias("AverageRating"),
    F.count("Rating").alias("NumberOfRatings")
)

# Identify highest and lowest rated movies
highest_rated = analysis_df.orderBy(F.desc("AverageRating")).limit(1)
lowest_rated = analysis_df.orderBy("AverageRating").limit(1)

# Save analysis results to a Delta table
analysis_delta_table_path = "/content/movie_rating_analysis_delta"
analysis_df.write.format("delta").mode("overwrite").save(analysis_delta_table_path)

# Show results
highest_rated.show()
lowest_rated.show()


Task 4: Time Travel and Delta Lake History

In [None]:
# Update some ratings for demonstration
updated_df = cleaned_df.withColumn("Rating", F.when(F.col("UserID") == "U001", 5).otherwise(F.col("Rating")))
updated_df.write.format("delta").mode("overwrite").save(delta_table_path)

# Rollback to the previous version
# Get the current version number
current_version = spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`").count() - 1

# Rollback to the previous version
rollback_df = spark.read.format("delta").option("versionAsOf", current_version - 1).load(delta_table_path)

# Show the rolled-back ratings
rollback_df.show()

# Describe history
history_df = spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`")
history_df.show(truncate=False)


Task 5: Optimize Delta Table

In [None]:
# Optimize the Delta table with Z-ordering on the MovieID column
spark.sql(f"OPTIMIZE delta.`{delta_table_path}` ZORDER BY (MovieID)")

# Clean up older versions
spark.sql(f"VACUUM delta.`{delta_table_path}` RETAIN 0 HOURS")
print("Optimization and cleanup completed.")
