# Download Email Events from Hubspot API

This app will download all email events from Hubspot API. It uses Campaign Statistics to decide what timeframe to begin downloading from.

Resources:
* MSSQL - Run DDL in the project against your database
* Schema named "hs" in database

Put the event types you want to download in the hs.DownloadableEventType table

In [1]:
import requests, json, time
import pandas as pd
import sqlalchemy
import pyodbc as odbc

import os
import configparser, json

In [2]:
parser = configparser.ConfigParser()

if os.name == "posix":
    parser.read(u'../config.ini')
else:
    parser.read(u"..\config.ini")

In [3]:
key =  parser.get('Hubspot', 'APIKEY')
apiKey = "?hapikey=" + key
limit = "&limit=1000"
firstParm = apiKey + limit
baseURL = "https://api.hubapi.com"
runningCallCount = 0

In [4]:
# SQLAlchemy Connection
meta_engine = sqlalchemy.create_engine(parser.get('Database', 'HubspotSqlAlc'))

# Test the connection
connection = meta_engine.connect()
result = connection.execute("SELECT @@ServerName ServerName, DB_NAME() AS DatabaseName, CURRENT_USER AS LoggedInAs")
for row in result:
    print (row['ServerName'], row['DatabaseName'], row['LoggedInAs'])

(u'WIN10', u'Dev', u'mac')


In [5]:
# PYODBC Connection
conn = odbc.connect(parser.get('Database', 'HubspotPy'), autocommit=True)
curs = conn.cursor()
curs.execute("SELECT @@ServerName AS ServerName, DB_NAME() AS DatabaseName, CURRENT_USER AS LoggedInAs")
print curs.fetchone()

(u'WIN10', u'Dev', u'mac')


## Database table structures
Pandas uses these dictionaries to know how to put the data into the database

In [6]:
columnsCampaignStatistics = {
      'appId': sqlalchemy.types.INTEGER()
    , 'appName': sqlalchemy.types.VARCHAR(length=255)
    , 'bounce': sqlalchemy.types.INTEGER()
    , 'campaignId': sqlalchemy.types.INTEGER()
    , 'campaignName': sqlalchemy.types.VARCHAR(length=255)
    , 'click': sqlalchemy.types.INTEGER()
    , 'deferred': sqlalchemy.types.INTEGER()
    , 'delivered': sqlalchemy.types.INTEGER()
    , 'dropped': sqlalchemy.types.INTEGER()
    , 'forward': sqlalchemy.types.INTEGER()
#    , 'lastUpdatedTime': sqlalchemy.types.BIGINT()
    , 'mta_dropped': sqlalchemy.types.INTEGER()
    , 'numIncluded': sqlalchemy.types.INTEGER()
    , 'numQueued': sqlalchemy.types.INTEGER()
    , 'open': sqlalchemy.types.INTEGER()
    , 'print': sqlalchemy.types.INTEGER()
    , 'processed': sqlalchemy.types.INTEGER()
    , 'processingState': sqlalchemy.types.VARCHAR(length=255)
    , 'sent': sqlalchemy.types.INTEGER()
    , 'spamreport': sqlalchemy.types.INTEGER()
    , 'statuschange': sqlalchemy.types.INTEGER()
    , 'subject': sqlalchemy.types.VARCHAR(length=255)
    , 'type': sqlalchemy.types.VARCHAR(length=255)
    , 'unsubscribed': sqlalchemy.types.INTEGER()
}

