In [12]:
import requests
import pandas as pd
from datetime import datetime, timedelta
import time
import pytz

BACKTEST_DATA_FOLDER = "backtest_data"


def get_binance_klines(symbol, interval, start_time, end_time):
    url = "https://api.binance.com/api/v3/klines"
    params = {
        "symbol": symbol,
        "interval": interval,
        "startTime": int(start_time.timestamp() * 1000),
        "endTime": int(end_time.timestamp() * 1000),
        "limit": 1000,
    }
    response = requests.get(url, params=params)
    data = response.json()
    df = pd.DataFrame(
        data,
        columns=[
            "timestamp",
            "open",
            "high",
            "low",
            "close",
            "volume",
            "close_time",
            "quote_asset_volume",
            "number_of_trades",
            "taker_buy_base_asset_volume",
            "taker_buy_quote_asset_volume",
            "ignore",
        ],
    )
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
    df["timestamp"] = df["timestamp"].dt.tz_convert("Asia/Tokyo")  # UTCからJSTに変換
    return df[["timestamp", "open", "high", "low", "close", "volume"]]


def get_historical_data(symbol, interval, start_date, end_date):
    all_data = []
    current_date = start_date
    while current_date < end_date:
        next_date = min(current_date + timedelta(days=1), end_date)
        print(f"Fetching data from {current_date} to {next_date}")
        df = get_binance_klines(symbol, interval, current_date, next_date)
        if df.empty:
            print(f"No data found for {symbol} in the specified time range.")
            exit()
        all_data.append(df)
        current_date = next_date
        time.sleep(1)  # APIレート制限を考慮
    return pd.concat(all_data)


# パラメータ設定
symbol = "SOLUSDT"
interval = "5m"
end_date = datetime(
    2024, 8, 11, tzinfo=pytz.timezone("Asia/Tokyo")
)  # 固定の終了日（JST）
start_date = end_date - timedelta(days=180)

# データ取得
historical_data = get_historical_data(symbol, interval, start_date, end_date)

# CSVに保存
historical_data.to_csv(
    f"{BACKTEST_DATA_FOLDER}/{symbol}_{interval}_{start_date.date()}_{end_date.date()}_JST.csv", index=False
)

print(f"Total rows: {len(historical_data)}")

Fetching data from 2024-02-13 00:00:00+09:19 to 2024-02-14 00:00:00+09:19
Fetching data from 2024-02-14 00:00:00+09:19 to 2024-02-15 00:00:00+09:19
Fetching data from 2024-02-15 00:00:00+09:19 to 2024-02-16 00:00:00+09:19
Fetching data from 2024-02-16 00:00:00+09:19 to 2024-02-17 00:00:00+09:19
Fetching data from 2024-02-17 00:00:00+09:19 to 2024-02-18 00:00:00+09:19
Fetching data from 2024-02-18 00:00:00+09:19 to 2024-02-19 00:00:00+09:19
Fetching data from 2024-02-19 00:00:00+09:19 to 2024-02-20 00:00:00+09:19
Fetching data from 2024-02-20 00:00:00+09:19 to 2024-02-21 00:00:00+09:19
Fetching data from 2024-02-21 00:00:00+09:19 to 2024-02-22 00:00:00+09:19
Fetching data from 2024-02-22 00:00:00+09:19 to 2024-02-23 00:00:00+09:19
Fetching data from 2024-02-23 00:00:00+09:19 to 2024-02-24 00:00:00+09:19
Fetching data from 2024-02-24 00:00:00+09:19 to 2024-02-25 00:00:00+09:19
Fetching data from 2024-02-25 00:00:00+09:19 to 2024-02-26 00:00:00+09:19
Fetching data from 2024-02-26 00:00:00

In [33]:
historical_data = pd.read_csv("backtest_data/SOLUSDT_5m_2024-02-13_2024-08-11_JST.csv")

In [28]:
historical_data["timestamp"] = pd.to_datetime(historical_data["timestamp"])
historical_data = historical_data[historical_data.timestamp.dt.date > pd.to_datetime("2024-07-21").date()]

In [39]:
import pandas as pd
import numpy as np
from src.utils.indicators import calculate_rsi, calculate_bollinger_bands


def calculate_performance(trades):
    if not trades:
        return 0, 0, 0, 0
    total_return = sum(trades)
    avg_return = total_return / len(trades)
    win_rate = sum(1 for t in trades if t > 0) / len(trades)
    returns = np.array(trades)
    sharpe_ratio = (returns.mean() / returns.std()) * np.sqrt(252)  # Annualized
    return total_return, avg_return, win_rate, sharpe_ratio


