In [None]:
#required installations
!pip install wikipedia
!pip install wikipedia-api
!pip install transformers accelerate bitsandbytes -q

In [None]:
#setting up libraries
import pandas as pd
import requests
from bs4 import BeautifulSoup
import wikipedia
import time
import urllib.parse
import re
import os
from google.colab import userdata
import json
import torch
from transformers import pipeline
import asyncio
import aiohttp
import wikipediaapi
import seaborn as sns
import matplotlib.pyplot as plt


os.environ['TMDB_TOKEN'] = userdata.get("TMDB-READ-ACCESS-TOKEN")
os.environ['GROQ_API_KEY'] = userdata.get("GROQ_API_KEY")
os.environ['OMDB_API_KEY'] = userdata.get("OMDB_API_KEY")

In [None]:
wikipedia.set_lang("en") #seting the for wikipedia language to english


In [None]:
df_movie_sample = pd.read_csv("sampled_100_movies.csv")

#Movie Material Collection

In [None]:
movie_title = df_movie_sample['title_x']

Authenticate For TMDB

In [None]:
def authenticate():
  url = "https://api.themoviedb.org/3/authentication"
  headers = {
    "accept": "application/json",
    "Authorization": f"Bearer {os.environ['TMDB_TOKEN']}"
  }
  response = requests.get(url, headers=headers)
  if json.loads(response.text)['success']: return
  raise Exception("Authentication failed")

Extracting Movie Synopsis From Wikipedia and TMDB

