In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import functions as F
from snowflake.snowpark.window import Window
session = get_active_session()


df_amazon = session.table("hulu_MOVIES_TV")
df_disney = session.table("disney_MOVIES_TV")
df_hulu = session.table("HULU_MOVIES_TV")
df_netflix = session.table("netflix_MOVIES_TV")

df_industry = session.table("INDUSTRY_MOVIES")
df_IMBD = session.table("IMDB_MOVIE_REVIEWS")



In [None]:

# Select only the required columns for df_amazon
df_amazon_with_columns = df_amazon \
    .with_column("amazon",F.lit(None)) \
    .with_column("disney", F.lit(None)) \
    .with_column("hulu", F.lit(None)) \
    .with_column("netflix", F.lit(None)) \
    .select("TYPE", "TITLE", "DIRECTOR", "CAST", "COUNTRY", "DATE_ADDED", 
            "RELEASE_YEAR", "RATING", "DURATION", "LISTED_IN", "DESCRIPTION")

# Select only the required columns for df_disney
df_disney_with_columns = df_disney \
    .with_column("amazon", F.lit(None)) \
    .with_column("disney", F.lit(None)) \
    .with_column("hulu", F.lit(None)) \
    .with_column("netflix", F.lit(None)) \
    .select("TYPE", "TITLE", "DIRECTOR", "CAST", "COUNTRY", "DATE_ADDED", 
            "RELEASE_YEAR", "RATING", "DURATION", "LISTED_IN", "DESCRIPTION")

# Select only the required columns for df_hulu
df_hulu_with_columns = df_hulu \
    .with_column("amazon", F.lit(None)) \
    .with_column("disney", F.lit(None)) \
    .with_column("hulu",F.lit(None)) \
    .with_column("netflix", F.lit(None)) \
    .select("TYPE", "TITLE", "DIRECTOR", "CAST", "COUNTRY", "DATE_ADDED", 
            "RELEASE_YEAR", "RATING", "DURATION", "LISTED_IN", "DESCRIPTION")

# Select only the required columns for df_netflix
df_netflix_with_columns = df_netflix \
    .with_column("amazon", F.lit(None)) \
    .with_column("disney", F.lit(None)) \
    .with_column("hulu", F.lit(None)) \
    .with_column("netflix", F.lit(None)) \
    .select("TYPE", "TITLE", "DIRECTOR", "CAST", "COUNTRY", "DATE_ADDED", 
            "RELEASE_YEAR", "RATING", "DURATION", "LISTED_IN", "DESCRIPTION")


# Union all the DataFrames into df_streaming with the required columns
df_streaming = df_amazon_with_columns \
    .union(df_disney_with_columns) \
    .union(df_hulu_with_columns) \
    .union(df_netflix_with_columns)

# drop duplicate titles
df_streaming = df_streaming.groupBy("TITLE").agg(
    F.coalesce(F.max("TYPE"), F.lit(None)).alias("TYPE"),
    F.coalesce(F.max("DIRECTOR"), F.lit(None)).alias("DIRECTOR"),
    F.coalesce(F.max("CAST"), F.lit(None)).alias("CAST"),
    F.coalesce(F.max("COUNTRY"), F.lit(None)).alias("COUNTRY"),
    F.coalesce(F.max("DATE_ADDED"), F.lit(None)).alias("DATE_ADDED"),
    F.coalesce(F.max("RELEASE_YEAR"), F.lit(None)).alias("RELEASE_YEAR"),
    F.coalesce(F.max("RATING"), F.lit(None)).alias("RATING"),
    F.coalesce(F.max("DURATION"), F.lit(None)).alias("DURATION"),
    F.coalesce(F.max("LISTED_IN"), F.lit(None)).alias("LISTED_IN"),
    F.coalesce(F.max("DESCRIPTION"), F.lit(None)).alias("DESCRIPTION")
)

# Add primary key (row number)
df_streaming = df_streaming.with_column("streaming_key", F.row_number().over(Window.order_by(F.lit(1))))

# Save to table (assuming Snowflake environment is set up)
df_streaming.write \
    .mode("overwrite") \
    .save_as_table("STREAMING_MOVIES_TV")

df_streaming.show()

In [None]:
# Calculate missing values before cleaning
missing_values = df_streaming.select([
    F.sum(F.when(F.col(col).is_null(), 1).otherwise(0)).alias(col)
    for col in df_streaming.columns
])
missing_values.show()

# Fill missing 'rating' with mode and 'country' with 'Unknown'
mode_value = df_streaming.groupBy('RATING').count().orderBy(F.desc('RATING')).first()['RATING']
df_streaming = df_streaming.fillna({'RATING': mode_value, 'COUNTRY': 'Unknown'})

# Fill missing 'director' and 'cast' with 'Not available'
df_streaming = df_streaming.fillna({'DIRECTOR': 'Not available', 'CAST': 'Not available'})

# Fix 'date_added': Convert 'DATE_ADDED' to date, if null set to January 1st of 'RELEASE_YEAR'
df_streaming = df_streaming.with_column(
    "date_added",
    F.coalesce(
        F.to_date(df_streaming["DATE_ADDED"], "MMMM dd, yyyy"),
        F.to_date(F.concat(df_streaming["RELEASE_YEAR"], F.lit("-01-01")))
    )
)

df_streaming.show()

In [None]:

df_industry.show()
df_IMBD.show()


In [None]:
# Assuming you already have the Snowpark session initialized and the dataframes loaded

# Perform the join operation on the NAME column
df_matches = df_IMBD.join(df_industry, on="NAME", how="inner")


# Order the results by IMDB_ID (assuming IMDB_ID exists in df_IMBD)
df_matches = df_matches.orderBy("IMBD_ID")

# Show the first 10 rows where there is a match
df_matches.show(10)

# Count the total number of matching records
total_matches = df_matches.count()

# Count the total number of records in each dataframe
total_records_IMBD = df_IMBD.count()
total_records_industry = df_industry.count()

# Output the results
print(f"Total matching records: {total_matches}")
print(f"Total records in df_IMBD: {total_records_IMBD}")
print(f"Total records in df_industry: {total_records_industry}")
