In [54]:
import pandas as pd
from sqlalchemy import create_engine

# Database connection setup
amazon_engine = create_engine('mysql+pymysql://root:1234@127.0.0.1/amazon')
hotstar_engine = create_engine('mysql+pymysql://root:1234@127.0.0.1/hotstar')
netflix_engine = create_engine('mysql+pymysql://root:1234@127.0.0.1/netflix')

In [55]:
amazon_file = r'datasources\amazonprime.csv'
hotstar_file = r'datasources\hotstar.csv'
netflix_file = r'datasources\netflix.csv'

# Column mappings and unique identifiers for each dataset
amazon_mappings = {
    "id": "id",
    "title": "title",
    "type": "type",
    "description": "description",
    "release_year": "release_year",
    "age_certification": "age_certification",
    "runtime": "runtime",
    "genres": "genres",
    "production_countries": "production_countries",
    "seasons": "seasons",
    "imdb_id": "imdb_id",
    "imdb_score": "imdb_score",
    "imdb_votes": "imdb_votes",
    "tmdb_popularity": "tmdb_popularity",
    "tmdb_score": "tmdb_score",
}
hotstar_mappings = {
    "hotstar_id": "hotstar_id",
    "title": "title",
    "description": "description",
    "genre": "genre",
    "year": "year",
    "age_rating": "age_rating",
    "running_time": "running_time",
    "seasons": "seasons",
    "episodes": "episodes",
    "type": "type",
}
netflix_mappings = {
    "show_id": "show_id",
    "type": "type",
    "title": "title",
    "director": "director",
    "cast": "cast",
    "country": "country",
    "date_added": "date_added",
    "release_year": "release_year",
    "rating": "rating",
    "duration": "duration",
    "listed_in": "listed_in",
    "description": "description",
}

In [56]:
def preprocess(df, column_mappings, unique_columns, numeric_cols=None, missing_threshold=0.5):
    # Rename columns
    df.rename(columns=column_mappings, inplace=True)
    
    # Trim whitespace from string columns
    df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
    
    # Drop rows with excessive missing values
    df = df[df.isnull().mean(axis=1) <= missing_threshold]
    
    # Drop duplicate rows based on unique movie columns
    df = df.drop_duplicates(subset=unique_columns)
    
    # Handle numeric columns with missing values
    if numeric_cols:
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
    
    return df


In [57]:

# Preprocess datasets
amazon_df = pd.read_csv(amazon_file)
hotstar_df = pd.read_csv(hotstar_file)
netflix_df = pd.read_csv(netflix_file)

In [58]:
amazon_df.columns

Index(['id', 'title', 'type', 'description', 'release_year',
       'age_certification', 'runtime', 'genres', 'production_countries',
       'seasons', 'imdb_id', 'imdb_score', 'imdb_votes', 'tmdb_popularity',
       'tmdb_score'],
      dtype='object')

In [59]:
hotstar_df.columns

Index(['hotstar_id', 'title', 'description', 'genre', 'year', 'age_rating',
       'running_time', 'seasons', 'episodes', 'type'],
      dtype='object')

In [60]:
netflix_df.columns

Index(['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in', 'description'],
      dtype='object')

In [61]:

amazon_df = preprocess(amazon_df, amazon_mappings, unique_columns=["title", "release_year"], numeric_cols=["runtime", "release_year"])
hotstar_df = preprocess(hotstar_df, hotstar_mappings, unique_columns=["hotstar_id"], numeric_cols=["running_time", "year"])
netflix_df = preprocess(netflix_df, netflix_mappings, unique_columns=["show_id","title", "release_year"], numeric_cols=["release_year"])


In [62]:

# Load data into respective databases
amazon_df.to_sql('amazonprime', con=amazon_engine, if_exists='append', index=False)


10860

In [64]:
from sqlalchemy import text

def upsert_hotstar_data_skip_errors(df, engine, table_name):
    """
    Perform row-by-row upsert for Hotstar data into the specified table,
    skipping rows that cause errors.
    """
    # Convert DataFrame to list of dictionaries for parameter binding
    data_dicts = df.to_dict(orient="records")
    
    # Define the query using text()
    query = text(f"""
    INSERT INTO {table_name} (hotstar_id, title, description, genre, year, age_rating, running_time, seasons, episodes, type)
    VALUES (:hotstar_id, :title, :description, :genre, :year, :age_rating, :running_time, :seasons, :episodes, :type)
    ON DUPLICATE KEY UPDATE
        title = VALUES(title),
        description = VALUES(description),
        genre = VALUES(genre),
        year = VALUES(year),
        age_rating = VALUES(age_rating),
        running_time = VALUES(running_time),
        seasons = VALUES(seasons),
        episodes = VALUES(episodes),
        type = VALUES(type);
    """)
    
    skipped_rows = []  # To log rows causing errors
    with engine.connect() as conn:
        for row in data_dicts:
            try:
                conn.execute(query, row)
            except Exception as e:
                # Log the error and skip this row
                skipped_rows.append({"row": row, "error": str(e)})
                
            
        conn.commit()
    
    # Log skipped rows for review
    # Writing skipped rows to a log file with UTF-8 encoding
    if skipped_rows:
        print(f"Skipped {len(skipped_rows)} rows due to errors.")
        with open("skipped_rows_log.txt", "w", encoding="utf-8") as log_file:
            for skipped in skipped_rows:
                log_file.write(f"Row: {skipped['row']}\nError: {skipped['error']}\n\n")


