In [0]:
pip install azure

In [0]:
import pandas as pd
import azure.storage.blob as azureblob
import json
import ast
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.preprocessing import StandardScaler
from pyspark.sql.types import *

import warnings
warnings.filterwarnings('ignore')

account_name = 'rohitk0810deprojects'
account_key = 'EsE3G6/l/WlknvHfc6VsXvBRyI7MrrWFV4ysGmJ+c6NRSan1LjmNvZows88flYMA88dkBpKoNr0P+AStFglhaA=='
container_name = 'blob'
blob_name = 'movies_metadata.csv'
blob_service = azureblob.BlockBlobService(account_name, account_key)

blob_service.get_blob_to_path(container_name, blob_name, 'temp.csv')

df = pd.read_csv('temp.csv')

In [0]:
def safe_json_loads(s):
    try:
        if s.startswith('{') and s.endswith('}'):
            return json.loads(s)
        else:
            return json.loads(s.replace("'", '"'))
    except json.JSONDecodeError:
        return []

def safe_json_loads_2(s):
    try:
        return json.loads(s.replace("'", '"'))
    except json.JSONDecodeError:
        return {}

def unpack_json(og_column_name, sub_column_names):
    for sub_column_name in sub_column_names:
        new_column_name = og_column_name + "_" + sub_column_name
        df[new_column_name] = df[og_column_name].apply(lambda x: [i.get(sub_column_name, None) for i in safe_json_loads(str(x))])
    return df

df = unpack_json('spoken_languages', ['iso_639_1','name']) 
df = unpack_json('genres', ['id','name'])
df = unpack_json('production_companies', ['id','name'])

cols_to_drop = ['belongs_to_collection','production_companies','genres','homepage','overview','poster_path','production_countries','spoken_languages','tagline','genres_id','production_companies_id']
df = df.drop(cols_to_drop, axis=1)

In [0]:
numeric_columns = ['budget', 'revenue', 'popularity', 'vote_average', 'vote_count', 'runtime']

for col in numeric_columns:
    df[col] = pd.to_numeric(df[col], errors='coerce')
    
df['budget'].replace(0, np.nan, inplace=True)
df['revenue'].replace(0, np.nan, inplace=True)

scaler = StandardScaler()
scaled_features = scaler.fit_transform(df[['popularity', 'vote_average', 'vote_count', 'budget', 'revenue']])

knn_imputer = KNNImputer(n_neighbors=5)
imputed_data = knn_imputer.fit_transform(scaled_features)

df[['popularity', 'vote_average', 'vote_count', 'budget', 'revenue']] = scaler.inverse_transform(imputed_data)

df.fillna(0, inplace=True)

df['spoken_languages_iso_639_1'] = df['spoken_languages_iso_639_1'].apply(lambda x: ','.join(x) if isinstance(x, list) else x)
df['spoken_languages_name'] = df['spoken_languages_name'].apply(lambda x: ','.join(x) if isinstance(x, list) else x)
df['production_companies_name'] = df['production_companies_name'].apply(lambda x: ','.join(x) if isinstance(x, list) else x)
df['genres_name'] = df['genres_name'].apply(lambda x: ','.join(x) if isinstance(x, list) else x)

In [0]:
df2 = df[~df['original_language'].astype(str).str.match(r'^(\d+\.?\d*|\.\d+)$') & df['original_language'].notna()]

language_analysis = df2.groupby('original_language').agg(
    number_of_movies=('id', 'count'),
    average_budget=('budget', 'mean'),
    average_revenue=('revenue', 'mean'),
    average_popularity=('popularity', 'mean')).reset_index()

In [0]:
df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')

df['release_year'] = df['release_date'].dt.year
df['release_month'] = df['release_date'].dt.month

df_after_1995 = df[df['release_year'] > 1995]

movies_per_year_after_1995 = df_after_1995['release_year'].value_counts().sort_index()

genres_per_year_after_1995 = df_after_1995.explode('genres_name').groupby(['release_year', 'genres_name']).size().unstack(fill_value=0)

top_genres_per_year_after_1995 = genres_per_year_after_1995.apply(lambda x: list(x.nlargest(5).index), axis=1)

top_genres_list_after_1995 = top_genres_per_year_after_1995.apply(lambda x: ', '.join(x))

release_date_analysis_after_1995 = pd.DataFrame({
    'Year': movies_per_year_after_1995.index,
    'Number of Movies': movies_per_year_after_1995.values,
    'Top 5 Genres': top_genres_list_after_1995.values
})

In [0]:
df_exploded_genres = df.explode('genres_name')

genre_distribution = df_exploded_genres['genres_name'].value_counts()

avg_budget_revenue_by_genre = df_exploded_genres.groupby('genres_name').agg(
    average_budget=('budget', 'mean'),
    average_revenue=('revenue', 'mean')).reset_index()

avg_rating_popularity_by_genre = df_exploded_genres.groupby('genres_name').agg(
    average_rating=('vote_average', 'mean'),
    average_popularity=('popularity', 'mean')).reset_index()

genre_analysis_table = pd.merge(
    avg_budget_revenue_by_genre,
    avg_rating_popularity_by_genre,
    on='genres_name')
genre_analysis_table['number_of_movies'] = genre_analysis_table['genres_name'].map(genre_distribution)

In [0]:
spark.conf.set(
    "fs.azure.account.key.moviedatanalytics.dfs.core.windows.net","xs8eaOJ+uzl3/n8bz4au+ieUyllqksTZvuvLBa+ybkV3/DQjfPKN/75c/reepUbu/QMY2vM5HgUh+ASt7RUWaw==")

In [0]:
lang_schema = StructType([
    StructField("original_language", StringType()),
    StructField("number_of_movies", LongType()),
    StructField("average_budget", DoubleType()),
    StructField("average_revenue", DoubleType()),
    StructField("average_popularity", DoubleType())
])

lang_analysis = spark.createDataFrame(language_analysis, schema=lang_schema)

lang_analysis.write.mode("overwrite").parquet(path='abfss://movieanalytics@moviedatanalytics.dfs.core.windows.net/Processed_Data/lang_analysis/')

In [0]:
release_date_schema = StructType([
    StructField("Year", DoubleType()),
    StructField("Number_of_Movies", LongType()),
    StructField("Top_5_Genres", StringType())
])

release_analysis_post_1995 = spark.createDataFrame(release_date_analysis_after_1995, schema=release_date_schema)

release_analysis_post_1995.write.mode("overwrite").parquet(path='abfss://movieanalytics@moviedatanalytics.dfs.core.windows.net/Processed_Data/release_date_analysis/')

In [0]:
genre_schema = StructType([
    StructField("genres_name", StringType()),
    StructField("average_budget", DoubleType()),
    StructField("average_revenue", DoubleType()),
    StructField("average_popularity", DoubleType()),
    StructField("number_of_movies", LongType())
])

genre_analysis = spark.createDataFrame(genre_analysis_table, schema=genre_schema)

genre_analysis.write.mode("overwrite").parquet(path='abfss://movieanalytics@moviedatanalytics.dfs.core.windows.net/Processed_Data/genre_analysis/')