## 1. Load packages and set global variables

In [18]:
import os
import pandas as pd
from datetime import datetime as dt
import requests
import json
import database as db # our script for simpler db integration
from config import config # pull sensitive data from .ini files

# set data directory
data_path = (os.getcwd() + "/Data/")

# connection to Postgres database
conn = db.connect()

## 2. Create tables for database

Think about schema design, primary/foreign keys, datatypes, etc.

#### Endpoint tracker

Track position ID for each endpoint in API. Enables reading from that position instead of reloading full data each call.

In [325]:
db.execute_query(conn, "drop table endpoint_tracker")
db.execute_query(conn, "create table endpoint_tracker(endpoint_id serial primary key, endpoint_nme varchar(30), next_start int, update_dtm timestamp default current_timestamp)")

0

In [326]:
product_tracker = pd.DataFrame({
    'endpoint_nme': ['deals','organizations','persons','products'],
    'next_start': [0, 0, 0, 0]
})

db.execute_values(conn, product_tracker, 'endpoint_tracker')

# initialize db tracker
db.execute_query(conn, "select * from endpoint_tracker")

execute_values() done


[(1, 'deals', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157)),
 (2, 'organizations', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157)),
 (3, 'persons', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157)),
 (4, 'products', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157))]

#### Products

In [327]:
db.execute_query(conn, "drop table products")
db.execute_query(conn, "create table products(product_id int primary key, product_nme varchar(50), product_cde varchar(10), product_dsc varchar(100), active_ind boolean, owner_id int, owner_nme varchar(50), insert_dte date, update_dte date, price_id int, price_amt numeric(9,2))")

0

#### Persons

In [355]:
db.execute_query(conn, "drop table persons")
db.execute_query(conn, """
create table persons (
    persons_id int primary key,
    first_nme varchar(100),
    last_nme varchar(100),
    open_cnt int,
    closed_cnt int,
    emails_cnt int,
    activities_cnt int,
    won_cnt int,
    lost_cnt int,
    active_ind boolean,
    update_dte date,
    insert_dte date,
    last_activity_dte date,
    last_inmail_dte date,
    last_outmail_dte date,
    org_nme varchar(100),
    owner_id int,
    owner_nme varchar(100),
    owner_active_ind boolean,
    org_active_ind boolean,
    org_id int
)
""")

0

#### Organizations

#### Deals

## 3. Extract data from Pipedrive

deals

orgainzations

persons

products

In [252]:
def get_data(endpoint, start, limit):
    
    params = config(section='pipedrive')
    params.update({'start':start,'limit':limit})
    
    response = requests.get(params['company_domain'] + endpoint, params=params)
    
    if response.status_code == 200:
        json_msg = response.json()
        print('Extraction of {} data from Pipedrive complete.'.format(endpoint))
    else:
        print(response.status_code)
    
    data = json_msg['data']
    
    if json_msg['additional_data']['pagination']['more_items_in_collection']:
        next_start = json_msg['additional_data']['pagination']['next_start']
    elif data is None:
        next_start = start
    else:
        next_start = len(data)
    
    return data, next_start

### Products

In [64]:
def products_to_df(json_data, dtm_to_dte=True):
    
    # read product data
    products = pd.json_normalize(json_data)[['id','name','code','description','active_flag','owner_id.id','owner_id.name','add_time','update_time']]

    # rename columns
    products.rename(
        columns={
            'id':'product_id',
            'name':'product_nme',
            'code':'product_cde',
            'description':'product_dsc',
            'active_flag':'active_ind',
            'owner_id.id':'owner_id',
            'owner_id.name':'owner_nme',
            'add_time':'insert_dte',
            'update_time':'update_dte'
        }, inplace=True)

    # read individual price data
    prices = pd.json_normalize(json_data, 'prices')[['id','price','product_id']]

    # rename columns
    prices.rename(
        columns={
            'id':'price_id',
            'price':'price_amt'
        }, inplace=True
    )

    # join price data back to product -- unnecessary complexity to keep both
    products = products.join(prices.set_index('product_id'), on='product_id')

    # convert to timestamps
    for col in ['insert_dte','update_dte']:
        products[col] = pd.to_datetime(products[col])
        # conditionally strip timestamp
        if dtm_to_dte:
            products[col] = products[col].dt.date
    
    return products

