# Overview

This is for the second round of testing.  **Updated 18 August 2022**. 

- Next: We now have functions to match emails in Airtable to emails in Presto.  Some emails match multiple Presto records.  Need to decide what to do with that.
    


In [1]:
import pandas as pd
import numpy as np
import os #not sure if this works on the Airflow directory structure.  It should.  But, untested.
import json 
import requests
from airtable import airtable #You need to do a pip install airtable.  Page at https://pypi.org/project/airtable/
from collections import Counter #For getting unique values from a list.

#For presto access
import getpass
import prestodb

In [2]:
#Read the secrets file
def read_secrets(fn) -> dict:
    filename = os.path.join(fn)
    try:
        with open(filename, mode='r') as f:
            return json.loads(f.read())
    except FileNotFoundError:
        return {}

In [3]:
#note the API base id is specific to the Airtable "base".  So, adjust the secrets and the ID to fit the specific Airtable base.
#the api key is an account-wide value so shouldn't change.
secret = read_secrets('secrets2.json')

#secret3 is username and password for Presto connection.  Will be unique to the environment.
secret3 = read_secrets('secrets3.json')

In [4]:
#Start by downloading data from Airtable
#https://towardsdatascience.com/downloading-airtable-data-into-python-89e5c7107a24

#Get a table in the Airtable base
#see https://github.com/josephbestjames/airtable.py

at = airtable.Airtable(secret['AIRTABLE_BASE_ID'], secret['AIRTABLE_API_KEY'])

In [5]:
#Note, only returns 1 page = 100 records per request.  So, we'll need to do pagination by repeatedly calling 'get' untill there is not 'offset' value returned - the last page.

def get_all_records(pages,offset=None,count=0):
    
    count = count + 1
    at_dict = at.get(secret['AIRTABLE_TABLE_ID'],offset=offset) 
    
    if 'offset' in at_dict: #this is not the last page
        offset = at_dict['offset']
        pages.append(at_dict) 
        if count % 10 == 0:
            print(count,offset) #this is for debugging.  Remove in production version.
        return(get_all_records(offset = offset,pages = pages,count=count))
            
    else: #if there is no offset, that means that we hit the last page
        pages.append(at_dict) 
        print(count,offset)
        return(pages)

In [6]:
#For now, need to pass these blank lists.  Might be a better way, like tuples or something.
pages = []
all_pages = get_all_records(pages=pages)

10 itrmZ321S4i0qMOtO/rec5CIPLxq6bYu481
20 itrmZ321S4i0qMOtO/recAF6s9I2j3mzhvS
30 itrmZ321S4i0qMOtO/recEwzqnOjgu2o0KG
40 itrmZ321S4i0qMOtO/recJjydwQWSRIbgiI
50 itrmZ321S4i0qMOtO/recOcfKDipC4bsfHy
60 itrmZ321S4i0qMOtO/recTL79eQvCaxxuyl
70 itrmZ321S4i0qMOtO/recYDsbriissrNlUZ
80 itrmZ321S4i0qMOtO/recdNvox9c8RmC5oR
90 itrmZ321S4i0qMOtO/reciOH1mNu64vljT6
100 itrmZ321S4i0qMOtO/recnTkESbLehVRbdx
110 itrmZ321S4i0qMOtO/recsBWxnI0rkacrM7
120 itrmZ321S4i0qMOtO/recxECmofMwounhKc
126 itrmZ321S4i0qMOtO/reczl4Q8FE1JF0r91


In [7]:
L = len(all_pages)
print(f'{L} pages returned')
p = 0
for i in range(L):
    p = p + len(all_pages[i]['records'])
    
print(f'{p} records total')

#all_pages[120]['records'][0]['fields']

126 pages returned
12546 records total


In [8]:
#Check the number of unique email addresses. 
#Only get an email address if there is a Record ID.  Records without a Record ID are duplicates.

L = len(all_pages) #L is the number of "pages".  Need to iterate through each set.
emails = {}
for i in range(L):
    for j in range(len(all_pages[i]['records'])):
        f = all_pages[i]['records'][j]['fields']
        if 'Record ID' in f:
            e = all_pages[i]['records'][j]['fields']['Email']
            this_id = all_pages[i]['records'][j]['id']
            r = all_pages[i]['records'][j]['fields']['Record ID']
            if len(r) > 5:
                emails[this_id] = e

In [9]:
print(len(emails))
lst = emails.values()
print(len(lst))
email_counts = Counter(lst)
print(len(email_counts))
# #So, we have duplicate emails
unique_emails = email_counts.keys()
unique_emails = tuple(unique_emails)

12535
12535
12449


In [10]:
dup_emails = []

for key in email_counts.keys():
    if email_counts[key] > 1:
        dup_emails.append((key,email_counts[key]))

In [11]:
dup_emails = pd.DataFrame(dup_emails)

In [12]:
dup_emails.head()

Unnamed: 0,0,1
0,archana.bhakkad@atlanticaviation.com,2
1,jfcmail@embarqmail.com,2
2,engrbilalmehrban@gmail.com,2
3,antunesleonardo12@gmail.com,2
4,fmroque10@gmail.com,3


