# Historical Data Aggregation

In order to fine-tune the model specifically on shorter time frame data, we want to store historical data for the past year for the set of tickers that we have provided to the client side. The two most important values to define are the sets of tickers as well as the shape of the data for fine-tuning. For convience, we have defined a `_reshape` method on the MoiRai model class, which receives a list of aggregate bars from Polygon, and creates a dataframe of the shape required for fine-tuning. We can use this method in the data aggregation pipeline, in order to create dataframes of the shape we require for fine-tuning.

In [24]:
import sys

sys.path.append('..')

from src.model import MoiRai


In [25]:
from datetime import datetime
from typing import List

from polygon import RESTClient
from polygon.rest.models import Agg

from src._data import AggregateInterval
from src.config import SERVER_CONFIG

model_class = MoiRai(interval=AggregateInterval.ONE_MINUTE)
rest_client = RESTClient(api_key=SERVER_CONFIG.POLYGON_API_KEY)

def _fetch_data(ticker: str, date: datetime) -> List[Agg]:
    try:
        end_date = date.replace(day=date.day + 1)
    
    # If the date is out of range, return empty list
    except ValueError:
        return []

    start_ts = int(date.timestamp() * 1000)
    end_ts = int(end_date.timestamp() * 1000)

    result = rest_client.get_aggs(
        ticker=ticker,
        multiplier=10, # Training on 10 second interval data
        timespan="second",
        from_=start_ts,
        to=end_ts,
        sort="asc",
    )

    return result

def get_ticker_data(ticker: str, month: int, year: int) -> List[Agg]:

    aggregates = []
    for i in range(1, 32):
        try: 
            date = datetime(year, month, i)

        # Catch an exception if the day is out of range
        except ValueError:
            continue

        data = _fetch_data(ticker, date)
        aggregates.extend(data)
    return aggregates

def process_result(ticker: str, aggregates: List[Agg]):
    df = model_class._reshape(aggregates)
    df['ticker'] = ticker
    return df

In [26]:
## Pipeline for processing a single month
import concurrent.futures
import pandas as pd

def aggregate_ticker_historical(ticker: str, year: int):
    aggregates = []
    for i in range(1, 13):
        data = get_ticker_data(ticker, i, year)
        aggregates.extend(data)

    df = process_result(ticker, aggregates)
    return df

## Pipeline for processing a single year for all 10 tickers
def aggregate_dataset(year: int):
    tickers = ["AAPL", "MSFT", "GOOGL", 'AMZN', "TSLA", "NVDA", "META", "BAC", "V", "NFLX"]
    dfs = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(aggregate_ticker_historical, tickers, [year] * len(tickers))
        for result in results:
            dfs.append(result)

    df = pd.concat(dfs)
    return df


df = aggregate_dataset(2021)

In [54]:
from typing import Generator, Any
from datasets import Features, Sequence, Value

def multivar_example_gen_func() -> Generator[dict[str, Any], None, None]:
    # Group the dataframe by the different ticker symbols
    for item_id, ticker_df in df.groupby("ticker"):
        ticker_df.drop(columns=["ticker"], inplace=True)
        

        # Group the ticker_df dataframe by day
        for date, daily_df in ticker_df.groupby(ticker_df.index.date):

            daily_df = daily_df.resample("10s").last()
            # Make sure the daily_df has at least 3 data points
            try:
                freq = pd.infer_freq(daily_df.index)
            except ValueError:
                freq = None

            yield {
                "target": daily_df.to_numpy().T,  # array of shape (var, time)
                "start": daily_df.index[0],
                "freq": "10s",
                "item_id": f"{item_id}_{date}",
            }

def aapl_example_gen_func() -> Generator[dict[str, Any], None, None]:
    # Filter the dataset down to just the AAPL ticker
    aapl_df = df[df["ticker"] == "AAPL"]
    aapl_df.drop(columns=["ticker"], inplace=True)

    aapl_df = aapl_df.resample("10s").last()

    freq = pd.infer_freq(aapl_df.index)

    print(freq)
    yield {
        "target": aapl_df.to_numpy().T,  # array of shape (var, time)
        "start": aapl_df.index[0],
        "freq": freq,
        "item_id": "AAPL",
    }

In [55]:
features = Features(
        dict(
            item_id=Value("string"),
            start=Value("timestamp[s]"),
            freq=Value("string"),
            target=Sequence(Sequence(Value("float32")), length=len(df.columns) - 1),
        )
    )

In [56]:
import datasets

hf_dataset = datasets.Dataset.from_generator(
    multivar_example_gen_func, features=features
)

# aapl_dataset = datasets.Dataset.from_generator(
#     aapl_example_gen_func, features=features
# )

# aapl_dataset.save_to_disk("datasets/candles_10sec_aapl")
# Keep 80% of the data for training, 10% for validation, and 10% for testing
train_testvalid = hf_dataset.train_test_split(test_size=0.2)

# Keep the 80% split for training
train_set = train_testvalid['train']

# Split the remaining 20% in half for validation and testing
val_test = train_testvalid['test'].train_test_split(test_size=0.5)

# Assign the validation and testing sets
val_set = val_test['train']
test_set = val_test['test']

train_set.save_to_disk("datasets/candles_10sec_04222024_train")
val_set.save_to_disk("datasets/candles_10sec_04222024_val")
test_set.save_to_disk("datasets/candles_10sec_04222024_test")



Generating train split: 2371 examples [00:27, 87.63 examples/s] 
Saving the dataset (1/1 shards): 100%|██████████| 1896/1896 [00:00<00:00, 21149.83 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 237/237 [00:00<00:00, 27459.19 examples/s]
Saving the dataset (1/1 shards): 100%|██████████| 238/238 [00:00<00:00, 28522.90 examples/s]


# Second Approach For Dataset

In order to reproduce the dataset used for finetuning, we will attempt to reproduce the steps used in the finetuning example provided in the `README.md` of the `uni2ts` repository. This will include:
- Fetching 1 month of intraday data from AAPL
- Resampling the dataframe, to adhere to a 10s frequency
- Cleaning the data, to fill in the NaN values, with logical fillers
    - Should have some way of logically filling in NaN values, likely just by taking the average between the two candles, and having no change in the candle (same next candle), making sure the close value corresponds to the next open value
- Store data in CSV file
- Mimic steps used in the 