In [None]:
!pip install -U -r requirements.txt

In [None]:
import os 
import prestodb 
import pandas 
import sqlalchemy

In [None]:
PRESTO_SERVER = os.environ.get('PRESTO_SERVER')
PRESTO_USER= os.environ.get('PRESTO_USER')
PRESTO_PORT= os.environ.get('PRESTO_PORT')
PRESTO_CATALOG= os.environ.get('PRESTO_CATALOG')

In [None]:
engine = sqlalchemy.create_engine("presto://%s@%s:%d/%s" %(PRESTO_USER, PRESTO_SERVER, 80, PRESTO_CATALOG))
pandas.read_sql('SHOW CATALOGS', engine)

In [None]:
# basic join between customer-domain and finance-domain
# similar to what we did in the previous notebook
sql = '''

SELECT 
    c.id, 
    c.customername, 
    c.customeraddr, 
    c.mktsegment, 
    c.status, 
    SUM(o.amount) as sum_purchased
FROM "customer-domain".public.customer c 
JOIN "finance-domain".public.transactions o ON c.id = o.customerid
GROUP BY c.id, c.customername, c.customeraddr, c.mktsegment, c.status
ORDER BY sum_purchased DESC

'''

df = pandas.read_sql(sql, engine)
df.head()

In [None]:
# clean and extract JSON message on kafka queue first 
# then LEFT JOIN to customer and finance domains
sql = '''

WITH 
    KAFKA AS (
        SELECT 
            id, 
            COUNT(*) AS message_count
        FROM (
            SELECT 
                JSON_EXTRACT(_message, '$.customer_number') as id, 
                JSON_EXTRACT(_message, '$.txt') as txt 
            FROM "notebook-test"
        )
        GROUP BY id 
    ) 

SELECT DISTINCT
    c.id, 
    c.customername, 
    c.mktsegment, 
    SUM(o.amount) AS sum_purchased, 
    k.message_count
FROM "customer-domain".public.customer c 
JOIN "finance-domain".public.transactions o ON c.id = o.customerid
LEFT JOIN KAFKA k on c.id = CAST(k.id AS INTEGER)
GROUP BY c.id, c.customername, c.mktsegment, k.message_count
ORDER BY sum_purchased DESC

'''
df = pandas.read_sql(sql, engine)
df.head()

In [None]:
# use SQL to build features accross domains without making extracts or copies
# limit to active customers for now 
sql = '''

WITH 
    KAFKA AS (
        SELECT 
            id, 
            COUNT(*) AS message_count
        FROM (
            SELECT 
                JSON_EXTRACT(_message, '$.customer_number') as id, 
                JSON_EXTRACT(_message, '$.txt') as txt 
            FROM "notebook-test"
        )
        GROUP BY id 
    )  

SELECT DISTINCT 
    c.id, 
    c.customername, 
    c.mktsegment, 
    day(current_date - c.effectivedate) AS tot_days_active,
    MAX(o.amount) AS tot_max_prch,
    MIN(o.amount) AS tot_min_prch,
    AVG(o.amount) AS tot_mean_prch,
    COUNT(o.amount) AS tot_count_prch,
    SUM(o.amount) AS tot_sum_prch, 
    k.message_count AS tot_message_count
FROM "customer-domain".public.customer c 
JOIN "finance-domain".public.transactions o ON c.id = o.customerid
LEFT JOIN KAFKA k on o.customerid = CAST(k.id AS INTEGER)
WHERE c.status > 0
GROUP BY c.id, c.customername, c.mktsegment, k.message_count, day(current_date - c.effectivedate)
ORDER BY tot_sum_prch DESC


'''
df = pandas.read_sql(sql, engine)
df.head()

In [None]:
# can use the HIVE connector to create views 

In [None]:
# use SQL to build features accross domains without making extracts or copies
# limit to active customers for now 
sql = '''

WITH 
    KAFKA AS (
        SELECT 
            id, 
            COUNT(*) AS message_count
        FROM (
            SELECT 
                JSON_EXTRACT(_message, '$.customer_number') as id, 
                JSON_EXTRACT(_message, '$.txt') as txt 
            FROM "notebook-test"
        )
        GROUP BY id 
    ),
    customer AS (
        SELECT 
            c.id, 
            c.customername, 
            c.effectivedate, 
            c.mktsegment, 
            c.status, 
            o.transdate, 
            o.amount 
        FROM "customer-domain".public.customer c 
        JOIN "finance-domain".public.transactions o ON c.id = o.customerid
    ),
    total AS (
        SELECT DISTINCT 
            c.id, 
            c.customername, 
            c.mktsegment, 
            day(current_date - c.effectivedate) AS tot_days_active,
            MAX(c.amount) AS tot_max_prch,
            MIN(c.amount) AS tot_min_prch,
            AVG(c.amount) AS tot_mean_prch,
            COUNT(c.amount) AS tot_count_prch,
            SUM(c.amount) AS tot_sum_prch, 
            k.message_count AS tot_message_count
        FROM customer c
        LEFT JOIN KAFKA k on c.id = CAST(k.id AS INTEGER)
        WHERE c.status > 0
        GROUP BY c.id, c.customername, c.mktsegment, k.message_Count, day(current_date - c.effectivedate)
        ORDER BY tot_sum_prch DESC
    ),
    three AS ( 
        SELECT DISTINCT 
            c.id, 
            MAX(c.amount) AS three_max_prch,
            MIN(c.amount) AS three_min_prch,
            AVG(c.amount) AS three_mean_prch,
            COUNT(c.amount) AS three_count_prch,
            SUM(c.amount) as three_sum_prch
        FROM customer c
        WHERE c.status > 0 AND c.transdate > date '2018-01-01'
        GROUP BY c.id, c.customername, c.mktsegment, day(current_date - c.effectivedate)
        ORDER BY three_sum_prch DESC
    )
    
SELECT 
    t.*, 
    ttt.three_max_prch,
    ttt.three_min_prch,
    ttt.three_count_prch,
    ttt.three_sum_prch
FROM total t 
JOIN three ttt ON t.id = ttt.id
ORDER BY tot_sum_prch DESC 

'''
df = pandas.read_sql(sql, engine)
df.head()