In [None]:
# import nest_asyncio
# nest_asyncio.apply()

import math
import uuid
import asyncio
import websockets
import json
import pandas as pd
import numpy as np
from datetime import timedelta, datetime
from scipy.signal import argrelextrema

import config
from phemex import *

In [None]:
all_clients = []
clients = {}
sendMessage = {}
trades = pd.DataFrame(columns=['timestamp','side','priceEp','volume'])
klines = []
ohlcvData = pd.DataFrame(columns=['open','high','low','close','volume','epoch'])
candlestickData = []
orderbookData = pd.DataFrame(columns=['price', 'amount', 'side', 'epoch'])
mbtcTickerData = pd.DataFrame(columns=['last', 'scale', 'symbol', 'timestamp','epoch'])
stoplossData = []
interval = "1min"
strategy = "short"
error_allowed = 5.0/100
seconds_processed = {}
second_candlesticks = []
candlesticks = []
moves = list()
current_priceEp = None
tradesPrices = []
prevVolume = 0
init = 0

difference = None
stoploss = 'None'
prevControl = None
control = None
exchange = None
prevStoploss = 0
leverageList = {
    0:0.040,
    10:0.035,
    20:0.030,
    30:0.025,
    40:0.020,
    50:0.015,
    60:0.010,
    70:0.005,
    85:0.004,
    80:0.003,
    90:0.001,
    100:0.001
}

In [None]:
def printText(msg,show=True):
    if show == True:
        print(msg)

In [None]:
def getTimeformat(interval):
    if 'S' in interval:
        timeformat = "%Y-%m-%d, %H:%M:%S"
    elif 'min' in interval:
        timeformat = "%Y-%m-%d, %H:%M"
    elif 'H' in interval:
        timeformat = "%Y-%m-%d, %H"    
    elif 'D' in interval:
        timeformat = "%Y-%m-%d"  
    elif 'M' in interval:
        timeformat = "%Y-%m"     
    elif 'Y' in interval:
        timeformat = "%Y"  
    else:
        timeformat = "%Y-%m-%d, %H:%M:%S"
    return timeformat

In [None]:
async def getVolumeColorDf(df):
    # print('-- Function getVolumeColorDf --')
    global prevVolume
    for index, row in df.iterrows():
        if row['volume'] > 0:
            global prevVolume
            if prevVolume == 0:
                prevVolume = row['volume']
                df.at[index,'color'] = '#DCDCDC' # grey
            elif prevVolume < row['volume']:
                df.at[index,'color'] = 'red'
            else:
                df.at[index,'color'] = 'green'
        else:
            df.at[index,'color'] = '#DCDCDC' # grey
    #print('VolumeColor',df)        
    return df

In [None]:
async def getVolumeColor(trades):
    # print('-- Function getVolumeColor --')
    color = "grey"
    if int(trades['volume']) > 0:
        global prevVolume
        if prevVolume == 0:
            prevVolume = int(trades['volume'])
            color = "grey"
        elif int(prevVolume) < int(trades['volume']):
            color = "red"
        else:
            color = "green"
    else:
        color = "grey"
    return color

In [None]:
def percentage(part, whole, leverage=1):
  if part == 0 or whole == 0:
    percentage = 1
  else:
    percentage = round(((100 * float(whole)/float(part))-100)/ (100 / leverage)*100,2)
  return percentage

In [None]:
async def getTradePrices(message,interval):
    # print('-- Function getTradePrices --')
    global trades
    try:        
        if len(trades) > 0:
            trades.append(json.loads(message)['trades'][0])
        else:
            trades = list(json.loads(message)['trades'])
        df = pd.DataFrame(trades,columns=['timestamp','side','priceEp','volume'])
        tick_datetime_object = pd.to_datetime(df['timestamp'], unit='ns',utc=True)
        timenow = pd.to_datetime(tick_datetime_object + timedelta(hours=1))
        df['timestamp'] = timenow
        df.set_index('timestamp',inplace=True)
        df.reset_index()
        tradesAgg = df.groupby([pd.Grouper(level='timestamp', freq=interval)]).agg({'side':lambda x: list(x),'priceEp':'max','volume':'sum'})
        
        return tradesAgg.dropna()
    except Exception as e:
            print('Foutmelding in functie getTradePrices: {}'.format(e))

