# Lab notebook to setup tables with Events dataset 

## HOL-EventsData-Lab-Setup: to perform queries on the data qubes created in setup notebook

In this exercise we will be creating four views on the Qube Salessnap with filters for each segment of users. 

### Reset data and recreate the indexes 
1. Drop the database/schema EventsDB and all the tables in the schema to reset the lab
2. RE-create the EventsDB schema/database and create all the external tables
    * users, event
3. Define the star schema between sales, users, category, venue and event in SNAP
    * Drop SNAP for 'sales' if it is already there
    * Define the SNAP index for sales
    * Load data into sales SNAP index
4. Define the star schema between listing, event and users_seller in SNAP
    * Drop SNAP index on 'listing' if it is already there
    * Define the SNAP index for second fact table 'listings' - note this table is located outside
    * Load data into listing SNAP index
5. Simple queries
    * Find the total tickets sold on Jan 5th 2018
    * Find the listing in a range
    * Find the top 10 events with maximum revenue
    * Find the top 10 customers with maximum number of tickers
    * query

### First let us setup the notebook environment (Python packages) and connect to Thrift server 

In [None]:
%load_ext autotime

In [None]:
from pyhive import hive
from pprint import pprint
import pandas as pd
import os

import IPython.display
def draw(spec):
    IPython.display.display({
        'application/vnd.vegalite.v1+json': spec.to_dict()
    }, raw=True)

pd.set_option('display.max_colwidth', -1) # dont truncate table columns

# Python function to run sql code and get the result set
def sql(query, explain=False) :
    # silly hack to handle filesystem prefix for us when creating local tables
    if "{prefix}" in query:
        query = query.replace('{prefix}',cwd)
    df=pd.read_sql(query,thrift_conn)
    return df

# Python function to explain the given query
def explain(query):
    df = sql("explain " + query)
    plan = df['plan'][0]
    pprint(plan)

# Set the directory of the data to ingest into SNAP
cwd="oci://sparkline-hol-data@paasdevbdc"

# Connection to Thrift server
thrift_conn = hive.Connection(host="129.146.118.175",port=10000)

### Drop the database/schema EventsDB and all the tables in the schema to reset the lab
### RE-create the EventsDB schema/database and create all the external tables

Note: These are only external tables - so drop will only delete the metadata not the actual data

In [None]:
sql("DROP DATABASE IF EXISTS EventsDB Cascade")
sql("CREATE DATABASE EventsDB")
sql("use EventsDB")
sql("show tables")

### Create external table: users

In [None]:
users = """
create table if not exists users (
	userid integer ,
	username string,
	firstname string,
	lastname string,
	city string,
	state string,
	email string,
	phone string,
	likesports string,
	liketheatre string,
	likeconcerts string,
	likejazz string,
	likeclassical string,
	likeopera string,
	likerock string,
	likevegas string,
	likebroadway string,
	likemusicals string)
    using csv
    options (path "{prefix}/data/allusers_pipe.txt", delimiter "|")
"""
sql(users)

### Create external table: users_buyer

In [None]:
users_buyer = """
create table if not exists users_buyer (
	userid integer ,
	username string,
	firstname string,
	lastname string,
	city string,
	state string,
	email string,
	phone string,
	likesports string,
	liketheatre string,
	likeconcerts string,
	likejazz string,
	likeclassical string,
	likeopera string,
	likerock string,
	likevegas string,
	likebroadway string,
	likemusicals string)
    using csv
    options (path "{prefix}/data/allusers_pipe.txt", delimiter "|")
"""
sql(users_buyer)

In [None]:
sql("select count(*) from users_buyer")

### Create external table: users_seller

In [None]:
users_seller = """
create table if not exists users_seller (
	userid integer ,
	username string,
	firstname string,
	lastname string,
	city string,
	state string,
	email string,
	phone string,
	likesports string,
	liketheatre string,
	likeconcerts string,
	likejazz string,
	likeclassical string,
	likeopera string,
	likerock string,
	likevegas string,
	likebroadway string,
	likemusicals string)
    using csv
    options (path "{prefix}/data/allusers_pipe.txt", delimiter "|")
"""
sql(users_seller)

In [None]:
sql("select count(*) from users_seller")

### Create external table: venue

