### Imports

In [None]:
import dlt
import numpy as np
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import col, lit, udf

## Widgets For Interaction with Streamlit App

In [None]:
dbutils.widgets.text("movie_id", "", "Movie ID")
movie_id_param = dbutils.widgets.get("movie_id")
movie_id = int(movie_id_param) if movie_id_param.isdigit() else None 

## Bronze Tables

The Bronze layer includes two critical datasets: ratings and movies. These datasets are ingested from CSV files and stored as Delta tables for enhanced reliability and performance. Below, we detail the creation of these tables and their respective schemas.

### Ratings

***Objective:*** To store raw user ratings data, capturing how users have rated movies over time.

***Schema Definition:***

- *userId* (IntegerType): Unique identifier for the user who provided the rating;
  
- *movieId* (IntegerType): Unique identifier for the movie that was rated;
  
- *rating* (DoubleType): Numerical rating given to the movie by the user;
  
- *timestamp* (IntegerType): Timestamp indicating when the rating was provided.

In [None]:
path_ratings = 'dbfs:/mnt/bronze/bronze/ratings.csv'

schema_ratings = StructType([
    StructField('userId', IntegerType(), False),
    StructField('movieId', IntegerType(), False),
    StructField('rating', DoubleType(), False),
    StructField('timestamp', IntegerType(), False)
])

In [None]:
@dlt.table
def ratings_bronze():
    return spark.read\
                    .format("csv") \
                    .option("header", "true")\
                    .schema(schema_ratings)\
                    .load(path_ratings)

### Movies

***Objective:*** To catalog movie metadata, including titles and genres, as ingested from the source.


***Schema Definition:***

- *movieId* (IntegerType): Unique identifier for each movie;
  
- *title* (StringType): The title of the movie;
  
- *genres* (StringType): A pipe-separated list of genres associated with the movie.

In [None]:
path_movies = 'dbfs:/mnt/bronze/bronze/movies.csv'

schema_movies = StructType([
    StructField('movieId', IntegerType(), False),
    StructField('title', StringType(), False),
    StructField('genres', StringType(), False)
])

In [None]:
@dlt.table
def movies_bronze():
    return spark.read\
                .format("csv") \
                .option("header", "true")\
                .schema(schema_movies)\
                .load(path_movies)

## Silver Tables

Enhancing our movie recommendation system further, we progress to the Silver tables layer. This layer refines the data from the Bronze layer, adding aggregations and enrichments that facilitate more nuanced analysis and insights. Here, we delve into creating aggregated views around user behaviors and movie characteristics, leveraging the power of Delta Lake for optimized data management.

#### User: mean, count

***Objective:*** Aggregate user data to understand average ratings and the total number of ratings per user. This aggregation helps identify highly active users and their general rating tendencies.

In [None]:
@dlt.table
def user_mean_count_silver():
    return (
        dlt.read('ratings_bronze').\
            groupBy('userId').\
            agg(F.mean('rating').alias('mean'), F.count('rating').alias('count')).\
            orderBy(F.col('mean').desc(), F.col('count').desc())
    )

#### Movie: mean, count

***Objective:*** Aggregate movie data to reveal average ratings and total rating counts per movie. This provides insights into overall movie popularity and viewer reception.

In [None]:
@dlt.table
def movie_mean_count_silver():
    return (
        dlt.read('ratings_bronze').\
            groupBy('movieId').\
            agg(F.mean('rating').alias('mean'), F.count('rating').alias('count')).\
            orderBy(F.col('mean').desc(), F.col('count').desc())
    )

#### UserId, MovieId, Title, Rating

***Objective:*** Create a comprehensive view that combines user ratings with movie titles. This table is instrumental in user-specific queries and recommendation algorithms.

In [None]:
@dlt.table
def user_movie_title_rating():
    ratings_df = dlt.read("ratings_bronze")
    movies_df = dlt.read("movies_bronze")

    df = ratings_df.join(
        movies_df,
        ratings_df.movieId == movies_df.movieId,
        "left"
    ).select(
        ratings_df.userId,
        ratings_df.movieId,
        movies_df.title,
        ratings_df.rating
    )

    return df

## Caching

To enhance query performance and data retrieval speeds, specific tables are cached. This ensures that frequently accessed data is stored in memory, reducing the need for repeated disk reads.

In [None]:
%sql
UNCACHE TABLE IF EXISTS user_mean_count_silver;
CACHE TABLE user_mean_count_silver;

In [None]:
%sql
UNCACHE TABLE IF EXISTS movie_mean_count_silver;
CACHE TABLE movie_mean_count_silver;

In [None]:
%sql
UNCACHE TABLE IF EXISTS user_movie_title_rating;
CACHE TABLE user_movie_title_rating;

## Algorithm

In this segment, we embark on a journey to develop a personalized movie recommendation system using the Collaborative Filtering algorithm, specifically Alternating Least Squares (ALS), provided by the Apache Spark ML library. This advanced section is dedicated to constructing an ALS model, extracting item factors for movies, and devising a method to find similar movies based on cosine similarity—a measure to quantify how similar two movies are based on their latent factors.

