In [101]:
# strategy.py

import requests
import pandas as pd
import time
from datetime import datetime
import numpy as np

def strategy(df, config_dict):
    
    data_url = "https://crypto.fin.cloud.ainode.ai/a71eaf04-802f-40be-93c2-5bee2548f4db/get/info/coin"
    response = requests.get(data_url)
    data = response.json()

    symbols = []
    for item in data['data']:
        symbols.append(item['coin_nm'] + 'USDT')

    def get_bitget_history_ohlcv(symbols, days_to_fetch, granularity='1D', product_type='umcbl'):
        """
        Bitget API를 사용하여 여러 종목의 일봉 OHLCV 데이터를 가져옵니다.
        API의 200개 제한을 해결하기 위한 페이지네이션 로직을 포함합니다.

        Args:
            symbols (list): 데이터를 가져올 심볼 리스트 (예: ['BTCUSDT', 'ETHUSDT'])
            days_to_fetch (int): 가져올 데이터의 날짜 수 (예: 500)
            granularity (str): 캔들 주기 ('1D'는 일봉)
            product_type (str): 상품 타입 ('umcbl'은 USDT 선물)

        Returns:
            pandas.DataFrame: MultiIndex 컬럼을 가진 OHLCV 데이터프레임.
                            (레벨 0: 심볼, 레벨 1: o, h, l, c, v 등)
        """
        base_url = "https://api.bitget.com"
        endpoint = "/api/v2/mix/market/history-candles"
        url = base_url + endpoint

        all_symbols_data = []

        for symbol in symbols:
            
            all_candles = []
            # Bitget은 endTime을 기준으로 과거 데이터를 조회하므로, 현재 시간부터 시작
            end_time_ms = int(datetime.now().timestamp() * 1000)

            while len(all_candles) < days_to_fetch:
                params = {
                    'symbol': symbol,
                    'granularity': granularity,
                    'productType': product_type,
                    'limit': 200,  # API 최대 요청 개수
                    'endTime': end_time_ms
                }

                try:
                    response = requests.get(url, params=params)
                    response.raise_for_status()  # HTTP 에러 발생 시 예외 처리
                    data = response.json()

                    if data.get('code') != '00000':
                        # print(f"[{symbol}] API 오류: {data.get('msg')}")
                        break

                    candles = data.get('data', [])
                    if not candles:
                        # print(f"[{symbol}] 더 이상 데이터가 없습니다.")
                        break
                    
                    # 중복 저장을 막기 위해 기존 데이터와 겹치지 않는 부분만 추가
                    new_candles = [c for c in candles if c not in all_candles]
                    all_candles.extend(new_candles)

                    # 가장 오래된 데이터의 타임스탬프를 다음 요청의 endTime으로 설정
                    oldest_ts = int(candles[0][0])
                    end_time_ms = oldest_ts - 1 # 1ms를 빼서 중복 조회 방지
                    
                    # API 속도 제한을 피하기 위한 약간의 대기 시간
                    time.sleep(0.2) 

                except requests.exceptions.RequestException as e:
                    break
            
            if not all_candles:
                continue
            
            # 수집된 데이터를 DataFrame으로 변환
            df = pd.DataFrame(all_candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'base_volume', 'volume'])
            
            # 필요한 컬럼만 선택하고 데이터 타입 변환
            df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
            df = df.astype(float)
            
            # 타임스탬프를 날짜/시간 인덱스로 변환
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df.set_index('timestamp', inplace=True)
            
            # 전략에서 요구하는 MultiIndex 컬럼 형식으로 변경
            df.columns = pd.MultiIndex.from_product([[symbol], df.columns])
            
            all_symbols_data.append(df)
        
        if not all_symbols_data:
            return None

        # 모든 종목의 데이터프레임을 하나로 병합
        final_df = pd.concat(all_symbols_data, axis=1)
        
        # 데이터를 시간순으로 정렬하고 요청한 날짜만큼만 반환
        final_df.sort_index(inplace=True)
        return final_df.tail(days_to_fetch)
    
    target_symbols = symbols
    # target_symbols = ['BTCUSDT', 'ETHUSDT', 'SOLUSDT', 'XRPUSDT', 'DOGEUSDT']

    days = 60
    df = get_bitget_history_ohlcv(symbols=target_symbols, days_to_fetch=days)

    """
    Expected df format:
    - Index: datetime
    - Columns: pandas MultiIndex with level 0 as symbol (e.g., 'BTCUSDT') and
               level 1 as metric (e.g., 'close', 'volume').
    """
    # --- Input validation ---
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input 'df' must be a pandas DataFrame.")
    if df.empty:
        raise ValueError("Input DataFrame 'df' is empty.")
    if not isinstance(config_dict, dict):
        raise TypeError("'config_dict' must be a dictionary.")
    
    if not isinstance(df.columns, pd.MultiIndex):
        raise ValueError("Input DataFrame 'df' must have MultiIndex columns.")

    # --- Load strategy-specific config ---
    strategy_specific_config = config_dict.get("strategy_config", {})
    median_lookback = strategy_specific_config.get("median_lookback", 60)
    std_lookback = strategy_specific_config.get("std_lookback", 60)
    volume_z_lookback = strategy_specific_config.get("volume_z_lookback", 20)
    volume_z_threshold = strategy_specific_config.get("volume_z_threshold", 1.5)
    long_percentile = strategy_specific_config.get("long_percentile", 0.01)
    short_percentile = strategy_specific_config.get("short_percentile", 0.01)
    vol_lookback = strategy_specific_config.get("vol_lookback", 30)
    vol_target_scaler = strategy_specific_config.get("vol_target_scaler", 0.0005)
    weight_clip = strategy_specific_config.get("weight_clip", 0.1)
    dd_lookback = strategy_specific_config.get("dd_lookback", 5)
    dd_threshold = strategy_specific_config.get("dd_threshold", 0.02)


    # --- Signal Calculation ---
    symbols = df.columns.get_level_values(0).unique()
    final_z_scores = {}

    for symbol in symbols:
        try:
            close_prices = df[(symbol, 'close')].ffill()
            total_volume = df[(symbol, 'volume')].ffill()
        except KeyError:
            continue
        

        min_required_data = max(median_lookback, std_lookback, vol_lookback, volume_z_lookback)
        if len(close_prices.dropna()) < min_required_data or len(total_volume.dropna()) < min_required_data:
            continue

        # 1. Price-based mean-reversion core
        median_price = close_prices.rolling(window=median_lookback).median()
        std_price = close_prices.rolling(window=std_lookback).std().replace(0, np.nan)
        z_price = (close_prices - median_price) / std_price

        # 2. Volume gate
        mean_volume = total_volume.rolling(window=volume_z_lookback).mean()
        std_volume = total_volume.rolling(window=volume_z_lookback).std().replace(0, np.nan)
        z_volume = (total_volume - mean_volume) / std_volume
        
        gated_z_price = z_price.where(z_volume.abs() < volume_z_threshold, 0)
        
        if pd.notna(gated_z_price.iloc[-1]):
            final_z_scores[symbol] = gated_z_price.iloc[-1]
            
    if not final_z_scores:
        return {}
        
    # --- Cross-sectional ranking and selection ---
    z_series = pd.Series(final_z_scores).dropna()
    if z_series.empty:
        return {}

    ranks = z_series.rank(pct=True)
    raw_weights = 0.5 - ranks

    long_candidates = raw_weights[ranks <= long_percentile]
    short_candidates = raw_weights[ranks >= (1.0 - short_percentile)]
    selected_raw_weights = pd.concat([long_candidates, short_candidates])
    
    if selected_raw_weights.empty:
        return {}

    # --- Volatility-targeted sizing & Risk Caps ---
    final_weights = {}
    for symbol, raw_weight in selected_raw_weights.items():
        close_prices = df[(symbol, 'close')].ffill()
        daily_returns = close_prices.pct_change(1)
        
        vol_30 = daily_returns.rolling(window=vol_lookback).std().iloc[-1]
        vol_30_safe = max(vol_30, 1e-6) if pd.notna(vol_30) else 1e-6
        
        vol_scale = vol_target_scaler / vol_30_safe
        scaled_weight = raw_weight * vol_scale
        
        final_weights[symbol] = np.clip(scaled_weight, -weight_clip, weight_clip)

    if not final_weights:
        return {}

    # --- Drawdown Guard (Stateless Proxy) ---
    recent_returns = df.xs('close', axis=1, level=1).pct_change().iloc[-(dd_lookback):]
    
    if not recent_returns.empty:
        simulated_pnl = pd.Series(0.0, index=recent_returns.index)
        for symbol, weight in final_weights.items():
            if symbol in recent_returns.columns:
                simulated_pnl += recent_returns[symbol].fillna(0) * weight
        
        equity_curve = (1 + simulated_pnl).cumprod()
        rolling_max = equity_curve.cummax()
        drawdown = (equity_curve - rolling_max) / rolling_max
        
        if not drawdown.empty and drawdown.min() < -dd_threshold:
            for symbol in final_weights:
                final_weights[symbol] /= 2.0

    # --- Final Normalization ---
    total_abs_weight = sum(abs(w) for w in final_weights.values())
    if total_abs_weight > 1.0:
        for symbol, weight in final_weights.items():
            final_weights[symbol] = weight / total_abs_weight
            
    return final_weights