columnsCampaignStatisticsHistory = {
      'RunID': sqlalchemy.types.INTEGER()
    , 'appId': sqlalchemy.types.INTEGER()
    , 'appName': sqlalchemy.types.VARCHAR(length=255)
    , 'bounce': sqlalchemy.types.INTEGER()
    , 'campaignId': sqlalchemy.types.INTEGER()
    , 'campaignName': sqlalchemy.types.VARCHAR(length=255)
    , 'click': sqlalchemy.types.INTEGER()
    , 'deferred': sqlalchemy.types.INTEGER()
    , 'delivered': sqlalchemy.types.INTEGER()
    , 'dropped': sqlalchemy.types.INTEGER()
    , 'forward': sqlalchemy.types.INTEGER()
#    , 'lastUpdatedTime': sqlalchemy.types.BIGINT()
    , 'mta_dropped': sqlalchemy.types.INTEGER()
    , 'numIncluded': sqlalchemy.types.INTEGER()
    , 'numQueued': sqlalchemy.types.INTEGER()
    , 'open': sqlalchemy.types.INTEGER()
    , 'print': sqlalchemy.types.INTEGER()
    , 'processed': sqlalchemy.types.INTEGER()
    , 'processingState': sqlalchemy.types.VARCHAR(length=255)
    , 'sent': sqlalchemy.types.INTEGER()
    , 'spamreport': sqlalchemy.types.INTEGER()
    , 'statuschange': sqlalchemy.types.INTEGER()
    , 'subject': sqlalchemy.types.VARCHAR(length=255)
    , 'type': sqlalchemy.types.VARCHAR(length=255)
    , 'unsubscribed': sqlalchemy.types.INTEGER()
}

columnsEmailEvent = {
      'RunID': sqlalchemy.types.INTEGER()
    , 'appId': sqlalchemy.types.INTEGER()
    , 'created': sqlalchemy.types.BIGINT()
    , 'deviceType': sqlalchemy.types.VARCHAR(length=255)
    , 'emailCampaignId': sqlalchemy.types.INTEGER()
    , 'recipient': sqlalchemy.types.VARCHAR(length=255)
    , 'type': sqlalchemy.types.VARCHAR(length=255)
    , 'country': sqlalchemy.types.VARCHAR(length=255)
    , 'state': sqlalchemy.types.VARCHAR(length=255)
    , 'city': sqlalchemy.types.VARCHAR(length=255)
    , 'duration': sqlalchemy.types.INTEGER()
    , 'browser': sqlalchemy.types.VARCHAR(length=255)
}

## Helper Functions

In [7]:
# push pandas dataframe to database
def pushDFToDB(df, tablename):
    df.to_sql(tablename, meta_engine, index=False, if_exists='append')

# push dictionary into pandas dataframe
def dictToDF(dict):
    return pd.DataFrame.from_dict(dict, orient='columns', dtype=None)

# parse the Epoch into a real date
def parseDate(datenum):
    return time.localtime(round(int(datenum/1000)))

def stringifyDate(thisDate):
    return time.strftime("%a, %d %b %Y %H:%M:%S", thisDate)

