In [1]:
import pandas as pd
import numpy as np
import psycopg2
import psycopg2.extras as extras
import re

In [2]:
df_calendar = pd.read_csv('Contoso/Calendar.csv')
df_channel = pd.read_csv('Contoso/Channel.csv')
df_geo = pd.read_csv('Contoso/Geography.csv')
df_product = pd.read_csv('Contoso/Product.csv', engine='python') #to read strings with double quotes
df_product_cat = pd.read_csv('Contoso/ProductCategory.csv')
df_product_subcat = pd.read_csv('Contoso/ProductSubcategory.csv')
df_promotion = pd.read_csv('Contoso/Promotion.csv')
df_sales = pd.read_csv('Contoso/Sales.csv', sep='\t')
df_stores = pd.read_csv('Contoso/Stores.csv')

# DB tables creation and filling with info

### Connection Master user to query the DB

In [3]:
conn = psycopg2.connect(user = "u3972050qf4sr4",
                            password = "paeabf277d5b1409f382e5d572959b8593cf5547814ea3c693fedf93aff477fcb",
                            host = "ec2-18-203-128-102.eu-west-1.compute.amazonaws.com",
                            port = "5432",
                            database = "d1e1c0r564nkc9")

## Creation of tables

### DROP ALL tables in database (need to run query all tables first). BE CAREFUL

In [325]:
# table_schema: This stores whether the table is designated as public or not.
# table_name: The name of the table being referenced.
s = """SELECT table_schema, table_name 
FROM information_schema.tables 
WHERE ( table_schema = 'public' ) 
ORDER BY table_schema, table_name;"""
df_aux = pd.read_sql(s, conn)

tables_to_drop = df_aux[df_aux['table_name']!= 'pg_stat_statements'].table_name
#cursor = conn.cursor()
#cursor.execute(f"DROP TABLE {','.join(tables_to_drop)};")
#conn.commit()
print("Tables deleted successfully in PostgreSQL ")

Tables deleted successfully in PostgreSQL 


### Generate query to create CALENDAR dimension table in star schema

In [326]:
# delete parts of the date that give problems
df_calendar['DateKey'] = df_calendar['DateKey'].map(lambda x: x[:10])