def prepare_data(df):
    # Reset index to ensure it's unique
    df = df.reset_index(drop=True)

    # Convert relevant columns to numeric types
    numeric_columns = ["open", "high", "low", "close", "volume"]
    for col in numeric_columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # Remove existing RSI column if it exists
    if "rsi" in df.columns:
        df = df.drop("rsi", axis=1)

    # Drop any rows with NaN values after conversion
    df = df.dropna(subset=numeric_columns)

    return df


def backtest_rsi_bollinger_strategy(
    df, rsi_period=14, bb_period=20, bb_std=2, rsi_oversold=35, rsi_overbought=70
):
    # Prepare the data
    df = prepare_data(df)

    # Calculate indicators
    df["rsi"] = calculate_rsi(df["close"].values, period=rsi_period)

    # Calculate Bollinger Bands
    bb_upper, bb_middle, bb_lower = calculate_bollinger_bands(
        df["close"].values, period=bb_period, num_std_dev=bb_std
    )

    # Add Bollinger Bands to the DataFrame, aligning with the index
    df["bb_upper"] = pd.Series(bb_upper, index=df.index[: len(bb_upper)])
    df["bb_middle"] = pd.Series(bb_middle, index=df.index[: len(bb_middle)])
    df["bb_lower"] = pd.Series(bb_lower, index=df.index[: len(bb_lower)])

    # Drop rows with NaN values
    df = df.dropna()

    # Initialize variables
    position = None
    entry_price = 0
    trades = []

    for i in range(len(df)):
        if i < max(rsi_period, bb_period):
            continue  # Skip until we have enough data for indicators

        row = df.iloc[i]
        if position is None:
            if row["rsi"] < rsi_oversold and row["close"] < row["bb_lower"]:
                position = "long"
                entry_price = row["close"]
        elif position == "long":
            if row["rsi"] > rsi_overbought or row["close"] > row["bb_upper"]:
                profit = (row["close"] - entry_price) / entry_price
                trades.append(profit)
                position = None

    return trades


# Use the function
trades = backtest_rsi_bollinger_strategy(historical_data)

# Calculate performance metrics
total_return, avg_return, win_rate, sharpe_ratio = calculate_performance(trades)

print(historical_data.describe())

# トレード詳細の表示
print("\nFirst 10 trades:")
print(trades[:10])
print("\nLast 10 trades:")
print(trades[-10:])

print(f"\nBacktesting Results:")
print(f"Total Return: {total_return:.2%}")
print(f"Average Trade Return: {avg_return:.2%}")
print(f"Win Rate: {win_rate:.2%}")
print(f"Sharpe Ratio: {sharpe_ratio:.2f}")
print(f"Number of Trades: {len(trades)}")

               open          high           low         close         volume
count  51840.000000  51840.000000  51840.000000  51840.000000   51840.000000
mean     152.750731    153.058877    152.434954    152.751953   17452.659700
std       22.944563     22.978588     22.912077     22.943828   22053.676687
min       98.570000     98.850000     98.480000     98.570000     379.726000
25%      138.140000    138.470000    137.830000    138.140000    5977.100250
50%      149.870000    150.200000    149.500000    149.870000   10547.949000
75%      171.650000    171.950000    171.302500    171.650000   20292.988500
max      209.740000    210.180000    208.970000    209.750000  796188.610000

