In [1]:
import os
os.getcwd()

'/root/miniconda3/BINANCE'

In [None]:
import websocket
import threading
import json
from collections import deque
import numpy as np
import pandas as pd
from pprint import pprint
from time import sleep
from IPython.display import clear_output
from datetime import datetime
from tokens import token_list

symbols = [s.lower() for s in token_list]

# only for testing
# symbols = ['ethusdt']

sockets = {}

# ========= [MAIN CONFIG] =============
# default settings
INTERVAL = '1m'
SHOW_FUTURES = True
COLLECT_EVERY_MINUTE = True
MAX_KLINE_COUNT = 10
COEF_TRADE = 2 # 
COEF_RANGE = 3 #
PCT_THRESHOLD = 80 # %
HIGH_THRESHOLD = 50 # %
LOW_THRESHOLD = 50 # %
# =====================================

BINANCE_URL = 'wss://stream.binance.com:9443/ws/{}@kline_'+INTERVAL
URL = 'https://binance.com/en/futures/{}' if SHOW_FUTURES else 'https://binance.com/en/trade/{}'


def pct_change(old_value, new_value):
    change = new_value - old_value
    return (change / old_value) * 100

def on_message(ws, message, kline_buffer):
    data = json.loads(message)
    
    # check if the message is a kline event
    if 'k' in data:
        symbol = data['s']
        kline_data = data['k']
        trade_count = int(kline_data['n'])
        is_closed = bool(kline_data['x'])  
        open_price = float(kline_data['o'])
        close_price = float(kline_data['c'])
        high_price = float(kline_data['h'])
        low_price = float(kline_data['l'])
        event_time = datetime.utcfromtimestamp(data['E'] / 1000).strftime('%H:%M:%S')

        if COLLECT_EVERY_MINUTE:
            if is_closed:
                kline_buffer['trades'].append(trade_count)
                kline_buffer['ohlc'].append({'open':open_price,
                                             'high':high_price,
                                             'low':low_price,
                                             'close':close_price})     
            else: return
        else:
            kline_buffer['trades'].append(trade_count)
            kline_buffer['ohlc'].append({'open':open_price,
                                         'high':high_price,
                                         'low':low_price,
                                         'close':close_price})            

        if len(kline_buffer['trades']) >= MAX_KLINE_COUNT:
            average = int(COEF_TRADE * float(np.mean(np.array(kline_buffer['trades']))))
            if average and trade_count: # check for non zero
                pct = round(pct_change(average,trade_count)) # calc percent change
                if pct >= PCT_THRESHOLD: # main logic

                    # convert buffer to dataframe
                    df = pd.DataFrame(kline_buffer['ohlc'])

                    # get ohlc data from last bar
                    last_row = df.iloc[-1]
                    open = last_row['open']
                    high = last_row['high']
                    low = last_row['low']
                    close = last_row['close']

                    # get average 'range' * coef
                    df['all_range'] = df['high'] - df['low']
                    mean_range = df[df['all_range'] != 0]['all_range'].mean() * COEF_RANGE
                    
                    # get 'range' from last bar
                    range = (high - low) 
                    
                    # filter bars only with long 'range'
                    if range > mean_range: 
                        high_shadow = round((abs(high - max(close, open)) / range) * 100)
                        low_shadow = round((abs(low -  min(close, open)) / range) * 100)
                        
                        # filter by high&low shadows size
                        if high_shadow >= HIGH_THRESHOLD or low_shadow >= LOW_THRESHOLD:
                            # drop last ohlc row
                            df = df[:-1]
                            # check if var is less or great than all values in the series
                            if high_shadow > low_shadow:
                                is_needed_bar = (high > df['high']).all()
                                signal = 'SELL'
                            elif low_shadow > high_shadow:
                                is_needed_bar = (low < df['low']).all()
                                signal = 'BUY'
                            else: is_needed_bar = False
                                
                            if is_needed_bar:
                                # print(f"{event_time} ┇ pct_change: +{pct}% ┇ high_shadow: {high_shadow}% ┇ low_shadow: {low_shadow}% ┇ {URL.format(symbol)}")
                                print(f"{event_time} ┇ signal: {signal} ┇ {URL.format(symbol)}")



def on_error(ws, error):
    print(f"Error: {error}")

def on_close(ws, close_status_code, close_msg):
    print(f"Closed connection: {close_status_code}")

# thread WebSocket connection for a given symbol
def socket_thread(symbol):
    kline_buffer = {'ohlc': deque(maxlen=MAX_KLINE_COUNT), 'trades': deque(maxlen=MAX_KLINE_COUNT)}
    
    def on_message_wrapper(ws, message):
        nonlocal kline_buffer
        on_message(ws, message, kline_buffer)
        
    websocket.WebSocketApp(BINANCE_URL.format(symbol), on_message=on_message_wrapper, on_error=on_error, on_close=on_close).run_forever()


# start WebSocket connections for each symbol
for symbol in symbols:
    sockets[symbol] = threading.Thread(target=socket_thread, args=(symbol,))
    sockets[symbol].start()

# keep main thread alive
try:
    print(f'Размер token_list: {len(token_list)}')
    print('Запускаю воркеры ... Ждем результаты ........\n')
    while True: pass
except KeyboardInterrupt:
    print("Stopping WebSocket connections...waiting...")
    for symbol in symbols: sockets[symbol].join()
    print("WebSocket connections stopped.")

[{'symb': 'XEMUSDT',
  'pct': 700,
  'trades': 8,
  'avg': 1,
  'url': 'https://binance.com/en/futures/XEMUSDT'},
 {'symb': 'LQTYUSDT',
  'pct': 205,
  'trades': 58,
  'avg': 19,
  'url': 'https://binance.com/en/futures/LQTYUSDT'},
 {'symb': 'DYDXUSDT',
  'pct': 178,
  'trades': 802,
  'avg': 288,
  'url': 'https://binance.com/en/futures/DYDXUSDT'}]