In [10]:
# Various parameters and secrets needed to connect to Presto

presto_host='presto-default.prod.twilio.com'
presto_port=8443
username= secret3['username'] 
password= secret3['password'] 
#password=getpass.getpass()

In [11]:
def connect(username,password):
    '''Create database connection and pass conn and cursor to another function'''
    conn=prestodb.dbapi.connect(
        host=presto_host,
        port=presto_port,
        user=username,
        catalog='hive',
        schema='public',
        http_scheme='https',
        auth=prestodb.auth.BasicAuthentication(username, password),
    )
    cursor = conn.cursor()
    return(cursor,conn)

In [12]:
# def get_emails(u,username,password,blank):
#     '''Step through a list of emails from Airtable and find them in Presto.
#        Some emails return multiple records which will be in a dataframe.
#        Concatenate those dataframes using the pd.concat function.  Return the concatenated dataframe.
#        This works, but might not be very efficient.
#     '''
#     count = 0
#     try:
#         cursor,conn = connect(username,password) #connect to database.
#     except:
#         print("Presto connection failed")
        
#     for email in u: 
#         count = count + 1
#         sql = "SELECT * FROM public.dim_contact WHERE email = '" + email + "'"
#         cursor.execute(sql)
#         df = pd.DataFrame(cursor.fetchall())
#         print(count, email,len(df))
#         if len(df) > 0:
#             blank = pd.concat([blank,df])
            
#     conn.close() #Close database connection
#     return(blank)
    

In [13]:
def run_sql(u=[],username,password,sql,returndf = True):
    '''
    Run any sql. Return a dataframe.  u is a variable full of emails, for that use case.
    '''
    try:
        cursor,conn = connect(username,password) #connect to database.
    except:
        print("Presto connection failed")

    cursor.execute(sql)
    if returndf:
        df = pd.DataFrame(cursor.fetchall())
        return(df)
    else:
        return()
            
    conn.close() #Close database connection


In [30]:
#create a temporary table of emails
#Seems to work better as separate statements

#um = unique_emails[0:10]

sql = """
DROP TABLE IF EXISTS dx_workspace.pdb_emails
"""
run_sql(um, username, password, sql, returndf=False)

sql = """
CREATE TABLE dx_workspace.pdb_emails (
    email varchar
    );
"""
run_sql(um, username, password, sql, returndf=False)

sql = """
GRANT SELECT ON dx_analytics_platform_workspace.pdb_emails TO PUBLIC;
GRANT INSERT ON dx_analytics_platform_workspace.pdb_emails TO PUBIC;
"""
run_sql(um, username, password, sql, returndf=False)

sql = """
INSERT INTO dx_workspace.pdb_emails (email)
VALUES ('jayson.m.webb@twlo.com')
"""
run_sql(um, username, password, sql, returndf=False)

sql = """
SELECT *
FROM dx_workspace.pdb_emails
"""
df = run_sql(um, username, password, sql, returndf=True)
df.head()

#drop it before session is over.
# sql = """
# DROP TABLE IF EXISTS dx_workspace.pdb_emails
# """
# run_sql(um, username, password, sql, returndf=False)
# rows = [(1,7,3000), (1,8,3500), (1,9,3900)]
# values = ', '.join(map(str, rows))
# sql = "INSERT INTO ... VALUES {}".format(values)

Unnamed: 0,0
0,jayson.m.webb@twlo.com


In [None]:
#create a temporary table of emails
#Seems to work better as separate statements

um = unique_emails[0:10]

sql = """
DROP TABLE IF EXISTS dx_workspace.pdb_emails
CREATE TABLE dx_workspace.pdb_emails (
    email varchar
    );
GRANT SELECT ON dx_analytics_platform_workspace.pdb_emails TO PUBLIC;
GRANT INSERT ON dx_analytics_platform_workspace.pdb_emails TO PUBIC;
INSERT INTO dx_workspace.pdb_emails (email)
VALUES ('jayson.m.webb@twlo.com')

SELECT *
FROM dx_workspace.pdb_emails
"""
df = run_sql(um, username, password, sql, returndf=True)
df.head()

#drop it before session is over.
# sql = """
# DROP TABLE IF EXISTS dx_workspace.pdb_emails
# """
# run_sql(um, username, password, sql, returndf=False)
# rows = [(1,7,3000), (1,8,3500), (1,9,3900)]
# values = ', '.join(map(str, rows))
# sql = "INSERT INTO ... VALUES {}".format(values)

ConnectTimeout: HTTPSConnectionPool(host='presto-default.prod.twilio.com', port=8443): Max retries exceeded with url: /v1/statement (Caused by ConnectTimeoutError(<urllib3.connection.HTTPSConnection object at 0x7fdd7e3feee0>, 'Connection to presto-default.prod.twilio.com timed out. (connect timeout=30.0)'))

In [39]:
#dup_emails.to_csv('dup_emails.csv')

In [None]:
#A field to update
#update_data = {"Email": "updated@viaAPI.com"}
#update_data

In [None]:
#Update the email address for a given record based on the record id (generated by AirTable)
#at.update(secret['AIRTABLE_TABLE_ID'], id, update_data)