In [None]:
import ccxt
import time
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import plotly.graph_objects as go
from datetime import datetime

# Создаем объект биржи
exchange = ccxt.binance()

# Список торговых пар, по которым будем загружать данные
trading_pairs = [
    "BTC/USDT"
]

# Таймфрейм для загрузки данных ('5m' означает 5 минут)
timeframe = '5m'

# Начальная дата для загрузки данных (в формате ISO 8601)
since = exchange.parse8601('2024-01-01T00:00:00Z')
# Текущее время в миллисекундах
now = exchange.milliseconds()

# Функция для загрузки исторических данных (OHLCV) для заданной пары и таймфрейма
def fetch_data(pair, timeframe, since, now):
    all_ohlcv = []  # Список для хранения всех данных
    while since < now:
        try:
            # Загружаем данные OHLCV (Open, High, Low, Close, Volume) с биржи
            ohlcv = exchange.fetch_ohlcv(pair, timeframe, since, limit=1000)
            if not ohlcv:  # Если данных нет, прерываем цикл
                break
            all_ohlcv.extend(ohlcv)  # Добавляем данные в список
            since = ohlcv[-1][0] + 1  # Обновляем начальное время для следующего запроса
            time.sleep(exchange.rateLimit / 1000)  # Пауза для соблюдения лимитов API биржи
        except Exception as e:
            # Обработка ошибок, если что-то пошло не так
            print(f"Ошибка при загрузке данных для {pair}: {e}")
            break
    return all_ohlcv

# Функция для проверки данных перед сохранением
def check_data_integrity(df):
    """
    Проверяет целостность данных: пропуски и аномалии.
    """
    print("Проверка данных на пропуски и аномалии...")
    # Проверяем наличие пропусков в данных
    if df.isnull().values.any():
        print("Обнаружены пропуски в данных! Сохранение невозможно.")
        return False

    # Проверка на наличие выбросов по стандартным значениям
    z_scores = (df[['open', 'high', 'low', 'close']] - df[['open', 'high', 'low', 'close']].mean()) / df[['open', 'high', 'low', 'close']].std()
    if (z_scores.abs() > 3).any().any():
        print("Обнаружены аномалии в данных! Сохранение невозможно.")
        return False

    print("Данные проверены и корректны.")
    return True

# Функция для сохранения данных в формате Parquet (с пятиминутными интервалами)
def save_to_parquet(data, pair):
    # Преобразуем данные в DataFrame для удобного анализа и хранения
    df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')  # Конвертируем timestamp в читаемый формат
    
    # Проверяем данные перед сохранением
    if not check_data_integrity(df):
        print(f"Данные для {pair} не сохранены из-за ошибок.")
        return None
    
    # Преобразуем DataFrame в формат Parquet и сохраняем на диск (пятиминутные интервалы)
    df.reset_index(drop=True, inplace=True)  # Сбрасываем индекс перед сохранением
    table = pa.Table.from_pandas(df)
    pq.write_table(table, f"{pair.replace('/', '_')}_5m.parquet")
    print(f"Пятиминутные данные для {pair} успешно сохранены.")
    return df

# Функция для ресемплинга данных в другой таймфрейм (например, 1ч, 4ч, 1д)
def resample_dataset(df: pd.DataFrame, time: str):
    """
    Ресемплинг данных для конвертации таймфрейма (например, 1ч, 4ч, 1д).
    """
    df = df.set_index('timestamp')  # Устанавливаем timestamp в качестве индекса
    curent_df_high = df['high'].resample(time).max()  # Высокая цена за период
    curent_df_low = df['low'].resample(time).min()    # Низкая цена за период
    curent_df_close = df['close'].resample(time).last()  # Последняя цена за период
    curent_df_open = df['open'].resample(time).first()  # Первая цена за период
    curent_df_volume = df['volume'].resample(time).sum()  # Общий объем за период
    
    # Собираем итоговый DataFrame
    final_df = pd.DataFrame({
        'open': curent_df_open,
        'close': curent_df_close,
        'high': curent_df_high,
        'low': curent_df_low,
        'volume': curent_df_volume
    })
    
    # Удаляем строки с отсутствующими данными
    final_df = final_df.dropna()
    
    # Восстанавливаем временные метки для дальнейшей обработки
    final_df.reset_index(inplace=True)
    return final_df

# Функция для загрузки данных из .parquet файла
def load_data_from_parquet(file_path):
    """
    Загрузка данных из .parquet файла.
    """
    try:
        df = pd.read_parquet(file_path)
        print(f"Данные загружены из {file_path}")
        return df
    except Exception as e:
        print(f"Ошибка при загрузке данных из {file_path}: {e}")
        return None

# Функция для построения графика с ценами открытия, закрытия, максимума и минимума
def plot_parquet_data(df, pair):
    """
    Построение графика цен (открытие, закрытие, максимум, минимум) для определенной пары на основе данных из parquet.
    """
    if df is not None and 'timestamp' in df and 'open' in df and 'high' in df and 'low' in df and 'close' in df:
        fig = go.Figure()

        fig.add_trace(go.Candlestick(x=df['timestamp'],
                                     open=df['open'],
                                     high=df['high'],
                                     low=df['low'],
                                     close=df['close'],
                                     name=f'Price of {pair}'))

        fig.update_layout(title=f'Price of {pair}',
                          yaxis_title='Price (USD)',
                          xaxis_title='Timestamp',
                          xaxis_rangeslider_visible=False)

        fig.show(renderer="browser")  # Открытие графика в браузере
    else:
        print("Данные некорректны или отсутствуют необходимые столбцы.")

# Основной цикл по всем торговым парам из списка
for pair in trading_pairs:
    print(f"Загружаем данные для {pair}...")
    data = fetch_data(pair, timeframe, since, now)  # Загружаем данные для каждой пары
    
    if data:  # Если данные успешно загружены
        print(f"Сохраняем пятиминутные данные для {pair}...")
        df = save_to_parquet(data, pair)  # Сохраняем пятиминутные данные в Parquet
        
        if df is not None:
            print(f"Данные для {pair} успешно сохранены в пятиминутках.")
    else:
        print(f"Данные для {pair} не найдены или загрузка завершилась неудачей.")




In [None]:
# Конвертация данных в другие таймфреймы
for pair in trading_pairs:
    try:
        df_5m = pd.read_parquet(f"{pair.replace('/', '_')}_5m.parquet")
        
        print(f"Преобразуем данные для {pair} в другие таймфреймы...")
        
        # Пример конвертации в 1 час
        resampled_df_1h = resample_dataset(df_5m, '1H')
        pq.write_table(pa.Table.from_pandas(resampled_df_1h), f"{pair.replace('/', '_')}_1h.parquet")
        
        # Пример конвертации в 4 часа
        resampled_df_4h = resample_dataset(df_5m, '4H')
        pq.write_table(pa.Table.from_pandas(resampled_df_4h), f"{pair.replace('/', '_')}_4h.parquet")
        
        # Пример конвертации в 1 день
        resampled_df_1d = resample_dataset(df_5m, '1D')
        pq.write_table(pa.Table.from_pandas(resampled_df_1d), f"{pair.replace('/', '_')}_1d.parquet")
    except Exception as e:
        print(f"Ошибка при конвертации данных для {pair}: {e}")


In [None]:
# Построение графиков для каждого таймфрейма
for pair in trading_pairs:
    for timeframe in ['5m', '1h', '4h', '1d']:
        file_path = f"{pair.replace('/', '_')}_{timeframe}.parquet"
        df = load_data_from_parquet(file_path)
        plot_parquet_data(df, pair)
