# Day 1 - Data Collection

In [None]:
# Import required libraries
import datetime
import random
import sqlite3
import time

import pandas as pd
from numpy import mean
from polygon import RESTClient
from sqlalchemy import create_engine, text

# Importing API Key
from polygon_client import RestClient

### SQLite storage functions

In [None]:
def reset_raw_data_tables(engine, currency_pairs):
    """
    Reset the tables so that tables contain data for every 6 minutes only.
    """
    with engine.begin() as conn:
        for curr in currency_pairs:
            conn.execute(text("DROP TABLE IF EXISTS " + curr + "_raw;"))
            conn.execute(
                text(
                    "CREATE TABLE IF NOT EXISTS "
                    + curr
                    + "_raw(ticktime text, fxrate  numeric, inserttime text);"
                )
            )


def initialize_raw_data_tables(engine, currency_pairs):
    """
    Creates tables to store the raw data from polygon api
    """
    with engine.begin() as conn:
        for curr in currency_pairs:
            conn.execute(
                text(
                    "CREATE TABLE IF NOT EXISTS "
                    + curr
                    + "_raw(ticktime text, fxrate  numeric, inserttime text);"
                )
            )


def initialize_aggregate_tables(engine, currency_pairs):
    """
    Create a table for storing the aggregated data for each currency pair
    """
    with engine.begin() as conn:
        for curr in currency_pairs:
            conn.execute(
                text(
                    "CREATE TABLE IF NOT EXISTS "
                    + curr
                    + "_agg (inserttime text, period numeric, max numeric, "
                    + "min numeric, mean numeric, vol numeric, fd numeric, return_val numeric);"
                )
            )


def initialize_output_tables(engine, currency_pairs):
    """
    Create a table to store the balance and return for every hour
    """
    with engine.begin() as conn:
        for curr in currency_pairs:
            conn.execute(
                text(
                    "CREATE TABLE "
                    + curr
                    + "_output (window numeric, balance numeric, return_val numeric, position text);"
                )
            )

### Utility functions for calculations

In [None]:
def get_keltner_channel(mean_val, vol) -> []:
    """
    Function to calculate Keltner Bands
    """
    keltner_upper_band = []
    keltner_lower_band = []

    for i in range(1, 101):
        keltner_upper_band.append(mean_val + (i * 0.025 * vol))
        keltner_lower_band.append(mean_val - (i * 0.025 * vol))

    # Reversing the Keltner Lower Band result and appending it to Upper Band Result
    # This sorts the final result from the lowest band to the highest band
    result = keltner_lower_band[::-1]
    result.extend(keltner_upper_band)
    return result


def calculate_keltner_intersection(last_price, current_price, kbands) -> int:
    """
    This calculates the total number of times the bands were crossed for two values.
    """
    count = 0

    # For each band value, check if the line from previous value to current value
    # crosses the keltner band. If yes, increase count
    for band in kbands:
        if last_price < band < current_price:
            count += 1
        elif last_price > band > current_price:
            count += 1

    return count


def calculate_return(last_price, current_price) -> float:
    """
    Calculate the return using the formula 𝑟𝑖 = (𝑃𝑖 − 𝑃𝑖−1)⁄(𝑃𝑖−1)
    """
    if last_price == 0:
        return 0
    return (current_price - last_price) / last_price


def timestamp_to_datetime(ts) -> str:
    return datetime.datetime.fromtimestamp(ts / 1000.0).strftime("%Y-%m-%d %H:%M:%S")

### Functions for predictions

In [None]:
def aggregate_data(engine, currency_pairs, period, n_values) -> []:
    """
    Function that runs every 6 minutes to calculate the Volatility, Fractal Dimension and Keltner Bands.
    """
    keltner_bands = {}

    with engine.begin() as conn:
        for curr in currency_pairs:
            # Calculates max, min and mean from raw tables
            # Using distinct ticktime, cause the api sometime returns multiple values for the same tick time, not sure why
            result = conn.execute(
                text(
                    "SELECT AVG(fxrate) as mean_val, "
                    "MIN(fxrate) as min_val, MAX(fxrate) as max_val FROM "
                    "(SELECT DISTINCT ticktime, fxrate from " + curr + "_raw);"
                )
            )
            
            for row in result:
                mean_val = row.mean_val
                min_val = row.min_val
                max_val = row.max_val
                volatility = (max_val - min_val) / mean_val

            # Calculate the new Keltner Bands
            keltner_bands[curr] = get_keltner_channel(mean_val, volatility)
            
            # Get the maximum timestamp to use for aggregate insertion
            date_res = conn.execute(
                text("SELECT MAX(ticktime) as last_date FROM " + curr + "_raw;")
            )
            for row in date_res:
                last_date = row.last_date

            # Calculate Fractal Dimension using the formula
            fd = 0
            if volatility != 0:
                fd = n_values[curr] / volatility

            # Get the previous mean value
            last_mean = 0
            result = conn.execute(
                text("SELECT * FROM " + curr + "_agg ORDER BY rowid DESC LIMIT 1;")
            )
            for row in result:
                last_mean = row.mean
            
            # Calculate the return value
            return_val = calculate_return(last_mean, mean_val)
            
            # Insert the results into aggregation table
            conn.execute(
                text(
                    "INSERT INTO "
                    + curr
                    + "_agg (inserttime, period, max, min, mean, vol, fd, return_val)"
                    + "VALUES (:inserttime, :period, :max, :min, :mean, :vol, :fd, :return_val );"
                ),
                [
                    {
                        "inserttime": last_date,
                        "period": period,
                        "max": max_val,
                        "min": min_val,
                        "mean": mean_val,
                        "vol": volatility,
                        "fd": fd,
                        "return_val": return_val,
                    }
                ],
            )

    return keltner_bands


