In [None]:
import numpy as np
import pandas as pd
import requests
import time
from datetime import datetime, timedelta
from statsmodels.tsa.stattools import adfuller
from collections import deque
import warnings
warnings.filterwarnings('ignore')

class StationarityMonitor:
    def __init__(self, symbol='BTCUSDT', interval='1m', window_size=10):
        self.symbol = symbol
        self.interval = interval
        self.window_size = window_size  # размер окна в минутах
        
        # Очереди для хранения данных
        self.prices = deque(maxlen=window_size)
        self.timestamps = deque(maxlen=window_size)
        
        # История ADF статистик для отслеживания динамики
        self.adf_history = {
            'sma_deviation': deque(maxlen=50),
            'bollinger': deque(maxlen=50),
            'return_deviation': deque(maxlen=50)
        }
        
        # Параметры для стратегий
        self.sma_period = 50
        self.bb_period = 20
        self.bb_std = 2
        
        print(f"Инициализирован мониторинг стационарности для {symbol}")
        print(f"Размер окна: {window_size} минут")
        
    def get_klines_data(self, limit=100):
        """Получение данных свечей с Binance API"""
        url = "https://api.binance.com/api/v3/klines"
        params = {
            'symbol': self.symbol,
            'interval': self.interval,
            'limit': limit
        }
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()
            
            df = pd.DataFrame(data, columns=[
                'timestamp', 'open', 'high', 'low', 'close', 'volume',
                'close_time', 'quote_asset_volume', 'number_of_trades',
                'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
            ])
            
            # Конвертация типов
            numeric_columns = ['open', 'high', 'low', 'close', 'volume']
            for col in numeric_columns:
                df[col] = pd.to_numeric(df[col])
            
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            return df
            
        except Exception as e:
            print(f"Ошибка получения данных: {e}")
            return None
    
    def calculate_sma(self, prices, period):
        """Расчет Simple Moving Average"""
        if len(prices) < period:
            return None
        return sum(list(prices)[-period:]) / period
    
    def calculate_bollinger_bands(self, prices, period, std_dev):
        """Расчет Bollinger Bands"""
        if len(prices) < period:
            return None, None, None
        
        recent_prices = list(prices)[-period:]
        sma = sum(recent_prices) / period
        std = np.std(recent_prices)
        
        upper_band = sma + (std_dev * std)
        lower_band = sma - (std_dev * std)
        
        return sma, upper_band, lower_band
    
    def calculate_returns(self, prices):
        """Расчет доходности"""
        if len(prices) < 2:
            return []
        
        prices_list = list(prices)
        returns = []
        for i in range(1, len(prices_list)):
            ret = (prices_list[i] - prices_list[i-1]) / prices_list[i-1]
            returns.append(ret)
        
        return returns
    
    def perform_adf_test(self, series, test_name):
        """Выполнение ADF теста"""
        if len(series) < 10:  # Минимум данных для теста
            return None, None, None
        
        try:
            result = adfuller(series)
            adf_stat = result[0]
            p_value = result[1]
            critical_values = result[4]
            
            # Определение стационарности на основе критических значений
            is_stationary = adf_stat < critical_values['5%']
            
            return adf_stat, p_value, is_stationary
            
        except Exception as e:
            print(f"Ошибка ADF теста для {test_name}: {e}")
            return None, None, None
    
    def check_adf_trend(self, strategy_name, current_adf):
        """Проверка динамики ADF статистики"""
        history = self.adf_history[strategy_name]
        
        if current_adf is not None:
            history.append(current_adf)
        
        if len(history) < 3:
            return "Недостаточно данных"
        
        # Проверяем тренд последних 3 значений
        recent_values = list(history)[-3:]
        
        # ADF статистика снижается (становится менее отрицательной) = хуже для стационарности
        if recent_values[-1] > recent_values[-2] > recent_values[-3]:
            return "⚠️ ADF СНИЖАЕТСЯ"
        elif recent_values[-1] < recent_values[-2] < recent_values[-3]:
            return "✅ ADF улучшается"
        else:
            return "→ ADF стабильна"
    
    def analyze_strategies(self, df):
        """Анализ всех стратегий"""
        if len(df) < max(self.sma_period, self.bb_period):
            print("Недостаточно данных для анализа")
            return
        
        current_price = df['close'].iloc[-1]
        prices = df['close'].values
        
        print(f"\n{'='*80}")
        print(f"Время: {datetime.now().strftime('%H:%M:%S')} | Цена BTC: ${current_price:.2f}")
        print(f"{'='*80}")
        
        # Стратегия 1: Отклонение от SMA
        if len(prices) >= self.sma_period:
            sma_value = self.calculate_sma(prices, self.sma_period)
            if sma_value:
                # Используем только последние window_size точек для ADF
                recent_prices = prices[-self.window_size:] if len(prices) >= self.window_size else prices
                recent_sma = [self.calculate_sma(prices[:i+1], min(self.sma_period, i+1)) 
                             for i in range(len(recent_prices))]
                recent_sma = [x for x in recent_sma if x is not None]
                
                if len(recent_sma) > 0:
                    sma_deviations = recent_prices[-len(recent_sma):] - np.array(recent_sma)
                    adf_stat, p_value, is_stationary = self.perform_adf_test(sma_deviations, "SMA отклонение")
                    trend = self.check_adf_trend('sma_deviation', adf_stat)
                    
                    print(f"\n📊 СТРАТЕГИЯ 1: Отклонение от SMA({self.sma_period})")
                    print(f"   Текущая цена: ${current_price:.2f}")
                    print(f"   SMA: ${sma_value:.2f}")
                    print(f"   Отклонение: ${current_price - sma_value:.2f} ({((current_price/sma_value-1)*100):.2f}%)")
                    if adf_stat is not None:
                        print(f"   ADF статистика: {adf_stat:.4f}")
                        print(f"   P-value: {p_value:.4f}")
                        print(f"   Стационарность: {'✅ ДА' if is_stationary else '❌ НЕТ'}")
                        print(f"   Тренд ADF: {trend}")
        
        # Стратегия 2: Bollinger Bands
        if len(prices) >= self.bb_period:
            bb_sma, upper_band, lower_band = self.calculate_bollinger_bands(
                prices, self.bb_period, self.bb_std
            )
            
            if bb_sma and upper_band and lower_band:
                # BB позиция для последних window_size точек
                recent_prices = prices[-self.window_size:] if len(prices) >= self.window_size else prices
                bb_positions = []
                
                for i, price in enumerate(recent_prices):
                    start_idx = max(0, len(prices) - len(recent_prices) + i - self.bb_period + 1)
                    end_idx = len(prices) - len(recent_prices) + i + 1
                    
                    if end_idx - start_idx >= self.bb_period:
                        period_prices = prices[start_idx:end_idx]
                        sma, upper, lower = self.calculate_bollinger_bands(
                            period_prices, self.bb_period, self.bb_std
                        )
                        if sma and upper and lower:
                            if price > upper:
                                bb_positions.append(1)  # Выше верхней полосы
                            elif price < lower:
                                bb_positions.append(-1)  # Ниже нижней полосы
                            else:
                                bb_positions.append(0)  # Внутри полос
                
                if len(bb_positions) > 5:
                    adf_stat, p_value, is_stationary = self.perform_adf_test(bb_positions, "Bollinger Bands")
                    trend = self.check_adf_trend('bollinger', adf_stat)
                    
                    print(f"\n📊 СТРАТЕГИЯ 2: Bollinger Bands({self.bb_period}, {self.bb_std})")
                    print(f"   Верхняя полоса: ${upper_band:.2f}")
                    print(f"   Средняя линия: ${bb_sma:.2f}")
                    print(f"   Нижняя полоса: ${lower_band:.2f}")
                    
                    current_bb_pos = ""
                    if current_price > upper_band:
                        current_bb_pos = "🔴 Выше верхней полосы"
                    elif current_price < lower_band:
                        current_bb_pos = "🟢 Ниже нижней полосы"
                    else:
                        current_bb_pos = "🟡 Внутри полос"
                    
                    print(f"   Позиция: {current_bb_pos}")
                    if adf_stat is not None:
                        print(f"   ADF статистика: {adf_stat:.4f}")
                        print(f"   P-value: {p_value:.4f}")
                        print(f"   Стационарность: {'✅ ДА' if is_stationary else '❌ НЕТ'}")
                        print(f"   Тренд ADF: {trend}")
        
        # Стратегия 3: Отклонение доходности от средней
        if len(prices) >= 10:
            returns = self.calculate_returns(prices[-self.window_size:] if len(prices) >= self.window_size else prices)
            
            if len(returns) > 5:
                mean_return = np.mean(returns)
                return_deviations = np.array(returns) - mean_return
                
                adf_stat, p_value, is_stationary = self.perform_adf_test(return_deviations, "Отклонение доходности")
                trend = self.check_adf_trend('return_deviation', adf_stat)
                
                print(f"\n📊 СТРАТЕГИЯ 3: Отклонение доходности от средней")
                print(f"   Средняя доходность: {mean_return*100:.4f}%")
                print(f"   Последняя доходность: {returns[-1]*100:.4f}%")
                print(f"   Отклонение: {(returns[-1] - mean_return)*100:.4f}%")
                print(f"   Волатильность: {np.std(returns)*100:.4f}%")
                if adf_stat is not None:
                    print(f"   ADF статистика: {adf_stat:.4f}")
                    print(f"   P-value: {p_value:.4f}")
                    print(f"   Стационарность: {'✅ ДА' if is_stationary else '❌ НЕТ'}")
                    print(f"   Тренд ADF: {trend}")
    
    def run_monitoring(self, update_interval=60):
        """Запуск мониторинга в реальном времени"""
        print(f"🚀 Запуск мониторинга стационарности для {self.symbol}")
        print(f"⏱️ Интервал обновления: {update_interval} секунд")
        print(f"📊 Размер окна для ADF: {self.window_size} минут")
        
        while True:
            try:
                # Получаем данные
                df = self.get_klines_data(limit=200)  # Берем больше данных для SMA
                
                if df is not None and len(df) > 0:
                    self.analyze_strategies(df)
                else:
                    print("❌ Не удалось получить данные")
                
                # Ждем до следующего обновления
                print(f"\n⏳ Следующее обновление через {update_interval} секунд...")
                time.sleep(update_interval)
                
            except KeyboardInterrupt:
                print("\n🛑 Мониторинг остановлен пользователем")
                break
            except Exception as e:
                print(f"❌ Ошибка в мониторинге: {e}")
                time.sleep(update_interval)

