# MVP - JSON in sqlite

Instead of shoehorning JSONs into a traditional relational DB, store as-is, and leverage the JSON features to get our delay data

1. call aviationstack API
2. save json response to sqlite3 db
3. extract data from response
4. tweet data

In [1]:
import os
import requests
from urllib3.util import Retry
from requests import Session, HTTPError
from requests.adapters import HTTPAdapter
from requests.exceptions import ReadTimeout
import sqlite3
import json
from pathlib import Path
from dotenv import load_dotenv
from datetime import datetime, timezone, timedelta
from time import sleep
import logging
from sys import stdout
import tomllib
import jinja2

In [2]:
logging.basicConfig(
    format="%(asctime)s [%(levelname)s] %(funcName)s: %(message)s",
    datefmt="%Y/%m/%d %H:%M:%S",
    handlers=[logging.StreamHandler(stdout)],
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

In [3]:
env_path = Path("../.env")
load_dotenv(env_path)
AV_API_KEY = os.getenv("AVIATION_API_KEY", "")
AV_API_URL = "http://api.aviationstack.com/v1/"
FLIGHT_API_URL = AV_API_URL + "flights"

In [4]:
toml_path = Path("../pyproject.toml")
with open(toml_path, "rb") as f:
    config = tomllib.load(f)

DB_NAME = config["sqlite"]["db_name"]
TBL_NAME = config["sqlite"]["tbl_name"]
JSON_COL = config["sqlite"]["json_col"]

In [5]:
def write_local_json(
    api_response: dict,
    json_dir: Path,
    str_date: str = str(datetime.now(tz=timezone.utc).date()),
    offset: int = 0,
    limit: int = 100,
):
    """
    Saves the flight api response as json, to be uploaded to a data lake
    json will be named according to the UTC date of when it was retrieved
    """
    if not json_dir.exists():
        json_dir.mkdir(parents=True)
    local_json_path = json_dir / f"flight-{str_date}-{offset}-{offset+limit}.json"
    logger.info(f"saving to {local_json_path}")
    with open(local_json_path, "w") as j:
        json.dump(api_response, j)
        logger.debug(f"saved to {local_json_path}")
    return local_json_path

In [6]:
def get_all_delays(
    json_dir: str,
    limit: int = 100,
    airline: str = "Malaysia Airlines",
    min_delay: int = 1,
    str_date: str = str(datetime.now(tz=timezone.utc).date() - timedelta(days=1)),
):
    sesh = Session()
    adapter = HTTPAdapter(
        max_retries=Retry(
            total=3,
            backoff_factor=0.1,
            status_forcelist=[500, 502, 503, 504],
            # allowed_methods={"POST"},
        )
    )
    sesh.mount(AV_API_URL, adapter)
    responses = []
    retrieved = total = 0
    logger.info(f"Retrieving delayed flights for {str_date}")
    while not total or retrieved < total:
        sleep(0.5)
        logger.info(f"retrieving {retrieved}th to {retrieved + limit}th")
        params = {
            "access_key": AV_API_KEY,  # retrieved from .env, global scope
            "offset": retrieved,
            "limit": limit,
            "airline_name": airline,
            "min_delay_arr": min_delay,
        }
        try:
            response = sesh.get(
                url=FLIGHT_API_URL,
                params=params,
                timeout=30.0,
            )
            response.raise_for_status()
        except HTTPError as exc:
            logger.error(f"HTTP Error: \n{exc}")

        except ReadTimeout as e:
            logger.error(
                f"Timeout retrieving {retrieved}th to {retrieved + limit}th:\n{e}"
            )
        # save response
        logger.debug(f"retrieved {retrieved}th to {retrieved + limit}th")
        responses.append(response.json())
        json_path = write_local_json(
            responses[-1], json_dir=json_dir, str_date=str_date, offset=retrieved
        )
        retrieved += responses[-1]["pagination"]["count"]
        if not total:
            # First request; get total count
            total = responses[0]["pagination"]["total"]
            logger.info(f"Total records count: {total}")
            if total == 0:
                # prevent infinite loop if there are no records retrieved
                logger.error("Zero records retrieved; exiting")
                break
    return responses

## 1. Fetch the responses

In [8]:
responses = get_all_delays(Path("../data/responses/"))

2023/10/17 09:47:22 [INFO] get_all_delays: Retrieving delayed flights for 2023-10-16
2023/10/17 09:47:23 [INFO] get_all_delays: retrieving 0th to 100th
2023/10/17 09:47:27 [DEBUG] get_all_delays: retrieved 0th to 100th
2023/10/17 09:47:33 [INFO] write_local_json: saving to ../data/responses/flight-2023-10-16-0-100.json
2023/10/17 09:47:33 [DEBUG] write_local_json: saved to ../data/responses/flight-2023-10-16-0-100.json
2023/10/17 09:47:33 [INFO] get_all_delays: Total records count: 0
2023/10/17 09:47:33 [ERROR] get_all_delays: Zero records retrieved; exiting


In [12]:
# get existing json, for testing
responses = []
str_date = "2023-10-13"
json_dir = Path("../data/responses")
json_paths = json_dir.glob(f"flight-{str_date}-*.json")
for json_file in json_paths:
    logger.debug(f"looking for {json_file}")
    with open(json_file) as j:
        flight_page = json.load(j)
        responses.extend(flight_page["data"])

logger.info(f"{len(responses)} entries on {str_date}")

2023/10/17 10:09:16 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-700-800.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-800-900.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-400-500.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-100-200.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-300-400.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-600-700.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-200-300.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-0-100.json
2023/10/17 10:09:19 [DEBUG] <module>: looking for ../data/responses/flight-2023-10-13-500-600.json
2023/10/17 10:09:19 [INFO] <module>: 819 entries on 2023-10-13


In [52]:
def execute_template_sql(
    db_conn: sqlite3.Connection,
    env: jinja2.Environment,
    template: str,
    params: dict,
    data: list,
):
    """
    Renders the jinja templated sql and executes,
    returning results if any
    """
    sql = env.get_template(template).render(params)
    logger.debug(f"rendered SQL:\n{sql}")
    with db_conn:
        if data:
            db_conn.executemany(sql, data)
            return None
        else:
            return db_conn.executescript(sql)

## 2. Insert JSON to sqlite

In [54]:
# app args
data_dir = Path("../data")
template_dir = Path("../templates")
params = dict(
    tbl_name=TBL_NAME,
    json_col=JSON_COL,
)
# instantiate db conn and jinja env
env = jinja2.Environment(loader=jinja2.FileSystemLoader(template_dir))
# search for existing db
# db_path = data_dir / f"{DB_NAME}.db"db_path = data_dir / "sample.db"
# check before connecting, which implicitly create
db_exists = db_path.exists()
db_conn = sqlite3.connect(db_path)
if not db_exists:
    logger.info(f"{db_path} does not exist, initializing...")
    execute_template_sql(db_conn, env, "create.sql", params)

# UPSERT data
flights = [(json.dumps(flight),) for flight in responses]
# db_conn.executemany(f"INSERT OR REPLACE INTO {TBL_NAME} ({JSON_COL}) VALUES( ? )", flights)
execute_template_sql(db_conn, env, "insert.sql", params, flights)

2023/10/17 14:59:05 [DEBUG] execute_template_sql: rendered SQL:
INSERT OR REPLACE INTO import_flight_records (flights_json) VALUES ( ? );


Testing if the transaction was successful

In [60]:
db_conn = sqlite3.connect(db_path)
for row in db_conn.execute(
    f"select * from {TBL_NAME} where arr_airport_code = 'KUL' LIMIT 3"
):
    print(row)

('{"flight_date": "2023-10-12", "flight_status": "active", "departure": {"airport": "Seoul (Incheon)", "timezone": "Asia/Seoul", "iata": "ICN", "icao": "RKSI", "terminal": "2", "gate": "235", "delay": 25, "scheduled": "2023-10-12T16:35:00+00:00", "estimated": "2023-10-12T16:35:00+00:00", "actual": "2023-10-12T17:00:00+00:00", "estimated_runway": "2023-10-12T17:00:00+00:00", "actual_runway": "2023-10-12T17:00:00+00:00"}, "arrival": {"airport": "Kuala Lumpur International Airport (klia)", "timezone": "Asia/Kuala_Lumpur", "iata": "KUL", "icao": "WMKK", "terminal": "1", "gate": null, "baggage": null, "delay": 1, "scheduled": "2023-10-12T21:55:00+00:00", "estimated": "2023-10-12T21:55:00+00:00", "actual": null, "estimated_runway": null, "actual_runway": null}, "airline": {"name": "Malaysia Airlines", "iata": "MH", "icao": "MAS"}, "flight": {"number": "5621", "iata": "MH5621", "icao": "MAS5621", "codeshared": {"airline_name": "korean air", "airline_iata": "ke", "airline_icao": "kal", "flight

## Tweeting it out

In [61]:
def dict_factory(cursor, row):
    """
    cursor: sqlite3 cursor object
    row: tuple from query result
    returns the tuple row as dict
    """
    # .description attr returns a 7-tuple; only 1st is the col name
    fields = [descr[0] for descr in cursor.description]
    return {field: val for field, val in zip(fields, row)}

In [77]:
db_conn.row_factory = dict_factory
sql = f"""
SELECT
    json_extract({JSON_COL},'$.flight_date') as date,
    DATE(json_extract({JSON_COL},'$.departure.scheduled')) as leave,
    DATE(json_extract({JSON_COL},'$.arrival.scheduled')) as arrive
FROM {TBL_NAME}
--WHERE CAST(leave AS DATE) != CAST(arrive AS DATE)
WHERE leave != arrive
LIMIT 5;
"""
for res in db_conn.execute(sql):
    print(res)

{'date': '2023-10-12', 'leave': '2023-10-12', 'arrive': '2023-10-13'}
{'date': '2023-10-12', 'leave': '2023-10-12', 'arrive': '2023-10-13'}
{'date': '2023-10-12', 'leave': '2023-10-12', 'arrive': '2023-10-13'}
{'date': '2023-10-12', 'leave': '2023-10-12', 'arrive': '2023-10-13'}
{'date': '2023-10-13', 'leave': '2023-10-13', 'arrive': '2023-10-12'}


In [78]:
def write_flight_tweet(
    db_conn: sqlite3.Connection,
    env: jinja2.Environment,
    str_date: str,
    tbl_name: str = TBL_NAME,
    num_delay: int = 3,
    template_dir: Path = Path("templates"),
) -> str:
    """
    Queries the flight records database to write the tweet
    Prepared queries makes some assumption about the table schema
    - follows aviationstack flights endpoint
    - flattened, with the same sep character

    Returns a string populated with the query result
    """
    # defining column names inside db for populating the tweet
    flight_num = f"flight{sep}iata"
    a_port = f"arrival{sep}airport"
    a_delay = f"arrival{sep}delay"
    a_sched = f"arrival{sep}scheduled"
    d_port = f"departure{sep}airport"

    # params to render the query template
    params = dict(
        flight_num=flight_num,
        a_port=a_port,
        a_delay=a_delay,
        a_sched=a_sched,
        d_port=d_port,
        num_delay=3,
        str_date=str_date,
        tbl_name=tbl_name,
    )
    # agg_sql = env.get_template("agg.sql").render(params)
    # logger.debug(f"rendered agg_sql:\n{agg_sql}")
    # delayed_sql = env.get_template("delayed.sql").render(params)
    # logger.debug(f"rendered delayed_sql:\n{delayed_sql}")
    logger.debug("Querying database...")
    agg_res = execute_template_sql(db_conn, env, "agg.sql", params)
    num_delay, avg_delay = agg_res[0].values()
    delays = execute_template(db_conn, env, "delayed.sql", params)
    logger.info("DB query executed")
    logger.debug("Query result:\n", delays)
    delays_in_sentences = "\n" + "\n".join(
        [
            f"{i+1} {d[flight_num]}: {d[d_port]} to {d[a_port]}, {int(d[a_delay])} min"
            for i, d in enumerate(delays)
        ]
    )
    pt1 = f"{num_delay} MH flights were late on {str_date}"
    pt2 = f"by an average of {avg_delay:.0f} min."
    tweet = " ".join([pt1, pt2, delays_in_sentences])
    if (tweet_chars := len(tweet)) > 280:
        logging.warning(f"Truncating tweet from {tweet_chars} to 280 chars")
        tweet = tweet[:280]
    logger.debug(f"tweet length: {len(tweet)}")
    return tweet

## 3. Tweet

In [None]:
# tweet
oauth1_client = tweepy.Client(
    consumer_key=TWITTER_API_KEY,
    consumer_secret=TWITTER_API_SECRET,
    access_token=TWITTER_ACCESS_TOKEN,
    access_token_secret=TWITTER_ACCESS_SECRET,
)
payload = write_flight_tweet(db_conn, str_date=str_date, template_dir=template_dir)
if local_tweet:
    logger.info(f"offline tweet:\n{payload}")
else:
    try:
        t_response = oauth1_client.create_tweet(text=payload, user_auth=True)
        logger.info(f"link: https://twitter.com/user/status/{t_response.data['id']}")
        logger.info(f"text: {t_response.data['text']}")
    except Exception as e:
        logger.error(f"Tweet failed: {e}")