In [None]:
import os
from dotenv import load_dotenv
import warnings
import json
from google.cloud import secretmanager
import functions_framework
import os
import time

In [None]:
os.chdir("/Users/eq81tw/Developer/money-flow")
from src.allocation import FireStore
from src.automate import AutomateAllocations
from src.bunq import BunqLib

In [None]:
def get_secret_value(secret_name, project_id):
    client = secretmanager.SecretManagerServiceClient()
    secret_version_name = client.secret_version_path(project_id, secret_name, "latest")
    response = client.access_secret_version(name=secret_version_name)
    return response.payload.data.decode("UTF-8")

In [None]:
warnings.filterwarnings("ignore")
load_dotenv(override=True)

PROJECT_ID = os.getenv("PROJECT_ID")
API_KEY = get_secret_value("bunq_api_key", PROJECT_ID)
GOOGLE_FIRESTORE_CONFIG = json.loads(
    get_secret_value("firebase_service_account", PROJECT_ID)
)
API_CONTEXT_FILE_PATH = os.getenv("BUNQ_FILE_NAME")
ENVIRONMENT = os.getenv("ENVIRONMENT")
DEVICE_DESCRIPTION = os.getenv("DESCRIPTION")
SIMULATE = os.getenv("SIMULATE", "False").lower() in ("true", "1", "t")

In [None]:
try:
    store_ = FireStore(config=GOOGLE_FIRESTORE_CONFIG)
except Exception as error:
    print("An error occurred:", type(error).__name__, "–", error)
    print("Are you on VPN maybe?")
    exit(1)

In [None]:
bunq_ = BunqLib(
    api_key=API_KEY,
    environment_type=ENVIRONMENT,
    device_description=DEVICE_DESCRIPTION,
    api_context_file_path=API_CONTEXT_FILE_PATH,
)

In [None]:
bunq_.connect()

In [None]:
from bunq.sdk.http.pagination import Pagination
from bunq.sdk.model.generated.endpoint import (
    MonetaryAccountApiObject,
    PaymentApiObject,
    MonetaryAccountBankApiObject,
    MonetaryAccountJointApiObject,
    MonetaryAccountSavingsApiObject,
)

pagination = Pagination()
pagination.count = 200
all_accounts = []
i = 0


In [None]:
def get_some_accounts(pagination, first_time=False):
    if first_time:
        params = pagination.url_params_count_only
    else:
        params = pagination.url_params_previous_page
    response = MonetaryAccountSavingsApiObject.list(params=params)
    return response.value, response.pagination

In [None]:
pagination = Pagination()
pagination.count = 200
all_accounts = []
i = 0
accounts, pagination = get_some_accounts(pagination, first_time=True)
time.sleep(1.1)

all_accounts.extend(accounts)
while pagination.has_previous_page():
    accounts, pagination = get_some_accounts(pagination)
    time.sleep(1.1)
    all_accounts.extend(accounts)
    i += 1
all_accounts

In [None]:
for i, account in enumerate(all_accounts):
    print(account.description, account.status)

In [None]:
account = all_accounts[-2]

In [None]:
account.description

In [None]:
vars(account)

In [None]:
account

In [None]:
AutomateAllocations(bunq=bunq_, store=store_, simulate=SIMULATE).run()
