<a href="https://colab.research.google.com/github/jwashb22/crypto_pipline-analysis/blob/main/pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [41]:
import requests
import pandas as pd
from sqlalchemy import create_engine, Column, DateTime, Float, Integer, String, UniqueConstraint, Boolean, text, BIGINT, ForeignKeyConstraint
from sqlalchemy.orm import declarative_base
import time
from datetime import datetime, timedelta
from functools import wraps
import os
from google.colab import userdata

In [16]:
base = declarative_base()

In [58]:
def normalize_symbol(symbol):
    return symbol if symbol.endswith('USDT') else f"{symbol}USDT"
def data_retrieval(func):
    @wraps(func)
    def wrapper(symbols, db_url, *args, **kwargs):
        symbols = [s for s in symbols if s not in ['USDT', 'USDC', 'BUSD']]
        symbols = [normalize_symbol(s) for s in symbols]
        try:
            engine = create_engine(db_url)
            base.metadata.create_all(engine)
            all_df = []

            for symbol in symbols:
                try:
                    df = func(symbol, *args, **kwargs)
                    if df is not None and len(df) > 0:
                        all_df.append(df)
                    time.sleep(1)
                except Exception as e:
                    print(f"Error processing {symbol}: {e}")

            if all_df:
                final_df = pd.concat(all_df, ignore_index=True)
                table_name = func.__name__.replace('data', '').replace('_', '')
                final_df.to_sql(f'{table_name}_data', engine, if_exists='replace', index=False)
                print(f"Successfully loaded {len(final_df)} records to {table_name}_data")
                return final_df
            else:
                print('No data was retrieved')
                return None

        except Exception as e:
             print(f"Error in data retrieval: {e}")
             return None
    return wrapper

In [25]:
@data_retrieval
def price_data(symbol):
    base_url = 'https://api.binance.us/api/v3/klines'
    params = {
        'symbol': symbol,
        'interval': '1d',
        'limit': 1000
    }
    r = requests.get(base_url, params=params)
    data = r.json()

    if not data:
        print(f"No data was recieved for {symbol}")
        return None
    df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume',
                                     'close_time', 'quote_volume', 'trades', 'taker_buy_base',
                                     'taker_buy_quote', 'ignore'])

    df['date'] = pd.to_datetime(df['timestamp'], unit='ms')
    df['symbol'] = symbol
    numeric_cols = ['open', 'high', 'low', 'close', 'volume']
    df[numeric_cols] = df[numeric_cols].astype(float)
    df['number_of_trades'] = df['trades'].astype(int)
    df['daily_return'] = df['close'].pct_change() * 100
    df['MA7'] = df['close'].rolling(window=7).mean()
    df['MA30'] = df['close'].rolling(window=30).mean()
    df['daily_range'] = df['high'] - df['low']
    df['volatility'] = ((df['high'] - df['low']) / df['open']) * 100

    columns = ['symbol', 'date', 'open', 'high', 'low', 'close', 'volume',
               'quote_volume', 'number_of_trades', 'daily_return',
               'MA7', 'MA30', 'daily_range', 'volatility']

    return df[columns].dropna()

In [26]:
@data_retrieval
def trade_stats_data(symbol):
    base_url = 'https://api.binance.us/api/v3/ticker/24hr'
    r = requests.get(base_url, params={'symbol': symbol})
    data = r.json()

    if not data:
        print(f"No data was recieved for {symbol}")
        return None

    df = pd.DataFrame([{
        'symbol': symbol,
        'timestamp': pd.to_datetime(int(data.get('closeTime', 0)), unit='ms'),
        'price_change': float(data['priceChange']),
        'price_change_percent': float(data['priceChangePercent']),
        'weighted_avg_price': float(data['weightedAvgPrice']),
        'prev_close_price': float(data['prevClosePrice']),
        'last_price': float(data['lastPrice']),
        'last_qty': float(data['lastQty']),
        'bid_price': float(data['bidPrice']),
        'bid_qty': float(data['bidQty']),
        'ask_price': float(data['askPrice']),
        'ask_qty': float(data['askQty'])
    }])

    return df.dropna()

