In [1]:
# ETL project Bootcamp
# Estefanía González
# Paolo Vega
# 20-may-2020 Version 1.0.1

In [4]:
# Modules needed
import pandas as pd
import json
from pandas.io.json import json_normalize
import sqlalchemy
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine, func, inspect
import os
import glob
from datetime import datetime

In [5]:
# Read JSON file
file = 'Resources/DataSources/json/CA_category_id.json'
# Opemn JSON File
data = json.load(open(file))
# Normalize 'items' element inside json tree (where our information is)
items_df = json_normalize(data['items'])
# Remove unwanted columns
items_df = items_df[["id","snippet.title"]].copy()
items_df.head()

Unnamed: 0,id,snippet.title
0,1,Film & Animation
1,2,Autos & Vehicles
2,10,Music
3,15,Pets & Animals
4,17,Sports


In [6]:
# Rename columns
items_df = items_df.rename(columns={"id":"CategoryID","snippet.title":"CategoryName"})
# Set index to ID column
#items_df = items_df.set_index("CategoryID")
items_df

Unnamed: 0,CategoryID,CategoryName
0,1,Film & Animation
1,2,Autos & Vehicles
2,10,Music
3,15,Pets & Animals
4,17,Sports
5,18,Short Movies
6,19,Travel & Events
7,20,Gaming
8,21,Videoblogging
9,22,People & Blogs


In [7]:
# import variables from config file
from credentials import host
from credentials import pwd
from credentials import usr
from credentials import dialect
from credentials import port
from credentials import db

engine = create_engine(f'{dialect}://{usr}:{pwd}@{host}:{port}/{db}')

# reflect an existing database into a new model
Base = automap_base()
# reflect the tables
Base.prepare(engine, reflect=True)
# display tables/classes
display(Base.classes.keys())

# Save references to each table
Category = Base.classes.Category
Country = Base.classes.Country
Video = Base.classes.Video

# Start session
session = Session(engine)

['Category', 'Country', 'Video']

In [8]:
# Read Tables to determine what task to perform in each
category_data = pd.read_sql_table("Category",engine)
country_data = pd.read_sql_table("Country",engine)
video_data = pd.read_sql_table("Video",engine)
engine.dispose()

In [23]:
# Cleaunup rules for dataframe columns (improve performance)
# remove leading spaces and capitalize the string
items_df.applymap(lambda x: x.strip().capitalize() if isinstance(x, str) else x)
try:
    items_df.to_sql('Category',con=engine,index=False,if_exists='append',chunksize=len(items_df))
    print("Success!")
except:
    print("Error in the DB")

Error in the DB


In [12]:
# Validate data into DB
category_data = pd.read_sql_table("Category",engine)
category_data.head()

Unnamed: 0,CategoryID,CategoryName
0,1,Film & Animation
1,2,Autos & Vehicles
2,10,Music
3,15,Pets & Animals
4,17,Sports


In [13]:
#----- Columns to use in the csv file

columns = [
    'video_id',
    'trending_date',
    'title',
    'channel_title',
    'category_id',
    'views',
    'likes',
    'dislikes',
    'comment_count'
]
#----- All the countries that are used
countries = ['CA',
            'DE',
            'FR',
            'GB',
            'IN',
            'JP',
            'KR',
            'MX',
            'RU',
            'US'
]

In [15]:
#------ Readind all the csv files in the folder
#------ Changing the encoding to latin1 in order to read the special characters
dataframes = []

for country in countries:
    df = [pd.read_csv(f'./Resources/DataSources/csv/{country}videos.csv',encoding='latin1',index_col=None, header=0, 
                     usecols=columns)]
    dataframes.append(df)

In [16]:
#------ Datarframes of all countries availables 

ca = dataframes[0][0]
de = dataframes[1][0]
fr = dataframes[2][0]
gb = dataframes[3][0]
in_ = dataframes[4][0]
jp = dataframes[5][0]
kr = dataframes[6][0]
mx = dataframes[7][0]
ru = dataframes[8][0]
us = dataframes[9][0]

#----- Adding the CountryID to each DF
ca['CountryID'] = 1
de['CountryID'] = 2
fr['CountryID'] = 3
gb['CountryID'] = 4
in_['CountryID'] = 5
jp['CountryID'] = 6
kr['CountryID'] = 7
mx['CountryID'] = 8
ru['CountryID'] = 9
us['CountryID'] = 10

#----- to read all the special character correctly


In [17]:
#----- Creating the DF with all the countries

