# DB2-Salesforce connector: Basic user information updates

In [1]:
# Parameters
hours_range = 24 # number of days to look back

# API settings
bulk_api_threshold = 100 # if more records than this, use Bulk API instead
disallow_bulk_api = False # if Bulk API is allowed

composite_api_limit = 25 # maximum number of records being passed via Composite API
composite_url = '/services/data/v43.0/sobjects/Contact/nanoHUB_user_ID__c/'

api_url = '/services/data/v43.0/sobjects'
external_id = 'nanoHUB_user_ID__c'
object_id = 'Contact'

# Salesforce login parameters
sf_login_params = {
    "grant_type":"password", 
    "client_id":"3MVG95jctIhbyCppj0SNJ75IsZ1y8UPGZtSNF4j8FNVXz.De8Lu4jHm3rjRosAtsHy6qjHx3i4S_QbQzvBePG", \
    "client_secret":"D1623C6D3607D4FC8004B92C761DFB6C1F70CCD129C5501E357028DFA00F5764", \
    "username":"wang159-4j1v@force.com", \
    "password":"napoleon0eZ3PQpQqE3C3z4wWqEqKGhQ8"
}

# 

In [2]:
import pandas as pd
import datetime

## Obtain contacts from DB2 
that have last visit date within range of interest specified by day_range

In [3]:
# Depending on the task, use different query and cutoff

# Hourly update for new registrations
date_cutoff = (datetime.datetime.today().date() - datetime.timedelta(hours=hours_range))\
                    .strftime('%Y-%m-%d')
sql_query = "select id, name, username, block, email, sendEmail, registerDate, lastvisitDate \
                    from jos_users where registerDate >= '%s'" % date_cutoff

# TEST
#date_cutoff = (datetime.datetime.today().date() - datetime.timedelta(hours=hours_range))\
#                    .strftime('%Y-%m-%d')
#sql_query = "select id, name, username, block, email, sendEmail, registerDate, lastvisitDate \
#                    from jos_users"
# display
print(sql_query)

select id, name, username, block, email, sendEmail, registerDate, lastvisitDate                     from jos_users where registerDate >= '2020-05-01'


In [4]:
# connect with DB2
import sqlalchemy as sql

engine = sql.create_engine('mysql+pymysql://wang159_ro:napoleon0@127.0.0.1/nanohub')
df = pd.read_sql_query(sql_query, engine)

In [5]:
# display
df.head(3)

Unnamed: 0,id,name,username,block,email,sendEmail,registerDate,lastvisitDate
0,286506,Homero Gonzalez,hgonzalez42,0,hgonzalez42@students.tntech.edu,0,2020-05-01 00:13:42,2020-05-01 00:15:05
1,286507,Yejin KIM,yejin808,0,yejin808@gmail.com,0,2020-05-01 01:02:08,2020-05-01 01:02:09
2,286508,Ana Paula Lima Batista,-136577,0,-136577@invalid,0,2020-05-01 01:25:50,2020-05-01 01:25:51


## Match data with Salesforce format

#### Salesforce contact fields
API name : explaination

- firstname
- Middle_name__c
- lastname
- Email
- HasOptedOutOfEmail
- nanoHUB_account_BLOCKED__c
- nanoHUB_last_active_date__c
- nanoHUB_registration_date__c
- nanoHUB_user_ID__c
- nanoHUB_username__c

In [6]:
# split full name into first, middle, and last names
def split_full_name(this_name):
    this_name_list = list(filter(None, this_name.split(' ')))
    
    if len(this_name_list) == 1:
        # single word name
        return pd.Series([this_name_list[0],None, this_name_list[0]])
    
    elif len(this_name_list) > 1:
        # multi word name
        return pd.Series([this_name_list[0],' '.join(this_name_list[1:-1]),this_name_list[-1]])

In [7]:
df_sf = pd.DataFrame()

# Make sure NaN and NaT values are taken care of here
df_sf[['firstname', 'Middle_name__c', 'lastname']] = df['name'].apply(split_full_name)

