In [None]:
import boto3
import configparser
import matplotlib.pyplot as plt
import pandas as pd
from time import time
import psycopg2

In [None]:
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY=config.get('AWS','key')
SECRET=config.get('AWS','secret')
REGION=config.get('AWS','REGION')

DWH_DB=config.get("DWH","DWH_DB")
DWH_DB_USER=config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD=config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT=config.get("DWH","DWH_PORT")

DWH_ROLE_ARN=config.get("DWH","DWH_ROLE_ARN")
DWH_ENDPOINT=config.get("DWH","DWH_ENDPOINT")

In [None]:
conn = psycopg2.connect(
    host=DWH_ENDPOINT,
    dbname=DWH_DB,
    user=DWH_DB_USER,
    password=DWH_DB_PASSWORD,
    port=DWH_PORT
)
print("Connection successful!")

cur = conn.cursor()

def execute_query(query, params=None):
    try:
        if params:
            cur.execute(query, params)
        else:
            cur.execute(query)
        conn.commit()
        print("Query executed successfully")
        return cur.fetchall()
    except Exception as e:
        print(f"An error occurred: {e}")
        conn.rollback()
        return None

In [None]:
create_nodist_tables = """

create schema if not exists nodist;
set search_path to nodist;



drop table if exists part cascade;
drop table if exists supplier;
drop table if exists supplier;
drop table if exists customer;
drop table if exists dwdate;
drop table if exists lineorder;

CREATE TABLE part 
(
  p_partkey     integer     not null,
  p_name        varchar(22) not null,
  p_mfgr        varchar(6)  not null,
  p_category    varchar(7)  not null,
  p_brand1      varchar(9)  not null,
  p_color       varchar(11) not null,
  p_type        varchar(25) not null,
  p_size        integer     not null,
  p_container   varchar(10) not null
);

CREATE TABLE supplier 
(   
    s_suppkey   integer     not null,
    s_name      varchar(25) not null,
    s_address   varchar(25) not null,
    s_city      varchar(10) not null,
    s_nation    varchar(15) not null,
    s_region    varchar(12) not null,
    s_phone     varchar(15) not null
);

CREATE TABLE customer 
(
    c_custkey     integer     not null,
    c_name        varchar(25) not null,
    c_address     varchar(25) not null,
    c_city        varchar(10) not null,
    c_nation      varchar(15) not null,
    c_region      varchar(12) not null,
    c_phone       varchar(15) not null,
    c_mktsegment  varchar(10) not null
);

CREATE TABLE dwdate 
(
  d_datekey            integer     not null,
  d_date               varchar(19) not null,
  d_dayofweek          varchar(10) not null,
  d_month              varchar(10) not null,
  d_year               integer     not null,
  d_yearmonthnum       integer     not null,
  d_yearmonth          varchar(8)  not null,
  d_daynuminweek       integer     not null,
  d_daynuminmonth      integer     not null,
  d_daynuminyear       integer     not null,
  d_monthnuminyear     integer     not null,
  d_weeknuminyear      integer     not null,
  d_sellingseason      varchar(13) not null,
  d_lastdayinweekfl    varchar(1)  not null,
  d_lastdayinmonthfl   varchar(1)  not null,
  d_holidayfl          varchar(1)  not null,
  d_weekdayfl          varchar(1)  not null
);
CREATE TABLE lineorder 
(
  lo_orderkey          integer     not null,
  lo_linenumber        integer     not null,
  lo_custkey           integer     not null,
  lo_partkey           integer     not null,
  lo_suppkey           integer     not null,
  lo_orderdate         integer     not null,
  lo_orderpriority     varchar(15) not null,
  lo_shippriority      varchar(1)  not null,
  lo_quantity          integer     not null,
  lo_extendedprice     integer     not null,
  lo_ordertotalprice   integer     not null,
  lo_discount          integer     not null,
  lo_revenue           integer     not null,
  lo_supplycost        integer     not null,
  lo_tax               integer     not null,
  lo_commitdate        integer     not null,
  lo_shipmode          varchar(10) not null
    
);

"""

result = execute_query(create_nodist_tables)
print(result)