First 10 trades:
[np.float64(0.017722640673460348), np.float64(-0.0015040254799610873), np.float64(0.013197619252997384), np.float64(0.0157679240351946), np.float64(0.015124712542013035), np.float64(0.02374977594551), np.float64(0.009431395665185511), np.float64(0.01770564367392108), np.float64(0.01590219

In [10]:
print(historical_data.info())
print(historical_data.head())
print(historical_data.index.is_unique)

<class 'pandas.core.frame.DataFrame'>
Index: 51840 entries, 0 to 287
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype                     
---  ------     --------------  -----                     
 0   timestamp  51840 non-null  datetime64[ns, Asia/Tokyo]
 1   open       51840 non-null  object                    
 2   high       51840 non-null  object                    
 3   low        51840 non-null  object                    
 4   close      51840 non-null  float64                   
 5   volume     51840 non-null  object                    
 6   rsi        51840 non-null  float64                   
dtypes: datetime64[ns, Asia/Tokyo](1), float64(2), object(4)
memory usage: 3.2+ MB
None
                  timestamp            open            high             low  \
0 2024-02-12 23:45:00+09:00  48556.30000000  48719.17000000  48455.51000000   
1 2024-02-12 23:50:00+09:00  48606.20000000  48663.20000000  48520.00000000   
2 2024-02-12 23:55:00+09:00  48623.66000000

In [25]:
import yaml
import os

# from src.utils.encryption import decrypt_file
from dotenv import load_dotenv

load_dotenv()


class ConfigLoader:
    def __init__(self):
        self.config = None
        self.secrets = None
        self.config_mtime = 0
        self.secrets_mtime = 0

    def load_config(self):
        current_mtime = os.path.getmtime("config/config.yaml")
        if current_mtime > self.config_mtime:
            with open("config/config.yaml", "r") as file:
                self.config = yaml.safe_load(file)
            self.config_mtime = current_mtime
        return self.config

    def load_secrets(self):
        secrets_path = os.path.join("config", "secrets.yaml")
        with open(secrets_path, "r") as file:
            self.secrets = yaml.safe_load(file)
        return self.secrets

In [26]:
config_loader = ConfigLoader()
config = config_loader.load_config()
secrets = config_loader.load_secrets()
print(config)
print(secrets)

{'trading': {'pair': 'BTCUSDT', 'amount': 0.001, 'interval_seconds': 300, 'rsi_period': 14, 'bollinger_period': 20, 'bollinger_std_dev': 2}}
{'bybit': {'api_key': 'Jg22qo1LSji45UIsjA', 'api_secret': '27OvNMgl3twzyRbcytP9lw4vpWiOWULQ1Z5p'}}


In [69]:
import requests


def get_real_time_price(symbol):
    url = f"https://api.binance.com/api/v3/ticker/price"
    params = {"symbol": symbol}
    response = requests.get(url, params=params)
    data = response.json()
    return data["price"]


# 使用例
symbol = "BTCUSDT"
price = get_real_time_price(symbol)
print(f"The current price of {symbol} is: {price}")

The current price of BTCUSDT is: 58597.24000000


In [72]:
import websocket
import json


def on_message(ws, message):
    data = json.loads(message)
    print(f"The current price of {data['s']} is: {data['c']}")


def on_error(ws, error):
    print(error)


def on_close(ws):
    print("### closed ###")


def on_open(ws):
    # Subscribe to the ticker for a specific symbol
    ws.send(json.dumps({"method": "SUBSCRIBE", "params": ["btcusdt@ticker"], "id": 1}))


if __name__ == "__main__":
    websocket_url = "wss://stream.binance.com:9443/ws"
    ws = websocket.WebSocketApp(
        websocket_url,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )
    ws.run_forever()

's'
The current price of BTCUSDT is: 58563.86000000
The current price of BTCUSDT is: 58563.86000000
The current price of BTCUSDT is: 58563.85000000
The current price of BTCUSDT is: 58563.86000000
The current price of BTCUSDT is: 58556.00000000
The current price of BTCUSDT is: 58556.00000000
The current price of BTCUSDT is: 58547.59000000
The current price of BTCUSDT is: 58544.98000000

on_close() takes 1 positional argument but 3 were given


In [68]:
import time


def get_real_time_price_bybit(symbol):
    url = f"https://api.bybit.com/v2/public/tickers"
    params = {"symbol": symbol}

    for attempt in range(5):  # 最大5回リトライ
        response = requests.get(url, params=params)

        # ステータスコードを確認
        if response.status_code == 200:
            try:
                data = response.json()
                if data["result"]:
                    return data["result"][0]["last_price"]
                else:
                    raise Exception("No result found in response")
            except ValueError:
                raise Exception(f"Error decoding JSON response: {response.text}")
        elif response.status_code == 429:
            print("Too many requests. Waiting before retrying...")
            time.sleep(5)  # 5秒待機
        else:
            raise Exception(
                f"Error fetching data: {response.status_code} - {response.text}"
            )

    raise Exception("Max retries exceeded")


# 使用例
symbol = "BTCUSDT"
price = get_real_time_price_bybit(symbol)
print(f"The current price of {symbol} on Bybit is: {price}")

Too many requests. Waiting before retrying...
Too many requests. Waiting before retrying...
Too many requests. Waiting before retrying...
Too many requests. Waiting before retrying...
Too many requests. Waiting before retrying...


Exception: Max retries exceeded

In [None]:
async def main():
    klines = await api.get_klines("BTCUSDT", "5", 100)
    latest_kline = klines["result"]["list"][-1]
    latest_timestamp = latest_kline[0]
    latest_open = latest_kline[1]
    latest_high = latest_kline[2]
    latest_low = latest_kline[3]
    latest_close = latest_kline[4]
    latest_volume = latest_kline[5]
    print("Latest Kline Data:")
    print(f"Timestamp: {latest_timestamp}")
    print(f"Open Price: {latest_open}")
    print(f"High Price: {latest_high}")
    print(f"Low Price: {latest_low}")
    print(f"Close Price: {latest_close}")
    print(f"Volume: {latest_volume}")