In [262]:
def etl(endpoint, limit=500):
    
    if endpoint not in ['products','persons','deals','organizations']:
        print('{} is not a valid endpoint. Please try again.'.format(endpoint))
        return
    
    # find last start position
    start_pos = db.execute_query(conn, "select next_start from endpoint_tracker where endpoint_nme = '{endpoint}'".format(endpoint=endpoint))[0]

    # read MAX rows from most recently stored data
    json_data, start_pos = get_data(endpoint, start=start_pos, limit=limit)

    if json_data is not None:
        # increment tracker to new start
        db.execute_query(conn, "update endpoint_tracker set next_start = {pos}, update_dtm = current_timestamp where endpoint_nme = '{endpoint}'".format(pos=start_pos, endpoint=endpoint))

        # convert to df
        if endpoint == 'products':
            data = products_to_df(json_data)
        elif endpoint == 'persons':
            data = persons_to_df(json_data)
        else:
            print('no other workflows defined right now...')

        print('Transform complete.')
        
        # write data to database
        db.execute_values(conn, data, endpoint)
        
        print('Load to database complete.')
        
    else:
        print('No more {} data - tracker at position {}.'.format(endpoint, *start_pos))

In [329]:
etl('products')

Extraction of products data from Pipedrive complete.
Transform complete.
execute_values() done
Load to database complete.


In [356]:
db.execute_query(conn, "select * from endpoint_tracker")

[(1, 'deals', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157)),
 (2, 'organizations', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157)),
 (4, 'products', 7, datetime.datetime(2020, 9, 17, 22, 46, 38, 13641)),
 (3, 'persons', 0, datetime.datetime(2020, 9, 17, 22, 52, 24, 590015))]

In [331]:
db.execute_query(conn, "select * from products")