In [None]:
async def getOhlc(trades,interval='1min'):
    global ohlcvData
    try:   
        # print('-- Function getOhlc --')
        ohlc = pd.DataFrame(trades,columns=['open','high','low','close','volume','epoch'])
        ohlc = trades['priceEp'].resample(interval).ohlc()
        tick_datetime_object = pd.to_datetime(ohlc.index, unit='ns',utc=True)
        timeformat = getTimeformat(interval)
        timenow = pd.to_datetime(pd.to_datetime(tick_datetime_object).strftime(timeformat))
        ohlc['epoch'] = timenow.astype(np.int64) / 10**9    
        ohlc['volume'] = trades['volume'].resample(interval).mean() 
        ohlc = await getVolumeColorDf(ohlc)
        ohlc = ohlc[ohlc['open'] > 0]
        ohlc.dropna()
        ohlcvData = pd.concat([ohlcvData,ohlc])
        return ohlcvData
    except Exception as e:
            print('Foutmelding in functie getOhlc: {}'.format(e))  

In [None]:
async def getCandlestickData(trades,interval):
    # print('-- Function getCandlestickData --')
    try:
        global current_priceEp
        posInfo = getPositionInfo()[0]
        leverage = posInfo['leverage']
        side = posInfo['side']
        entryPrice = posInfo['entryPrice']
        amount = posInfo['amount']
        symbol = posInfo['symbol']
        
        #print('Get current position information: side = {} entry price = {} amount = {}'.format(side,entryPrice,amount))

        if side == 'Buy':
            strategy = 'long'
        elif side == 'Sell':
            strategy = 'short'
        else:
            strategy = 'not active'

        for timestamp, trade in trades.iterrows():
            if trade['volume'] > 0:
                current_priceEp = trade['priceEp']/10000
                volume = trade['volume']
                difference = percentage(entryPrice,current_priceEp,config.LEVERAGE)
                seconds = datetime.now().second
                minutes = datetime.now().minute

                tick_datetime_object = pd.to_datetime(timestamp, unit='ns',utc=True)
                timeformat = getTimeformat(interval)
                timenow = pd.to_datetime(pd.to_datetime(tick_datetime_object + timedelta(hours=0)).strftime(timeformat))
                
                if not timenow in seconds_processed:
                    seconds_processed[timenow] = True
                    color = await getVolumeColor(trade)
                    candleStick = {
                        'creation_date': timenow,
                        'epoch': timenow.timestamp(),
                        'strategy':strategy,
                        # 'strategyPosition':strategyPosition,
                        'side':side,
                        'volume': volume,
                        'color': color,
                        # 'control':control,
                        'price':current_priceEp,
                        'entryPrice':entryPrice,
                        # 'colorPrice':colorPrice,
                        'amount':amount,
                        # 'rsi':rsi,
                        # 'ema':ema,
                        'difference':difference,
                        # 'stoploss':stoploss,
                        'symbol':symbol
                        }
                    # print('candleStick:',candleStick)
                    second_candlesticks.append(candleStick)
        
        # resp = [{'data':pd.DataFrame(second_candlesticks).to_json(orient='records')}]
        # return resp
        return pd.DataFrame(second_candlesticks)
    except Exception as e:
        print('Foutmelding in functie websocketServer.getCandlestickData: {}'.format(e))