In [None]:
def extract_plot_from_wikipedia(wiki_url):
    headers = {
        'User-Agent': 'Mozilla/5.0'
    }

    try:
        response = requests.get(wiki_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')

        # More robust search for the 'Plot' section
        plot_heading = soup.find(id='Plot')
        if not plot_heading:
          plot_heading = soup.find(id='Synopsis')

        if not plot_heading:
            #print(f"No plot section found in: {wiki_url}")
            return None

        # Get the text between this header and the next header
        content = []
        for sibling in plot_heading.parent.find_next_siblings():
            if sibling.name == 'h2':
                break
            if sibling.name == 'p':
                text = sibling.get_text(" ", strip=True)
                if text:
                    content.append(text)

        plot_text = "\n\n".join(content).strip()

        return plot_text if plot_text else None

    except Exception as e:
        #print(f"Error fetching from {wiki_url}: {e}")
        return None

In [None]:
def search_wikipedia_url(title):
    base_url = "https://en.wikipedia.org/wiki/"
    formatted_title = urllib.parse.quote(title.replace(" ", "_"))
    return base_url + formatted_title

In [None]:
plots_data = []
for index, row in df_movie_sample.iterrows():
    title = row['title_x']
    imdb_id = row['imdb_id']
    wiki_link = row["wiki_link"]
    plot = extract_plot_from_wikipedia(wiki_link)
    plots_data.append({'title_x': title, 'imdb_id': imdb_id, 'plot': plot})

df_plots = pd.DataFrame(plots_data)
#display(df_plots.head())

Find Movie Plot from TMDB

In [None]:
def get_movie_overview(movie_name):
  movie_id = search_movie(movie_name)
  if movie_id is None:
    return None
  movie_data = get_movie_data(movie_id)
  return movie_data['overview']

Find Movie Poster from TMDB

In [None]:
def search_movie(movie_name):
  response = requests.get(
    url = f"https://api.themoviedb.org/3/search/movie?query={movie_name}",
    headers = {
        "accept": "application/json",
        "Authorization": f"Bearer {os.environ['TMDB_TOKEN']}"
    }
  )
  movie_data = json.loads(response.text)
  movie_id = None
  for result in movie_data['results']:
    if result['title'] == movie_name:
      movie_id = result['id']
      return movie_id
  return None


def get_movie_data(movie_id):
  response = requests.get(
    url = f"https://api.themoviedb.org/3/movie/{movie_id}?language=en-US",
    headers = {
      "accept": "application/json",
      "Authorization": f"Bearer {os.environ['TMDB_TOKEN']}"
    }
  )
  movie_data = json.loads(response.text)
  return movie_data


def construct_poster_url(poster_path):
  # getting the base_url and image size from the configurations API
  response = requests.get(
      url = "https://api.themoviedb.org/3/configuration",
      headers = {
        "accept": "application/json",
        "Authorization": f"Bearer {os.environ['TMDB_TOKEN']}"
      }
  )

  config_data = json.loads(response.text)
  base_url = config_data['images']['base_url']
  # selecting the 3rd smallest size
  poster_size = config_data['images']['poster_sizes'][2]
  return f"{base_url}{poster_size}{poster_path}"



def get_movie_poster(movie_name):
  movie_id = search_movie(movie_name)
  if movie_id is None:
    return None
  movie_data = get_movie_data(movie_id)
  poster_path = movie_data['poster_path']
  poster_url = construct_poster_url(poster_path)
  return poster_url


COLLECTING ALL MOVIE SYNOPSIS

In [None]:
def get_plot(row):
    if pd.notna(row['plot']):
        return row['plot']
    else:
        movie_title = row['title_x']
        story = df_movie_sample.loc[df_movie_sample['title_x'] == movie_title, 'story'].iloc[0]
        if pd.notna(story):
            return story
        else:
            summary = df_movie_sample.loc[df_movie_sample['title_x'] == movie_title, 'summary'].iloc[0]
            if pd.notna(summary):
                return summary
            else:
                return get_movie_overview(movie_title)

df_plots['plot'] = df_plots.apply(get_plot, axis=1)
#display(df_plots)

Collecting all Movie Posters

In [None]:
movie_poster = df_movie_sample[['title_x']].copy()

def get_poster(row):
    if pd.notna(row['poster_path']) and row['poster_path'] != '':
        return row['poster_path']
    else:
        return get_movie_poster(row['title_x'])

movie_poster['movie_poster'] = df_movie_sample.apply(get_poster, axis=1)
#display(movie_poster)

In [None]:
# Create the "data" directory if it doesn't exist
if not os.path.exists('data'):
    os.makedirs('data')

df_plot_poster = pd.merge(df_plots, movie_poster, on='title_x')
df_plot_poster.to_csv('data/plot_poster_data.csv', index=False)
#display(df_plot_poster.head())

#DESCRIPTIVE METADATA

Scraping Gender of Director

In [None]:
gender_cache = {}

async def get_gender_async(session, name):
    name = name.strip()

    # Return from cache if already found
    if name in gender_cache:
        return gender_cache[name]

    # --- Try Wikipedia first ---
    try:
        async with session.get(f"https://en.wikipedia.org/api/rest_v1/page/summary/{name}") as response:
            if response.status == 200:
                data = await response.json()
                text = data.get("extract", "").lower()
                if " she " in text or " her " in text:
                    gender_cache[name] = "female"
                    return "female"
                elif " he " in text or " his " in text:
                    gender_cache[name] = "male"
                    return "male"
    except Exception as e:
        print(f"Error fetching from Wikipedia for {name}: {e}")

    # --- Fallback to genderize.io ---
    first_name = name.split()[0]
    try:
        async with session.get(f"https://api.genderize.io?name={first_name}") as response:
            if response.status == 200:
                data = await response.json()
                gender = data.get("gender", "unknown")
                gender_cache[name] = gender or "unknown"
                return gender or "unknown"
    except Exception as e:
        print(f"Error for {name}: {e}")

    # Default fallback
    gender_cache[name] = "unknown"
    return "unknown"

async def annotate_director_genders_async(df, column_name='directors_name'):
    # Get all unique director names
    all_directors = set(d.strip() for directors in df[column_name] for d in directors.split('|'))

    async with aiohttp.ClientSession() as session:
        tasks = [get_gender_async(session, name) for name in all_directors]
        await asyncio.gather(*tasks)

    # Map back to DataFrame
    def annotate_row(directors_str):
        return '|'.join([f"{d.strip()} ({gender_cache.get(d.strip(), 'unknown')})" for d in directors_str.split('|')])

    df['Director_Genders'] = df[column_name].apply(annotate_row)
    return df

Adding Gender to Data Frame

In [None]:
df_movie_sample = await annotate_director_genders_async(df_movie_sample)
#display(df_movie_sample.head())

Summary Table and Plot for Director's Gender

In [None]:
# Extract the gender from the 'Director_Genders' column
df_movie_sample['gender'] = df_movie_sample['Director_Genders'].apply(lambda x: x.split('(')[-1][:-1] if '(' in x else 'unknown')

# Count the occurrences of each gender
gender_counts = df_movie_sample['gender'].value_counts().reset_index()
gender_counts.columns = ['Gender', 'Count']

# Display the summary table
display(gender_counts)

# Create the bar plot
plt.figure(figsize=(8, 6))
sns.barplot(x='Gender', y='Count', data=gender_counts)
plt.title('Distribution of Director Genders')
plt.xlabel('Gender')
plt.ylabel('Count')
plt.show()

GET BOX OFFICE DATA

API Key

In [None]:
OMDB_API_KEY = userdata.get("OMDB_API_KEY")  # Replace with perosnal key for access  key

In [None]:
def get_gross_revenue_from_omdb(imdb_id):
    try:
        url = f"http://www.omdbapi.com/?i={imdb_id}&apikey={OMDB_API_KEY}"
        response = requests.get(url)
        data = response.json()
        return data.get("BoxOffice", None)
    except Exception as e:
        print(f"Error for {imdb_id}: {e}")
        return None

In [None]:
box_office = pd.DataFrame()
box_office[imdb_id] = df_movie_sample['imdb_id'].copy()
box_office['box_office'] = df_movie_sample['imdb_id'].apply(get_gross_revenue_from_omdb)

Summary Plot and Graog for Box Office Data

In [None]:
# Clean the 'box_office' column
box_office['box_office_numeric'] = box_office['box_office'].replace({'\$': '', ',': ''}, regex=True)
box_office['box_office_numeric'] = pd.to_numeric(box_office['box_office_numeric'], errors='coerce')

# Create a summary table
box_office_summary = box_office['box_office_numeric'].describe().to_frame()

# Format the summary table
box_office_summary['box_office_numeric'] = box_office_summary['box_office_numeric'].apply(lambda x: f"${x:,.2f}")


display(box_office_summary)

# Create a summary plot
plt.figure(figsize=(10, 6))
sns.histplot(box_office['box_office_numeric'].dropna(), bins=20, kde=True)
plt.title('Distribution of Box Office Revenue')
plt.xlabel('Box Office Revenue (in millions)')
plt.ylabel('Frequency')
plt.show()

#THEMATIC CODING

In [None]:
def analyze_theme_sentiment_llm_batched(plots, candidate_labels, classifier):
    results = []
    for i, plot in enumerate(plots):
        print(f"Processing plot {i+1}/{len(plots)}...")
        try:
            # The zero-shot pipeline handles truncation
            response = classifier(plot, candidate_labels)
            results.append(response['labels'][0])
        except Exception as e:
            print(f"An error occurred while processing plot {i+1}: {e}")
            results.append("Error")

    return results

Using Pre-trained model from HuggingFace

Note : For time effeciency use GPU when running on Google Colab

In [None]:
classifier = pipeline("zero-shot-classification", model="cross-encoder/nli-distilroberta-base")

axes = {
    'hindu_muslim': ['Secular','Exclusionary'],
    'feminist_misogynistic': ['Feminist', 'Misogynistic'],
    'nationalism': ['Tolerant', 'Jingoistic'],
    'caste_dynamics': ['Egalitarian','casteist']
}

for axis_name, labels in axes.items():
    df_plots[f"{axis_name}_llm_label"] = analyze_theme_sentiment_llm_batched(df_plots['plot'].tolist(), labels, classifier)

del classifier
torch.cuda.empty_cache()

In [None]:
df_plots = pd.merge(df_plots, df_movie_sample[['imdb_id', 'year_of_release']], on='imdb_id', how='left')
#display(df_plots.head())

In [None]:
df_plots.to_csv('data/classified_data.csv')