[(1,
  'Pro',
  'PRO',
  None,
  True,
  7316834,
  'Kirk Behrendt',
  datetime.date(2018, 12, 7),
  datetime.date(2020, 6, 16),
  1,
  Decimal('3700.00')),
 (3,
  'Inner Circle',
  'IC',
  None,
  True,
  7316834,
  'Kirk Behrendt',
  datetime.date(2019, 2, 5),
  datetime.date(2019, 2, 5),
  3,
  Decimal('0.00')),
 (4,
  'Dental Intel',
  'DI',
  None,
  True,
  7316834,
  'Kirk Behrendt',
  datetime.date(2019, 2, 5),
  datetime.date(2019, 2, 5),
  4,
  Decimal('0.00')),
 (5,
  'Connect',
  'CONNECT',
  None,
  True,
  7316834,
  'Kirk Behrendt',
  datetime.date(2019, 2, 5),
  datetime.date(2019, 2, 5),
  5,
  Decimal('0.00')),
 (6,
  'Academy',
  'ACADEMY',
  None,
  True,
  7316834,
  'Kirk Behrendt',
  datetime.date(2019, 2, 5),
  datetime.date(2019, 2, 5),
  6,
  Decimal('0.00')),
 (8,
  'Practice Assessment - Onsite',
  None,
  None,
  True,
  7316834,
  'Kirk Behrendt',
  datetime.date(2019, 6, 26),
  datetime.date(2019, 6, 26),
  8,
  Decimal('0.00')),
 (9,
  '5964',
  None,
  

### Persons

In [319]:
def persons_to_df(json_data, dtm_to_dte=True):
    
    persons = pd.json_normalize(json_data)[['id','first_name','last_name','open_deals_count','closed_deals_count','email_messages_count','activities_count','won_deals_count','lost_deals_count','active_flag','update_time','add_time','last_activity_date','last_incoming_mail_time','last_outgoing_mail_time','org_name','owner_id.id','owner_id.name','owner_id.active_flag','org_id.active_flag','org_id.value']]
    
    persons.rename(
        columns={
            'id':'persons_id',
            'first_name':'first_nme',
            'last_name':'last_nme',
            'open_deals_count':'open_cnt',
            'closed_deals_count':'closed_cnt',
            'email_messages_count':'emails_cnt',
            'activities_count':'activities_cnt',
            'won_deals_count':'won_cnt',
            'lost_deals_count':'lost_cnt',
            'active_flag':'active_ind',
            'update_time':'update_dte',
            'add_time':'insert_dte',
            'last_activity_date':'last_activity_dte',
            'last_incoming_mail_time':'last_inmail_dte',
            'last_outgoing_mail_time':'last_outmail_dte',
            'org_name':'org_nme',
            'owner_id.id':'owner_id',
            'owner_id.name':'owner_nme',
            'owner_id.active_flag':'owner_active_ind',
            'org_id.active_flag':'org_active_ind',
            'org_id.value':'org_id'
        }, inplace=True)
    
    # convert to timestamps
    for col in ['update_dte','insert_dte','last_activity_dte','last_inmail_dte','last_outmail_dte']:
        persons[col] = pd.to_datetime(persons[col])
        persons[col].fillna(pd.to_datetime('1900-01-01'), inplace=True)
        # conditionally strip timestamp
        if dtm_to_dte:
            persons[col] = persons[col].dt.date
        
    # handle missing values
    persons['org_nme'].fillna('', inplace=True)
    persons['org_active_ind'].fillna(False, inplace=True)
    persons['org_id'].fillna(-1, inplace=True)
    
    return persons

#### Areas for improvement

Error handling on queries -- maybe return a specific number (-1) on failed query. Before moving on, check for -1.

If we encounter -1 returned at ANY TIME, we should immediately break out of the ETL function.

Should not increment position in endpoint_tracker if query or data pull is unsuccessful. Maybe push the increment later in the ETL?

In [403]:
etl('persons',limit=500)

Extraction of persons data from Pipedrive complete.
Transform complete.
Error: duplicate key value violates unique constraint "persons_pkey"
DETAIL:  Key (persons_id)=(359) already exists.

Load to database complete.


In [None]:
# iterate until we have all data
while True:
    etl('persons') # returns False if part of workflow fails

In [408]:
db.execute_query(conn, 'select max(persons_id) from persons')

[(16187,)]

In [411]:
db.execute_query(conn, 'select * from persons where persons_id = 16169')

[(16169,
  'kim',
  'lien hoang',
  0,
  0,
  2,
  0,
  0,
  0,
  True,
  datetime.date(2020, 9, 17),
  datetime.date(2020, 9, 17),
  datetime.date(1900, 1, 1),
  datetime.date(2020, 4, 24),
  datetime.date(2020, 4, 24),
  '',
  7316834,
  'Kirk Behrendt',
  True,
  False,
  -1)]

In [407]:
db.execute_query(conn, 'select * from endpoint_tracker')

[(1, 'deals', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157)),
 (2, 'organizations', 0, datetime.datetime(2020, 9, 17, 22, 46, 21, 86157)),
 (4, 'products', 7, datetime.datetime(2020, 9, 17, 22, 46, 38, 13641)),
 (3, 'persons', 14682, datetime.datetime(2020, 9, 17, 22, 59, 4, 459471))]

### Deals

In [231]:
deals, deals_start = get_data('deals',start=0,limit=1)
pd.json_normalize(deals)

Retrieved deals data from Pipedrive


Unnamed: 0,id,stage_id,title,value,currency,add_time,update_time,stage_change_time,active,deleted,...,e95d6b75fe35d641090f38583a71c3f782c0bb4e.email,e95d6b75fe35d641090f38583a71c3f782c0bb4e.phone,e95d6b75fe35d641090f38583a71c3f782c0bb4e.value,a7357ae350bb916a359a580ce5847b896b79cc97.name,a7357ae350bb916a359a580ce5847b896b79cc97.people_count,a7357ae350bb916a359a580ce5847b896b79cc97.owner_id,a7357ae350bb916a359a580ce5847b896b79cc97.address,a7357ae350bb916a359a580ce5847b896b79cc97.active_flag,a7357ae350bb916a359a580ce5847b896b79cc97.cc_email,a7357ae350bb916a359a580ce5847b896b79cc97.value
0,13040,65,Discovery Study Club Lecture,7900,USD,2019-03-29 02:29:30,2020-09-02 17:29:42,2019-05-31 18:11:53,True,False,...,"[{'label': 'work', 'value': 'dr.mark@realteeth...","[{'label': 'mobile', 'value': '412-996-2023', ...",14908,Discovery Study Club,2,7573109,,True,actdental-c8c401@pipedrivemail.com,459
