In [1]:
import os

os.chdir("../")

from dotenv import load_dotenv

load_dotenv(".env")

True

In [2]:
import black
import jupyter_black

jupyter_black.load(
    lab=False,
    line_length=79,
    verbosity="INFO",
    target_version=black.TargetVersion.PY310,
)

<IPython.core.display.Javascript object>

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

%load_ext autoreload
%autoreload 2

TEXT_COLOR = "#313131"
# Plotly colors
LINE_COLORS = [
    "#636EFA",
    "#EF553B",
    "#00CC96",
    "#AB63FA",
    "#FFA15A",
    "#19D3F3",
    "#FF6692",
    "#B6E880",
    "#FF97FF",
    "#FECB52",
]

sns.set(
    style="darkgrid",
    rc={
        "figure.figsize": (6, 4),
        "figure.dpi": 100,
        "figure.facecolor": "w",
        "legend.facecolor": "w",
        "text.color": TEXT_COLOR,
        "font.family": "Microsoft Sans Serif",
        "axes.labelcolor": TEXT_COLOR,
        "xtick.color": TEXT_COLOR,
        "ytick.color": TEXT_COLOR,
    },
)

sns.set_palette(sns.color_palette(LINE_COLORS))

In [5]:
from src.dataloaders import CandleStickDataLoader
import constants

from binance import enums

# Load data for the past 4 years

In [33]:
from typing import *

In [48]:
from datetime import datetime
import pytz

In [34]:
def partition_timestamps_into_days(
    start: int, end: int
) -> List[Tuple[int, int]]:
    # hours * minutes * seconds * milliseconds
    milliseconds_per_day = 24 * 60 * 60 * 1000
    daily_chunks = []

    current_timestamp = start
    next_midnight = (start // milliseconds_per_day + 1) * milliseconds_per_day

    # Add the first chunk, which might be incomplete
    if next_midnight <= end:
        daily_chunks.append((current_timestamp, next_midnight))
        current_timestamp = next_midnight

    # Add the complete chunks
    while current_timestamp + milliseconds_per_day <= end:
        next_day = current_timestamp + milliseconds_per_day
        daily_chunks.append((current_timestamp, next_day))
        current_timestamp = next_day

    # Add the last chunk, which might be incomplete
    if current_timestamp < end:
        daily_chunks.append((current_timestamp, end))

    return daily_chunks

In [41]:
def saved_partitions(path):
    partitions = [f for f in os.listdir(path) if f.startswith("date=")]
    return [p.split("=")[1] for p in partitions]

In [74]:
def timestamp_to_date(timestamp):
    dt = datetime.fromtimestamp(int(timestamp / 1000), tz=pytz.utc)
    return dt.strftime("%Y-%m-%d")

In [135]:
DATA_DIR = "data/binance"
os.makedirs(DATA_DIR, exist_ok=True)

In [309]:
dl = CandleStickDataLoader(
    interval=enums.KLINE_INTERVAL_1MINUTE,
    assets=[constants.ASSET_TO_TRADE] + constants.PREDICTOR_ASSETS,
    fiat=constants.FIAT_TO_TRADE,
)

In [147]:
start = dl.date_to_timestamp("2019-02-01 00:00:00")
end = dl.date_to_timestamp("2023-04-28 00:00:00")

In [148]:
# Create date partitions
date_partitions = partition_timestamps_into_days(start, end)
date_partitions = {
    timestamp_to_date(start): (start, end) for start, end in date_partitions
}

# Filter out those that are not already loaded
date_partitions_to_load = [
    ts
    for date, ts in date_partitions.items()
    if date not in saved_partitions(DATA_DIR)
]

for date_start, date_end in date_partitions_to_load:
    date = timestamp_to_date(date_start)
    data = dl.load_data(date_start, date_end)

    date_dir = os.path.join(DATA_DIR, f"date={date}")
    os.makedirs(date_dir, exist_ok=True)
    data.to_parquet(os.path.join(date_dir, "data.parquet"))

In [311]:
start = dl.date_to_timestamp("2021-04-25 03:45:00")
end = dl.date_to_timestamp("2021-04-25 10:15:00")
check = dl.load_data(start, end)

In [167]:
data = pd.read_parquet(DATA_DIR)

In [153]:
data = data.set_index(["time"])

In [178]:
merged_df = data.pivot_table(
    index="open_timestamp",
    values=data.columns.drop(["open_timestamp", "close_timestamp"]).tolist(),
    aggfunc="first",
).reset_index()

In [189]:
merged_df = merged_df.drop("close_timestamp", axis=1)

In [191]:
sorted_cols = ["time"] + merged_df.columns.drop("time").tolist()
merged_df = merged_df[sorted_cols]

---

# Process missing intervals

In [372]:
# 1. Extend the data frame to have full interval range
min_time = data["time"].min()
max_time = data["time"].max()

full_range_df = pd.DataFrame(
    {"time": pd.date_range(min_time, max_time, freq="1T")}
)

# Merge the existing DataFrame with the full_range_df DataFrame
filled_df = full_range_df.merge(merged_df, on="time", how="left")

In [373]:
# 2. Create `service_down` marking
filled_df["service_down"] = np.where(
    filled_df.isnull().any(axis=1), True, False
)

In [374]:
# 3. Fill the missing data
filled_df = filled_df.fillna(method="ffill")

In [375]:
dtypes = data.dtypes.drop("close_timestamp").to_dict()

In [376]:
filled_df = filled_df.astype(dtypes)

In [380]:
# Save the dataset
import pyarrow as pa
import pyarrow.parquet as pq

table = pa.Table.from_pandas(filled_df)

output_dir = "data/binance_fixed"
os.makedirs(output_dir, exist_ok=True)

pq.write_to_dataset(
    table,
    root_path=output_dir,
    partition_cols=["date"],
    partition_filename_cb=lambda _: "data.parquet",
)

  pq.write_to_dataset(
  pq.write_to_dataset(


---