In [41]:
#Import the packages that I will use 
import os
import json
import numpy as np
import datetime
import pandas as pd
import pprint 
import mysql.connector
import pymongo
import pymysql
from sqlalchemy import create_engine
import matplotlib.pyplot as plt
import requests

In [42]:
# To create my datamart that focuses on film data specifically information that would be useful for film studios, producers and investors I extracted data from MongoDB, an API call, and a CSV file then following the ELT process filled the datamart.
# The first part of ETL process is data extraction I pull data from three data source Mongo, an API, and a csv file
# The second step is transform because I plan on performing calculations with the data in SQL I transformed the data to allow to perform those operations
# The final step is load, I load my the data I have extracted into tables in the SQL database I created 

In [43]:
# Database Creation: 
# I first connect to a MySQL database, drops a database named 'films' (if it exists), and then creates it again.
connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="Uva!1819",
    port="3306"
)

cursor = connection.cursor()

database_drop = 'films'
try:
    cursor.execute(f"DROP DATABASE {database_drop}")
    print(f"Database '{database_drop}' dropped successfully!")
except mysql.connector.Error as err:
    print(f"Error: {err}")

try:
    cursor.execute("CREATE DATABASE films")
    print("Database 'films' created successfully!")
except mysql.connector.Error as err:
    print(f"Error: {err}")
finally:
    cursor.close()
    connection.close()



Database 'films' dropped successfully!
Database 'films' created successfully!


In [44]:
#Extration: I connect to a NOSQL database (MongoDB) and then extract data that contains information about movies
def extract_from_mongodb():
    host_name = "localhost"
    port = "27017"
    atlas_user_name = "ds2002"
    atlas_password = "UVA1819"
    atlas_cluster_name = "cluster0.xuibg2h.mongodb.net"
    
    connection_string = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}/"
    
    try:
        client = pymongo.MongoClient(connection_string)
        db = client.sample_mflix
        collection = db.movies
        documents = list(collection.find())  # Convert the cursor to a list for easier handling
        return documents
        
    except pymongo.errors.ConnectionFailure as e:
        print(f"MongoDB connection failed: {str(e)}")
        return []

# Extract data
documents = extract_from_mongodb()
for doc in documents[:1]:
    print(doc)
data_list = []

# Transform Mongo data: I select relevant fields (e.g., title, year, directors, ratings),clean the data to prepare it for SQL calculations by droping the missing data and making sure the data is the correct type (e.g., title to string, year to integer, and ratings to floats) 

for document in documents:
    try:
        data = {
            'title': document.get('title', np.nan),
            'year': document.get('year', np.nan),
            'directors': ', '.join(document.get('directors', [])), 
            'imdb_rating': document.get('imdb', {}).get('rating', np.nan), 
            'rottentomatoes': document.get('tomatoes', {}).get('viewer', {}).get('rating', np.nan) 
        }
        data_list.append(data)
    except Exception as e:
        print(f"Error processing document {document}: {e}")

# Create DataFrame
try:
    mongodb_df = pd.DataFrame(data_list)
except Exception as e:
    print(f"Error creating DataFrame: {e}")

# Clean data
try:
    mongodb_df.dropna(inplace=True)
    mongodb_df['title'] = mongodb_df['title'].astype(str)
    mongodb_df = mongodb_df[mongodb_df['year'].astype(str).str.isnumeric()]
    mongodb_df['year'] = mongodb_df['year'].astype(int)
    mongodb_df['directors'] = mongodb_df['directors'].astype(str)
    mongodb_df['rottentomatoes'] = mongodb_df['rottentomatoes'].astype(float)
    mongodb_df['imdb_rating'].replace('', np.nan, inplace=True)
    mongodb_df['imdb_rating'] = mongodb_df['imdb_rating'].astype(float)
except Exception as e:
    print(f"Failure cleaning data: {e}")

mongodb_df.head()


