# Project 3, Part 1: Create Tables from Movie Lens Small Data Set 

University of California Berkeley 

Master of Information and Data Science (MIDS) program

w205 - Fundamentals of Data Engineering

Section 009 

In [2]:
# import statements

import csv

import math
import numpy as np
import pandas as pd

import psycopg2

In [3]:
# Starter Code 
# function to run a select query and return rows in a pandas dataframe
# pandas puts all numeric values from postgres to float
# if it will fit in an integer, change it to integer
#

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)

In [4]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)
cursor = connection.cursor()

In [5]:
# starter code to read a csv file 

def my_read_csv_file(file_name, limit):
    "read the csv file and print only the first limit rows"
    
    csv_file = open(file_name, "r")
    
    csv_data = csv.reader(csv_file)
    
    i = 0
    
    for row in csv_data:
        i += 1
        if i <= limit:
            print(row)
            
    print("\nPrinted ", min(limit, i), "lines of ", i, "total lines.")

# drop tables if exist 

In [6]:
# drop the movies table if it exits 
# for clean up if needed
# dropping movies table

connection.rollback()

query = """

drop table if exists movies

"""

cursor.execute(query)

connection.commit()

In [8]:
# drop the ratings table if it exits 
# for clean up if needed 
# dropping ratings table

connection.rollback()

query = """

drop table if exists ratings

"""

cursor.execute(query)

connection.commit()

In [9]:
# drop the movies table if it exits 
# for clean up if needed
# dropping optional genres table

connection.rollback()

query = """

drop table if exists genre

"""

cursor.execute(query)

connection.commit()

In [10]:
# drop the movies table if it exits 
# for clean up if needed
# dropping average_ratings

connection.rollback()

query = """

drop table if exists average_ratings

"""

cursor.execute(query)

connection.commit()

# Create Tables for Movies and Load Data 


In [11]:
# create a table for movies 
# set primary key as movieID
# note that genres is a pipe separated list of genres 
# include movieid, title, and generes(pipe separated)

connection.rollback()

query = """

create table movies (
  movieid numeric,
  title varchar,
  genres varchar,
  primary key (movieid)
);

"""

cursor.execute(query)

connection.commit()

In [12]:
# display the first 10 rows of the movies.csv file to check it is as expected 

my_read_csv_file('MovieLens_small/movies.csv', limit=10)

['movieId', 'title', 'genres']
['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy']
['2', 'Jumanji (1995)', 'Adventure|Children|Fantasy']
['3', 'Grumpier Old Men (1995)', 'Comedy|Romance']
['4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance']
['5', 'Father of the Bride Part II (1995)', 'Comedy']
['6', 'Heat (1995)', 'Action|Crime|Thriller']
['7', 'Sabrina (1995)', 'Comedy|Romance']
['8', 'Tom and Huck (1995)', 'Adventure|Children']
['9', 'Sudden Death (1995)', 'Action']

Printed  10 lines of  9743 total lines.


In [13]:
# load the csv file movies.csv into the movies database table

connection.rollback()

query = """

copy movies
from '/user/projects/project-3-RebeccaBaugh/code/MovieLens_small/movies.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

In [16]:
# verify movies loaded correctly 

rollback_before_flag = True
rollback_after_flag = True

query = """

select *
from movies
order by movieid
limit 40
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,movieid,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
5,6,Heat (1995),Action|Crime|Thriller
6,7,Sabrina (1995),Comedy|Romance
7,8,Tom and Huck (1995),Adventure|Children
8,9,Sudden Death (1995),Action
9,10,GoldenEye (1995),Action|Adventure|Thriller


### number of movies watched in common 

In [None]:
connection.rollback()
query = """

drop table if exists incommon

"""
cursor.execute(query)
connection.commit()

In [None]:
connection.rollback()
query = """

create table incommon (
  movieid numeric,
  primary key (movieid)
);

"""
cursor.execute(query)
connection.commit()

In [41]:
rollback_before_flag = True
rollback_after_flag = True

query = """

with sample as (
select userid
from ratings
where movieid in (1)
order by random()
limit 10
),
r as (
select *
from ratings
where userid in (select userid from sample)
),
pairspermovie as (
select ra.userid as uid1,
        rb.userid as uid2,
        ra.movieid as mid
from r as ra
join r as rb on ra.movieid = rb.movieid
where ra.userid != rb.userid
)
select uid1,
        uid2,
        count(*)
from pairspermovie
group by 1,2

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,uid1,uid2,count
0,232,282,124
1,334,134,9
2,608,282,148
3,608,334,83
4,573,216,21
...,...,...,...
85,334,608,83
86,216,570,25
87,608,573,171
88,232,573,188


In [46]:
## validate above query. those two users watched that many movies in common

rollback_before_flag = True
rollback_after_flag = True

query = """