upsert_hotstar_data_skip_errors(hotstar_df, hotstar_engine, 'Hotstar')

Skipped 4568 rows due to errors.


In [65]:
import pandas as pd

def preprocess(df, column_mappings, unique_columns, numeric_cols=None, date_cols=None, missing_threshold=0.5):
    """
    Preprocess the DataFrame:
    - Rename columns
    - Drop duplicates
    - Handle numeric and date columns
    """
    # Rename columns
    df.rename(columns=column_mappings, inplace=True)
    
    # Trim whitespace from string columns
    df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
    
    # Drop rows with excessive missing values
    df = df[df.isnull().mean(axis=1) <= missing_threshold]
    
    # Drop duplicate rows based on unique columns
    df = df.drop_duplicates(subset=unique_columns)
    
    # Handle numeric columns with missing values
    if numeric_cols:
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
    
    # Convert date columns to 'YYYY-MM-DD' format
    if date_cols:
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors='coerce').dt.date
    
    return df

# Column mappings for Netflix
netflix_mappings = {
    "show_id": "show_id",
    "type": "type",
    "title": "title",
    "director": "director",
    "cast": "cast",
    "country": "country",
    "date_added": "date_added",
    "release_year": "release_year",
    "rating": "rating",
    "duration": "duration",
    "listed_in": "listed_in",
    "description": "description",
}

# Preprocess the Netflix dataset
netflix_df = preprocess(
    netflix_df,
    netflix_mappings,
    unique_columns=["show_id"],
    numeric_cols=["release_year"],
    date_cols=["date_added"]
)


netflix_df.to_sql('netflix', con=netflix_engine, if_exists='append', index=False)


7787

In [None]:
"""SCHEMAS : 

drop netflix.netflix
drop amazon.amazonprime
drop hotstar.hotstar

CREATE TABLE netflix.Netflix (
    show_id VARCHAR(20) PRIMARY KEY,       -- Unique ID for each show or movie
    type VARCHAR(50) NOT NULL,            -- 'Movie' or 'TV Show'
    title VARCHAR(255) NOT NULL,          -- Title of the movie or show
    director VARCHAR(255),                -- Director name(s)
    cast TEXT,                            -- Cast members
    country VARCHAR(255),                 -- Countries where it's available or made
    date_added DATE,                      -- Date the content was added
    year INT,                     -- Year of release
    rating VARCHAR(10),                   -- Content rating (e.g., 'PG-13', 'R')
    duration VARCHAR(50),                 -- Duration (e.g., '90 min', '2 Seasons')
    listed_in TEXT,                       -- Categories/genres (e.g., 'Horror, Drama')
    description TEXT                      -- Description or synopsis
);


CREATE TABLE amazon.AmazonPrime (
    id VARCHAR(20) PRIMARY KEY,           -- Unique ID for each show or movie
    title VARCHAR(255) NOT NULL,          -- Title of the movie or show
    type VARCHAR(50) NOT NULL,            -- 'Movie' or 'TV Show'
    description TEXT,                     -- Description or synopsis
    year INT,                     -- Year of release
    age_certification VARCHAR(10),        -- Age rating (e.g., 'PG', 'U/A 16+')
    runtime INT,                          -- Runtime in minutes
    genres TEXT,                          -- Categories/genres (e.g., 'Drama, Thriller')
    production_countries TEXT,            -- Production countries (e.g., 'US, UK')
    seasons INT,                          -- Number of seasons (for TV shows)
    imdb_id VARCHAR(15),                  -- IMDb ID
    imdb_score DECIMAL(3, 1),             -- IMDb rating (e.g., 8.7)
    imdb_votes INT,                       -- Number of votes on IMDb
    tmdb_popularity FLOAT,                -- TMDb popularity score
    tmdb_score FLOAT                      -- TMDb rating
);

CREATE TABLE hotstar.Hotstar (
    hotstar_id VARCHAR(20) PRIMARY KEY,   -- Unique ID for each show or movie
    title VARCHAR(255) NOT NULL,          -- Title of the movie or show
    description TEXT,                     -- Description or synopsis
    genre VARCHAR(100),                   -- Primary genre (e.g., 'Action')
    year INT,                             -- Year of release
    age_rating VARCHAR(10),               -- Age rating (e.g., 'U/A 16+', 'U')
    running_time FLOAT,                   -- Runtime in minutes
    seasons INT,                          -- Number of seasons (for TV shows)
    episodes INT,                         -- Number of episodes (for TV shows)
    type VARCHAR(50) NOT NULL             -- 'Movie' or 'TV Show'
);

ALTER TABLE netflix.Netflix 
ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

ALTER TABLE amazon.AmazonPrime 
ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;

ALTER TABLE hotstar.Hotstar 
ADD COLUMN updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP;


"""