In [None]:
async def getStopLossIndicator(mbtcPrice,currentPrice,lastCandles,strategy):
    try:
        # print('getStopLossIndicator')
        global prevStoploss,exchange
        takeprofit = False
        def removeStoploss(symbol):
            try:
                # print('Remove Stoploss')
                cancelResponse = exchange.cancel_all_orders(symbol,params={'untriggered':True})
                # print('All pending orders removed',cancelResponse)
                # print('Stoploss removed',cancelResponse)
            except Exception as e:
                print('Error {}'.format(e))
                pass
            
        def addStoploss(symbol,stoploss,side,leverage,amount):
            try:
                params = {}
                params['stopPxEp'] = stoploss #int(current_priceEp * (1.000 + leverageList[int(leverage)])) # (add 5% SL)
                if side == 'Buy':
                    params['triggerType'] = 'ByLastPrice'
                    # print('Create Stoploss:',params,currentPrice,side,amount)
                    orderResponse = exchange.create_order(symbol, 'Stop', 'Sell', amount, None, params)
                    # print('Stoploss created',orderResponse)
                else:
                    params['triggerType'] = 'ByMarkPrice'
                    # print('Create Stoploss:',params,currentPrice,side,amount)
                    orderResponse = exchange.create_order(symbol, 'Stop', 'Buy', amount, None, params)
                    # print('Stoploss created',orderResponse)

            except Exception as e:
                print('Error {}'.format(e))
                pass
                
        def takeProfit(symbol,side,amount):
            try:
                if side == 'Buy':
                    orderResponse = exchange.create_market_buy_order(symbol,amount)
                else:
                    orderResponse = exchange.create_market_sell_order(symbol,amount)
                # print('Take profit',orderResponse)
            except Exception as e:
                print('Error {}'.format(e))
                pass       
        
        posInfo = getPositionInfo()[0]
        # print('posInfo',posInfo)
        leverage = posInfo['leverage']
        side = posInfo['side']
        entryPrice = posInfo['entryPrice']
        amount = posInfo['amount']
        symbol = posInfo['symbol']
        stoploss = None
        lastCandle = lastCandles.tail(1)
        secondLastCandle = lastCandles.tail(2)
        thirdLastCandle = lastCandles.tail(3)
        difference = percentage(int(entryPrice*10000),int(currentPrice),leverage)  
        # print('profit {}%'.format(difference))
        if difference >= 2000:
            takeprofit = True
            removeStoploss(symbol)
            takeProfit(symbol,side,amount)
            
        # print('lastCandles',lastCandles)
        if side != 'None':
            if mbtcPrice < entryPrice:
                if side == 'Buy':
                    stoploss = mbtcPrice-50000 #round(currentPrice*(1 - 0.002))
                    stoplossSide = 'Sell'
                    # print('add stoploss @ 2% below mark price: 10% {} - current price {}'.format(stoploss,currentPrice))
                else:
                    # stoploss = thirdLastCandle['high']
                    # if stoploss < currentPrice:
                    #    stoploss =  round(currentPrice*(1 + 0.008))
                    stoploss = mbtcPrice+180000
                    stoplossSide = 'Buy'
            elif (mbtcPrice+50000) > currentPrice:
                if side == 'Buy':
                    stoploss = mbtcPrice-150000 #round(currentPrice*(1 - 0.002))
                    stoplossSide = 'Sell'
                    # print('add stoploss @ 2% below mark price: 10% {} - current price {}'.format(stoploss,currentPrice))
                else:
                    # stoploss = thirdLastCandle['high']
                    # if stoploss < currentPrice:
                    #    stoploss =  round(currentPrice*(1 + 0.008))
                    stoploss = mbtcPrice+380000
                    stoplossSide = 'Buy'
                    # print('add stoploss @ third last peak price {} current price {}'.format(thirdLastCandle['high'],currentPrice))
            else:
                if side == 'Buy':
                    # stoploss = lastCandle['high']-((lastCandle['high']-lastCandle['low'])*(1 - 0.008))
                    # if stoploss < currentPrice:
                    #     stoploss = round(mbtcPrice * (1 - 0.008))
                    stoploss = mbtcPrice-380000
                    stoplossSide = 'Sell'
                    # print('add stoploss @ 68% of last peak price: high {} - 68% {} - low {} current price {}'.format(lastCandle['high'],stoploss,lastCandle['low'],currentPrice))
                else:
                    # stoploss = round(currentPrice*(1 + 0.002))
                    stoploss = mbtcPrice+150000
                    stoplossSide = 'Buy'
                    # print('add stoploss @ 2% above mark price: 10% {} - current price {}'.format(stoploss,currentPrice))
            
            # print('prevStoploss {} Stoploss {}'.format(prevStoploss,stoploss))
            seconds = datetime.now().second
            minutes = datetime.now().minute
            # if minutes in (11,21,29,36,52,59):
            if seconds in (29,59):
                # print('Minute:',minutes)
                # print('Seconds:',seconds)
                if prevStoploss != stoploss:
                    if exchange == None:
                        # print('Init exchange')
                        exchange = initExchange()
                    removeStoploss(symbol)
                    addStoploss(symbol,stoploss,side,leverage,amount)  
            mbtcDifference = percentage(int(mbtcPrice),int(currentPrice),leverage)
            if mbtcDifference < 1:
                takeprofit = True
                removeStoploss(symbol)
                takeProfit(symbol,side,amount)               
            prevStoploss = stoploss
        return {'stoploss':stoploss}
    except Exception as e:
        print('Error in getStopLossIndicator: {}'.format(e))

