# Using Airtable API to build an OMOP CDM table

## Prerequisite from Airtable 
- [NOTE] Before running this script, add your relevant Airtable ``base_id``, ``table_id`` and ``API_KEY`` 

- **Airtable Setup:** Airtable base with the data you want to extract. Airtable API key from your Account Settings.

In [None]:
# Set up your request URL and headers
AIRTABLE_BASE_ID = ''
TABLE_ID = ''
AIRTABLE_API_KEY = ''

## Setup request URL and parameters 

In [None]:
url = f'https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{TABLE_ID}'

In [None]:
# Set up parameters for the request (specify the number of records per page)
params = {
    'pageSize': 100
}

## Functions

In [None]:
# Create header function 
def create_header():
    header = {
        'Authorization': 'Bearer ' + str(AIRTABLE_API_KEY),
        'Content-Type': 'application/json',
    }
    return header

In [None]:
# Create function to read sql script and execute command on db table
from sqlite3 import OperationalError

def executeScriptsFromFile(filename):
    fd = open(filename, 'r')
    sqlFile = fd.read()
    fd.close()

    # all SQL commands (split on ';')
    sqlCommands = sqlFile.split(';')

    # Execute every command from the input file
    for command in sqlCommands:
        try:
            conn.execute(command)
            conn.commit()
        except OperationalError as msg:
            print("Command skipped: ", msg)

## Import libraries 

- **Python Environment:** Utilise the ``requests`` library to make API calls to the Airtable endpoint for desired table.

In [None]:
import requests
import pandas as pd
import json

## Extract data 

- **API Calls:** Construct the URL (Uniform Resource Locator) using the Airtable API ``base URL``, ``base ID``, and ``table ID``.

- **Pagination:** Airtable limits records per request. Use the ``offset`` parameter in the URL to retrieve all records iteratively.

- **Data Parsing:** Parse the JSON response from the API call to extract the list of records. Each record will be a dictionary with its fields and an ID.


In [None]:
# List all records 
headers = create_header()

all_records = []

# Make initial request to fetch first set of records
response = requests.get(url, headers=headers, params=params)
data = response.json()
all_records.extend(data['records'])

# Pass this offset into the next request to fetch the next page of records.
# Continue making requests with 'offset', adding to the all_records until no more records
while 'offset' in data:
    params['offset'] = data['offset']
    response = requests.get(url, headers=headers, params=params)
    data = response.json()
    all_records.extend(data['records'])

# Now all_records contains all the retrieved records
print(len(all_records))

# Convert all_records list to a DataFrame (optional)
if all_records:
  df = pd.DataFrame(all_records)
  print(df)
else:
  print("No records found in the table.")

In [None]:
print(type(data))
print(data)
print(type(all_records))
print(all_records)
print(type(df))
print(df)

In [None]:
data = json.loads(response.text)
# List all records from the table
for record in data['records']:
    print(record['fields']) 

In [None]:
# Extract fields from records and convert to a pandas dataframe
df_fields = pd.DataFrame.from_records((record['fields'] for record in all_records))
print(df_fields)

## Transform data

- **Extract fields:** Extract fields from records and transform to a dataframe. 
- **Temporary databases:**

    Load dataframe into temporary database in memory. 
    
    Run SQL script on temporary database and load transformed data into a placeholder database in memory.


In [None]:
import sqlite3

# Connect to a temporary database in memory
conn = sqlite3.connect(':memory:')

# Create a cursor object
cursor = conn.cursor()

# Define your DataFrame (replace with your actual data)
data_to_insert = df_fields
# data_to_insert

In [None]:
# Define your temporary table schema (replace with your column names and data types)
sql_create_table = """
CREATE TEMPORARY table patients (
            id            varchar(1000),
            birthdate     date,
            deathdate     date,
            ssn           varchar(100),
            drivers       varchar(100),
            passport      varchar(100),
            prefix        varchar(100),
            first         varchar(100),
            last          varchar(100),
            suffix        varchar(100),
            maiden        varchar(100),
            marital       varchar(100),
            race          varchar(100),
            ethnicity     varchar(100),
            gender        varchar(100),
            birthplace    varchar(100),
            address       varchar(100),
            city		  varchar(100),
            state		  varchar(100),
            zip			  varchar(100)
            );
"""

cursor.execute(sql_create_table)


In [None]:
data_to_insert.to_sql('patients', conn, index=False, if_exists='replace')

# Query the temporary table
query_result = pd.read_sql('SELECT * FROM patients', conn)

print(query_result)

In [None]:
# Placeholder table for Person 
# Define your temporary table schema (replace with your column names and data types)
sql_create_CDM_table = """
CREATE TEMPORARY table person (
			person_id integer NOT NULL,
			gender_concept_id integer NOT NULL,
			year_of_birth integer NOT NULL,
			month_of_birth integer NULL,
			day_of_birth integer NULL,
			birth_datetime datetime NULL,
			race_concept_id integer NOT NULL,
			ethnicity_concept_id integer NOT NULL,
			location_id integer NULL,
			provider_id integer NULL,
			care_site_id integer NULL,
			person_source_value varchar(50) NULL,
			gender_source_value varchar(50) NULL,
			gender_source_concept_id integer NULL,
			race_source_value varchar(50) NULL,
			race_source_concept_id integer NULL,
			ethnicity_source_value varchar(50) NULL,
			ethnicity_source_concept_id integer NULL );
"""

cursor.execute(sql_create_CDM_table)


In [None]:
executeScriptsFromFile('/Users/solmazeradat/Downloads/insert_person_table.sql')

In [None]:
# Query the temporary table
query_result = pd.read_sql('SELECT * FROM person', conn)
print(query_result)

In [None]:
type(query_result)
query_result

## Load data

- [NOTE] Before running this script, add your relevant Airtable ``base_id``, ``table_id`` and ``API_KEY`` 

- **Airtable Setup:** Airtable base with the data you want to extract. Airtable API key from your Account Settings.
- **API Calls:** Construct the URL using the Airtable API ``base URL``, ``base ID``, and ``table ID``. Use a post request to load records transformed database to the Airtable base.


In [None]:
# Define your Airtable API key and base ID
AIRTABLE_BASE_ID = ''
TABLE_ID = ''
AIRTABLE_API_KEY = ''


# Define the URL for the Airtable endpoint
url = f'https://api.airtable.com/v0/{AIRTABLE_BASE_ID}/{TABLE_ID}'

# Convert DataFrame to dictionary
records = query_result.to_dict(orient='records')

# Push data to Airtable table
for record in records:
    response = requests.post(url, headers=headers, json={'fields': record})
    # print(response)
    if response.status_code == 200:
        print("Record posted successfully.")
else:
    print(f"Failed to post record with status code {response.status_code}: {response.text}")