# parse Single Campaign Statistics from the json received from Hubspot
def parseSingleCampaignStatistics(r):
    row = {}
    
    row[u'appId'] = r['appId']
        
    row[u'campaignId'] = r['id']
    
    # row[u'lastUpdatedTime'] = r['lastUpdatedTime']

    if 'name' in r:
        row[u'campaignName'] = r['name']
    else:
        row[u'campaignName'] = ''
    
    if 'appName' in r:
        row[u'appName'] = r['appName']
    else:
        row[u'appName'] = ''
        
    if 'numIncluded' in r:
        row[u'numIncluded'] = r['numIncluded']
    else:
        row[u'numIncluded'] = 0
    
    if 'numQueued' in r:
        row[u'numQueued'] = r['numQueued']
    else:
        row[u'numQueued'] = 0
        
    if 'processingState' in r:
        row[u'processingState'] = r['processingState']
    else:
        row[u'processingState'] = ''
        
    if 'subject' in r:
        row[u'subject'] = r['subject']
    else:
        row[u'subject'] = ''
    
    if 'type' in r:
        row[u'type'] = r['type']
    else:
        row[u'type'] = ''
        
    #counters
    c = {}
    if 'counters' in r:
        c = r['counters'] 

        if 'bounce' in c:
            row[u'bounce'] = c['bounce']
        else:
            row[u'bounce'] = 0

        if 'click' in c:
            row[u'click'] = c['click']
        else:
            row[u'click'] = 0

        if 'deferred' in c:
            row[u'deferred'] = c['deferred']
        else:
            row[u'deferred'] = 0

        if 'delivered' in c:
            row[u'delivered'] = c['delivered']
        else:
            row[u'delivered'] = 0

        if 'dropped' in r:
            row[u'dropped'] = r['dropped']
        else:
            row[u'dropped'] = 0

        if 'forward' in c:
            row[u'forward'] = c['forward']
        else:
            row[u'forward'] = 0

        if 'mta_dropped' in c:
            row[u'mta_dropped'] = c['mta_dropped']
        else:
            row[u'mta_dropped'] = 0

        if 'open' in c:
            row[u'open'] = c['open']
        else:
            row[u'open'] = 0

        if 'print' in r:
            row[u'print'] = r['print']
        else:
            row[u'print'] = 0

        if 'processed' in c:
            row[u'processed'] = c['processed']
        else:
            row[u'processed'] = 0

        if 'sent' in c:
            row[u'sent'] = c['sent']
        else:
            row[u'sent'] = 0

        if 'spamreport' in c:
            row[u'spamreport'] = c['spamreport']
        else:
            row[u'spamreport'] = 0

        if 'statuschange' in c:
            row[u'statuschange'] = c['statuschange']
        else:
            row[u'statuschange'] = 0

        if 'unsubscribed' in c:
            row[u'unsubscribed'] = c['unsubscribed']
        else:
            row[u'unsubscribed'] = 0

    return row

# parse Single Event into dictionary from json received from Hubspot
def parseSingleEvent(r):
    row = {}
    
    if 'recipient' in r:
        row[u'recipient'] = r['recipient']
    else:
        row[u'recipient'] = ''
    
    if 'created' in r:
        row[u'created'] = r['created']
    else:
        row[u'created'] = 0
        
    if 'emailCampaignId' in r:
        row[u'emailCampaignId'] = r['emailCampaignId']
    else:
        row[u'emailCampaignId'] = 0
    
    if 'deviceType' in r:
        row[u'deviceType'] = r['deviceType']
    else:
        row[u'deviceType'] = ''
        
    if 'type' in r:
        row[u'type'] = r['type']
    else:
        row[u'type'] = ''
        
    if 'appId' in r:
        row[u'appId'] = r['appId']
    else:
        row[u'appId'] = 0
        
    if 'location' in r:
        if 'country' in r['location']:
            row[u'country'] = r['location']['country']
    else:
        row[u'country'] = ''
        
    if 'location' in r:
        if 'state' in r['location']:
            row[u'state'] = r['location']['state']
    else:
        row[u'state'] = ''
        
    if 'location' in r:
        if 'city' in r['location']:
            row[u'city'] = r['location']['city']
    else:
        row[u'city'] = ''
        
    if 'duration' in r:
        row[u'duration'] = r['duration']
    else:
        row[u'duration'] = 0
        
    if 'browser' in r:
        if 'name' in r['browser']:
            row[u'browser'] = r['browser']['name']
    else:
        row[u'browser'] = ''
        
    return row

# getContactStatistics()

* Get Contact Statistics from Hubspot
* Returns: Dictionary of Contact Stats

In [8]:
# Get Contact Statistics
# Return: Dictionary
def getContactStats():
    statURL = "/contacts/v1/contacts/statistics"
    url = baseURL + statURL + apiKey
    r = requests.get(url)

    return r.json()

# getAllCampaignsList()

* Get Base List of Campaigns from Hubspot
* Returns: Dictionary of All Campaigns

In [9]:
# Get base campaigns
# Return: Dictionary

