Читаем файл databento и собираем стакан на каждые 15 мин

In [None]:
import dask.dataframe as dd
import pandas as pd

df = dd.read_csv(file)
# Удаление строк, где symbol != 'NQZ4'
df = df[df["symbol"] == "NQZ4"]

# Удаление строк с NaN
df = df.dropna()

# Преобразование временной метки в datetime
df["datetime"] = dd.to_datetime(df["ts_recv"] // 10**9, unit="s", utc=True)


def process_orders_15min(df):
    """
    Обрабатывает заявки каждые 15 минут с учетом добавления, отмены и модификации.
    """
    # Упрощение временных интервалов до 15-минутных биннингов
    df["15min_bin"] = df["datetime"].dt.floor("15T")

    # Группировка данных по временным интервалам и обработка заявок
    def aggregate_orders(block):
        current_orders = {}
        results = []

        for _, row in block.iterrows():
            order_id = row["order_id"]
            if row["action"] == "A":  # Add
                current_orders[order_id] = row
            elif row["action"] == "C":  # Cancel
                current_orders.pop(order_id, None)
            elif row["action"] == "M":  # Modify
                if order_id in current_orders:
                    current_orders[order_id] = row

        if current_orders:
            orders_df = pd.DataFrame.from_dict(current_orders, orient="index")
            aggregated = (
                orders_df.groupby(["side", "price"])["size"].sum().reset_index()
            )
            aggregated["15min_bin"] = row[
                "15min_bin"
            ]  # Вставляем текущий интервал времени
            results.append(aggregated)

        return pd.concat(results, ignore_index=True) if results else pd.DataFrame()

    grouped = df.map_partitions(
        aggregate_orders,
        meta={
            "side": "object",
            "price": "float64",
            "size": "float64",
            "15min_bin": "datetime64[ns]",
        },
    )

    # Группировка итогов по интервалам времени
    summary = (
        grouped.groupby(["15min_bin", "side", "price"])["size"].sum().reset_index()
    )
    return summary


# Применяем обработку
summary_15min = process_orders_15min(df)

# Выполняем вычисления
result = summary_15min.compute()

# Вывод результата
print("Summary every 15 minutes:")
print(result)


Запись в файл (если необходимо)

result.to_csv(name)