In [102]:
# strategy_config.py

strategy_config = {
    # 1. Price-based mean-reversion core
    "median_lookback": 60,       # 롤링 중간값 계산 기간
    "std_lookback": 60,          # 롤링 표준편차 계산 기간

    # 2. Volume Gate (거래량 필터)
    "volume_z_lookback": 20,       # 총 거래량 z-score 계산 기간
    "volume_z_threshold": 1.5,     # z-score 필터링 임계값

    # 3. Cross-sectional ranking and selection (교차 순위 선정)
    "long_percentile": 0.01,     # 롱 포지션 진입을 위한 하위 순위 백분율
    "short_percentile": 0.01,    # 숏 포지션 진입을 위한 상위 순위 백분율

    # 4. Volatility-targeted sizing (변동성 기반 비중 조절)
    "vol_lookback": 30,          # 수익률 변동성 계산 기간
    "vol_target_scaler": 0.0005, # 목표 변동성에 맞추기 위한 스케일러

    # 5. Risk caps and turnover control (리스크 관리)
    "weight_clip": 0.1,        # 개별 자산의 최대 비중 (0.5%)

    # 5b. Draw-down guard (손실 제한)
    "dd_lookback": 5,            # 손실률 계산 기간
    "dd_threshold": 0.02         # 손실 제한 발동 임계값 (2%)
}

