# Data Cloud Batch Ingestion API

This notebook is a walkthrough of the process to ingest comma delimited data into a Data Cloud Instance utilizing the Streaming Ingestion API.
- Author: Jose Sifontes
- Email: jsifontes@salesforce.com

**Resources:**
These are the majority of the resources I utilized to develop this walkthrough process:
- [Share Ingestion API Information with your Developer](https://help.salesforce.com/s/articleView?id=sf.c360_a_developer_required_information.htm&type=5)
- [Get Started with Ingestion API](https://developer.salesforce.com/docs/atlas.en-us.c360a_api.meta/c360a_api/c360a_api_get_started.htm)
- [Requirements for Ingestion API Schema File](https://help.salesforce.com/s/articleView?id=sf.c360_a_ingestion_api_schema_req.htm&type=5)
- [Bulk Ingestion Walkthrough](https://developer.salesforce.com/docs/atlas.en-us.c360a_api.meta/c360a_api/c360a_api_bulk_insert_example.htm)
- [Data Cloud Developers: Bulk Ingestion](https://developer.salesforce.com/docs/atlas.en-us.c360a_api.meta/c360a_api/c360a_api_bulk_ingestion.htm)
- [Load Data Programmatically with the Ingestion API](https://developer.salesforce.com/blogs/2023/07/load-data-programmatically-with-the-ingestion-api)
- [Using the Ingestion API to Load Data into Data Cloud](https://www.youtube.com/watch?v=3xWSVGcTORI)

## Ingestion Process

### Authentication Process

##### Import Packages

In [58]:
import requests
import json
import pandas as pd
import config

##### Define Salesforce OAuth endpoints

In [59]:
authorization_url = 'https://login.salesforce.com/services/oauth2/authorize'
token_url = 'https://login.salesforce.com/services/oauth2/token'

##### Define Salesforce Credentials

In [60]:
client_id = config.client_id
client_secret = config.client_secret
username = config.username
password = config.password
security_token = config.security_token

##### Define Parameters Payload

In [61]:
payload = {
    'grant_type': 'password',
    'client_id': client_id,
    'client_secret': client_secret,
    'username': username,
    'password': password + security_token
}


##### Get Access Token

In [62]:
response = requests.post(token_url, data=payload)

##### Save and Print Instance and Access Token

In [63]:
    access_token = response.json()['access_token']
    instance_url = response.json()['instance_url']
    print("Access Token:", access_token)
    print("Instance URL:", instance_url)

Access Token: 00Dal000003O56w!AQEAQO1ZZscIRB1zgHQBuLne4rsuR5R5sf_Fas8UULN_1c.ZVGvRXtyZMe0un96ymeiiVMjvxs_KTcv2xaVF30FJu5HWYu_n
Instance URL: https://storm-4f9bcb560dac64.my.salesforce.com


### Generate Data Cloud Token Exchange

##### Define URL

In [64]:
url = instance_url + '/services/a360/token'

##### Define Headers

In [65]:
headers = {
    'Content-Type': 'application/x-www-form-urlencoded'
}

##### Define Payload

In [66]:
payload = {
    'grant_type': 'urn:salesforce:grant-type:external:cdp',
    'subject_token': access_token,
    'subject_token_type': 'urn:ietf:params:oauth:token-type:access_token'
}

##### Generate and Print Out Token

In [67]:
response = requests.post(url, data=payload, headers=headers)
dc_access_token = response.json()
print(json.dumps(dc_access_token,indent=4, sort_keys=True))

{
    "access_token": "eyJraWQiOiJDT1JFLjAwRGFsMDAwMDAzTzU2dy4xNzEyNTkwNzQ0NjM2IiwidHlwIjoiSldUIiwiYWxnIjoiRVMyNTYifQ.eyJzdWIiOiJodHRwczovL2xvZ2luLnNhbGVzZm9yY2UuY29tL2lkLzAwRGFsMDAwMDAzTzU2d0VBQy8wMDVhbDAwMDAwMTdGelJBQVUiLCJzY3AiOiJjZHBfcHJvZmlsZV9hcGkgY2RwX2luZ2VzdF9hcGkgY2RwX2lkZW50aXR5cmVzb2x1dGlvbl9hcGkgY2RwX3NlZ21lbnRfYXBpIGNkcF9xdWVyeV9hcGkgY2RwX2FwaSIsImlzcyI6Imh0dHBzOi8vbG9naW4uc2FsZXNmb3JjZS5jb20vIiwib3JnSWQiOiIwMERhbDAwMDAwM081NnciLCJpc3N1ZXJUZW5hbnRJZCI6ImNvcmUvcHJvZC8wMERhbDAwMDAwM081NndFQUMiLCJzZmFwcGlkIjoiM01WRzlIX0tWczZWOUxpTnhDU0d6bk9YdXdOV1hoU2N1clRsYTRmaE5maUt0blJFcUNPMDh6M2pndlJqaXhUdHU4M3VwdWlYSXJpREpjbEdueWdBRyIsImF1ZGllbmNlVGVuYW50SWQiOiJhMzYwL3Byb2QvMmRhNjgxYzNhYTYwNDYxYmIxYjRmZjBiNGRhOWNmOGUiLCJjdXN0b21fYXR0cmlidXRlcyI6eyJkYXRhc3BhY2UiOiJkZWZhdWx0In0sImF1ZCI6ImFwaS5hMzYwLnNhbGVzZm9yY2UuY29tIiwibmJmIjoxNzE2MzA4MTkzLCJzZm9pZCI6IjAwRGFsMDAwMDAzTzU2dyIsInNmdWlkIjoiMDA1YWwwMDAwMDE3RnpSIiwiZXhwIjoxNzE2Mzk0NTkwLCJpYXQiOjE3MTYzMDgyNTMsImp0aSI6IjAyYWIwNjFhLTBlNWEtNDIyNy

##### Save Token into a Variable

In [68]:
dc_token = dc_access_token['access_token']
print(dc_access_token['access_token'])

eyJraWQiOiJDT1JFLjAwRGFsMDAwMDAzTzU2dy4xNzEyNTkwNzQ0NjM2IiwidHlwIjoiSldUIiwiYWxnIjoiRVMyNTYifQ.eyJzdWIiOiJodHRwczovL2xvZ2luLnNhbGVzZm9yY2UuY29tL2lkLzAwRGFsMDAwMDAzTzU2d0VBQy8wMDVhbDAwMDAwMTdGelJBQVUiLCJzY3AiOiJjZHBfcHJvZmlsZV9hcGkgY2RwX2luZ2VzdF9hcGkgY2RwX2lkZW50aXR5cmVzb2x1dGlvbl9hcGkgY2RwX3NlZ21lbnRfYXBpIGNkcF9xdWVyeV9hcGkgY2RwX2FwaSIsImlzcyI6Imh0dHBzOi8vbG9naW4uc2FsZXNmb3JjZS5jb20vIiwib3JnSWQiOiIwMERhbDAwMDAwM081NnciLCJpc3N1ZXJUZW5hbnRJZCI6ImNvcmUvcHJvZC8wMERhbDAwMDAwM081NndFQUMiLCJzZmFwcGlkIjoiM01WRzlIX0tWczZWOUxpTnhDU0d6bk9YdXdOV1hoU2N1clRsYTRmaE5maUt0blJFcUNPMDh6M2pndlJqaXhUdHU4M3VwdWlYSXJpREpjbEdueWdBRyIsImF1ZGllbmNlVGVuYW50SWQiOiJhMzYwL3Byb2QvMmRhNjgxYzNhYTYwNDYxYmIxYjRmZjBiNGRhOWNmOGUiLCJjdXN0b21fYXR0cmlidXRlcyI6eyJkYXRhc3BhY2UiOiJkZWZhdWx0In0sImF1ZCI6ImFwaS5hMzYwLnNhbGVzZm9yY2UuY29tIiwibmJmIjoxNzE2MzA4MTkzLCJzZm9pZCI6IjAwRGFsMDAwMDAzTzU2dyIsInNmdWlkIjoiMDA1YWwwMDAwMDE3RnpSIiwiZXhwIjoxNzE2Mzk0NTkwLCJpYXQiOjE3MTYzMDgyNTMsImp0aSI6IjAyYWIwNjFhLTBlNWEtNDIyNy04MDU2LWI3OWZlYjdiYzk3M

### Batch Ingestion API Process

##### Define Header for Batch Ingestion API

In [69]:
headers = {
        'Authorization': f'Bearer {dc_token}',
        'Content-Type': 'application/json'
    }

##### Define Batch Ingestion API Endpoint URL

In [70]:
url = "https://gy4tqnr-mqzwcmrvmnsdqyzzh1.c360a.salesforce.com/api/v1/ingest/jobs"

### Creating Batch Ingestion Job

The Batch Ingestion process consists of 3 parts:
- Create a Job
- Upload to the Job
- Close Job

#### Create a Job

##### Define JSON Payload to send to API

In [71]:
payload = json.dumps({
    "object": "Transaction",
    "sourceName": "Transaction_Ingestion",
    "operation": "upsert"
})

##### Post Request

In [72]:
response = requests.request("POST", url, headers=headers, data=payload)

##### Check Response

In [73]:
print(response.text)

{"object":"Transaction","id":"3dda1d3a-0672-48d3-9ee4-bb6ffa55b0cd","operation":"upsert","sourceName":"Transaction_Ingestion","createdById":"005al0000017FzRAAU","createdDate":"2024-05-21T16:19:22.456058Z","systemModstamp":"","state":"Open","contentType":"CSV","apiVersion":"v1","contentUrl":"/api/v1/ingest/jobs/3dda1d3a-0672-48d3-9ee4-bb6ffa55b0cd/batches"}


##### Get Job ID and Store It

In [74]:
JobID = response.json()['id']
print(JobID)

3dda1d3a-0672-48d3-9ee4-bb6ffa55b0cd


#### Upload to Job

##### Establish the URL for the enpoint which includes the job ID at the end

In [75]:
url = f'https://gy4tqnr-mqzwcmrvmnsdqyzzh1.c360a.salesforce.com/api/v1/ingest/jobs/{JobID}/batches'

##### Define path for the CSV file with the data we want to pass to the payload

In [76]:
file_path = '/Users/jsifontes/Documents/Dev/Data Science/DataSets/Data Cloud/Transactions2.csv'

##### Read In the CSV File and Post the Request

In [77]:
with open(file_path, 'rb') as csvfile:
    headers = {
        'Authorization': f'Bearer {dc_token}',
        'Content-Type': 'text/csv'
    }
    response = requests.request("PUT", url, headers=headers, data=csvfile)

##### Print the Response

In [78]:
print(response.text)

{"accepted":true}


#### Close the Job

##### Establish the endpoint URL with the job ID at the end

In [79]:
url = f'https://gy4tqnr-mqzwcmrvmnsdqyzzh1.c360a.salesforce.com/api/v1/ingest/jobs/{JobID}'

##### Generate the payload for the request to let it know the job is complete

In [80]:
payload = json.dumps({
    "state": "UploadComplete"
})

##### Post the Request to Patch the connection

##### Redefine the Headers

In [81]:
headers = {
        'Authorization': f'Bearer {dc_token}',
        'Content-Type': 'application/json'
    }

##### Patch the Connection

In [82]:
response = requests.request("PATCH", url, headers=headers, data=payload)

##### Check the response

In [83]:
print(response.text)

{"object":"Transaction","id":"3dda1d3a-0672-48d3-9ee4-bb6ffa55b0cd","operation":"upsert","sourceName":"Transaction_Ingestion","createdById":"005al0000017FzRAAU","createdDate":"2024-05-21T16:19:22.456058Z","systemModstamp":"","state":"UploadComplete","contentType":"CSV","apiVersion":"v1"}


##### Check the Status of the Job

In [85]:
response = requests.get(url, headers=headers)
print(response.text)
print(response.status_code)

{"object":"Transaction","id":"3dda1d3a-0672-48d3-9ee4-bb6ffa55b0cd","operation":"upsert","sourceName":"Transaction_Ingestion","createdById":"005al0000017FzRAAU","createdDate":"2024-05-21T16:19:22.456058Z","systemModstamp":"","state":"InProgress","contentType":"CSV","apiVersion":"v1","contentUrl":"/api/v1/ingest/jobs/3dda1d3a-0672-48d3-9ee4-bb6ffa55b0cd/batches","retries":0,"totalProcessingTime":0}
200
