# Imports

In [1]:
# add to sys.path
import sys

sys.path.insert(0, "../")
sys.path.insert(0, "../src")

In [2]:
import time
import pandas as pd
from google.cloud import bigquery
import plaid

from sql.bq_table_schemas import BqTableSchemas
from utils.bq_utils import BqUtils

# from utils.plaid_utils import PlaidUtils
from utils.plaid_utils import PlaidUtils
from utils.financial_accounts import FinancialAccounts
from utils.plaid_transactions import PlaidTransactions
from utils.plaid_investments import PlaidInvestments


# constants
PLAID_CLIENT_ID = "65975384ab670e001c0aaf0d"
# PLAID_SECRET = "56e33c77237c8c9e45f5c066b8b2fa"  # production
# PLAID_SECRET = "9294dd5ca4a5c99d90da56640f40e5"  # sandbox
PLAID_SECRET = "c5c55de38434db3e6456d0e146db8b"  # dev
PLAID_HOST = plaid.Environment.Development
PLAID_ACCESS_TOKENS = [
    "access-development-a00c51da-ea66-459a-9d70-aa8e7cde48db",  # Chase
    "access-development-36b11b1f-7e28-41b7-bff1-713683d6d180",  # BoA
    "access-development-71716c32-af8e-4632-8805-dc26872a0187",  # Schwab
    "access-development-07939f94-059d-45d0-a338-65222b5ea656",  # Vanguard
    "access-development-d73b7fc4-f2ee-4a52-9d80-a06f203a2009",  # Fundrise
    "access-development-b34d5d0a-eca4-4fa8-8b27-e8bf8ef37dc6",  # e-Trade
]
# PLAID_ENV = "sandbox"
PLAID_PRODUCTS = ["liabilities", "transactions", "investments"]
# PLAID_COUNTRY_CODES = ["US"]
# PLAID_REDIRECT_URI="https://localhost:3000/"

# initialize clients
bq_client = bigquery.Client()
bq = BqUtils(bq_client=bq_client)
plaid_client = PlaidUtils(bq_client, PLAID_CLIENT_ID, PLAID_SECRET, PLAID_HOST)
financial_accounts = FinancialAccounts(bq_client, plaid_client)
plaid_transactions = PlaidTransactions(bq_client, plaid_client)
plaid_investments = PlaidInvestments(bq_client, plaid_client)
bq_tables = BqTableSchemas()

pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_columns", None)

In [3]:
from plaid.model.products import Products

products = []
for product in PLAID_PRODUCTS:
    products.append(Products(product))

In [None]:
### START HERE
backfill = True
write_disposition = "WRITE_TRUNCATE"
offset_days = -1

# only create new financial_accounts table and plaid_cursors_YYYYMMDD table if starting with initial backfill
if backfill:
    add_test_transaction = True  # to add a removed transaction or not in generate_transactions_dfs()
    print("STARTING HISTORICAL DATA PULL")
    # Create a new plaid_cursors_YYYYMMDD table with access_token, item_id, and next_cursor
    plaid_transactions.create_cursors_bq_table(offset_days=offset_days, write_disposition=write_disposition)

else:
    add_test_transaction = False
    print("STARTING DAILY DATA PULL")

# create empty temp cursor table to upload cursors to for the current run.
# When job finishes running, this table will become the latest plaid_cursors_YYYYMMDD partitions
plaid_transactions.create_temp_cursors_bq_table(write_disposition="WRITE_TRUNCATE")

# grab latest cursors for each access token / item
latest_cursors_df = plaid_transactions.get_latest_cursors()

# Run create_transactions_df() to store added/modified transactions in transactions_df and removed transactions in removed_df
transactions_df_list = []
removed_df_list = []
for i, row in latest_cursors_df.iterrows():

    transactions_df, removed_df = plaid_transactions.generate_transactions_dfs(
        access_token=row["access_token"],
        item_id=row["item_id"],
        next_cursor=row["next_cursor"],
        offset_days=offset_days,
        add_test_transaction=add_test_transaction,
    )

    if transactions_df is not None:
        transactions_df_list.append(transactions_df)

    if removed_df is not None:
        removed_df_list.append(removed_df)

# only upload transactions_df to BQ if there is at least one non-null df
if len(transactions_df_list) > 0:
    concat_transactions_df = pd.concat(transactions_df_list)
    plaid_transactions.create_empty_transactions_bq_table(offset_days=offset_days, write_disposition=write_disposition)
    print("SLEEP 5 SECONDS TO WAIT FOR plaid_transactions_YYYYMMDD creation\n")
    time.sleep(5)

    plaid_transactions.upload_transactions_df_to_bq(concat_transactions_df, offset_days)
else:
    print("No transactions present in concat_transactions_df")