select  movieid,
        count(distinct userid)
from ratings
where userid in (334,134)
group by 1 
order by 2 desc
limit 40
"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,movieid,count
0,231,2
1,316,2
2,47,2
3,364,2
4,553,2
5,1,2
6,595,2
7,344,2
8,296,2
9,153,1


# Create Tables for Ratings and Load Data 

In [13]:
# create a table for ratings 
# set primary key as movieid and userid composite key 

connection.rollback()

query = """

create table ratings (
  userid numeric,
  movieid numeric,
  rating float,
  timestamp varchar,
  primary key (userid, movieid)
);

"""

cursor.execute(query)

connection.commit()

In [14]:
# display the first 10 rows of the ratings.csv file to check it is as expected 

my_read_csv_file('MovieLens_small/ratings.csv', limit=10)

['userId', 'movieId', 'rating', 'timestamp']
['1', '1', '4.0', '964982703']
['1', '3', '4.0', '964981247']
['1', '6', '4.0', '964982224']
['1', '47', '5.0', '964983815']
['1', '50', '5.0', '964982931']
['1', '70', '3.0', '964982400']
['1', '101', '5.0', '964980868']
['1', '110', '4.0', '964982176']
['1', '151', '5.0', '964984041']

Printed  10 lines of  100837 total lines.


In [15]:
# load the csv file rating.csv into the ratings database table

connection.rollback()

query = """

copy ratings
from '/user/projects/project-3-RebeccaBaugh/code/MovieLens_small/ratings.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

In [16]:
# verify ratings loaded correctly 

rollback_before_flag = True
rollback_after_flag = True

query = """

select *
from ratings
order by userid, movieid

"""

ratings_df = my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)
ratings_df

Unnamed: 0,userid,movieid,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931
...,...,...,...,...
100831,610,166534,4.0,1493848402
100832,610,168248,5.0,1493850091
100833,610,168250,5.0,1494273047
100834,610,168252,5.0,1493846352


# Create Average_Rating user to user table relationship table

In [17]:
# this query creates has a limit on it for tesitng purposes "where movieid in (1)"
rollback_before_flag = True
rollback_after_flag = True

query = """

with t as (select userid::int as userid,
        round((avg(rating)::int), 0) as average_rating
    from ratings where movieid in (5)
    group by userid
    )
select ta.userid as userid1,
    tb.userid as userid2,
    ta.average_rating
    from t as ta
        join t as tb
            on ta.average_rating = tb.average_rating
    where ta.userid != tb.userid
    order by ta.userid

"""

average_ratings = my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)
average_ratings


Unnamed: 0,userid1,userid2,average_rating
0,6,169,5
1,6,43,5
2,31,604,3
3,31,561,3
4,31,521,3
...,...,...,...
781,604,121,3
782,604,117,3
783,604,84,3
784,604,45,3


In [18]:
#save to csv 
average_ratings.to_csv('average_ratings.csv', index = False)

In [19]:
#creating the table for average_ratings
connection.rollback()

query = """

create table average_ratings (
  userid1 numeric,
  userid2 numeric,
  average_rating float,
  primary key (userid1, average_rating, userid2)
);

"""

cursor.execute(query)

connection.commit()

In [20]:
# copy the csv data into the average_ratings table
connection.rollback()

query = """

copy average_ratings
from '/user/projects/project-3-RebeccaBaugh/code/average_ratings.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

In [21]:
# verify average_ratings loaded correctly 

rollback_before_flag = True
rollback_after_flag = True

query = """

select *
from average_ratings
order by userid1

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,userid1,userid2,average_rating
0,6,169,5
1,6,43,5
2,31,604,3
3,31,561,3
4,31,521,3
...,...,...,...
781,604,121,3
782,604,117,3
783,604,84,3
784,604,45,3


# this is optional if we use genre 
to create a genres table which has the list of genres not the pipe separated values 
we are not using this currently 

In [22]:
# create a table for genre

connection.rollback()

query = """

create table genre (
  id int,
  genre varchar
);

"""

cursor.execute(query)

connection.commit()

In [23]:
# load the csv file genre.csv into the ratings database table

connection.rollback()

query = """

copy genre
from '/user/projects/project-3-RebeccaBaugh/code/MovieLens_small/genre.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

In [24]:
# verify genre loaded correctly 
# this should be a unique list of single genres not pipe separated

rollback_before_flag = True
rollback_after_flag = True

query = """

select genre
from genre
order by genre

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,genre
0,Action
1,Adventure
2,Animation
3,Children's
4,Comedy
5,Crime
6,Documentary
7,Drama
8,Fantasy
9,Film-Noir