df_sf['nanoHUB_user_ID__c']            = df['id']
df_sf['nanoHUB_username__c']           = df['username']
df_sf['Email']                         = df['email'].fillna('').apply(lambda x: '' if '@invalid' in x else x)

# for sendEmail: 0 = opt-out, 1 = receive email. For salesforce HasOptedOutOfEmail, it's exact opposite
df_sf['HasOptedOutOfEmail']            = df['sendEmail'].apply(lambda x: 0 if x==1 else 1)
df_sf['nanoHUB_account_BLOCKED__c']    = df['block'].fillna(0)

# solidify time-related columns from datetime to string
df_sf['nanoHUB_registration_date__c']  = df['registerDate'].dt.date.fillna('').astype('str')
df_sf['nanoHUB_last_active_date__c']   = df['lastvisitDate'].dt.date.fillna('').astype('str')

sf_original_fields = df_sf.columns

# display
df_sf.head(2).T

Unnamed: 0,0,1
firstname,Homero,Yejin
Middle_name__c,,
lastname,Gonzalez,KIM
nanoHUB_user_ID__c,286506,286507
nanoHUB_username__c,hgonzalez42,yejin808
Email,hgonzalez42@students.tntech.edu,yejin808@gmail.com
HasOptedOutOfEmail,1,1
nanoHUB_account_BLOCKED__c,0,0
nanoHUB_registration_date__c,2020-05-01,2020-05-01
nanoHUB_last_active_date__c,2020-05-01,2020-05-01


## To Salesforce Sales Cloud CRM

In [8]:
# Username: wang159-4j1v@force.com
# Security token (case-sensitive): eZ3PQpQqE3C3z4wWqEqKGhQ8

# consumer key: 3MVG95jctIhbyCppj0SNJ75IsZ1y8UPGZtSNF4j8FNVXz.De8Lu4jHm3rjRosAtsHy6qjHx3i4S_QbQzvBePG
# consumer secret: D1623C6D3607D4FC8004B92C761DFB6C1F70CCD129C5501E357028DFA00F5764

In [56]:
import requests

# obtain access token
response = requests.post("https://login.salesforce.com/services/oauth2/token", params=sf_login_params)
access_token = response.json()['access_token']

In [None]:
# Issuing a job request
response = requests.post('https://na172.salesforce.com/services/data/v43.0/jobs/ingest/', 
                    headers={"Authorization": "Bearer %s" %access_token, 
                             'Content-Type': 'application/json; charset=UTF-8',
                             'Accept': 'application/json'},
                    json={
                            "object" : "Contact",
                            "externalIdFieldName" : 'nanoHUB_user_ID__c',
                            "contentType" : "CSV",
                            "operation" : "upsert"
                    })    

In [38]:
response = requests.get('https://na172.salesforce.com/services/data/v43.0/query/', 
                    headers={"Authorization": "Bearer %s" %access_token},
                    params={
                            "q" : "select id from contact where nanoHUB_username__c = 'wang159'"
                    })

In [39]:
response.json()

{'totalSize': 1,
 'done': True,
 'records': [{'attributes': {'type': 'Contact',
    'url': '/services/data/v43.0/sobjects/Contact/0035w000034JEs4AAG'},
   'Id': '0035w000034JEs4AAG'}]}

In [49]:
# Composite REST call to Salesforce
response = requests.patch('https://na172.salesforce.com/services/data/v43.0/sobjects/contact_citation_asso__c/name/test_api', 
                    headers={"Authorization": "Bearer %s" %access_token, 
                             'Content-Type': 'application/json; charset=UTF-8',
                             'Accept': 'application/json'},
                    json={
                            "Contact__c" : '0035w000034JEs4AAG',
                            "Citation__c" : 'a0t5w000008oQILAA2'
                    })   

In [50]:
response.json()

{'id': 'a0u5w00000Rh91pAAB', 'success': True, 'errors': []}

In [111]:
# bulk get
# Issuing a job request
response = requests.post('https://na172.salesforce.com/services/data/v47.0/jobs/query', 
                    headers={"Authorization": "Bearer %s" %access_token, 
                             'Content-Type': 'application/json; charset=UTF-8',
                             'Accept': 'application/json'},
                    json={
                            "query" : "SELECT Id, nanoHUB_user_ID__c FROM Contact where nanoHUB_user_ID__c != NULL",
                            "operation" : "query"
                    })    

