# Bybit 1m Candles DB QA Notebook

This notebook helps you validate that **1-minute candles** are being ingested into PostgreSQL correctly.

It provides:
- **Async** DB access (SQLAlchemy Async)
- Per-symbol **counts** of 1m candles in the last **5 days**
- A completeness check (expected **7200** candles per symbol over a 5-day window)
- A helper to list **missing minute timestamps** for a specific symbol
- A Plotly **candlestick chart** for any symbol

## Preconditions
- Your `.env` (or environment) must provide `DATABASE_URL` in async SQLAlchemy format, e.g.  
  `postgresql+asyncpg://user:pass@host:5432/dbname`
- Table names expected:
  - `tokens` with `bybit_symbol` and `is_active`
  - `candles_1m` with columns: `symbol`, `timestamp`, `open`, `high`, `low`, `close`, `volume`, `turnover`, `is_confirmed`


In [1]:
# !pip install pandas

In [2]:
import os
import asyncio
from datetime import datetime, timedelta, timezone

import pandas as pd
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text

# Optional: load .env if you run locally
try:
    from dotenv import load_dotenv
    load_dotenv()
except Exception:
    pass

DATABASE_URL = os.getenv("DATABASE_URL", "").strip()
if not DATABASE_URL:
    raise RuntimeError("DATABASE_URL is not set. Put it in your environment or .env file.")

engine = create_async_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

def utc_floor_minute(dt: datetime) -> datetime:
    dt = dt.astimezone(timezone.utc)
    return dt.replace(second=0, microsecond=0)

print("DB URL loaded (masked):", DATABASE_URL.split('@')[0] + '@…')


DB URL loaded (masked): postgresql+asyncpg://postgres:rWg3_0XFt1@…


## Window definition

To make the check deterministic and avoid partial minutes:
- `end_exclusive` = start of the **current** UTC minute
- `start_inclusive` = `end_exclusive - window_days`

So the interval is **[start_inclusive, end_exclusive)**.

For `window_days=5`, the expected number of 1-minute candles per symbol is:
`5 * 24 * 60 = 7200`