In [None]:
create_dist_tables = """

create schema if not exists dist;
set search_path to dist;

drop table if exists part cascade;
drop table if exists supplier;
drop table if exists supplier;
drop table if exists customer;
drop table if exists dwdate;
drop table if exists lineorder;

CREATE TABLE part (
  p_partkey     	integer     	not null sortkey distkey,
  p_name        	varchar(22) 	not null,
  p_mfgr        	varchar(6)    not null,
  p_category    	varchar(7)    not null,
  p_brand1      	varchar(9)    not null,
  p_color       	varchar(11) 	not null,
  p_type        	varchar(25) 	not null,
  p_size        	integer     	not null,
  p_container   	varchar(10)   not null
);

CREATE TABLE supplier (
  s_suppkey     	integer        not null sortkey,
  s_name        	varchar(25)    not null,
  s_address     	varchar(25)    not null,
  s_city        	varchar(10)    not null,
  s_nation      	varchar(15)    not null,
  s_region      	varchar(12)    not null,
  s_phone       	varchar(15)    not null)
diststyle all;

CREATE TABLE customer (
  c_custkey     	integer        not null sortkey,
  c_name        	varchar(25)    not null,
  c_address     	varchar(25)    not null,
  c_city        	varchar(10)    not null,
  c_nation      	varchar(15)    not null,
  c_region      	varchar(12)    not null,
  c_phone       	varchar(15)    not null,
  c_mktsegment    varchar(10)    not null)
diststyle all;

CREATE TABLE dwdate (
  d_datekey            integer       not null sortkey,
  d_date               varchar(19)   not null,
  d_dayofweek	         varchar(10)   not null,
  d_month      	       varchar(10)   not null,
  d_year               integer       not null,
  d_yearmonthnum       integer  	   not null,
  d_yearmonth          varchar(8)	   not null,
  d_daynuminweek       integer       not null,
  d_daynuminmonth      integer       not null,
  d_daynuminyear       integer       not null,
  d_monthnuminyear     integer       not null,
  d_weeknuminyear      integer       not null,
  d_sellingseason      varchar(13)   not null,
  d_lastdayinweekfl    varchar(1)    not null,
  d_lastdayinmonthfl   varchar(1)    not null,
  d_holidayfl          varchar(1)    not null,
  d_weekdayfl          varchar(1)    not null)
diststyle all;

CREATE TABLE lineorder (
  lo_orderkey      	    integer     	not null,
  lo_linenumber        	integer     	not null,
  lo_custkey           	integer     	not null,
  lo_partkey           	integer     	not null distkey,
  lo_suppkey           	integer     	not null,
  lo_orderdate         	integer     	not null sortkey,
  lo_orderpriority     	varchar(15)   not null,
  lo_shippriority      	varchar(1)    not null,
  lo_quantity          	integer     	not null,
  lo_extendedprice     	integer     	not null,
  lo_ordertotalprice   	integer     	not null,
  lo_discount          	integer     	not null,
  lo_revenue           	integer     	not null,
  lo_supplycost        	integer     	not null,
  lo_tax               	integer     	not null,
  lo_commitdate         integer       not null,
  lo_shipmode          	varchar(10)   not null
);

"""

result = execute_query(create_dist_tables)
print(result)

In [None]:
#automating copy 

def loadTables(schema, tables):
    loadTimes = []
    SQL_SET_SCHEMA = "SET search_path TO {};".format(schema)
    execute_query(SQL_SET_SCHEMA)
    
    for table in tables:
        SQL_COPY = """
copy {} from 's3://awssampledbuswest2/ssbgz/{}' 
credentials 'aws_iam_role={}'
gzip region 'us-west-2';
        """.format(table,table, DWH_ROLE_ARN)

        print("======= LOADING TABLE: ** {} ** IN SCHEMA ==> {} =======".format(table, schema))
        print(SQL_COPY)

        t0 = time()
        execute_query(SQL_COPY)
        loadTime = time()-t0
        loadTimes.append(loadTime)

        print("=== DONE IN: {0:.2f} sec\n".format(loadTime))
    return pd.DataFrame({"table":tables, "loadtime_"+schema:loadTimes}).set_index('table')

In [None]:
#-- List of the tables to be loaded
tables = ["customer","dwdate","supplier", "part", "lineorder"]

# inserting for each schema 
nodistStats = loadTables("nodist", tables)
distStats = loadTables("dist", tables)

In [None]:
#-- Plotting of the timing results comparing loadtime 
stats = distStats.join(nodistStats)
stats.plot.bar()
plt.show()

In [None]:
#Comparing query performance 

oneDim_SQL ="""
set enable_result_cache_for_session to off;
SET search_path TO {};

select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, dwdate
where lo_orderdate = d_datekey
and d_year = 1997 
and lo_discount between 1 and 3 
and lo_quantity < 24;
"""

twoDim_SQL="""
set enable_result_cache_for_session to off;
SET search_path TO {};

select sum(lo_revenue), d_year, p_brand1
from lineorder, dwdate, part, supplier
where lo_orderdate = d_datekey
and lo_partkey = p_partkey
and lo_suppkey = s_suppkey
and p_category = 'MFGR#12'
and s_region = 'AMERICA'
group by d_year, p_brand1
"""

drill_SQL = """
set enable_result_cache_for_session to off;
SET search_path TO {};

select c_city, s_city, d_year, sum(lo_revenue) as revenue 
from customer, lineorder, supplier, dwdate
where lo_custkey = c_custkey
and lo_suppkey = s_suppkey
and lo_orderdate = d_datekey
and (c_city='UNITED KI1' or
c_city='UNITED KI5')
and (s_city='UNITED KI1' or
s_city='UNITED KI5')
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc;
"""


oneDimSameDist_SQL ="""
set enable_result_cache_for_session to off;
SET search_path TO {};

select lo_orderdate, sum(lo_extendedprice*lo_discount) as revenue  
from lineorder, part
where lo_partkey  = p_partkey
group by lo_orderdate
order by lo_orderdate
"""

def compareQueryTimes(schema):
    queryTimes  =[] 
    for i,query in enumerate([oneDim_SQL, twoDim_SQL, drill_SQL, oneDimSameDist_SQL]):
        t0 = time()
        q = query.format(schema)
        execute_query(q)
        queryTime = time()-t0
        queryTimes.append(queryTime)
    return pd.DataFrame({"query":["oneDim","twoDim", "drill", "oneDimSameDist"], "queryTime_"+schema:queryTimes}).set_index('query')

In [None]:
noDistQueryTimes = compareQueryTimes("nodist")
distQueryTimes   = compareQueryTimes("dist") 

In [None]:
queryTimeDF =noDistQueryTimes.join(distQueryTimes)
queryTimeDF.plot.bar()
plt.show()

In [None]:
improvementDF = queryTimeDF["distImprovement"] =100.0*(queryTimeDF['queryTime_nodist']-queryTimeDF['queryTime_dist'])/queryTimeDF['queryTime_nodist']
improvementDF.plot.bar(title="% dist Improvement by query")
plt.show()