def getAllCampaignsList(noisy=False):
    # First call to get campaigns
    campsURL = "/email/public/v1/campaigns/by-id"

    url = baseURL + campsURL + firstParm
    r = requests.get(url)

    callCount = 1
    d = r.json()['campaigns']

    # Subsequent calls to get rest of campaigns
    while r.json()['hasMore']:
        
        if (callCount % 10) == 0:
            time.sleep(1)
        
        offset = r.json()['offset']
        nextURL = url + "&offset=" + offset
        
        r = requests.get(nextURL)

        callCount = callCount + 1
        d = d + r.json()['campaigns']
    
    if noisy:
        print "Campaign List Call Count: " + str(callCount)
    
    return d, callCount

# getSingleCampaignStatistics()

* Get Single Campaign Statistic from Hubspot
* Returns: Dictionary of Campaign Metadata

In [10]:
# Get a single campaign's statistics
# Return: Dictionary

def getSingleCampaignStatistics(camp, noisy=False):
    campStatURL = "/email/public/v1/campaigns/" + str(camp['id'])
    appId = "&appId=" + str(camp['appId'])

    url = baseURL + campStatURL + apiKey + appId
    
    r = requests.get(url)
    js = r.json()
    # js['lastUpdatedTime'] = camp['lastUpdatedTime']

    parsedCampaign = parseSingleCampaignStatistics(js)
    if noisy:
        print "Getting Statistics for " + parsedCampaign['campaignName']
        
    return parsedCampaign

# getAllCampaignsStatistics()

* Get All Campaigns' Statistics from Hubspot
* Returns: Dictionary of Campaign Statistics, Number of Calls made

In [11]:
# Get all campaigns metadata
# Return: Dictionary

def getAllCampaignsStatistics(noisy=False):
    camps = []
    
    campaigns, callCount = getAllCampaignsList(noisy=noisy)
    
    if noisy:
        print "Number of Campaigns: " + str(len(campaigns))
    
    for camp in campaigns:
        camps = camps + [getSingleCampaignStatistics(camp)]

    callCount = callCount + len(camps)
    
    return callCount, camps

# getCampaignEventsSince()

* Get All of a single Campaign's Specific EventTypes since x/x/xxxx
* Store results in temporary database table
* Returns: Number of Calls Made, Number of Events Downloaded

In [12]:
# Get specific eventType for a sepcific campaign after defined time
# Return: Dictionary

def getCampaignEventsSince(app, campaign, since, event, identity, noisy=False):
   
    campEventURL = "/email/public/v1/events"
    appId = "&appId=" + str(app)
    campId = "&campaignId=" + str(campaign)
    timeSince = "&startTimestamp=" + str(since)
    event = "&eventType=" + event

    url = baseURL + campEventURL + firstParm + appId + campId + timeSince + event
    r = requests.get(url)

    callCount = 1
    d = r.json()['events']
    
    events = []
    for event in d:
        events = events + [parseSingleEvent(event)]
    
    # Put events in Database
    if len(events) > 0:
        df = dictToDF(events)
        df['RunID'] = identity
        
        df.to_sql(name="EmailEventTemp"
                    , con=meta_engine
                    , schema="hs"
                    , index=False
                    , if_exists='append'
                    , dtype=columnsEmailEvent)
        
    eventCount = len(events)
    
    # Subsequent calls to get rest of campaigns
    while r.json()['hasMore']:
        callCount = callCount + 1
            
        if (callCount % 10) == 0:
            time.sleep(1)
        
        offset = r.json()['offset']
        nextURL = url + "&offset=" + offset
        
        r = requests.get(nextURL)

        d = r.json()['events']
        
        events = []
        for event in d:
            events = events + [parseSingleEvent(event)]
        
        # Put events in Database
        if len(events) > 0:
            df = dictToDF(events)
            df['RunID'] = identity
            
            df.to_sql(name="EmailEventTemp"
                        , con=meta_engine
                        , schema="hs"
                        , index=False
                        , if_exists='append'
                        , dtype=columnsEmailEvent)
            
            eventCount = eventCount + len(events)
    
    return callCount, eventCount

# getNewEvents
* Gets events for all campaigns