In [103]:
weights = strategy(pd.DataFrame(), strategy_config)
weights

{'AUCTIONUSDT': np.float64(0.008237956262284397),
 'OXTUSDT': np.float64(0.007616959068619726),
 'SNXUSDT': np.float64(-0.0031540954184424387),
 'XAUTUSDT': np.float64(-0.03909232803415366),
 'IMXUSDT': np.float64(-0.003868639430186473)}

In [108]:
import numpy as np

# 입력 데이터 딕셔너리
data = weights

# 1. 모든 값의 절대값의 합계 계산
total_abs_sum = sum(abs(value) for value in data.values())

# 2. 각 값을 절대값의 합계로 나누어 정규화
normalized_data = {key: value / total_abs_sum for key, value in data.items()}

# 결과 출력
print("정규화된 딕셔너리:")
print(normalized_data)

# (검증) 정규화된 값들의 절대값 합계가 1인지 확인
verification_sum = sum(abs(value) for value in normalized_data.values())
print("\n정규화된 값들의 절대값 합계:")
print(verification_sum)

정규화된 딕셔너리:
{'AUCTIONUSDT': np.float64(0.13293463221623275), 'OXTUSDT': np.float64(0.12291369608610647), 'SNXUSDT': np.float64(-0.0508971522882644), 'XAUTUSDT': np.float64(-0.6308268803864082), 'IMXUSDT': np.float64(-0.062427639022988146)}

정규화된 값들의 절대값 합계:
1.0


In [107]:
weights

{'AUCTIONUSDT': np.float64(0.008237956262284397),
 'OXTUSDT': np.float64(0.007616959068619726),
 'SNXUSDT': np.float64(-0.0031540954184424387),
 'XAUTUSDT': np.float64(-0.03909232803415366),
 'IMXUSDT': np.float64(-0.003868639430186473)}

In [104]:
list(weights.keys())

['AUCTIONUSDT', 'OXTUSDT', 'SNXUSDT', 'XAUTUSDT', 'IMXUSDT']

In [132]:
# strategy.py
import pandas as pd
import numpy as np
from module.data_context import DataContext # Import the necessary module

