# Savor Data

> Taking advantage of my own big data.

A data-driven project by [Tobias Reaper](https://github.com/tobias-fyi/)

## Part 2: Pipelines

Here are the general steps in the pipeline:

1. Extract current data from Airtable
2. Apply any needed transformations
    * Fixing column datatypes
3. Save a copy to CSV, for posterity
4. Insert into local Postgres database

---
---

## Airtable

Data extraction from Airtable using [airtable-python-wrapper](https://github.com/gtalarico/airtable-python-wrapper).

In [5]:
# === Some initial imports and config === #
%load_ext autoreload
%autoreload

from os import environ
from pprint import pprint

from airtable import Airtable
import pandas as pd
import janitor

pd.options.display.max_rows = 100
pd.options.display.max_columns = 50

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
# === Set up environment variables === #
from dotenv import load_dotenv
from pathlib import Path

env_path = Path.cwd().parents[0] / ".env"
load_dotenv(dotenv_path=env_path)

# === Airtable keys === #
base_key = environ.get("AIRTABLE_BASE_KEY")
api_key = environ.get("AIRTABLE_API_KEY")

In [3]:
# === engage_log === #
table_name = "engage_log"
airtable = Airtable(base_key, table_name, api_key=api_key)
print(airtable)

<Airtable table:engage_log>


In [4]:
# === Get all engagement records, sorted by time_in === #
engage_log_records = airtable.get_all(sort=["time_in"])
pprint(engage_log_records[0])

{'createdTime': '2020-09-07T20:14:40.000Z',
 'fields': {'created': '2020-09-07T20:14:40.000Z',
            'duration': {'specialValue': 'NaN'},
            'id_num': 12344,
            'mental': ['recVFgOi7povUvYjF', 'recPkaZr3nxxYyS37'],
            'mental_note': 'Savor Data EDA and Viz - from CSV ::\n\n',
            'modified': '2020-09-07T20:14:54.000Z',
            'name': '12344-Sit-Cod',
            'physical': ['recAEH30q7XSj0DCG'],
            'project_location': ['recioKUrLNgcouGZW'],
            'project_log': ['recpZPk1EhkKVds9P'],
            'subloc': ['recSSXv8D15gISw4Y'],
            'tags': ['recRjlzORPPL9M6qe',
                     'recuxV21zzIxFJUCc',
                     'rec6NWSQTIDn6rBRY',
                     'recbDqHY61m2rocNM']},
 'id': 'receSicSHvRJm4DDI'}


In [9]:
# === mental === #
table_name = "mental"
airtable = Airtable(base_key, table_name, api_key)
mental_records = airtable.get_all()
# Output is weird due to how airtable connects tables
# pprint(mental_records[0])
# We only need the `airtable_id` and `name`

In [None]:
# === physical === #
table_name = "physical"
airtable = Airtable(base_key, table_name, api_key)
physical_records = airtable.get_all()
# Output is weird due to how airtable connects tables
# pprint(physical_records[0])
# We only need the `airtable_id` and `name`

### Primary Keys

I can't simply put the `fields` item into a dataframe and call it a day, because of the primary key / identifier that Airtable assigns on the back end of each record — e.g. `rec8GZsE62hEBtAst`. I'll need it to link records from different tables, but it's not included in the `fields`; it doesn't get brought into the dataframe without some extra processing.

In [6]:
def extract_and_concat_airtable_data(records: dict) -> pd.DataFrame:
    """Extracts fields from the airtable data and concatenates them with airtable id.
    Uses pyjanitor to clean up column names.
    """
    df = (  # Load and clean/fix names
        pd.DataFrame.from_records(records)
        .clean_names()
        .rename_column("id", "airtable_id")
    )
    df2 = pd.concat(  # Extract `fields` and concat to `airtable_id`
        [df["airtable_id"], df["fields"].apply(pd.Series)], axis=1
    )
    return df2

In [7]:
# === Use function to load and do initial transformations === #
df_engage_1 = 
df_engage_1.shape

(12272, 23)

In [8]:
# === Write to CSV to save on API calls === #
df_engage_2.to_csv("../assets/data_/20-09-06-engage_log.csv", index=False)

In [3]:
# === Test out loading from csv === #
df_engage_2 = pd.read_csv("../assets/data_/20-09-06-engage_log.csv")

In [4]:
df_engage_2.head(3)

Unnamed: 0,airtable_id,time_in,mental_note,physical_note,project_log,time_out,subloc,id_num,mental,physical,id,created,modified,duration,project_location,location,dose,task,tags,who,idea,task 2,money
0,recT93azq9mX8ecGC,2019-12-03T14:00:00.000Z,Full Stack Radio - Evan Yue \\ Vue 3.0 + new e...,Cardio - elliptical,['rec8GZsE62hEBtAst'],2019-12-03T14:19:00.000Z,['recptpk82lROLNhrS'],1,['rec04WWDmwUYsOfVR'],['recVFqFwGgz4dsAd6'],1-Exe-Pod,2019-11-24T21:58:14.000Z,2020-08-11T23:35:08.000Z,1140,['recyYL97lMDyqhG4M'],,,,,,,,
1,recGJ5ynXurXT87My,2019-12-03T14:19:00.000Z,Full Stack Radio with Evan Yue \\ Vue 3.0 - fi...,Cardio - stairs,['rec8GZsE62hEBtAst'],2019-12-03T14:37:00.000Z,['recQxGqlDnhl6JSQT'],2,['rec04WWDmwUYsOfVR'],['recVFqFwGgz4dsAd6'],2-Exe-Pod,2019-12-03T14:11:24.000Z,2020-08-11T23:35:08.000Z,1080,['recyYL97lMDyqhG4M'],,,,,,,,
2,recXMT29xVuku6sdD,2019-12-03T14:37:00.000Z,Django Chat \\ Caching - something to read up ...,Weights - hip abduction in / out (machine) - k...,['rec8GZsE62hEBtAst'],2019-12-03T15:02:00.000Z,['recdQfkyyrbP9E3T7'],3,['rec04WWDmwUYsOfVR'],['recVFqFwGgz4dsAd6'],3-Exe-Pod,2019-12-03T14:12:34.000Z,2020-08-11T23:35:08.000Z,1500,['recyYL97lMDyqhG4M'],,,,,,,,


---

## Transform

* Column data types
  * [ ] Date columns

In [None]:
# Use literal eval to convert cells with lists into actual python lists
from ast import literal_eval

# Convert to python list
df["mental"] = df["mental"].apply(literal_eval)

In [28]:
df_engage_2.dtypes

airtable_id         object
time_in             object
mental_note         object
physical_note       object
project_log         object
time_out            object
subloc              object
code                 int64
mental              object
physical            object
id                  object
created             object
modified            object
duration            object
project_location    object
location            object
dose                object
task                object
tags                object
who                 object
idea                object
task 2              object
$dojo               object
dtype: object

In [42]:
pd.to_datetime(df_engage_2["time_in"])

0       2019-12-03 14:00:00+00:00
1       2019-12-03 14:19:00+00:00
2       2019-12-03 14:37:00+00:00
3       2019-12-03 15:02:00+00:00
4       2019-12-03 15:08:00+00:00
                   ...           
12261   2020-09-06 22:51:00+00:00
12262   2020-09-06 23:16:00+00:00
12263   2020-09-06 23:22:00+00:00
12264   2020-09-06 23:29:00+00:00
12265   2020-09-06 23:35:00+00:00
Name: time_in, Length: 12266, dtype: datetime64[ns, UTC]

In [32]:
# === Automate datetime conversion in pipeline === #
datetime_cols = [
    "time_in",
    "time_out",
    "created",
    "modified",
    "date",
]

def convert_datetime_cols(data: pd.DataFrame, dt_cols: list) -> pd.DataFrame:
    """If datetime columns exist in dataframe, convert them to datetime.

    :param data (pd.DataFrame) : DataFrame with datetime cols to be converted.
    :param dt_cols (list) : List of potential datetime cols.
    :return (pd.DataFrame) : DataFrame with datetime cols converted.
    """
    data = data.copy()  # Don't change original dataframe
    for col in dt_cols:
        if col in data.columns:  # Make sure column exists
            data[col] = pd.to_datetime(data[col])
    return data

True

In [9]:
df_engage_2.iloc[12268, 8]

"['recVFgOi7povUvYjF', 'recPkaZr3nxxYyS37', 'recJjXRBEtra3FAmr']"

---

## Postgres

Inserting the extracted data into a local Postgres instance using SQLAlchemy.

The SQLAlchemy `create_engine` function uses the following connection string format:

    dialect+driver://username:password@host:port/database

In [27]:
# === Set up connection to postgres db === #
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

pg_user = environ.get("PG_USER")
pg_pass = environ.get("PG_PASS")
db_uri = f"postgresql+psycopg2://{pg_user}:{pg_pass}@localhost:5432/savor"
engine = create_engine(uri, echo=True)

# Instantiate new session
Session = sessionmaker(bind=engine)
session = Session()

### Creating the tables

The first time this pipeline is run, the tables will have to be created in the Postgres database.



In [None]:
# === Define the declarative base class === #
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

In [1]:
# === Data model === #
from savor_code.models import Project, Engagement, Moment
    
    