In [3]:
def compute_window(window_days: int = 5) -> tuple[datetime, datetime, int]:
    end_exclusive = utc_floor_minute(datetime.now(timezone.utc))
    start_inclusive = end_exclusive - timedelta(days=window_days)
    expected = int((end_exclusive - start_inclusive).total_seconds() // 60)
    return start_inclusive, end_exclusive, expected

start_inclusive, end_exclusive, expected = compute_window(5)
start_inclusive, end_exclusive, expected


(datetime.datetime(2025, 12, 16, 19, 0, tzinfo=datetime.timezone.utc),
 datetime.datetime(2025, 12, 21, 19, 0, tzinfo=datetime.timezone.utc),
 7200)

## 1) Per-symbol candle counts for last N days (and quick completeness estimate)

This query:
- pulls all **active** symbols from `tokens`
- counts candles in `candles_1m` inside the window
- computes `missing_estimate = expected - count`

If your `candles_1m` table enforces uniqueness on `(symbol, timestamp)` and stores minute-start timestamps,
then `count == expected` is a strong indicator there are **no gaps** in that window.


In [4]:
async def fetch_counts_per_symbol(window_days: int = 5) -> pd.DataFrame:
    start_inclusive, end_exclusive, expected = compute_window(window_days)

    sql = text("""
        WITH active AS (
            SELECT DISTINCT bybit_symbol AS symbol
            FROM tokens
            WHERE is_active = TRUE AND bybit_symbol IS NOT NULL AND bybit_symbol <> ''
        ),
        counts AS (
            SELECT c.symbol, COUNT(*)::bigint AS candle_count,
                   MIN(c.timestamp) AS first_ts,
                   MAX(c.timestamp) AS last_ts
            FROM candles_1m c
            WHERE c.timestamp >= :start_inclusive
              AND c.timestamp <  :end_exclusive
            GROUP BY c.symbol
        )
        SELECT a.symbol,
               COALESCE(cnt.candle_count, 0) AS candle_count,
               cnt.first_ts,
               cnt.last_ts
        FROM active a
        LEFT JOIN counts cnt ON cnt.symbol = a.symbol
        ORDER BY candle_count DESC, a.symbol ASC
    """)

    async with SessionLocal() as session:
        res = await session.execute(sql, {
            "start_inclusive": start_inclusive,
            "end_exclusive": end_exclusive,
        })
        rows = res.mappings().all()

    df = pd.DataFrame(rows)
    df["expected"] = expected
    df["missing_estimate"] = df["expected"] - df["candle_count"]
    df["is_complete"] = df["missing_estimate"] == 0

    # Helpful: sort by missing first
    df = df.sort_values(by=["missing_estimate", "candle_count", "symbol"], ascending=[False, True, True])
    return df

df_counts = await fetch_counts_per_symbol(5)
df_counts.tail(10)


Unnamed: 0,candle_count,first_ts,last_ts,symbol,expected,missing_estimate,is_complete
54,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,TUSDT,7200,0,True
55,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,TWTUSDT,7200,0,True
56,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,UBUSDT,7200,0,True
57,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,VELOUSDT,7200,0,True
58,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,VIRTUALUSDT,7200,0,True
59,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,WIFUSDT,7200,0,True
60,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,WUSDT,7200,0,True
61,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,ZBCNUSDT,7200,0,True
62,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,ZKUSDT,7200,0,True
63,7200,2025-12-16 19:00:00+00:00,2025-12-21 18:59:00+00:00,ZORAUSDT,7200,0,True


### Summary statistics

In [5]:
def summarize(df: pd.DataFrame) -> pd.DataFrame:
    total = len(df)
    complete = int(df["is_complete"].sum()) if total else 0
    incomplete = total - complete
    worst_missing = int(df["missing_estimate"].max()) if total else 0

    return pd.DataFrame([{
        "symbols_total": total,
        "symbols_complete": complete,
        "symbols_incomplete": incomplete,
        "worst_missing_minutes": worst_missing,
    }])

summarize(df_counts)


Unnamed: 0,symbols_total,symbols_complete,symbols_incomplete,worst_missing_minutes
0,207,64,143,7200


## 2) List exact missing minute timestamps for a specific symbol

For **one symbol**, we can generate the expected minute grid in SQL (`generate_series`) and left-join to `candles_1m` to find missing timestamps.

Use this only for investigation (not for thousands of symbols at once).


In [6]:
async def fetch_missing_minutes_for_symbol(symbol: str, window_days: int = 5, limit: int = 500) -> pd.DataFrame:
    start_inclusive, end_exclusive, _ = compute_window(window_days)

    sql = text("""
        WITH grid AS (
            SELECT generate_series(
                :start_inclusive::timestamptz,
                (:end_exclusive::timestamptz - interval '1 minute'),
                interval '1 minute'
            ) AS ts
        )
        SELECT g.ts AS missing_ts
        FROM grid g
        LEFT JOIN candles_1m c
               ON c.symbol = :symbol AND c.timestamp = g.ts
        WHERE c.timestamp IS NULL
        ORDER BY g.ts
        LIMIT :limit
    """)

    async with SessionLocal() as session:
        res = await session.execute(sql, {
            "symbol": symbol,
            "start_inclusive": start_inclusive,
            "end_exclusive": end_exclusive,
            "limit": int(limit),
        })
        rows = res.mappings().all()

    return pd.DataFrame(rows)

# Example:
# missing = asyncio.run(fetch_missing_minutes_for_symbol("BTCUSDT", 5, 200))
# missing.head(20)


## 3) Plotly candlestick chart for a symbol

This helper fetches candles for a time range and renders a Plotly candlestick chart.


In [9]:
import plotly.graph_objects as go

async def fetch_candles(symbol: str, start: datetime, end: datetime) -> pd.DataFrame:
    sql = text("""
        SELECT timestamp, open, high, low, close, volume, turnover, is_confirmed
        FROM candles_1m
        WHERE symbol = :symbol
          AND timestamp >= :start_ts
          AND timestamp <  :end_ts
        ORDER BY timestamp ASC
    """)

    async with SessionLocal() as session:
        res = await session.execute(sql, {
            "symbol": symbol,
            "start_ts": start,
            "end_ts": end,
        })
        rows = res.mappings().all()

    df = pd.DataFrame(rows)
    if not df.empty:
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
    return df

def plot_candles(df: pd.DataFrame, title: str = "") -> go.Figure:
    if df.empty:
        raise ValueError("No candles to plot for the specified range.")

    fig = go.Figure(
        data=[go.Candlestick(
            x=df["timestamp"],
            open=df["open"],
            high=df["high"],
            low=df["low"],
            close=df["close"],
            name="1m"
        )]
    )
    fig.update_layout(
        title=title or "Candles",
        xaxis_title="Time (UTC)",
        yaxis_title="Price",
        xaxis_rangeslider_visible=False,
        height=600,
    )
    return fig

symbol = "ETHUSDT"
end = utc_floor_minute(datetime.now(timezone.utc))
start = end - timedelta(hours=6)
df_btc = await fetch_candles(symbol, start, end)
fig = plot_candles(df_btc, f"{symbol} - last 6h (1m)")
fig.show()