The ALS model is a cornerstone of collaborative filtering algorithms, optimizing latent factors for both users and items (movies, in our case) to predict how much a user will like an item they have not interacted with. These latent factors are derived from the implicit feedback available in the ratings data.

***Purpose:*** Trains an ALS model using user ratings and extracts item (movie) factors, which represent latent features inferred from user interactions.

In [None]:
@dlt.table(comment="Table storing ALS model item factors")
def als_item_factors():
    df_ratings = dlt.read("ratings_bronze")
    als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', rank=10, coldStartStrategy='drop')
    als_model = als.fit(df_ratings)
    item_factors = als_model.itemFactors
    item_factors = item_factors.withColumnRenamed('id', 'movieId').withColumn('movieId', col('movieId').cast(IntegerType()))
    return item_factors

***Function: cosine_similarity(vec1, vec2)***

Cosine similarity offers a way to calculate the similarity between two items based on their latent factors. It ranges from -1 (completely dissimilar) to 1 (identical).

***Function: get_k_similar_movies(movie_id, k=10)***

Retrieves the k most similar movies to a given movie, excluding the movie itself from the results.

In [None]:
@udf(returnType=FloatType())
def cosine_similarity(vec1, vec2):
    temp = np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    return float(temp)

def get_k_similar_movies(movie_id, k=10):
    item_factors = dlt.read_table("als_item_factors")
    filtered_df = item_factors.filter(item_factors['id'] == movie_id)
    features_collected = filtered_df.select('features').collect()
    to_insert = features_collected[0]['features'].tolist()

    item_factors = item_factors.withColumn('features_array', F.lit(to_insert))
    item_factors = item_factors.withColumn("similarity", cosine_similarity(F.col("features"), F.col("features_array")))
    
    similar_movies = item_factors.filter(F.col("id") != F.lit(movie_id)).orderBy(F.col("similarity").desc()).select("id").limit(k)
    
    return similar_movies

## Calculation for specific Movie

Once similar movies are identified, the results are joined with the movies_bronze table to fetch movie titles and then saved to a specified path for further analysis or to be used within recommendation engines.

In [None]:
if movie_id:
    similar_movies = get_k_similar_movies(movie_id, k=50)
    df_movies = dlt.read('movies_bronze')
    similar_movies_df = similar_movies.join(df_movies, df_movies['movieId'] == similar_movies['id'], how='left').drop(col('id'))
    output_path = "/mnt/results/similar_movies.csv"
    similar_movies_df.write.format("parquet").mode("overwrite").save(output_path)


## Automation Report

The movielens25m_pipeline was created to automate ETL process.

The movielens25m_pipeline represents a sophisticated orchestration of data flows within the Databricks environment. By leveraging Delta Live Tables (DLT), we establish an automated, reliable, and maintainable process for continuous data integration and ETL (extract, transform, load) operations. This report outlines the pipeline's structure, its components, and the automation benefits it delivers.

**Pipeline Structure**

The pipeline is composed of several materialized views—movies_bronze, ratings_bronze, user_movie_title_rating, movie_mean_count_silver, and user_mean_count_silver—each representing a stage in the data transformation process.

*Bronze Tables*
Bronze tables (movies_bronze and ratings_bronze) serve as the initial raw data repositories. The automation pipeline ingests new data into these tables as it arrives, ensuring a steady flow of the most current information.

*Silver Tables*
From the bronze layers, data is transformed and promoted to silver tables. These tables—user_movie_title_rating, movie_mean_count_silver, and user_mean_count_silver—provide a more curated view. They offer insights such as average movie ratings, user engagement levels, and a consolidated view of user ratings with movie titles.




**Automation and Continuous Integration**

*The DLT pipeline automates the entire data lifecycle:*

*Ingestion*: New data is automatically ingested into bronze tables.
Transformation: Data is transformed using predefined rules and business logic to populate the silver tables.
Materialization: Silver tables are materialized views that are automatically updated as underlying data changes, maintaining data freshness.
Maintenance: The pipeline handles data cleaning, error handling, and schema enforcement without manual intervention.
Benefits of DLT Automation

*Scalability*: As data volume grows, the pipeline scales to handle increased loads, thanks to the distributed nature of Databricks.

*Reliability*: Continuous integration of data ensures that downstream applications and analytics always have access to the latest data.

*Efficiency*: Reduces the time and effort required for manual ETL tasks, freeing up resources for more value-adding activities.

*Quality*: Consistent application of transformation logic ensures high data quality and trustworthiness.

*Traceability*: The pipeline's execution graph provides clear visibility into the data flow and lineage, simplifying troubleshooting and auditing.
Conclusion

The movielens25m_pipeline exemplifies how modern data architecture can simplify complex data workflows. By automating the data integration process with Delta Live Tables, we achieve a high level of operational efficiency and data quality, ensuring our data platform remains agile and responsive to the needs of the business.