## Data Extraction,Transforming and Loading using Pandas and ProsgreSQL

In this project the basics of ETL shall be explored.

This is a dataset from [Kaggle](https://www.kaggle.com/rounakbanik/the-movies-dataset#ratings_small.csv),and detailed decription of the dataset can also be found [here](https://grouplens.org/datasets/movielens/latest/)


### Project Tasks includes:

* Data extraction from csv files into pandas DataFrame.

* Transformation and grouping of data

* Create database and tables in Postgres.

* Load pandas dataframe into the Database.

* Query Database

In [1]:
import pandas as pd
import matplotlib as plt
import psycopg2
from sqlalchemy import create_engine
import os

from datetime import datetime
import numpy as np

%matplotlib inline

## **Extraction and Transformation**

### **First Dataset** 

In [2]:
df_links = pd.read_csv('links.csv')
df_links.head()

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0
3,4,114885,31357.0
4,5,113041,11862.0


In [3]:
#Check for missing values
df_links.isnull().sum()

#Drop all missing values
df_links.dropna()

df_links

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0
3,4,114885,31357.0
4,5,113041,11862.0
...,...,...,...
9737,193581,5476944,432131.0
9738,193583,5914996,445030.0
9739,193585,6397426,479308.0
9740,193587,8391976,483455.0


#### Using Idenfiers we generate the movie links, by appending the IMDB/TMDB links to the imdbid/tmdbid respectively for each movie.
For IMDB Link = 'http://www.imdb.com/title/tt***'

The *** or IMDB ID in the actual hyperlink requires a 7 digit ID. So in the dataset, wherever we have 6 digits in the 'imdbid' column, we are appending '0' before the id in the link. For a 7 digit 'imdbid', no changes are required.

For TMDB Link= 'https://www.themoviedb.org/movie/***'

Replace the astericts with the TMDB ID value.

In [4]:
# Create a copy of the Dataset

df_links_copy = df_links.copy()

In [5]:
# Fill the NA in the tmbId colunm with 0 and change to interger
df_links['tmdbId'] = df_links['tmdbId'].fillna(0).astype(int)

In [6]:
# Add new columns to Create IMDB link 
df_links['IMDB_link'] = df_links.apply(lambda row: "https://www.imdb.com/title/tt0" + "0" * (6 - len(str(row.imdbId))) + str(row.imdbId), axis = 1)
df_links.tail()

Unnamed: 0,movieId,imdbId,tmdbId,IMDB_link
9737,193581,5476944,432131,https://www.imdb.com/title/tt05476944
9738,193583,5914996,445030,https://www.imdb.com/title/tt05914996
9739,193585,6397426,479308,https://www.imdb.com/title/tt06397426
9740,193587,8391976,483455,https://www.imdb.com/title/tt08391976
9741,193609,101726,37891,https://www.imdb.com/title/tt0101726


In [7]:
# Add new columns to Create TMDB link 
df_links['TMDB_link'] = df_links.apply(lambda row: "https://www.themoviedb.org/movie/" + str(row.tmdbId), axis = 1)
df_links.tail()

Unnamed: 0,movieId,imdbId,tmdbId,IMDB_link,TMDB_link
9737,193581,5476944,432131,https://www.imdb.com/title/tt05476944,https://www.themoviedb.org/movie/432131
9738,193583,5914996,445030,https://www.imdb.com/title/tt05914996,https://www.themoviedb.org/movie/445030
9739,193585,6397426,479308,https://www.imdb.com/title/tt06397426,https://www.themoviedb.org/movie/479308
9740,193587,8391976,483455,https://www.imdb.com/title/tt08391976,https://www.themoviedb.org/movie/483455
9741,193609,101726,37891,https://www.imdb.com/title/tt0101726,https://www.themoviedb.org/movie/37891


### **Second dataset**

In [8]:
df_movies = pd.read_csv('movies.csv')


In [9]:
#Check for missing values
df_movies.isnull().sum()

movieId    0
title      0
genres     0
dtype: int64

In [10]:
# Create a copy of the Dataset
df_movies_copy = df_movies.copy()

Extract year of release from the title and create a new column 'year_of_release'

In [11]:
#Replace delimiter in title column with space
df_movies["title"] = df_movies["title"].str.replace(',','')

#Split title column into two 
split = df_movies["title"].str.rsplit("(", n = 1, expand = True)
df_movies["Title"] = split[0]
df_movies["Year_of_Release "] = split[1]

#Drop the previous title column
df_movies = df_movies.drop('title',axis = 1,inplace = False)

#Replace demiliters with space
df_movies["Year_of_Release "] = df_movies["Year_of_Release "].str.replace(')','')


#rearrange the column
df_movies.rename(columns={"genres": "Genres"},inplace = True)
df_movies = df_movies[['movieId', 'Title', 'Genres','Year_of_Release ']]

#df_movies = df_movies.set_index('movieId')

df_movies

Unnamed: 0,movieId,Title,Genres,Year_of_Release
0,1,Toy Story,Adventure|Animation|Children|Comedy|Fantasy,1995
1,2,Jumanji,Adventure|Children|Fantasy,1995
2,3,Grumpier Old Men,Comedy|Romance,1995
3,4,Waiting to Exhale,Comedy|Drama|Romance,1995
4,5,Father of the Bride Part II,Comedy,1995
...,...,...,...,...
9737,193581,Black Butler: Book of the Atlantic,Action|Animation|Comedy|Fantasy,2017
9738,193583,No Game No Life: Zero,Animation|Comedy|Fantasy,2017
9739,193585,Flint,Drama,2017
9740,193587,Bungo Stray Dogs: Dead Apple,Action|Animation,2018


In [12]:

df_movies['Title'].head(10)

0                      Toy Story 
1                        Jumanji 
2               Grumpier Old Men 
3              Waiting to Exhale 
4    Father of the Bride Part II 
5                           Heat 
6                        Sabrina 
7                   Tom and Huck 
8                   Sudden Death 
9                      GoldenEye 
Name: Title, dtype: object

### Third Dtaset

In [13]:
df_ratings = pd.read_csv('ratings.csv')
df_ratings


Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931
...,...,...,...,...
100831,610,166534,4.0,1493848402
100832,610,168248,5.0,1493850091
100833,610,168250,5.0,1494273047
100834,610,168252,5.0,1493846352


In [14]:
#Check for missing values
df_ratings.isnull().sum()

userId       0
movieId      0
rating       0
timestamp    0
dtype: int64

In [15]:
# Create a copy of the Dataset
df_ratings_copy = df_ratings.copy()

#Convert time  the time to datatime format
df_ratings['timestamp'] = df_ratings['timestamp'].apply(lambda x:datetime.utcfromtimestamp(x ).strftime('%Y-%m-%d %H:%M:%S'))

Since a movie can be rated by multiple users and likewise a user can rate multiple movies,we group the rating of each movie based on their id and calculate the mean.


In [16]:
mean_df_ratings = df_ratings.groupby('movieId').mean().reset_index()

#drop userid
mean_df_ratings= mean_df_ratings.drop('userId',axis = 1)

mean_df_ratings

Unnamed: 0,movieId,rating
0,1,3.920930
1,2,3.431818
2,3,3.259615
3,4,2.357143
4,5,3.071429
...,...,...
9719,193581,4.000000
9720,193583,3.500000
9721,193585,3.500000
9722,193587,3.500000


In [17]:
#Todo: group movieId rating by total number of users that gave it a  particular rating
agg_df_ratings = df_ratings.groupby(['movieId','rating'])['userId'].agg(sum).reset_index()


agg_df_ratings

Unnamed: 0,movieId,rating,userId
0,1,0.5,76
1,1,1.5,462
2,1,2.0,1454
3,1,2.5,2939
4,1,3.0,9719
...,...,...,...
30412,193581,4.0,184
30413,193583,3.5,184
30414,193585,3.5,184
30415,193587,3.5,184


### **Fourth dataset**

In [18]:
df_tags = pd.read_csv('tags.csv')
df_tags

Unnamed: 0,userId,movieId,tag,timestamp
0,2,60756,funny,1445714994
1,2,60756,Highly quotable,1445714996
2,2,60756,will ferrell,1445714992
3,2,89774,Boxing story,1445715207
4,2,89774,MMA,1445715200
...,...,...,...,...
3678,606,7382,for katie,1171234019
3679,606,7936,austere,1173392334
3680,610,3265,gun fu,1493843984
3681,610,3265,heroic bloodshed,1493843978


In [19]:
#Check for missing values
df_tags.isnull().sum()

#Convert time  the time to datatime format
df_tags['timestamp'] = df_tags['timestamp'].apply(lambda x:datetime.utcfromtimestamp(x ).strftime('%Y-%m-%d %H:%M:%S'))

#create a copy
df_tags_copy = df_tags.copy()

In [20]:
#Aggregate the tags associated with each movieId
df_tags_new = df_tags.groupby('movieId')['tag'].agg(set).reset_index()

#Convert tags from set to strings seperated by space
df_tags_new['tag_new'] = df_tags_new['tag'].apply(lambda x:' '.join(x))

#Rearrange column
df_tags_new = df_tags_new.drop('tag',axis = 1)
df_tags_new.rename(columns={"tag_new":"Tag"},inplace = True)


df_tags_new

Unnamed: 0,movieId,Tag
0,1,pixar fun
1,2,fantasy game Robin Williams magic board game
2,3,moldy old
3,5,remake pregnancy
4,7,remake
...,...,...
1567,183611,funny Comedy Rachel McAdams
1568,184471,adventure video game adaptation Alicia Vikander
1569,187593,sarcasm Josh Brolin Ryan Reynolds
1570,187595,Emilia Clarke star wars


###  Merge all dataframes into into a singe csv file

In [21]:
#Join all dataframes based on the movieid column
dfs = [df_movies, df_links, mean_df_ratings,df_tags_new]
dfs = [df.set_index('movieId') for df in dfs]


df_final = pd.concat(dfs,axis = 1)

#Replace all NaN with a space
df_final.fillna(' ',inplace = True)

#save as csv file
final_csv = "./df_final.csv"
df_final.to_csv(final_csv)
df_final.tail(10)

Unnamed: 0_level_0,Title,Genres,Year_of_Release,imdbId,tmdbId,IMDB_link,TMDB_link,rating,Tag
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
193565,Gintama: The Movie,Action|Animation|Comedy|Sci-Fi,2010,1636780,71172,https://www.imdb.com/title/tt01636780,https://www.themoviedb.org/movie/71172,3.5,comedy gintama anime remaster
193567,anohana: The Flower We Saw That Day - The Movie,Animation|Drama,2013,2323836,255413,https://www.imdb.com/title/tt02323836,https://www.themoviedb.org/movie/255413,3.0,
193571,Silver Spoon,Comedy|Drama,2014,3110014,297825,https://www.imdb.com/title/tt03110014,https://www.themoviedb.org/movie/297825,4.0,
193573,Love Live! The School Idol Movie,Animation,2015,3837248,333623,https://www.imdb.com/title/tt03837248,https://www.themoviedb.org/movie/333623,4.0,
193579,Jon Stewart Has Left the Building,Documentary,2015,5342766,360617,https://www.imdb.com/title/tt05342766,https://www.themoviedb.org/movie/360617,3.5,
193581,Black Butler: Book of the Atlantic,Action|Animation|Comedy|Fantasy,2017,5476944,432131,https://www.imdb.com/title/tt05476944,https://www.themoviedb.org/movie/432131,4.0,
193583,No Game No Life: Zero,Animation|Comedy|Fantasy,2017,5914996,445030,https://www.imdb.com/title/tt05914996,https://www.themoviedb.org/movie/445030,3.5,
193585,Flint,Drama,2017,6397426,479308,https://www.imdb.com/title/tt06397426,https://www.themoviedb.org/movie/479308,3.5,
193587,Bungo Stray Dogs: Dead Apple,Action|Animation,2018,8391976,483455,https://www.imdb.com/title/tt08391976,https://www.themoviedb.org/movie/483455,3.5,
193609,Andrew Dice Clay: Dice Rules,Comedy,1991,101726,37891,https://www.imdb.com/title/tt0101726,https://www.themoviedb.org/movie/37891,4.0,


In [None]:
# dropping ALL duplicte values if any 
df_final.drop_duplicates( keep = False, inplace = True) 

### Loading transformed data into Postgres database

###  Pre-requitsite
Create database on Postgres, Enter the command : **CREATE DATABASE moviedb;**


Next:

* connect to database using conncetion parameters (user,host,password and databaseName)
* create table
* Load dataframe in bulk into moviedb 

In [22]:
#Postgres Connection parameters
params = {
    "host"      : "127.0.0.1",
    "database"  : "moviedb",
    "user"      : "postgres",
    "password"  : "Enter your password "}

In [23]:
#Function to connect to database   
def connect(params):
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        
    print("Connection successful")     
    return conn

In [24]:
#Test connection
conn = connect(params)

Connecting to the PostgreSQL database...
Connection successful


In [104]:
#Function to Close connection

def close_connection(connection):
    if conn:
        conn.close()
    return print('Connection closed')

In [None]:
close_connection(conn)

In [25]:
#Function to execute query

def execute_query(conn, query):
    """ Execute a single query """
    
    ret = 0 # Return value
    cursor = conn.cursor()
    try:
        cursor.execute(query)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1                                                                                                                                                                                                                                                                                                                                                                       

    # If this was a select query, return the result
    if 'select' in query.lower():
        ret = cursor.fetchall()
    cursor.close()
    print ('Query executed')
    return ret

In [26]:
def create_tables():
    """ create tables in the PostgreSQL database"""
    table = (
        """
        CREATE TABLE Movie_data (
            movieId INTEGER PRIMARY KEY,
            Genres VARCHAR ,
            Title VARCHAR ,
            Year_of_Release VARCHAR,
            imdbid VARCHAR ,
            tmdid VARCHAR ,
            IMBD_link VARCHAR ,
            TMBD_link VARCHAR ,
            rating VARCHAR,
            Tag VARCHAR 
                
        )
        """)
    conn = None
    try:
        
        # connect to the PostgreSQL server
        conn = psycopg2.connect(**params)
        cur = conn.cursor()
        # create table 
        cur.execute(table)
        # close communication with the PostgreSQL database server
        cur.close()
        # commit the changes
        conn.commit()
        print("Table created successfully in PostgreSQL ")
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

In [27]:

def copy_from_file(conn, df, table):
    """
    Here we are going save the dataframe on disk as 
    a csv file, load the csv file  
    and use copy_from() to copy it to the table
    """
    # Save the dataframe to disk
    tmp_df = "./tmp_df.csv"
    df.to_csv(tmp_df)
    cursor = conn.cursor()
    with open(tmp_df, 'r',encoding='utf-8',errors='ignore') as f:
        try:
            next(f) # Skip the header row.
            cursor.copy_from(f, table, sep=",",null="")
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            os.remove(tmp_df)
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()
            return 1
        print("File copied into Database")
    
    cursor.close()
    os.remove(tmp_df)
    print("%s has been removed successfully" %tmp_df) 

In [28]:
create_tables()

relation "movie_data" already exists



In [489]:
#Bulk Load csv into the database

copy_from_file(conn, df_final,'movie_data')


File copied into Database
./tmp_df.csv has been removed successfully


### Run some querys on the Database

In [495]:
##Select all the data in the table
execute_query(conn, 'SELECT * FROM movie_data;')

Query executed


[(1,
  'Toy Story ',
  'Adventure|Animation|Children|Comedy|Fantasy',
  '1995',
  '114709',
  '862',
  'https://www.imdb.com/title/tt0114709',
  'https://www.themoviedb.org/movie/862',
  '3.9209302325581397',
  'pixar fun'),
 (2,
  'Jumanji ',
  'Adventure|Children|Fantasy',
  '1995',
  '113497',
  '8844',
  'https://www.imdb.com/title/tt0113497',
  'https://www.themoviedb.org/movie/8844',
  '3.4318181818181817',
  'Robin Williams game magic board game fantasy'),
 (3,
  'Grumpier Old Men ',
  'Comedy|Romance',
  '1995',
  '113228',
  '15602',
  'https://www.imdb.com/title/tt0113228',
  'https://www.themoviedb.org/movie/15602',
  '3.2596153846153846',
  'moldy old'),
 (4,
  'Waiting to Exhale ',
  'Comedy|Drama|Romance',
  '1995',
  '114885',
  '31357',
  'https://www.imdb.com/title/tt0114885',
  'https://www.themoviedb.org/movie/31357',
  '2.357142857142857',
  ' '),
 (5,
  'Father of the Bride Part II ',
  'Comedy',
  '1995',
  '113041',
  '11862',
  'https://www.imdb.com/title/tt0113

In [544]:
##count movieids and group them based on their year of release
execute_query(conn,'SELECT Year_of_Release,COUNT(movieId) FROM movie_data GROUP BY Year_of_Release ORDER BY Year_of_Release;')

Query executed


[(' ', 12),
 ('1902', 1),
 ('1903', 1),
 ('1908', 1),
 ('1915', 1),
 ('1916', 4),
 ('1917', 1),
 ('1919', 1),
 ('1920', 2),
 ('1921', 1),
 ('1922', 1),
 ('1923', 4),
 ('1924', 5),
 ('1925', 4),
 ('1926', 5),
 ('1927', 7),
 ('1928', 4),
 ('1929', 4),
 ('1930', 5),
 ('1931', 14),
 ('1932', 9),
 ('1933', 12),
 ('1934', 11),
 ('1935', 13),
 ('1936', 18),
 ('1937', 16),
 ('1938', 15),
 ('1939', 23),
 ('1940', 25),
 ('1941', 18),
 ('1942', 23),
 ('1943', 10),
 ('1944', 16),
 ('1945', 17),
 ('1946', 23),
 ('1947', 20),
 ('1948', 20),
 ('1949', 25),
 ('1950', 21),
 ('1951', 22),
 ('1952', 16),
 ('1953', 30),
 ('1954', 23),
 ('1955', 36),
 ('1956', 30),
 ('1957', 33),
 ('1958', 31),
 ('1959', 37),
 ('1960', 37),
 ('1961', 34),
 ('1962', 40),
 ('1963', 39),
 ('1964', 43),
 ('1965', 47),
 ('1966', 42),
 ('1967', 42),
 ('1968', 42),
 ('1969', 35),
 ('1970', 33),
 ('1971', 47),
 ('1972', 39),
 ('1973', 58),
 ('1973 ', 1),
 ('1974', 45),
 ('1975', 42),
 ('1976', 44),
 ('1977', 63),
 ('1978', 59),
 (