def strategy(context: DataContext, config_dict: dict) -> dict:
    """
    Implements the Micro-structure Volatility Anomaly strategy with position limits.

    This version correctly fetches historical data using the DataContext module before
    applying the trading logic.
    - It fetches data for a pre-defined asset list.
    - It selects the top N long and top N short candidates based on the signal.
    - It shorts assets with unusually high recent volatility (mean-reversion).
    - It longs assets with unusually low recent volatility.
    """
    # --- 1. Configuration Extraction ---
    strategy_params = config_dict.get("strategy_config", {})
    assets = strategy_params.get("assets", [])
    short_vol_window = strategy_params.get("short_vol_window", 20)
    long_vol_window = strategy_params.get("long_vol_window", 60)
    clip_threshold = strategy_params.get("clip_threshold", 2.0)
    max_positions = strategy_params.get("max_positions", 6)

    if not assets:
        return {} # Exit if no assets are specified

    # --- 2. Data Fetching ---
    # The total lookback period required is the sum of the two volatility windows.
    # We add a small buffer to ensure enough data for rolling calculations.
    total_lookback = short_vol_window + long_vol_window + 5
    
    hist = context.get_history(
        assets=assets,
        window=total_lookback,
        frequency="1m",
        fields=["close"]
    )

    # Exit if no historical data is returned
    if hist.empty:
        return {}

    # Pivot the data to have datetime as index and assets as columns
    df = hist["close"].unstack(level=0)
    
    # Ensure all required assets are present after unstacking
    tradable_assets = [asset for asset in assets if asset in df.columns]
    if not tradable_assets:
        return {}
    df_filtered = df[tradable_assets]


    # --- 3. Strategy Logic ---
    # Step 1: Compute 1-minute percentage returns.
    returns = df_filtered.pct_change(1)

    # Step 2: Calculate short-term realized volatility.
    vol_short = returns.rolling(window=short_vol_window, min_periods=short_vol_window).std()

    # Step 3: Calculate the medium-term volatility benchmark.
    vol_long = vol_short.rolling(window=long_vol_window, min_periods=long_vol_window).mean()

    # Step 4: Form a volatility-relative signal (z-score like measure).
    epsilon = 1e-10
    zvol = (vol_short - vol_long) / (vol_long + epsilon)

    # Step 5: Generate the final trading signal (inverted for mean-reversion).
    signal = -zvol

    # Step 6: Clip the raw signal.
    signal_clipped = signal.clip(-clip_threshold, clip_threshold)

    # --- 4. Position Selection ---
    latest_signal = signal_clipped.iloc[-1].dropna()

    if latest_signal.empty:
        return {}

    # Sort signals to find the strongest long (positive) and short (negative) candidates.
    sorted_signals = latest_signal.sort_values(ascending=False)
    
    num_longs = max_positions // 2
    num_shorts = max_positions - num_longs
    
    long_candidates = sorted_signals[sorted_signals > 0].head(num_longs)
    short_candidates = sorted_signals[sorted_signals < 0].tail(num_shorts)
    
    final_signals = pd.concat([long_candidates, short_candidates])

    if final_signals.empty:
        return {}

    # --- 5. Weight Allocation ---
    total_abs_signal = np.abs(final_signals).sum()

    if total_abs_signal > 0:
        weights = final_signals / total_abs_signal
    else:
        weights = final_signals * 0

    return weights.to_dict()

In [133]:
# strategy_config.py

strategy_config = {
    "assets": [ "OXTUSDT", "XRPUSDT", "BCHUSDT", "LTCUSDT", "AUCTIONUSDT",
               "ADAUSDT", "ETCUSDT", "TRXUSDT", "DOTUSDT", "DOGEUSDT"],
    # Defines the lookback period (in minutes) for calculating short-term volatility.
    # This parameter captures the most recent price fluctuation intensity.
    "short_vol_window": 20,

    # Defines the lookback period (in minutes) for the medium-term volatility benchmark.
    # This acts as the baseline to which the short-term volatility is compared.
    "long_vol_window": 60,

    # Sets the maximum absolute value for the raw signal before normalization.
    # This helps to mitigate the impact of extreme volatility spikes on position sizing.
    "clip_threshold": 2.0
}

In [134]:
# Create Fake module
import sys
import types
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import requests
import pandas as pd

# 1. Create a fake module called 'module' and register it with the system.
module_obj = types.ModuleType('module')
sys.modules['module'] = module_obj

# 2. Create a fake empty module called 'module.data_context' and register it with the system.
data_context_module = types.ModuleType('module.data_context')
sys.modules['module.data_context'] = data_context_module