{'_id': ObjectId('573a1391f29313caabcd9458'), 'plot': 'A young artist draws a face at a canvas on his easel. Suddenly the mouth on the drawing comes into life and starts talking. The artist tries to wipe it away with his hand, but when he looks...', 'runtime': 55, 'rated': 'UNRATED', 'cast': ['Enrique Rivero', 'Elizabeth Lee Miller', 'Pauline Carton', 'Odette Talazac'], 'num_mflix_comments': 0, 'poster': 'https://m.media-amazon.com/images/M/MV5BYWY3ODE5ZWEtYjlmYi00NjA4LTk4ZWYtMzBhZDE5MjY0YTYxXkEyXkFqcGdeQXVyNzI4MDMyMTU@._V1_SY1000_SX677_AL_.jpg', 'title': 'The Blood of a Poet', 'lastupdated': '2015-09-16 13:13:05.537000000', 'languages': ['French'], 'released': datetime.datetime(2010, 5, 20, 0, 0), 'directors': ['Jean Cocteau'], 'writers': ['Jean Cocteau'], 'awards': {'wins': 1, 'nominations': 0, 'text': '1 win.'}, 'year': 1932, 'imdb': {'rating': 7.5, 'votes': 3903, 'id': 21331}, 'countries': ['France'], 'type': 'movie', 'tomatoes': {'viewer': {'rating': 3.9, 'numReviews': 2865, 'mete

Unnamed: 0,title,year,directors,imdb_rating,rottentomatoes
0,The Blood of a Poet,1932,Jean Cocteau,7.5,3.9
1,A Woman of Paris: A Drama of Fate,1923,Charles Chaplin,7.1,3.7
2,Where Are My Children?,1916,"Phillips Smalley, Lois Weber",5.9,3.1
3,The Birth of a Nation,1915,D.W. Griffith,6.8,3.2
4,Salomè,1922,"Charles Bryant, Alla Nazimova",6.6,3.9


In [48]:
#Extraction: connect to trakt.tv database using api call to extract data on popular movies, I have to fetch multiple pages otherwise I only get 10 datapoints
def get_api_response(url, headers, params=None):
    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
    except requests.exceptions.HTTPError as errh:
        return "An HTTP Error occurred: " + repr(errh)
    except requests.exceptions.ConnectionError as errc:
        return "An Error Connecting to the API occurred: " + repr(errc)
    except requests.exceptions.Timeout as errt:
        return "A Timeout Error occurred: " + repr(errt)
    except requests.exceptions.InvalidHeader as erri:
        return "A Header Error occurred: " + repr(erri)
    except requests.exceptions.RequestException as err:
        return "An Unknown Error occurred: " + repr(err)

    return response.json()

def extract_from_api(url, headers, limit=100):
    all_data = []
    for page in range(1, 5):  
        params = {
            'extended': 'full',
            'page': page,
            'limit': limit
        }
        response_data = get_api_response(url, headers, params)
        if isinstance(response_data, str):
            print(response_data)
            continue
        all_data.extend(response_data)
    return all_data

url = 'https://api.trakt.tv/movies/popular'
headers = {
    'Content-Type': 'application/json',
    'trakt-api-version': '2',
    'trakt-api-key': '6f89cb83d7ab0e50bd524a4cc4de508b11bdd9c89b73166cb1c5c8cd489dc188'
}

api_data = extract_from_api(url, headers)

if isinstance(api_data, str):
    print(api_data)
elif len(api_data) > 1:
    print(api_data[1])
else:
    print("Failed to extract data from API")


{'title': 'Guardians of the Galaxy', 'year': 2014, 'ids': {'trakt': 82405, 'slug': 'guardians-of-the-galaxy-2014', 'imdb': 'tt2015381', 'tmdb': 118340}, 'tagline': 'All heroes start somewhere.', 'overview': 'Light years from Earth, 26 years after being abducted, Peter Quill finds himself the prime target of a manhunt after discovering an orb wanted by Ronan the Accuser.', 'released': '2014-08-01', 'runtime': 121, 'country': 'us', 'trailer': 'https://youtube.com/watch?v=3CqymRQ1uUU', 'homepage': 'http://marvel.com/guardians', 'status': 'released', 'rating': 8.31525, 'votes': 91743, 'comment_count': 151, 'updated_at': '2023-10-31T08:02:35.000Z', 'language': 'en', 'available_translations': ['ar', 'az', 'be', 'bg', 'bs', 'ca', 'cn', 'cs', 'da', 'de', 'el', 'en', 'es', 'fa', 'fi', 'fr', 'he', 'hr', 'hu', 'is', 'it', 'ja', 'ka', 'ko', 'lt', 'lv', 'nl', 'no', 'pl', 'pt', 'ro', 'ru', 'sk', 'sl', 'sr', 'sv', 'th', 'tr', 'uk', 'uz', 'vi', 'zh'], 'genres': ['adventure', 'science-fiction', 'action

In [49]:
# Transform Data: I normalized the JSON data from the API, I select relevant fields (e.g., title, year, rating, certification),clean the data to prepare it for SQL calculations by droping the missing data and making sure the data is the correct type (e.g., title to string, year to integer, and rating to floats) 
try:
    api_df = pd.json_normalize(api_data)
    selected_data = [{'title': movie.get('title', 'N/A'), 
                      'year': movie.get('year', 'N/A'),
                      'rating': movie.get('rating', 'N/A'),
                      'certification': movie.get('certification', 'N/A')} for movie in api_data]
    api_df = pd.DataFrame(selected_data)
except Exception as e:
    print(f"Failure to transform data: {e}")

# Clean Data
try:
    api_df.dropna(inplace=True)
    api_df['title'] = api_df['title'].astype(str)
    api_df['year'] = api_df['year'].astype(int)
    api_df['rating'] = api_df['rating'].astype(float)
    api_df['certification'] = api_df['certification'].astype(str)
except Exception as e:
    print(f"Failure to clean data: {e}")

api_df.head()


Unnamed: 0,title,year,rating,certification
0,Deadpool,2016,8.29604,R
1,Guardians of the Galaxy,2014,8.31525,PG-13
2,The Dark Knight,2008,8.88482,PG-13
3,Inception,2010,8.65032,PG-13
4,Interstellar,2014,8.55333,PG-13


In [50]:
# Extraction: I extracted data that contained movie information from an online CSV file
try:
    url = "https://raw.githubusercontent.com/katedriebe/filmdata/main/HollywoodMovies.csv"
    df = pd.read_csv(url)
    
except Exception as e:
    print(f"Failure extracting data from CSV: {e}")

df.head()

Unnamed: 0,Movie,LeadStudio,RottenTomatoes,AudienceScore,Story,Genre,TheatersOpenWeek,OpeningWeekend,BOAvgOpenWeekend,DomesticGross,ForeignGross,WorldGross,Budget,Profitability,OpenProfit,Year
0,Spider-Man 3,Sony,61.0,54.0,Metamorphosis,Action,4252.0,151.1,35540.0,336.53,554.34,890.87,258.0,345.3,58.57,2007
1,Shrek the Third,Paramount,42.0,57.0,Quest,Animation,4122.0,121.6,29507.0,322.72,476.24,798.96,160.0,499.35,76.0,2007
2,Transformers,Paramount,57.0,89.0,Monster Force,Action,4011.0,70.5,17577.0,319.25,390.46,709.71,150.0,473.14,47.0,2007
3,Pirates of the Caribbean: At World's End,Disney,45.0,74.0,Rescue,Action,4362.0,114.7,26302.0,309.42,654.0,963.42,300.0,321.14,38.23,2007
4,Harry Potter and the Order of the Phoenix,Warner Bros,78.0,82.0,Quest,Adventure,4285.0,77.1,17998.0,292.0,647.88,939.89,150.0,626.59,51.4,2007


In [51]:
#Transform: I select relevant fields (e.g., title, year, genre, budget),clean the data to prepare it for SQL calculations by droping the missing data and making sure the data is the correct type (e.g., movie to string, year to integer, and budget to float), and then I standarized the column titles to make the joinging process easier 
try:
    selected_columns = ['Movie', 'Year', 'Genre', 'Budget', 'DomesticGross', 'ForeignGross', 'WorldGross', 'RottenTomatoes']
    csv_df = df[selected_columns]
    csv_df = csv_df.rename(columns={
        'Movie': 'title',
        'Year': 'year',
        'Genre': 'genres',
        'Budget': 'budget',
        'DomesticGross': 'domesticgross',
        'ForeignGross': 'foreigngross',
        'WorldGross': 'worldgross',
        'RottenTomatoes': 'rottentomatoes'
    })
except Exception as e:
    print(f"Failure transforming data: {e}")

try:
    csv_df.dropna(inplace=True)
    csv_df['title'] = csv_df['title'].astype(str)
    csv_df['year'] = csv_df['year'].astype(int)
    csv_df['genres'] = csv_df['genres'].astype(str)
    csv_df['domesticgross'] = csv_df['domesticgross'].astype(float)
    csv_df['foreigngross'] = csv_df['foreigngross'].astype(float)
    csv_df['worldgross'] = csv_df['worldgross'].astype(float)
    csv_df['rottentomatoes'] = csv_df['rottentomatoes'].astype(float)
except Exception as e:
    print(f"Failure cleaning data: {e}")


csv_df.head()

Unnamed: 0,title,year,genres,budget,domesticgross,foreigngross,worldgross,rottentomatoes
0,Spider-Man 3,2007,Action,258.0,336.53,554.34,890.87,61.0
1,Shrek the Third,2007,Animation,160.0,322.72,476.24,798.96,42.0
2,Transformers,2007,Action,150.0,319.25,390.46,709.71,57.0
3,Pirates of the Caribbean: At World's End,2007,Action,300.0,309.42,654.0,963.42,45.0
4,Harry Potter and the Order of the Phoenix,2007,Adventure,150.0,292.0,647.88,939.89,78.0


In [52]:
# Load:Connected to MySQL then loaded the transformed data to SQL tables to a datamart focused on fims.
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Uva!1819"
db_name = "films"
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
try:
    engine = create_engine(conn_str)
except Exception as e:
    print(f"Failure connecting to database: {e}")

# 'csv_df' dataframe to the 'profit' table in SQL films database
try:
    csv_df.to_sql(name='profit', con=engine, if_exists='replace', index=False)
except Exception as e:
    print(f"Failure saving 'csv_df' to 'profit' table: {e}")

# 'mongodb_df' dataframe to the 'ratings' table in SQL films database
try:
    mongodb_df.to_sql(name='ratings', con=engine, if_exists='replace', index=False)
except Exception as e:
    print(f"Failure saving 'mongodb_df' to 'ratings' table: {e}")

# 'api_df' dataframe to the 'popular' table in SQL films database
try:
    api_df.to_sql(name='popular', con=engine, if_exists='replace', index=False)   
except Exception as e:
    print(f"Failure saving 'api_df' to 'popular' table: {e}")
