# Movie Data ETL Pipeline - Load

With the data extracted and transformed from the 2 previous notebooks, this notebook will focus on loading the data into a PostgreSQL database. We will also be incorporating some of the ratings data into the combined movie data before loading it into the database.

### Dependencies

In [1]:
%matplotlib inline

import datetime as dt
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sqlalchemy import create_engine

from config import PSQL_PW

### Data

In [2]:
# Path to data directory
data_path = '../data/'

In [3]:
# Combined movie data
movies_df = pd.read_pickle(data_path + 'movies.pkl')
print(movies_df.info())
movies_df.head(2)

<class 'pandas.core.frame.DataFrame'>
Int64Index: 5982 entries, 0 to 5982
Data columns (total 30 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   movie_id              5982 non-null   int64         
 1   imdb_id               5982 non-null   object        
 2   imdb_link             5982 non-null   object        
 3   url                   5982 non-null   object        
 4   poster_path           5981 non-null   object        
 5   title                 5982 non-null   object        
 6   overview              5977 non-null   object        
 7   release_date          5982 non-null   datetime64[ns]
 8   year                  5982 non-null   int64         
 9   runtime               5982 non-null   float64       
 10  budget                4600 non-null   float64       
 11  revenue               5149 non-null   float64       
 12  genres                5982 non-null   object        
 13  country           

Unnamed: 0,movie_id,imdb_id,imdb_link,url,poster_path,title,overview,release_date,year,runtime,...,producers,writers,director,cinematographers,editors,composers,stars,production_companies,production_countries,distributor
0,9548,tt0098987,https://www.imdb.com/title/tt0098987/,https://en.wikipedia.org/wiki/The_Adventures_o...,/yLeX2QLkHeRlYQRcbU8BKgMaYYD.jpg,The Adventures of Ford Fairlane,"Ford ""Mr. Rock n' Roll Detective"" Fairlane is ...",1990-07-11,1990,104.0,...,"[Steve Perry, Joel Silver]","[David Arnott, James Cappe]",Renny Harlin,Oliver Wood,Michael Tronick,"[Cliff Eidelman, Yello]","[Andrew Dice Clay, Wayne Newton, Priscilla Pre...",[{'name': 'Twentieth Century Fox Film Corporat...,"[{'iso_3166_1': 'US', 'name': 'United States o...",20th Century Fox
1,25501,tt0098994,https://www.imdb.com/title/tt0098994/,"https://en.wikipedia.org/wiki/After_Dark,_My_S...",/3hjcHNtWn9T6jVGXgNXyCsMWBdj.jpg,"After Dark, My Sweet",The intriguing relationship between three desp...,1990-08-24,1990,114.0,...,"[Ric Kidney, Robert Redlin]","[James Foley, Robert Redlin]",James Foley,Mark Plummer,Howard E. Smith,Maurice Jarre,"[Jason Patric, Rachel Ward, Bruce Dern, George...","[{'name': 'Avenue Pictures Productions', 'id':...","[{'iso_3166_1': 'US', 'name': 'United States o...",Avenue Pictures


In [4]:
# Rating data
ratings_df = pd.read_csv(data_path + 'raw/ratings.csv')
print(ratings_df.info(null_counts=True))
ratings_df.head(2)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 26024289 entries, 0 to 26024288
Data columns (total 4 columns):
 #   Column     Non-Null Count     Dtype  
---  ------     --------------     -----  
 0   userId     26024289 non-null  int64  
 1   movieId    26024289 non-null  int64  
 2   rating     26024289 non-null  float64
 3   timestamp  26024289 non-null  int64  
dtypes: float64(1), int64(3)
memory usage: 794.2 MB
None


Unnamed: 0,userId,movieId,rating,timestamp
0,1,110,1.0,1425941529
1,1,147,4.5,1425942435


In [5]:
# Convert timestamp to datetime type
ratings_df['timestamp'] = pd.to_datetime(ratings_df['timestamp'], unit='s')
ratings_df.head(2)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,110,1.0,2015-03-09 22:52:09
1,1,147,4.5,2015-03-09 23:07:15


### Aggregate ratings by movie

With over 20 million rows in the ratings data, it would be helpful to summarize it with an aggregate and include it in the combined movie data. We will create a pivot table to count the number of times each movie got each numbered rating.

In [6]:
# Count ratings by movie
ratings_count_df = pd.pivot_table(data=ratings_df, index="movieId", columns='rating', 
                                  values='timestamp', aggfunc='count', fill_value=0).reset_index()
ratings_count_df

rating,movieId,0.5,1.0,1.5,2.0,2.5,3.0,3.5,4.0,4.5,5.0
0,1,441,804,438,2083,1584,11577,5741,22020,5325,15995
1,2,263,797,525,2479,1810,8510,2916,6035,690,2035
2,3,169,772,233,1665,616,6213,759,3433,154,1483
3,4,47,351,31,496,77,1133,66,557,12,211
4,5,237,785,270,1716,664,6608,723,2959,166,1130
...,...,...,...,...,...,...,...,...,...,...,...
45110,176267,0,0,0,0,0,0,0,1,0,0
45111,176269,0,0,0,0,0,0,1,0,0,0
45112,176271,0,0,0,0,0,0,0,0,0,1
45113,176273,0,1,0,0,0,0,0,0,0,0


In [7]:
# Add prefix to column names
ratings_count_df.columns = ['movie_id'] + ['rating_' + str(rating) for rating in ratings_count_df.columns[1:]]
ratings_count_df.head(2)

Unnamed: 0,movie_id,rating_0.5,rating_1.0,rating_1.5,rating_2.0,rating_2.5,rating_3.0,rating_3.5,rating_4.0,rating_4.5,rating_5.0
0,1,441,804,438,2083,1584,11577,5741,22020,5325,15995
1,2,263,797,525,2479,1810,8510,2916,6035,690,2035


### Combine movie and rating data

In [8]:
# Merge aggregate rating data with movie data
df = pd.merge(movies_df, ratings_count_df, on='movie_id', how='left')
df.head(2)

Unnamed: 0,movie_id,imdb_id,imdb_link,url,poster_path,title,overview,release_date,year,runtime,...,rating_0.5,rating_1.0,rating_1.5,rating_2.0,rating_2.5,rating_3.0,rating_3.5,rating_4.0,rating_4.5,rating_5.0
0,9548,tt0098987,https://www.imdb.com/title/tt0098987/,https://en.wikipedia.org/wiki/The_Adventures_o...,/yLeX2QLkHeRlYQRcbU8BKgMaYYD.jpg,The Adventures of Ford Fairlane,"Ford ""Mr. Rock n' Roll Detective"" Fairlane is ...",1990-07-11,1990,104.0,...,,,,,,,,,,
1,25501,tt0098994,https://www.imdb.com/title/tt0098994/,"https://en.wikipedia.org/wiki/After_Dark,_My_S...",/3hjcHNtWn9T6jVGXgNXyCsMWBdj.jpg,"After Dark, My Sweet",The intriguing relationship between three desp...,1990-08-24,1990,114.0,...,,,,,,,,,,


In [9]:
# Fill missing rating counts with 0
for col in df.columns[-10:]:
    df[col].fillna(0, inplace=True)
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 5982 entries, 0 to 5981
Data columns (total 40 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   movie_id              5982 non-null   int64         
 1   imdb_id               5982 non-null   object        
 2   imdb_link             5982 non-null   object        
 3   url                   5982 non-null   object        
 4   poster_path           5981 non-null   object        
 5   title                 5982 non-null   object        
 6   overview              5977 non-null   object        
 7   release_date          5982 non-null   datetime64[ns]
 8   year                  5982 non-null   int64         
 9   runtime               5982 non-null   float64       
 10  budget                4600 non-null   float64       
 11  revenue               5149 non-null   float64       
 12  genres                5982 non-null   object        
 13  country           

### Connect to PostgreSQL database

An empty PostgreSQL database was created in pgAdmin named `movie_db`. A connection will be made to this database so that the movie data can be loaded in.

In [10]:
# Connection string format: "postgres://[user]:[password]@[location]:[port]/[database]"
db_string = f'postgres://postgres:{PSQL_PW}@127.0.0.1:5432/movie_db'

# Create engine
engine = create_engine(db_string)
engine

Engine(postgres://postgres:***@127.0.0.1:5432/movie_db)

### Load movie data into database

In [11]:
# Create table for movie data
df.to_sql('movies', engine, if_exists='replace')
pd.read_sql('SELECT * FROM movies', engine).info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5982 entries, 0 to 5981
Data columns (total 41 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   index                 5982 non-null   int64         
 1   movie_id              5982 non-null   int64         
 2   imdb_id               5982 non-null   object        
 3   imdb_link             5982 non-null   object        
 4   url                   5982 non-null   object        
 5   poster_path           5981 non-null   object        
 6   title                 5982 non-null   object        
 7   overview              5977 non-null   object        
 8   release_date          5982 non-null   datetime64[ns]
 9   year                  5982 non-null   int64         
 10  runtime               5982 non-null   float64       
 11  budget                4600 non-null   float64       
 12  revenue               5149 non-null   float64       
 13  genres            

### Load rating data into database

The ratings data itself will also be loaded into the database, but it does contain ratings from a lot of movies that are not in the movie data. There's no point in keeping those ratings, especially since it will make the loading time unnecessarily long.

We will reduce the rating data to only ratings of movies that are in the movie data. But even with this reduction, it could still have a large number of rows. To handle this, the reduced data will be saved and then read back in in chunks to be loaded into the database one chunk at a time.

In [13]:
# Filter the data to only the movies in the movie data
ratings_reduced_df = ratings_df[ratings_df.movieId.isin(df.movie_id.values)]
ratings_reduced_df.shape

(4265986, 4)

In [14]:
# Save reduced rating data
ratings_reduced_df.to_csv(data_path + 'ratings_reduced.csv', index=False)
pd.read_csv(data_path + 'ratings_reduced.csv').info(null_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4265986 entries, 0 to 4265985
Data columns (total 4 columns):
 #   Column     Dtype  
---  ------     -----  
 0   userId     int64  
 1   movieId    int64  
 2   rating     float64
 3   timestamp  object 
dtypes: float64(1), int64(2), object(1)
memory usage: 130.2+ MB


In [None]:
# Load data in chunks
loaded, chunksize = 0, 5e5
start = dt.datetime.now()
for chunk in pd.read_csv(data_path + 'ratings_reduced.csv', chunksize=chunksize):
    print('Loading rows', loaded, 'to', loaded + chunksize, end=' | ')
    chunk.to_sql('ratings', engine, if_exists='append')
    loaded += chunksize
    print((dt.datetime.now() - start), 'elapsed')

Loading rows 0 to 500000.0 | 

In [None]:
pd.read_sql('SELECT * FROM ratings', engine).info()