This is where the real work is done
First have to figure out what campaigns and events we want to download
Then have to figure out when the last time we downloaded so that we don't get data we have already downloaded

In [13]:
def getNewEvents(eventType, identity):

    # Get stats for this event
    sql = "SELECT * FROM hs.RunStatistics WHERE eventType = '%s'" % eventType
    campaigns = pd.read_sql(sql, meta_engine)

    totalEventCount = 0

    curs.execute("TRUNCATE TABLE hs.EmailEventTemp")

    # loop through the campaigns for this eventType
    for index, row in campaigns.iterrows():
        numCalls, eventCount = getCampaignEventsSince(row['appId']
                                                    , row['campaignId']
                                                    , row['lastUpdatedTime']
                                                    , row['eventType']
                                                    , identity)

        output = "appId:%s\tcampaignId:%s\t%s:%s\texpected:%s\tcalls:%s" % (row['appId']
                                                                              , row['campaignId']
                                                                              , row['eventType']
                                                                              , eventCount
                                                                              , row['ExpectingAtLeast']
                                                                              , numCalls)
        print output

        #runningCallCount = runningCallCount + numCalls
        #totalEventCount = totalEventCount + eventCount

    curs.execute("EXEC hs.FinalizeEvent ?", eventType)

    #print runningCallCount
    #print totalEventCount

## Step 1: Setup the Download Run
I like to keep track of each run and when I did it

In [14]:
curs.execute("INSERT INTO hs.DownloadRun(RunDate) VALUES(GETDATE())")
ident = curs.execute("SELECT SCOPE_IDENTITY()").fetchone()[0]

## Step 2: Get New Campaign Stats

In [15]:
# Get All the campaign statistics
numCalls, campaignStats = getAllCampaignsStatistics(noisy=True)

runningCallCount = runningCallCount + numCalls

# put them in database
dictToDF(campaignStats).to_sql(name="CampaignStatistics"
                               , schema="hs"
                               , con=meta_engine
                               , index=False
                               , if_exists="replace"
                               , dtype=columnsCampaignStatistics)

Campaign List Call Count: 1
Number of Campaigns: 260


## Step 3: Loop through the events we want to capture and download

In [16]:
# hs.DownloadableEventType stores the event Types I want
sql = "SELECT * FROM hs.DownloadableEventType"
eventList = pd.read_sql(sql, meta_engine)

for index, row in eventList.iterrows():
    getNewEvents(row['eventType'], ident)

appId:113	campaignId:48837341	click:1	expected:1	calls:1
appId:22843	campaignId:46643110	click:93	expected:82	calls:1
appId:2286	campaignId:46192935	click:258	expected:238	calls:1
appId:2286	campaignId:46186380	click:398	expected:360	calls:1
appId:113	campaignId:46169860	click:244	expected:224	calls:1
appId:113	campaignId:44396401	click:53	expected:33	calls:1
appId:113	campaignId:44392461	click:224	expected:145	calls:1
appId:22843	campaignId:43337453	click:77	expected:67	calls:1
appId:2286	campaignId:43183550	click:639	expected:584	calls:1
appId:2286	campaignId:43182828	click:81	expected:74	calls:1
appId:113	campaignId:43154241	click:67	expected:50	calls:1
appId:113	campaignId:42714048	click:21	expected:19	calls:1
appId:113	campaignId:42029398	click:182	expected:166	calls:1
appId:113	campaignId:42028889	click:126	expected:116	calls:1
appId:113	campaignId:42028740	click:93	expected:89	calls:1
appId:22843	campaignId:41942094	click:70	expected:60	calls:1
appId:113	campaignId:41681386	clic

## Step 4: Store Stats History
Finalize the data

In [17]:
df = dictToDF(campaignStats)
df['RunID'] = ident

df.to_sql(name="CampaignStatisticsHistory"
            , schema="hs"
            , con=meta_engine
            , index=False
            , if_exists="append"
            , dtype=columnsCampaignStatisticsHistory)