In [1]:
import json
import logging
import mintapi
import os

from typing import Dict

from airflow.hooks.base import BaseHook
from airflow.macros import ds_format
from airflow.models import Variable

from sqlalchemy.orm import sessionmaker

from timhealz.common.utils import get_mysql_db_engine
from timhealz.common.data_models.spendy import Transaction
from timhealz.spendy import parse_transaction

log = logging.getLogger(__name__)

log.info("Fetching data directory")
DATA_DIR = Variable.get("MINT_DATA_DIRECTORY")
TRANSACTIONS_FP = os.path.join(DATA_DIR, "transactions/{ds}.json")

[[34m2021-12-30 13:20:09,132[0m] {[34m96420819.py:[0m20} INFO[0m - Fetching data directory[0m


In [2]:
log.info("Fetching intuit credentials")
intuit_creds = BaseHook.get_connection("intuit-credentials")

log.info("Initializing Mint session")
mint = mintapi.Mint(
    intuit_creds.login,
    intuit_creds.password,
    headless=True,
    use_chromedriver_on_path=True,
)

[[34m2021-12-30 13:20:09,139[0m] {[34m2277532274.py:[0m1} INFO[0m - Fetching intuit credentials[0m
[[34m2021-12-30 13:20:09,144[0m] {[34m2277532274.py:[0m4} INFO[0m - Initializing Mint session[0m


In [3]:
log.info("Getting transactions")

start_date = ds_format("2021-01-01", "%Y-%m-%d", "%m/%d/%y")
end_date = ds_format("2021-12-28", "%Y-%m-%d", "%m/%d/%y")

data = mint.get_transactions_json(
    start_date=start_date, end_date=end_date
)

[[34m2021-12-30 13:20:23,743[0m] {[34m1128300975.py:[0m1} INFO[0m - Getting transactions[0m


In [4]:
transactions_clean = {}
for i,transaction in enumerate(data):
    try:
        month, day = transaction["date"].split()
        ds = ds_format(f"2021-{month}-{day}","%Y-%b-%d", "%Y-%m-%d")
    except:
        ds = ds_format(transaction["date"], "%m/%d/%y", "%Y-%m-%d")

    transactions_clean.setdefault(ds,[]).append(transaction)

In [5]:
log.info(f"Dumping transactions json for {start_date} - {end_date}")
for ds, ds_transactions in transactions_clean.items():
    fp = TRANSACTIONS_FP.format(ds=ds)    
    with open(fp, "w") as out:
        json.dump(ds_transactions, out, indent=4)

[[34m2021-12-30 13:20:35,630[0m] {[34m549145441.py:[0m1} INFO[0m - Dumping transactions json for 01/01/21 - 12/28/21[0m


In [6]:
def insert_mint_transactions(transactions_clean):
    Session = sessionmaker(bind=get_mysql_db_engine())
    session = Session()
    
    for ds, ds_transactions in transactions_clean.items():
        
        log.info(f"Clearing out data - {ds}")
        (
            session.query(Transaction)
                .filter(Transaction.ds == ds)
                .delete(synchronize_session=False)
        )
        
        log.info(f"Loading transactions - {ds}")
        for transaction in ds_transactions:
            session.add(parse_transaction(transaction=transaction, ds=ds))

    session.commit()

In [None]:
insert_mint_transactions(transactions_clean)