In [112]:
if not response.ok:
    # job request not successful
    print('[FAIL] Bulk job creation failed ...')
    raise
else:
    # job request successful
    print('[Success] Bulk job creation successful. Job ID = %s'%response.json()['id'])

job_id = response.json()['id']

[Success] Bulk job creation successful. Job ID = 7505w00000LMxDsAAL


In [113]:
response.text

'{"id":"7505w00000LMxDsAAL","operation":"query","object":"Contact","createdById":"0055w00000ArpYvAAJ","createdDate":"2020-05-02T23:00:40.000+0000","systemModstamp":"2020-05-02T23:00:40.000+0000","state":"UploadComplete","concurrencyMode":"Parallel","contentType":"CSV","apiVersion":47.0,"lineEnding":"LF","columnDelimiter":"COMMA"}'

In [119]:
# monitor result
# check status
response = requests.get('https://na172.salesforce.com/services/data/v47.0/jobs/query/%s'%job_id, 
                        headers={"Authorization": "Bearer %s" %access_token}
                        )

display(response.json()['state'])

'JobComplete'

In [87]:
# get result
# Issuing a job request
response = requests.get('https://na172.salesforce.com/services/data/v47.0/jobs/query/%s/results' %job_id, 
                    headers={"Authorization": "Bearer %s" %access_token, 
                             'Content-Type': 'application/json; charset=UTF-8',
                             'Accept': 'application/json'}
                   )    

In [104]:
from io import StringIO


TESTDATA = StringIO(response.text)

nanoHUB_user_ID_df = pd.read_csv(TESTDATA)

In [107]:
nanoHUB_user_ID_df.nanoHUB_user_ID__c.value_counts()

145436.0    1
118959.0    1
274728.0    1
86051.0     1
90228.0     1
           ..
48729.0     1
16880.0     1
54108.0     1
285012.0    1
65536.0     1
Name: nanoHUB_user_ID__c, Length: 216165, dtype: int64

In [None]:
if not response.ok:
    # job request not successful
    print('[FAIL] Bulk job creation failed ...')
    raise
else:
    # job request successful
    print('[Success] Bulk job creation successful. Job ID = %s'%response.json()['id'])

job_id = response.json()['id']

# Save dataframe into CSV. Using Salesforce Bulk 2.0 API, CSV file should not exceed 150 MB
bulk_csv = bytes(df_sf.to_csv(index=False), 'utf-8').decode('utf-8','ignore').encode("utf-8")

# Put CSV content to bulk job
# json={"body" : './temp_bulk.csv'}
response = requests.put('https://na172.salesforce.com/services/data/v43.0/jobs/ingest/%s/batches/'%job_id, 
                        headers={"Authorization": "Bearer %s" %access_token, 
                                 'Content-Type': 'text/csv',
                                 'Accept': 'application/json'},
                        data = bulk_csv
                        )

if not response.ok:
    # CSV upload not successful
    print('[FAIL] CSV upload failed ...')
    raise
else:
    # CSV upload successful
    print('[Success] CSV upload successful. Job ID = %s'%job_id)

# Close the job, so Salesforce can start processing data
response = requests.patch('https://na172.salesforce.com/services/data/v43.0/jobs/ingest/%s'%job_id,
                    headers={"Authorization": "Bearer %s" %access_token, 
                             'Content-Type': 'application/json; charset=UTF-8',
                             'Accept': 'application/json'},
                    json={
                            "state" : "UploadComplete"
                    })  

if not response.ok:
    # job close not successful
    print('[FAIL] Closing job failed ...')
    raise
else:
    # job close successful
    print('[Success] Closing job successful. Job ID = %s'%job_id)

In [None]:
error

In [503]:
# Determine if using Composite or Bulk API

if (df_sf.shape[0] <= bulk_api_threshold) | (disallow_bulk_api):
    api_option = 'composite'
    