def calculate_hourly_metrics(
    engine, currency_pairs, window, currencyCheck, long_currency
):
    """
    Function called every 60 minutes to make a decision on currencies that are still open for buying.
    """

    for curr in currency_pairs:
        # If currency pair is False in the currencyCheck dict the currency is closed
        if not currencyCheck[curr]:
            continue

        # setting position based on longCurrency list
        if curr in long_currency:
            position = "LONG"
        else:
            position = "SHORT"

        with engine.begin() as conn:
            # Fetching last 10 rows from aggregate table to get the summed up return value
            result = conn.execute(
                text("SELECT *  FROM " + curr + "_agg ORDER BY rowid DESC LIMIT 10;")
            )
            # return_val stores the sum of last 10 return_val
            return_val = 0
            for row in result:
                if row.return_val:
                    return_val += row.return_val

            # At T10, cutoff value to use is 0.250%
            if window == 1:
                balance = 100
                if position == "LONG":
                    # Long condition -> , a profitable trade has a positive return but we have a tolarence of 0.250%
                    # 0.250% = 0.0025
                    if return_val >= -0.0025:
                        balance = balance + 100 + return_val
                    # If not profitable, we will close the position and set the currencyCheck flag to false
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False
                else:
                    # Short condition -> a profitable trade has a negative return.
                    if return_val <= 0.0025:
                        balance = balance + 100 + return_val
                    # If not profitable, we will close the position and set the currencyCheck flag to false
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False

            # At T20, cutoff value to use is 0.150%
            elif window == 2:
                # The below line fetches the last balance of the currency pair of last 60 min period
                result = conn.execute(
                    text(
                        "SELECT * FROM "
                        + curr
                        + "_output WHERE window = "
                        + str(window - 1)
                        + ";"
                    )
                )
                for row in result:
                    balance = row.balance

                if position == "LONG":
                    if return_val >= -0.0015:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False
                else:
                    # Short condition
                    if return_val <= 0.0015:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False

            # At T30, value to use is 0.100%
            elif window == 3:
                result = conn.execute(
                    text(
                        "SELECT * FROM "
                        + curr
                        + "_output WHERE window = "
                        + str(window - 1)
                        + ";"
                    )
                )
                for row in result:
                    balance = row.balance

                if position == "LONG":
                    if return_val >= -0.001:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False
                else:
                    if return_val <= 0.001:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False

            # At T40, value to use is 0.050%
            elif window == 4:
                result = conn.execute(
                    text(
                        "SELECT * FROM "
                        + curr
                        + "_output WHERE window = "
                        + str(window - 1)
                        + ";"
                    )
                )
                for row in result:
                    balance = row.balance

                if position == "LONG":
                    if return_val >= -0.0005:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False
                else:
                    if return_val <= 0.0005:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False

            # After T40, value to use is 0.050%
            elif window > 4:
                result = conn.execute(
                    text(
                        "SELECT * FROM "
                        + curr
                        + "_output WHERE window = "
                        + str(window - 1)
                        + ";"
                    )
                )
                for row in result:
                    balance = row.balance

                if position == "LONG":
                    if return_val >= -0.0005:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False
                else:
                    if return_val <= 0.0005:
                        balance = balance + 100 + return_val
                    else:
                        balance = balance + return_val
                        currencyCheck[curr] = False
            
            # Insert into output table
            conn.execute(
                text(
                    "INSERT INTO "
                    + curr
                    + "_output (window, balance, return_val, position) VALUES (:window, :balance, :return_val, :position);"
                ),
                [
                    {
                        "window": window,
                        "balance": balance,
                        "return_val": return_val,
                        "position": position,
                    }
                ],
            )

    return currencyCheck

### Main Function

