In [1]:
import os
import sys
from dateutil import parser
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [2]:
from src.db import fetch_profiles, events_for_profile_ids, EVENTS_TABLE, PROFILES_TABLE

# Example Data

Data from our noSQL store (DynamoDB)

In [3]:
profile = PROFILES_TABLE.get_item(Key={"$distinct_id": '156148eb09715c-056eef5e2ee445-e313161-100200-156148eb09aa4'})

In [4]:
event = EVENTS_TABLE.get_item(Key={'event_id': '15cac3683c12b5-074efb05ef78e-1d401925-fa000-15cac3683c249e:1498113686'})

# SQL

Exploring creating tables and storing data in SQL instead of DynamoDB

In [1]:
import psycopg2
from psycopg2.extensions import AsIs
conn = psycopg2.connect(dbname='conversion_deck', host='localhost')

In [2]:
cur = conn.cursor()

In [3]:
query = '''
        CREATE TABLE users (
            distinct_id varchar(75), 
            camp_count int, 
            camp_deliveries int,
            email varchar(255),
            first_name varchar(50),
            last_name varchar(50),
            is_paying boolean,
            is_registered boolean,
            signup_at date,
            vertical varchar(255),
            country_code varchar(10),
            subscription_type varchar(255)
        );
        
        CREATE UNIQUE INDEX distinct_idx ON users (distinct_id);
        '''
cur.execute(query)

In [8]:
query = '''
        CREATE TABLE events (
            type varchar(255),
            time timestamp,
            distinct_id varchar(75),
            event_id varchar(255)
        );
        
        CREATE UNIQUE INDEX event_id_idx ON events (event_id);
        '''
cur.execute(query)

In [2]:
conn.commit()
conn.close()

### Formatting Data

In [9]:
from collections import defaultdict

def format_sql_profile(profile):
    properties = profile['$properties']
    properties = defaultdict(str, properties)

    signup_at = None
    vertical = None
    subscription_type = None
    camp_count = 0
    camp_deliveries = 0
    is_paying = False
    is_registered = False
    
    if 'signupDate' in properties:
        signup_at = parser.parse(properties['signupDate'])
        
    if 'vertical' in properties:
        vertical = properties['vertical']
        
    if 'subscriptionType' in properties:
        subscription_type = properties['subscriptionType']
        
    if '$campaigns' in properties:
        camp_count = len(properties['$campaigns'])
        
    if '$deliveries' in properties:
        camp_deliveries = len(properties['$deliveries'])
    
    if 'isPaying' in properties:
        is_paying = properties['isPaying']
        
    if 'isRegistered' in properties:
        is_registered = properties['isRegistered']
    
    return {
        "distinct_id": profile['$distinct_id'],
        "camp_count": camp_count,
        "camp_deliveries": camp_deliveries,
        "email": properties['$email'],
        "first_name": properties['$first_name'],
        "last_name": properties['$last_name'],
        "is_paying": is_paying,
        'is_registered': is_registered,
        'signup_at': signup_at,
        'vertical': vertical,
        'subscription_type': subscription_type,
    }

In [49]:
from datetime import datetime

def format_sql_event(event):
    ts = None
    
    if 'time' in event['properties']:
        ts = datetime.fromtimestamp(event['properties']['time'])
    
    return {
        'type': event['event'],
        'event_id': event['event_id'],
        'distinct_id': event['profile_id'],
        'time': ts   
    }
    

In [52]:
def insert_sql_event(cur, event):
    data = format_sql_event(event)
    columns = data.keys()
    values = [data[column] for column in columns]
    insert_statement = 'insert into events (%s) values %s'

    return cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))

In [10]:
def insert_sql_profile(cur, profile):
    data = format_sql_profile(profile)
    columns = data.keys()
    values = [data[column] for column in columns]
    insert_statement = 'insert into users (%s) values %s'

    return cur.execute(insert_statement, (AsIs(','.join(columns)), tuple(values)))

In [33]:
from sqlalchemy.exc import IntegrityError

def import_sql_profiles(cursor):
    for profile in fetch_profiles():
        try:
            insert_sql_profile(cursor, profile)
        except IntegrityError:
            # Duplicate key error
            pass

In [64]:
import pandas as pd
pd.read_sql_query('select * from events;', conn)

Unnamed: 0,type,time,distinct_id,event_id
0,Export PPT,2017-06-21 23:41:26,15cac3683c12b5-074efb05ef78e-1d401925-fa000-15...,15cac3683c12b5-074efb05ef78e-1d401925-fa000-15...