In [None]:
def process_orderbook_side(side, precision, side_name, epoch):
    levels = pd.DataFrame(side, columns=['price', 'amount'], dtype=float)
    levels['side'] = side_name
    min_level = math.floor(min(levels.price) / precision) * precision
    max_level = (math.ceil(max(levels.price) / precision) + 1) * precision
    level_bounds = [float(min_level + precision * x) for x in range(int((max_level - min_level) / precision) + 1)]
    levels['bin'] = pd.cut(levels.price, bins=level_bounds, right=False, precision=10)
    levels = levels.groupby('bin').agg(amount=("amount", "sum"), side=('side', "first"), price=("price", "last"))\
        .reset_index()
    levels['epoch'] = epoch
    return levels

async def process_orderbook_data(message, interval):
    try:
        global orderbookData
        orderbook = json.loads(message)['book']
        precision = 10000
        
        bid_levels = pd.DataFrame(columns=['price', 'amount', 'side', 'epoch'])
        ask_levels = pd.DataFrame(columns=['price', 'amount', 'side', 'epoch'])
        epoch = json.loads(message)['timestamp']
        tick_datetime_object = pd.to_datetime(epoch, unit='ns', utc=True)
        time_format = getTimeformat(interval)
        time_now = pd.to_datetime(pd.to_datetime(tick_datetime_object + timedelta(hours=1)).strftime(time_format))
        epoch = time_now.timestamp()

        if orderbook.get('bids'):
            bid_levels = process_orderbook_side(orderbook['bids'], precision, "bid", epoch)
        if orderbook.get('asks'):
            ask_levels = process_orderbook_side(orderbook['asks'], precision, "ask", epoch)

        result = pd.concat([bid_levels, ask_levels])
        result.dropna(inplace=True)
        orderbookData = pd.concat([orderbookData, result])\
            .groupby('epoch')\
            .agg(amount=("amount", "sum"), side=('side', 'first'), price=("price", "last"))\
            .reset_index()

        if len(orderbookData) > 200:
            orderbookData = orderbookData.loc[-200:]

        # return orderbookData.to_json(orient="records")
    except Exception as e:
        print(f'Error in function process_orderbook_data: {e}')

In [None]:
async def process_mbtcData(message,interval):
    global mbtcTickerData
    mbtcTicker = json.loads(message)['tick']
    mbtcTicker = pd.DataFrame(mbtcTicker,columns=['last', 'scale', 'symbol', 'timestamp'],index=[0])
    # print(mbtcTicker)    
    tick_datetime_object = pd.to_datetime(mbtcTicker['timestamp'].iloc[0], unit='ns',utc=True)
    timeformat = getTimeformat(interval)  
    timenow = pd.to_datetime(pd.to_datetime(tick_datetime_object + timedelta(hours=1)).strftime(timeformat))
    mbtcTicker['epoch'] = timenow.timestamp()
    mbtcTickerData = pd.concat([mbtcTickerData,mbtcTicker])  

In [None]:
async def send_message(message):
    try:
        global ohlcvData, tradesPrices,mbtcTickerData,clients,sendMessage,orderbookData,candlestickData
        if message != None:
            if 'trades' in message:
                tradesPrices= await getTradePrices(message,'1S')
            # print('len(tradesPrices)',len(tradesPrices))
            if len(tradesPrices) > 300:
                clientsCopy = clients.copy()
                for cid in clientsCopy.keys():
                    try:
                        # print('Client Interval',clientsCopy[cid]['interval'])
                        ohlcvData = await getOhlc(tradesPrices,clientsCopy[cid]['interval'])
                        # print(ohlcv)
                        candlestickData = await getCandlestickData(tradesPrices,clientsCopy[cid]['interval'])
                        # print(candlestickData)
                        if 'book' in message:
                            # orderbook = await getOrderbookData(message,clientsCopy[cid]['interval'])
                            await process_orderbook_data(message,clientsCopy[cid]['interval'])
                            # print(orderbookData)
                        if 'tick' in message:                                               
                            await process_mbtcData(message,clientsCopy[cid]['interval'])
                            # print(mbtcTickerData)                     
                            
                        if sendMessage != None:
                            # print('len(ohlcvData)',len(ohlcvData))
                            # print('len(mbtcTickerData)',len(mbtcTickerData))
                            if len(mbtcTickerData) > 0 and len(ohlcvData) > 3:
                                
                                sendMessage['data'] = candlestickData.to_json(orient='records')
                                sendMessage['interval'] = clientsCopy[cid]['interval']
                                sendMessage['ohlc'] = ohlcvData.to_json(orient='records')
                                sendMessage['mbtc'] = mbtcTickerData.to_json(orient='records')
                                sendMessage['orderbook'] = orderbookData.to_json(orient="records")
                                strategy = candlestickData.tail(1)['strategy']
                                lastMbtc = mbtcTickerData.tail(1)['last']
                                lastTrade = tradesPrices.tail(1)['priceEp']
                                lastOhlcv = ohlcvData.tail(3)                                
                                stoploss = await getStopLossIndicator(lastMbtc,lastTrade,lastOhlcv,strategy)
                                sendMessage['stoploss'] = json.dumps(stoploss,default=str)
                                await clientsCopy[cid]['client_socket'].send(json.dumps(sendMessage,default=str))
                                
                        if clientsCopy[cid]['reset'] == True:
                            print('Client Reset')
                            tradesPrices = []
                            ohlcv = []
                            orderbookData = pd.DataFrame(columns=['price', 'amount', 'side', 'epoch'])
                            mbtcTickerData = pd.DataFrame(columns=['last', 'scale', 'symbol', 'epoch'])
                            clientsCopy[cid]['reset'] = False  
                    except websockets.ConnectionClosed:
                        continue
    except Exception as e:
        print('Error in send_message: {}'.format(e))