# Пример использования
if __name__ == "__main__":
    # Создаем монитор
    monitor = StationarityMonitor(
        symbol='BTCUSDT',
        interval='1m',
        window_size=15  # 15-минутное окно для ADF теста
    )
    
    # Запускаем мониторинг (обновление каждые 60 секунд)
    monitor.run_monitoring(update_interval=60)

Инициализирован мониторинг стационарности для BTCUSDT
Размер окна: 15 минут
🚀 Запуск мониторинга стационарности для BTCUSDT
⏱️ Интервал обновления: 60 секунд
📊 Размер окна для ADF: 15 минут

Время: 14:24:06 | Цена BTC: $107293.82

📊 СТРАТЕГИЯ 1: Отклонение от SMA(50)
   Текущая цена: $107293.82
   SMA: $107223.39
   Отклонение: $70.43 (0.07%)
   ADF статистика: -6.4509
   P-value: 0.0000
   Стационарность: ✅ ДА
   Тренд ADF: Недостаточно данных
Ошибка ADF теста для Bollinger Bands: Invalid input, x is constant

📊 СТРАТЕГИЯ 2: Bollinger Bands(20, 2)
   Верхняя полоса: $107421.91
   Средняя линия: $107316.42
   Нижняя полоса: $107210.93
   Позиция: 🟡 Внутри полос

📊 СТРАТЕГИЯ 3: Отклонение доходности от средней
   Средняя доходность: -0.0045%
   Последняя доходность: -0.0070%
   Отклонение: -0.0025%
   Волатильность: 0.0553%
   ADF статистика: -7.2222
   P-value: 0.0000
   Стационарность: ✅ ДА
   Тренд ADF: Недостаточно данных

⏳ Следующее обновление через 60 секунд...

Время: 14:25:06 |