In [None]:
venue = """
create table if not exists venue(
	venueid integer,
	venuename string,
	venuecity string,
	venuestate string,
	venueseats integer)
    using csv
    options (path "{prefix}/data/venue_pipe.txt", delimiter "|")
"""
sql(venue)

In [None]:
sql("select count(*) from venue")

### Create external table: category

In [None]:
category="""
create table if not exists category(
	catid integer,
	catgroup string,
	catname string,
	catdesc string)    
    using csv
    options (path "{prefix}/data/category_pipe.txt", delimiter "|")
"""
sql(category)

In [None]:
sql("select count(*) from category")

### Create external table: date

In [None]:
ddate = """
create table if not exists ddate(
	dateid integer ,
	caldate date,
	day string,
	week integer,
	month string,
	qtr string,
	year integer,
	holiday string)    
    using csv
    options (path "{prefix}/data/date2008_pipe.txt", delimiter "|")
"""
sql(ddate)

In [None]:
sql("select count(*) from ddate")

### Create external table: event

In [None]:
event = """
create table if not exists event(
	eventid integer ,
	venueid integer,
	catid integer,
	dateid integer ,
	eventname string,
	starttime timestamp)    
    using csv
    options (path "{prefix}/data/allevents_pipe.txt", delimiter "|")
"""
sql(event)

In [None]:
sql("select count(*) from event")

### Create external table: listing

#### Make a note this data set is coming from third part data lake like AWS-S3

In [None]:
listing = """
create table if not exists listing(
	listid integer  ,
	sellerid integer ,
	eventid integer ,
	dateid integer ,
	numtickets integer ,
	priceperticket double, 
	totalprice double,
	listtime timestamp)    
    using csv
    options (path "s3a://snap-samples/redshift/listings_pipe.txt", delimiter "|")
    
"""

sql(listing)

In [None]:
sql("select count(*) from listing")

### Create external table: sales

In [None]:
sales = """
create table if not exists  sales(
	salesid integer ,
	listid integer,
	sellerid integer,
	buyerid integer,
	eventid integer,
	dateid integer,
	qtysold integer,
	pricepaid double,
	commission double,
	saletime timestamp)
    using csv
    options (
        path "{prefix}/data/sales_tab.txt"
        ,delimiter "\t"
        ,timestampFormat "d/MM/yyyy HH:mm:ss"
        ,inferSchema "true"
        )
"""
sql(sales)

In [None]:
sql("select count(*) from sales")

### Define the star schema between sales, users, category, venue and event in SNAP

In [None]:
create_sales_star_schema = """alter star schema on sales with stats as
many_to_one join of sales with event on sales.eventid = event.eventid
many_to_one join of sales with ddate on sales.dateid = ddate.dateid
many_to_one join of sales with users_buyer on sales.buyerid = users_buyer.userid
many_to_one join of sales with users_seller on sales.sellerid = users_seller.userid
many_to_one join of event with category on event.catid = category.catid
many_to_one join of event with venue on event.venueid = venue.venueid
"""
sql(create_sales_star_schema)

### Define the star schema between listing, event and users_seller in SNAP

In [None]:
create_listing_star_schema = """alter star schema on listing with stats as

many_to_one join of listing with event on listing.eventid = event.eventid
many_to_one join of listing with ddate on listing.dateid = ddate.dateid
many_to_one join of listing with users_seller on listing.sellerid = users_seller.userid

"""
sql(create_listing_star_schema)

### Drop SNAP for sales if it is already there

In [None]:
salessnap="""
drop olap index salessnap on sales
"""
sql(salessnap)

### Define the SNAP index for sales

In [None]:
salessnap="""

create olap index salessnap on sales
timestamp dimension starttime 
timestamp dimension saletime
timestamp dimension caldate
metric qtysold aggregator longSum is nullable nullvalue "0"
metric pricepaid aggregator doubleSum is nullable nullvalue "0.0"
metric commission aggregator doubleSum is nullable nullvalue "0.0"
dimension holiday is not nullable 
dimensions "users_buyer.username,users_buyer.city, users_buyer.state, users_buyer.likesports, users_buyer.liketheatre,users_buyer.likeconcerts, users_buyer.likejazz , \
users_buyer.likeclassical, users_buyer.likeopera, users_buyer.likerock, users_buyer.likevegas, users_buyer.likebroadway, users_buyer.likemusicals, \
venuename, venuecity, venuestate, catgroup, catname, catdesc \
, day, week, month, qtr, year, eventname , \
sales.eventid, sales.buyerid, sales.listid, sales.salesid" 

OPTIONS (        
    path "{prefix}/snapindex/salessnap",
    avgSizePerPartition  "40mb",
    preferredSegmentSize "20mb",
    rowFlushBoundary "100000",
    defaultNullValue "0"
)
"""