In [20]:
@data_retrieval
def order_book_data(symbol):
    base_url = 'https://api.binance.us/api/v3/depth'
    r = requests.get(base_url, params={'symbol': symbol, 'limit': 1000})
    data = r.json()

    if not data:
        print(f"No data was recieved for {symbol}")
        return None

    df = pd.DataFrame([{
        'symbol': symbol,
        'timestamp': pd.Timestamp.now(),
        'last_update_id': data['lastUpdateId'],
        'bid_price': float(data['bids'][0][0]),
        'bid_quantity': float(data['bids'][0][1]),
        'ask_price': float(data['asks'][0][0]),
        'ask_quantity': float(data['asks'][0][1]),
        'bid_volume': sum(float(bid[1]) for bid in data['bids']),
        'ask_volume': sum(float(ask[1]) for ask in data['asks'])
    }])

    return df

In [39]:
@data_retrieval
def recent_trades_data(symbol, limit=1000):
    base_url = 'https://api.binance.us/api/v3/trades'
    r = requests.get(base_url, params={'symbol': symbol, 'limit': limit})
    data = r.json()

    if not data:
        print(f"No data was recieved for {symbol}")
        return None

    trades_list = [{
        'symbol': symbol,
        'trade_id': trade['id'],
        'price': float(trade['price']),
        'quantity': float(trade['qty']),
        'quote_quantity': float(trade['quoteQty']),
        'timestamp': pd.to_datetime(trade['time'], unit='ms'),
        'is_buyer_maker': trade['isBuyerMaker']
    } for trade in data]

    return pd.DataFrame(trades_list)

In [35]:
@data_retrieval
def technical_indicators_data(symbol, limit=1000):
    base_url = 'https://api.binance.us/api/v3/klines'
    params = {
        'symbol': symbol,
        'interval': '1d',
        'limit': 1000
    }
    r = requests.get(base_url, params=params)
    data = r.json()

    if not data:
        print(f"No data was recieved for {symbol}")
        return None

    df = pd.DataFrame(data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume',
                                     'close_time', 'quote_volume', 'trades', 'taker_buy_base',
                                     'taker_buy_quote', 'ignore'])

    df['date'] = pd.to_datetime(df['timestamp'], unit='ms')
    df['symbol'] = symbol
    numeric_cols = ['open', 'high', 'low', 'close']
    df[numeric_cols] = df[numeric_cols].astype(float)

    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['rsi_14'] = 100 - (100 / (1 + rs))
    exp1 = df['close'].ewm(span=12, adjust=False).mean()
    exp2 = df['close'].ewm(span=26, adjust=False).mean()
    df['macd_line'] = exp1 - exp2
    df['signal_line'] = df['macd_line'].ewm(span=9, adjust=False).mean()
    df['macd_histogram'] = df['macd_line'] - df['signal_line']
    df['bb_middle'] = df['close'].rolling(window=20).mean()
    bb_std = df['close'].rolling(window=20).std()
    df['bb_upper'] = df['bb_middle'] + (bb_std * 2)
    df['bb_lower'] = df['bb_middle'] - (bb_std * 2)
    low_14 = df['low'].rolling(window=14).min()
    high_14 = df['high'].rolling(window=14).max()
    df['stoch_k'] = ((df['close'] - low_14) / (high_14 - low_14)) * 100
    df['stoch_d'] = df['stoch_k'].rolling(window=3).mean()

    columns = [
        'symbol', 'date',
        'rsi_14', 'macd_line', 'signal_line', 'macd_histogram',
        'bb_upper', 'bb_middle', 'bb_lower',
        'stoch_k', 'stoch_d'
    ]

    return df[columns].dropna()

In [54]:
symbols = ['BTC', 'ETH', 'BNB', 'SOL', 'XRP', 'ADA', 'DOGE', 'LINK', 'DOT', 'AVAX']
db_url = userdata.get('DB_URL')

In [57]:
price_data(symbols, db_url);
trade_stats_data(symbols, db_url);
technical_indicators_data(symbols, db_url);
recent_trades_data(symbols, db_url);
order_book_data(symbols, db_url);

Successfully loaded 9710 records to price_data
Successfully loaded 10 records to tradestats_data
Successfully loaded 9810 records to technicalindicators_data
Successfully loaded 10000 records to recenttrades_data
Successfully loaded 10 records to orderbook_data