In [None]:
async def receive_messages(cid):
    # print('-- Function receive_messages --')
    global clients
    while True:
        try:
            # print('start receive_messages cid: {}'.format(cid),clients[cid])
            # print('clients',clients)
            response = json.loads(await clients[cid]['client_socket'].recv())
            print('receive_messages - Response client',response)
            client = clients[cid]
            # all_clients[response['clientId']]['reset'] = False
            if 'interval' in response:
                print('Interval found!',response['interval'])
                client['interval'] = response['interval']
                client['reset'] = True
            if 'strategy' in response:
                #print('Strategy found!',response['strategy'])
                client['strategy'] = response['strategy']
        except websockets.ConnectionClosedOK:
            if cid in clients:
                print('Remove Client {}'.format(cid))
                print('Client before {}'.format(clients))
                clients.pop(cid)
                print('Client after {}'.format(clients))

            continue
        # except websockets.ConnectionClosed:
        #     print('Websocket ConnectionClosed')

        #     continue

In [None]:
async def register_new_client(client_socket,path):
    try:
        global interval,strategy,clients,sendMessage
        print('clients',clients)
        clientsCopy = clients.copy()
        registered = [clientsCopy[i] for i in list(clientsCopy.keys())]
        ids = list(clientsCopy.keys())
        if client_socket not in registered:
            while (cid := str(uuid.uuid4())) in ids:
                pass

            print("registered new client with id {}".format(cid))
            sendMessage['clientId'] = cid
            await client_socket.send(json.dumps(sendMessage,default=str))
            clients[cid] = {'client_socket':client_socket,'interval':interval,'strategy': strategy,'reset':False}
            await receive_messages(cid)
    except websockets.ConnectionClosed:
        pass
    # except Exception as e:
    #     print('Foutmelding in functie register_new_client: {}'.format(e))

In [None]:
# connect to Phemes websocket server
async def connectPhemexWS():
    global interval
    async for websocket in websockets.connect('wss://vapi.phemex.com/ws'):
        try:
            print('Phemex Websocket Server is connected!')
            subscribe_ticker_msg = json.dumps({
                "id": 0,
                "method": "tick.subscribe",
                "params": [config.MBTC_SYMBOL]
            })
            await websocket.send(subscribe_ticker_msg)
            
            subscribe_Trade_msg = json.dumps({
                "id": 0,
                "method": "trade.subscribe",
                "params": [config.TRADE_SYMBOL]
            })
            await websocket.send(subscribe_Trade_msg)  

            subscribe_orderbook_msg = json.dumps({
                "id": 0,
                "method": "orderbook.subscribe",
                "params": [config.TRADE_SYMBOL,True]
            })
            await websocket.send(subscribe_orderbook_msg)              
                     
            while True:
                message = await websocket.recv() 
                # if 'book' in message:
                #     print(message)          
                await send_message(message)
        except websockets.ConnectionClosed:
            continue

In [None]:
# start python localhost websocket server
async def start_server():
    print('Server started')
    try:
        await websockets.serve(register_new_client,"localhost",3000)
        await connectPhemexWS()
    except Exception as e:
        print('Error {}'.format(e))

In [19]:
if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(start_server())
    event_loop.run_forever()