sql(salessnap)

### Load data into sales SNAP index

In [None]:
insert=""" insert olap index salessnap of sales """

sql(insert)

In [None]:
sql(""" REFRESH TABLE sales""")

In [None]:
sql(""" select count(*) from sales""")

### Drop SNAP index on listing if it is already there

In [None]:
salessnap="""
drop olap index listingsnap on listing
"""
sql(salessnap)

### Define the SNAP Index : Second Fact table is Listings.

In [None]:
listingsnap="""

create olap index listingsnap on listing
timestamp dimension caldate
metric numtickets aggregator longSum is nullable nullvalue "0"
metric priceperticket aggregator doubleSum is nullable nullvalue "0.0"
metric totalprice aggregator doubleSum is nullable nullvalue "0.0"
dimension holiday is not nullable 
dimensions "users_seller.username,users_seller.city, users_seller.state, \
, day, week, month, qtr, year, eventname , \
listing.eventid, listing.sellerid, listing.listid" 

OPTIONS (        
    path "{prefix}/snapindex/listingsnap",
    avgSizePerPartition  "40mb",
    preferredSegmentSize "20mb",
    rowFlushBoundary "100000",
    defaultNullValue "0"
)
"""

sql(listingsnap)

### Load data into listing SNAP index

In [None]:
insert=""" insert olap index listingsnap of listing """

sql(insert)

### We are done with creating indexes and loaded the data in to the indexes, let us do a simple query 

In [None]:
select_listing = """ 
    SELECT count(*) 
    FROM listing WHERE listid between 1 AND 10000 
    OR 
    listid between 5000 AND 25000
"""
sql(select_listing)

### Query: Find the total tickets sold on Jan 5th 2018

##### Note after loading data into SNAP you can query the original tables sales, ddate with the join keys. The query will get rewritten to use the SNAP Index

In [None]:
tickets_sold_onJan5th = """
SELECT sum(qtysold) 
FROM   sales as sales, ddate
WHERE  sales.dateid = ddate.dateid 
AND    ddate.caldate = '2008-01-05'
"""
sql(tickets_sold_onJan5th)

### Query: Find the top ten customers with maximum number of tickers

In [None]:
top_10_customers = """
SELECT firstname, lastname, total_quantity 
FROM   (SELECT buyerid, sum(qtysold) total_quantity
        FROM  sales
        GROUP BY buyerid
        ORDER BY total_quantity desc limit 10) Q, users
WHERE Q.buyerid = userid
ORDER BY Q.total_quantity desc
"""
sql(top_10_customers)

### Find the top 10 events with maximum revenue

In [None]:
top_10_events="""
select sales.eventid, sum(sales.pricepaid)
from sales, event
where sales.eventid = event.eventid
and sales.pricepaid > 30
group by sales.eventid
order by 2 desc
limit 10
"""

sql(top_10_events)

### Join sales and listing along with other non-indexed external tables

In [None]:
query3=""" 
SELECT SUM(`sales`.`qtysold`) AS `sum_qty_sold`, 
COUNT(1) AS `x__alias__0` 
FROM `EventsDB`.`salessnap` `sales` 
  JOIN `EventsDB`.`users` `users` ON (`sales`.`buyerid` = `users`.`userid`) 
  JOIN `EventsDB`.`event` `event` ON (`sales`.`sales_eventid` = `event`.`eventid`) 
  JOIN `EventsDB`.`listing` `listing` ON (`sales`.`listid` = `listing`.`listid`) 
  JOIN `EventsDB`.`ddate` `ddate` ON (`sales`.`caldate` = `ddate`.`caldate`) 
  JOIN `EventsDB`.`category` `category` ON (`event`.`catid` = `category`.`catid`) 
  JOIN `EventsDB`.`venue` `venue` ON (`event`.`venueid` = `venue`.`venueid`) 
  HAVING (COUNT(1) > 0)
"""
sql(query3)

## Let us stop here to exercise query notebooks 

### Describe SNAP index sales

In [None]:
sql(""" desc ddate""")