else:
    api_option = 'bulk'

print('%s API is selected' %api_option.capitalize())

Composite API is selected


In [504]:
# Composite API: form JSON composite input
if api_option == 'composite':
    # prepare composite JSON fields
    df_sf['body'] = df_sf[sf_original_fields].drop('nanoHUB_user_ID__c', axis=1)\
                         .fillna('').to_dict(orient='records')
    df_sf['method'] = 'PATCH'
    df_sf['url'] = df_sf.nanoHUB_user_ID__c.apply(lambda x: composite_url+str(x))
    df_sf['referenceId'] = df_sf.nanoHUB_user_ID__c.apply(lambda x: 'Contact_'+str(x))

    record_list = df_sf[['method', 'url', 'referenceId', 'body']].to_dict(orient='records')

    record_total = len(record_list) # total number of records
    print('%d total records are found.' %record_total)

    record_index = 0
    response_list = list()

    while record_index < record_total:
        # form JSON for composite API call
        composite_json = {'compositeRequest': record_list[record_index:(record_index+composite_api_limit)]}

        # increase record index position
        record_index = record_index + composite_api_limit
        #pprint(composite_json)

        # Composite REST call to Salesforce
        response = requests.post('https://na172.salesforce.com/services/data/v43.0/composite', 
                            headers={"Authorization": "Bearer %s" %access_token},
                            json=composite_json)

        # save response
        response_list.append(response)

100 total records are found.


In [505]:
# Bulk API
if api_option == 'bulk':
    
    # Issuing a job request
    response = requests.post('https://na172.salesforce.com/services/data/v43.0/jobs/ingest/', 
                        headers={"Authorization": "Bearer %s" %access_token, 
                                 'Content-Type': 'application/json; charset=UTF-8',
                                 'Accept': 'application/json'},
                        json={
                                "object" : "Contact",
                                "externalIdFieldName" : 'nanoHUB_user_ID__c',
                                "contentType" : "CSV",
                                "operation" : "upsert"
                        })    
    
    if not response.ok:
        # job request not successful
        print('[FAIL] Bulk job creation failed ...')
        raise
    else:
        # job request successful
        print('[Success] Bulk job creation successful. Job ID = %s'%response.json()['id'])
    
    job_id = response.json()['id']
    
    # Save dataframe into CSV. Using Salesforce Bulk 2.0 API, CSV file should not exceed 150 MB
    bulk_csv = bytes(df_sf.to_csv(index=False), 'utf-8').decode('utf-8','ignore').encode("utf-8")
    
    # Put CSV content to bulk job
    # json={"body" : './temp_bulk.csv'}
    response = requests.put('https://na172.salesforce.com/services/data/v43.0/jobs/ingest/%s/batches/'%job_id, 
                            headers={"Authorization": "Bearer %s" %access_token, 
                                     'Content-Type': 'text/csv',
                                     'Accept': 'application/json'},
                            data = bulk_csv
                            )
    
    if not response.ok:
        # CSV upload not successful
        print('[FAIL] CSV upload failed ...')
        raise
    else:
        # CSV upload successful
        print('[Success] CSV upload successful. Job ID = %s'%job_id)
    
    # Close the job, so Salesforce can start processing data
    response = requests.patch('https://na172.salesforce.com/services/data/v43.0/jobs/ingest/%s'%job_id,
                        headers={"Authorization": "Bearer %s" %access_token, 
                                 'Content-Type': 'application/json; charset=UTF-8',
                                 'Accept': 'application/json'},
                        json={
                                "state" : "UploadComplete"
                        })  
    
    if not response.ok:
        # job close not successful
        print('[FAIL] Closing job failed ...')
        raise
    else:
        # job close successful
        print('[Success] Closing job successful. Job ID = %s'%job_id)
    

In [506]:
# Bulk API
if api_option == 'bulk':
    
    # check status
    response = requests.get('https://na172.salesforce.com/services/data/v43.0/jobs/ingest/%s'%job_id, 
                            headers={"Authorization": "Bearer %s" %access_token}
                            )
    
    display(response.json())