# 3. Define the DataContext class required by the strategy.
class DataContext(ABC):
    @property
    @abstractmethod
    def current_dt(self) -> datetime: pass
    @abstractmethod
    def get_history(self, assets: list, window: int, frequency: str, fields: str or list = 'close') -> pd.DataFrame: pass

# 4. Place the defined DataContext class inside the fake 'module.data_context' module.
data_context_module.DataContext = DataContext

print("✅ Fake 'module.data_context' module and DataContext class created successfully.")

# 1. Import example strategy, config
# from futures.anomarly_vol.anomarly_vol import strategy     # Replace with your own strategy file
import futures.anomarly_vol.anomarly_vol_config as config  # Replace with your own config file

# 2. Prepare mock objects and data required for testing.
class MockDataContext(DataContext):
    def __init__(self, fake_data):
        self._fake_data = fake_data

    @property
    def current_dt(self) -> datetime:
        return datetime.now()

    def get_history(self, assets, window, frequency, fields):
        return self._fake_data[fields]

def create_fake_data(assets, num_periods):
    dates = pd.date_range(end=datetime.now(), periods=num_periods, freq='T')
    all_dfs = []
    for i, asset in enumerate(assets):
        trend = 1 + (i - len(assets) / 2) * 0.0001
        prices = [100 * (trend ** j) for j in range(num_periods)]
        df = pd.DataFrame({'close': prices}, index=dates)
        df['asset'] = asset
        all_dfs.append(df)
    df_concat = pd.concat(all_dfs).reset_index().rename(columns={'index': 'datetime'})
    df_final = df_concat.set_index(['asset', 'datetime'])
    display(df_final)
    return df_final

# 3. Run verify test
test_config = {
    "strategy_config": config.strategy_config
}
fake_data = create_fake_data(
    assets=test_config["strategy_config"]["assets"], num_periods=1440)
mock_context = MockDataContext(fake_data)

print("\n🚀 Starting strategy function testing...")
weights = strategy(context=mock_context, config_dict=test_config)
print("✅ Strategy function execution complete!")
print("\n[Final weighted result]")
weights

✅ Fake 'module.data_context' module and DataContext class created successfully.


  dates = pd.date_range(end=datetime.now(), periods=num_periods, freq='T')


Unnamed: 0_level_0,Unnamed: 1_level_0,close
asset,datetime,Unnamed: 2_level_1
OXTUSDT,2025-09-28 17:43:02.183134,100.000000
OXTUSDT,2025-09-28 17:44:02.183134,99.950000
OXTUSDT,2025-09-28 17:45:02.183134,99.900025
OXTUSDT,2025-09-28 17:46:02.183134,99.850075
OXTUSDT,2025-09-28 17:47:02.183134,99.800150
...,...,...
DOGEUSDT,2025-09-29 17:38:02.183134,177.515054
DOGEUSDT,2025-09-29 17:39:02.183134,177.586060
DOGEUSDT,2025-09-29 17:40:02.183134,177.657094
DOGEUSDT,2025-09-29 17:41:02.183134,177.728157



🚀 Starting strategy function testing...
✅ Strategy function execution complete!

[Final weighted result]


{'AUCTIONUSDT': 0.24739267395226772,
 'LTCUSDT': 0.16221299802390304,
 'OXTUSDT': 0.1431721793685802,
 'DOTUSDT': -0.05556640486227585,
 'XRPUSDT': -0.06046527983979275,
 'BCHUSDT': -0.3311904639531804}

In [126]:
fake_data

Unnamed: 0_level_0,Unnamed: 1_level_0,close
asset,datetime,Unnamed: 2_level_1
OXTUSDT,2025-09-28 17:36:13.251379,100.000000
OXTUSDT,2025-09-28 17:37:13.251379,99.950000
OXTUSDT,2025-09-28 17:38:13.251379,99.900025
OXTUSDT,2025-09-28 17:39:13.251379,99.850075
OXTUSDT,2025-09-28 17:40:13.251379,99.800150
...,...,...
DOGEUSDT,2025-09-29 17:31:13.251379,177.515054
DOGEUSDT,2025-09-29 17:32:13.251379,177.586060
DOGEUSDT,2025-09-29 17:33:13.251379,177.657094
DOGEUSDT,2025-09-29 17:34:13.251379,177.728157