Video_df = ca.append([de,fr,gb,in_,jp,kr,mx,ru,us])




In [18]:
#------ Renaming the columns to export the data into the created Database

Video_df.rename(columns={
    'video_id' :'VideoID', 
    'trending_date':'TrendingDate',
    'title':'Title', 
    'channel_title':'Channel', 
    'category_id':'CategoryID',
    'views':'Views', 
    'likes':'Likes', 
    'dislikes' :'Dislikes', 
    'comment_count':'Comments'
}, inplace = True)

Video_df['Title'] = Video_df['Title'].str.encode('latin1').str.decode('utf8')
Video_df['Channel'] = Video_df['Channel'].str.encode('latin1').str.decode('utf8')

In [14]:
Video_df.head()

Unnamed: 0,VideoID,TrendingDate,Title,Channel,CategoryID,Views,Likes,Dislikes,Comments,CountryID
0,n1WpP7iowLc,17.14.11,Eminem - Walk On Water (Audio) ft. Beyoncé,EminemVEVO,10,17158579,787425,43420,125882,1
1,0dBIkQ4Mz1M,17.14.11,PLUSH - Bad Unboxing Fan Mail,iDubbbzTV,23,1014651,127794,1688,13030,1
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,3191434,146035,5339,8181,1
3,d380meD0W0M,17.14.11,I Dare You: GOING BALD!?,nigahiga,24,2095828,132239,1989,17518,1
4,2Vv-BfVoq4g,17.14.11,Ed Sheeran - Perfect (Official Music Video),Ed Sheeran,10,33523622,1634130,21082,85067,1


In [19]:
#------- Creating the table with the countriesID

index=list(range(1,len(countries)+1))
Country_df = pd.DataFrame({'CountryID':index,'CountryName':countries})


In [24]:
# insert Countrty (empty table)
Country_df.applymap(lambda x: x.strip().capitalize() if isinstance(x, str) else x)
try:
    Country_df.to_sql('Country', con=engine, index=False, if_exists='append',chunksize=len(Country_df))
    print("Success!")
except:
    print("Error inserting into the DB")

Error in the DB


In [25]:
#-----  Removing leading spaces and capitalize the string
Video_df["VideoID"] = Video_df["VideoID"].str.strip().str.capitalize()
Video_df['TrendingDate']= Video_df['TrendingDate'].str.replace('.','-')
Video_df['TrendingDate']=pd.to_datetime(Video_df['TrendingDate'], format ="%y-%d-%m")
Video_df["Title"] = Video_df["Title"].str.strip().str.capitalize()
Video_df["Channel"]=Video_df["Channel"].str.strip().str.capitalize()
Video_df["CategoryName"]=Video_df["CategoryName"].str.strip().str.capitalize()
idx = 0
new_col = list(range(1,len(Video_df["VideoID"])+1))  # can be a list, a Series, an array or a scalar   
Video_df.insert(loc=idx, column='VideoSRID', value=new_col)

In [26]:
#---- Extracting the CategoryID from the DF Video_df, in order to see if all categories are inclueded

category_data2=pd.DataFrame(data=Video_df['CategoryID'].drop_duplicates())
category_data2.reset_index(inplace = True)
category_data2.drop(columns=['index'],inplace = True)

In [27]:
#----- Merging with the previous categories that are loaded in the database

categorias_df= pd.merge(category_data2,category_data, on='CategoryID', how='left')


In [28]:
#----- Selecting the ones that are not included in the database
new_cat= categorias_df[categorias_df['CategoryName'].isnull()]

In [29]:
#------ Adding the New category to format purposes
new_cat.fillna("New Category", inplace = True)

In [22]:
new_cat

Unnamed: 0,CategoryID,CategoryName
10,29,New Category


In [30]:
#Adding the new Categories into the table Category
for index, row in new_cat.iterrows():
        # create a new object (category)
        newCategory = Category(CategoryID = row["CategoryID"], CategoryName = row["CategoryName"])
        # insert into DB
        session.add(newCategory)
        print("Done!")
# Commit changes
try:
    session.commit()
    print("Commit!")
except:
    session.rollback()
    print("Rollback :(")

Done!
Commit!


In [33]:
#------ Adding the Video_df to the table Video in sets of 10000 in order to load fastest
try:
    Video_df.to_sql('Video',con=engine,index=False,if_exists='append',chunksize=10000)
    print("Success!")
except:
    print("Error inserting into the DB")

Success!