In [327]:
# customer table
query_str = """DROP TABLE IF EXISTS calendar;
CREATE TABLE calendar
(DateKey DATE PRIMARY KEY NOT NULL,
DateInt INT,
MonthName TEXT,
DayOfWeekName TEXT,
Year INT,
QuarterOfYear INT,
MonthOfYear INT,
DayOfMonth INT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create CHANNEL dimension table in star schema

In [328]:
df_channel.dropna(inplace=True)
df_channel.rename(columns={'Channel':'ChannelKey'}, inplace=True)

In [329]:
# channel table
query_str = """DROP TABLE IF EXISTS channel;
CREATE TABLE channel
(ChannelKey INT PRIMARY KEY NOT NULL,
ChannelName TEXT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create GEOGRAPHY dimension table in star schema

In [330]:
df_geo.dropna(inplace=True)

In [331]:
df_geo.head(1)

Unnamed: 0,GeographyKey,GeographyType,ContinentName,RegionCountryName
0,424.0,City,North America,United States


In [332]:
# geography table
query_str = """DROP TABLE IF EXISTS geography;
CREATE TABLE geography
(GeographyKey INT PRIMARY KEY NOT NULL,
GeographyType TEXT,
ContinentName TEXT,
RegionCountryName TEXT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create PRODUCT dimension table in star schema

In [404]:
df_product.dropna(inplace=True)

In [405]:
df_product.head(1)

Unnamed: 0,ProductName,ProductDescription,Manufacturer,BrandName,ClassName,UnitCost,UnitPrice,ProductKey,ProductSubcategoryKey
0,Contoso Wireless Laser Mouse E50 Grey,Advanced 2.4 GHz cordless technology makes fre...,"Contoso, Ltd",Contoso,Economy,$10.69,$20.96,873.0,22.0


In [406]:
# product table
query_str = """DROP TABLE IF EXISTS product;
CREATE TABLE product
(ProductKey INT PRIMARY KEY NOT NULL,
ProductSubcategoryKey INT,
ProductName TEXT,
ProductDescription TEXT,
Manufacturer TEXT,
BrandName TEXT,
ClassName TEXT,
UnitCost MONEY,
UnitPrice MONEY
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create PRODUCT CATEGORY dimension table in star schema

In [336]:
df_product_cat.dropna(inplace=True)

In [337]:
df_product_cat.head(1)

Unnamed: 0,ProductCategoryKey,ProductCategory
0,1.0,Audio


In [338]:
# product category table
query_str = """DROP TABLE IF EXISTS product_category;
CREATE TABLE product_category
(ProductCategoryKey INT PRIMARY KEY NOT NULL,
ProductCategory TEXT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create PRODUCT SUBCATEGORY dimension table in star schema

In [339]:
df_product_subcat.dropna(inplace=True)

In [340]:
df_product_subcat.head(1)

Unnamed: 0,ProductSubcategoryKey,ProductSubcategory,ProductCategoryKey
0,42.0,Refrigerators,8.0


In [341]:
# product subcategory table
query_str = """DROP TABLE IF EXISTS product_subcategory;
CREATE TABLE product_subcategory
(ProductSubcategoryKey INT PRIMARY KEY NOT NULL,
ProductCategoryKey INT,
ProductSubcategory TEXT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create PROMOTION dimension table in star schema

In [342]:
df_promotion.dropna(inplace=True)
# delete parts of the date that give problems
df_promotion['StartDate'] = df_promotion['StartDate'].map(lambda x: x[:10])
df_promotion['EndDate'] = df_promotion['EndDate'].map(lambda x: x[:10])

In [343]:
df_promotion.head(1)

Unnamed: 0,PromotionKey,PromotionLabel,PromotionName,DiscountPercent,StartDate,EndDate
0,1.0,1.0,No Discount,0.0,01/01/2007,31/12/2014


In [344]:
# promotionms table
query_str = """DROP TABLE IF EXISTS promotion;
CREATE TABLE promotion
(PromotionKey INT PRIMARY KEY NOT NULL,
PromotionLabel INT,
PromotionName TEXT,
DiscountPercent REAL,
StartDate DATE,
EndDate DATE
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create SALES fact table in star schema

In [345]:
df_sales.dropna(inplace=True)
df_sales.drop(columns=['UnitCost', 'UnitPrice', 'ReturnAmount', 'DiscountAmount', 'TotalCost'], inplace=True)
# delete parts of the date that give problems
df_sales['DateKey'] = df_sales['DateKey'].map(lambda x: x[:10])
df_sales['SalesAmount'] = df_sales['SalesAmount'].map(lambda x: x.replace('.', ''))
df_sales.rename(columns={'channelKey':'ChannelKey'}, inplace=True)
df_sales.drop('SalesAmount', axis=1, inplace=True)

In [346]:
# sales table
query_str = """DROP TABLE IF EXISTS sales;
CREATE TABLE sales
(SalesKey INT PRIMARY KEY NOT NULL,
DateKey DATE,
ChannelKey INT,
StoreKey INT,
ProductKey INT,
PromotionKey INT,
SalesQuantity INT,
ReturnQuantity INT,
DiscountQuantity INT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create Stores dimension table in star schema

In [395]:
df_stores.fillna('Open', inplace=True)

In [396]:
df_stores.head(1)

Unnamed: 0,StoreKey,GeographyKey,StoreType,StoreName,Status,CloseReason,EmployeeCount,SellingAreaSize
0,1,693,Store,Contoso Seattle No.1 Store,On,Open,17,462


In [397]:
# stores table
query_str = """DROP TABLE IF EXISTS stores;
CREATE TABLE stores
(StoreKey INT PRIMARY KEY NOT NULL,
GeographyKey INT,
StoreType TEXT,
StoreName TEXT,
Status TEXT,
CloseReason TEXT,
EmployeeCount INT,
SellingAreaSize INT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

### Generate query to create REVIEWS from Yelp dimension table in star schema

This part uses the file `yelp_reviews_contoso_stores_with_bussiness_id.csv` which is generated by the notebook `process_yelp_business.ipynb` which assigns the id from the stores to the reviews from Yelp

In [5]:
df_reviews = pd.read_csv('yelp_reviews_contoso_stores_with_bussiness_id.csv')

In [8]:
df_reviews.head(2)

Unnamed: 0,review_id,user_id,business_id,stars,useful,funny,cool,text,date,StoreKey
0,8PuXeGEK1y_y0dry7pt6XQ,9a60KWJRTxHFNRqt4qb7Xw,6QJAT0N9X2dRqAtQyTw7Ag,5,0,0,0,Artie with the business team is awesome. Very ...,2016-07-25 04:50:21,172
1,XxQ-Md8h9ti_xJ1i7oFCMg,msbNF-OYhHH6FJ_oQy1-cA,6QJAT0N9X2dRqAtQyTw7Ag,4,1,0,0,Customer service is awesome! Secret is: Make...,2011-07-28 18:52:43,172


In [21]:
# stores table
query_str = """DROP TABLE IF EXISTS reviews_yelp;
CREATE TABLE reviews_yelp
(review_id TEXT PRIMARY KEY NOT NULL,
user_id TEXT,
business_id TEXT,
stars INT,
useful INT,
funny INT,
cool INT,
text TEXT,
date DATE,
StoreKey INT
)"""
cursor = conn.cursor()
cursor.execute(query_str)
conn.commit()

## Insert of information

In [11]:
def execute_values(conn, df, table, date="default"):
    """
    Using psycopg2.extras.execute_values() to insert the dataframe
    """
    # Create a list of tupples from the dataframe values
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(list(df.columns))
    if date != "default":
        # SQL quert to execute
        query  = "set DateStyle='ISO, DMY'; INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    else:
        query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    print(f"execute_values table {table} done")

In [351]:
# insert customer data
execute_values(conn, df_calendar, 'calendar')
execute_values(conn, df_channel, 'channel')
execute_values(conn, df_geo, 'geography')
execute_values(conn, df_product, 'product')
execute_values(conn, df_product_cat, 'product_category')
execute_values(conn, df_product_subcat, 'product_subcategory')
execute_values(conn, df_promotion, 'promotion')
#execute_values(conn, df_sales, 'sales')
execute_values(conn, df_stores, 'stores')

execute_values table calendar done
execute_values table channel done
execute_values table geography done
execute_values table product done
execute_values table product_category done
execute_values table product_subcategory done
execute_values table promotion done


In [22]:
execute_values(conn, df_reviews, 'reviews_yelp')

execute_values table reviews_yelp done