# only upload removed_df to BQ if there is at least one non-null df
if len(removed_df_list) > 0:
    concat_removed_df = pd.concat(removed_df_list)
    plaid_transactions.create_empty_removed_bq_table(offset_days=offset_days, write_disposition=write_disposition)
    print("SLEEP 5 SECONDS TO WAIT FOR plaid_removed_transactions_YYYYMMDD creation\n")
    time.sleep(5)
    plaid_transactions.upload_removed_df_to_bq(concat_removed_df, offset_days)
else:
    print("No removed transactions present in concat_removed_df")

# Copy temp_cursors to plaid_cursors_YYYYMMDD
# plaid_transactions.copy_temp_cursors_to_cursors_bq_table(offset_days=offset_days, write_disposition="WRITE_TRUNCATE")

In [None]:
concat_removed_df.head()

In [None]:
columns = [
    "item_id",
    "account_id",
    "transaction_id",
    "pending_transaction_id",
    "is_pending",
    "account_owner",
    "status",
    "date",
    # 'datetime',
    # 'authorized_date',
    # 'authorized_datetime',
    # 'amount',
    # 'currency_code',
    # 'unofficial_currency_code',
    # 'personal_finance_category',
    # 'payment_channel',
    # 'merchant',
    # 'counterparties',
    # 'location',
    # 'check_number',
    # 'payment_meta',
    # 'transaction_code'
]

In [None]:
df_1 = transactions_df_list[0][columns]
df_2 = transactions_df_list[1][columns]
final_df = pd.concat([df_1, df_2])

In [None]:
print(final_df.shape[0])
print(df_1.shape[0] + df_2.shape[0])

In [None]:
# resp = plaid_client.remove_item(access_token="access-development-258fd242-5274-4456-964e-8e364f2301da")
# print(resp)

In [4]:
from plaid.model.link_token_transactions import LinkTokenTransactions

# Test Investments

In [None]:
write_disposition = "WRITE_TRUNCATE"
offset_days = -1
start_date = "2024-01-01"
end_date = "2024-04-25"

print("STARTING main_plaid_investments.py")

# Run create_transactions_df() to store added/modified transactions in transactions_df and removed transactions in removed_df
holdings_df_list = []
investment_transactions_df_list = []

# get investments access_tokens
access_tokens = list(plaid_client.get_access_tokens(products=["investments"])["access_token"].unique())

# generate investment dfs for investment holdings and investment transactions
for token in access_tokens:
    holdings_df, investment_transactions_df = plaid_investments.generate_investments_dfs(start_date, end_date, token)
    holdings_df_list.append(holdings_df)
    investment_transactions_df_list.append(investment_transactions_df)

concat_holdings_df = pd.concat(holdings_df_list)
concat_investment_transactions_df = pd.concat(investment_transactions_df_list)

# only upload holdings_df to BQ if there is at least one non-null df
# if not all(df is None for df in holdings_df_list):
#     concat_holdings_df = pd.concat(holdings_df_list)

#     # create empty plaid_investment_holdings_YYYYMMDD to upload holdings to
#     plaid_investments.create_empty_investment_holdings_bq_table(
#         offset_days=offset_days,
#         write_disposition=write_disposition,
#     )
#     print("SLEEP 5 SECONDS TO WAIT FOR plaid_investment_holdings_YYYYMMDD creation\n")
#     time.sleep(5)
#     plaid_investments.upload_investment_holdings_df_to_bq(concat_holdings_df, offset_days)
# else:
#     print("No investment holdings present in concat_investment_holdings_df")

# only upload investment_transactions_df to BQ if there is at least one non-null df
if not all(df is None for df in investment_transactions_df_list):
    concat_investment_transactions_df = pd.concat(investment_transactions_df_list)

    # create empty plaid_investment_transactions_YYYYMMDD to upload transactions to
    plaid_investments.create_empty_investment_transactions_bq_table(
        offset_days=offset_days, write_disposition=write_disposition
    )
    print("SLEEP 5 SECONDS TO WAIT FOR plaid_investment_transactions_YYYYMMDD creation\n")
    time.sleep(5)
    plaid_investments.upload_investment_transactions_df_to_bq(concat_investment_transactions_df, offset_days)
else:
    print("No investment transactions present in concat_investment_transactions_df")

print("SUCCESS: investment data uploaded to BQ")

# Test Delete all Partitions

In [None]:
removed_transaction_bq = bq_tables.plaid_removed_transactions_YYYYMMDD()
transaction_bq = bq_tables.plaid_transactions_YYYYMMDD()
accounts_bq = bq_tables.financial_accounts_YYYYMMDD()

# bq.delete_all_partitions(
#     project_id=transaction_bq["project_id"],
#     dataset_id=transaction_bq["dataset_id"],
#     table_id=transaction_bq["table_id"],
#     confirm=True,
# )