In [None]:
def main(currency_pairs):
    """
    Repeatedly calls the polygon api every 1 seconds for 24 hours.
    Data is added to *_raw tables in SQLite.
    Every 6 minutes, calculates the new Keltner Bands using the raw tables.
    Fractal Dimension, Volatility and Returns are calculated based on the new Keltner bands and added to aggregate table.

    Every 1 hour, use estimates and errors to make decision whether to buy more currency, do nothing or stop buying the currency.
    If the estimates and errors are in line - buy more currency;
    If estimates are pointing towards decline and errors are negative or visa versa, do nothing;
    If estimates and errors are pointing to decline, stop buying more currency.
    """
    # Create an engine to connect to the database; setting echo to false should stop it from logging in std.out
    # Initialize database
    engine = create_engine("sqlite:///../sqlite/day1.db", echo=False, future=True)
    initialize_raw_data_tables(engine, currency_pairs)
    initialize_aggregate_tables(engine, currency_pairs)
    initialize_output_tables(engine, currency_pairs)

    # Get the API key from the library
    key = RestClient.fetch_key()
    # Open a RESTClient for making the api calls
    client = RESTClient(key)

    # Initializing the in-memory data structures
    counter = 1
    period_count = 0
    n_values = {}
    keltner_bands = {}
    last_price = {}
    currency_check = {}
    
    for currency in currency_pairs:
        n_values[currency] = 0
        currency_check[currency] = True

    # Long currencies
    long_currency = random.choices(currency_pairs, 5)
    # Short Currencies
    short_currency = [x for x in currency_pairs if x not in long_currency]

    # Loop that runs until the total duration of the program hits 24 hours.
    while counter < 36010:
        # Every six minutes - 360 seconds.
        if counter % 360 == 0:
            period_count += 1

            # Calculate the new keltner bands and also aggregate the data into mean, vol, fd and returns.
            keltner_bands = aggregate_data(
                engine, currency_pairs, period_count, n_values
            )
            # Reset the raw data table so that it hold only data for the next 6 minute window 
            reset_raw_data_tables(engine, currency_pairs)
            
            # Every one hour - 3600 seconds
            if counter % 3600 == 0:
                window = period_count / 10
                calculate_hourly_metrics(
                    engine, currency_pairs, window, currency_check, long_currency
                )
            # Reset the n_values and last price dictionaries so that they hold new values for the next window
            n_values = {}
            last_price = {}
        else:
            # Only call the API every 1 second, hence wait here for 0.65s.
            # Code runs for 0.35 seconds normally and 1.2 seconds when the aggregation functions are called.
            time.sleep(0.65)


        for currency in currency_pairs:
            # Call the API with the required parameters
            try:
                # Eg, curr = USDSGD. curr[:3] = USD, curr[3:] = SGD 
                from_ = currency[:3]
                to = currency[3:]
                # Plygon API call
                resp = client.forex_currencies_real_time_currency_conversion(
                    from_, to, amount=100, precision=2
                )
            except:
                continue

            # This gets the Last Trade object defined in the API Resource
            last_trade = resp.last

            # Format the timestamp from the result
            dt = timestamp_to_datetime(last_trade['timestamp'])

            # Get the current time and format it
            insert_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            # Calculate the price by taking the average of the bid and ask prices
            avg_price = (last_trade['bid'] + last_trade['ask']) / 2
            
            # In run 1, keltner bands are empty.
            if keltner_bands != {}:
                if currency in last_price:
                    # Calculate the number of band intersections for the line from current point to previous point
                    n_values[currency] += calculate_keltner_intersection(
                        last_price[currency],
                        avg_price,
                        keltner_bands[currency],
                    )
                    last_price[currency] = avg_price
                else:
                    last_price[currency] = avg_price
                    n_values[currency] = 0

            # Write the data to the SQLite database, raw data tables
            with engine.begin() as conn:
                conn.execute(
                    text(
                        "INSERT INTO "
                        + from_
                        + to
                        + "_raw(ticktime, fxrate, inserttime) VALUES (:ticktime, :fxrate, :inserttime)"
                    ),
                    [
                        {
                            "ticktime": dt,
                            "fxrate": avg_price,
                            "inserttime": insert_time,
                        }
                    ],
                )
        # Increment the counters
        counter += 1

### Function to generate CSV

In [None]:
def generate_csv():
    # Code to convert db tables into csv files
    conn = sqlite3.connect(
        "../sqlite/day1.db", isolation_level=None, detect_types=sqlite3.PARSE_COLNAMES
    )

    for curr in currency_pairs:
        sql = "SELECT * FROM " + curr + "_agg"
        db_df = pd.read_sql_query(sql, conn)
        path = "results/" + curr + ".csv"
        db_df.to_csv(path, index=False)

### Invoking the functions

In [None]:
# A dictionary defining the set of currency pairs we will be pulling data for
currency_pairs = [
    "EURUSD",
    "GBPUSD",
    "USDCAD",
    "USDCHF",
    "USDHKD",
    "USDAUD",
    "USDNZD",
    "USDSGD",
]

# Run the main data collection loop
main(currency_pairs)

# Generate the CSVs required
generate_csv()