In [2]:
import pandas as pd
import http.client
import enum
import json
import random
import numpy as np
import math
import statistics
import matplotlib.pyplot as plt

In [3]:
Connection = http.client.HTTPSConnection("api.kucoin.com")
Connection.request("GET", "/api/v1/symbols")
AllSymbols = []
for symbol in json.loads(Connection.getresponse().read())['data']:
    AllSymbols.append(symbol['symbol'])

In [4]:
class TimeType:
    def __init__(self, val : str, uSize: int):
        self.val = val
        self.uSize = uSize


class TimeFrame(enum.Enum):
    min1 = TimeType('1min', 1 * 60)
    min3 = TimeType('3min', 3 * 60)
    min5 = TimeType('5min', 5 * 60)
    min15 = TimeType('15min', 15 * 60)
    min30 = TimeType('30min', 30 * 60)
   
    hour1 = TimeType('1hour', 1 * 3600)
    hour2 = TimeType('2hour', 2 * 3600)
    hour4 = TimeType('4hour', 4 * 3600)
    hour6 = TimeType('6hour', 6 * 3600)
    hour8 = TimeType('8hour', 8 * 3600)
    hour12 = TimeType('12hour', 12 * 3600)

    day1 = TimeType('1day', 24 * 3600)

    week1 =  TimeType('1week', 7 * 24 * 3600)


In [5]:
def GetData(type: TimeFrame, symbol: str, fromDate: str, toDate: str = '', pages: int = -1):
    conn = http.client.HTTPSConnection("api.kucoin.com")

    df = []
    totalData = []
        
    startAt: int = int(pd.Timestamp(fromDate).timestamp())
    endAt: int = 0

    if toDate == '':
        conn.request("GET", "/api/v1/timestamp")
        sTime = conn.getresponse()
        endAt = int(json.loads(sTime.read())['data'] / 1000)
    else:
        endAt = int(pd.Timestamp(toDate).timestamp())

    #TODO: Check for last page data  
    if pages == -1:
        duration = endAt - startAt
        pages = np.ceil(duration / (1500 * type.value.uSize))

    #TODO: Check for netwrok error 
    #TODO: Check for empty data
    for page in range(0, pages):
        conn.request("GET", f"/api/v1/market/candles?type={type.value.val}&symbol={symbol}&startAt={startAt}&endAt={endAt}")
        res = conn.getresponse()
        data = res.read()
        print(data)
        totalData.extend(json.loads(data)['data'])
        endAt -= type.value.uSize * 1500
        
    return totalData


In [6]:
def toDataFrame(data: list) -> pd.DataFrame:
    df = pd.DataFrame(data)
    df = df.rename(columns={0: 'Time', 1: 'Open', 2: 'Close', 3: 'High', 4: 'Low', 5: 'Volume', 6: 'Transaction'})
    df[['Open', 'Close', 'High', 'Low']] = df[['Open', 'Close', 'High', 'Low']].astype(float)
    df[['Volume', 'Transaction']] = df[['Volume', 'Transaction']].astype(float)

    # df['Time'] = pd.to_datetime(df['Time'], unit='s')
    # df.set_index('Time', inplace = True)

    return df

In [7]:
def RandomizedSamples(num : int, candleSize: int):
    dfs = []
    for i in range(0,num):
        timeType = TimeFrame[random.choice(TimeFrame._member_names_)]
        #TODO:Randomize starting date with respect to starting date of chosen symbol
        df = GetData(timeType, random.choice(AllSymbols),'2020.1.1', pages= int(np.ceil(candleSize/ 1500)))
        dfs.append(toDataFrame(df))
    return dfs

In [23]:
#Settings

df= toDataFrame(data= GetData(type= TimeFrame.min5, symbol= 'BTC-USDT', fromDate='2020.1.1', pages=1))
Indicators = True

In [24]:
# Indicators

if Indicators == True:
    # Simple Moving Average(SMA):
    class Indicator_SMA():
        def main(self, df: pd.DataFrame, period: int) -> pd.DataFrame:
            SMA = []

            for i in range(0, len(df)-period):
                SMA.append((df['Close'][i:i+period]).mean())
            for n in range(0, period):
                SMA.append(0)

            df[f'SMA_{period}'] = SMA
            return df

        def eval(self, df: pd.DataFrame, period: int):
            df = self.main(df, period)
            plt.figure(figsize=(15, 5), dpi=300)
            plt.plot(df.index, df.SMA_12, label="SMA")
            plt.plot(df.index, df.Close, label="Price")
            plt.xlim(df.index[200], 0)
            plt.ylim(bottom=42000, top=44000)
            plt.grid()
            plt.legend()
            plt.show()

    # Exponential Moving Average(EMA):

    class Indicator_EMA():
        def main(self, df: pd.DataFrame, period: int):
            EMA = []

            k = 2/(period+1)

            for i in range(0, period):
                EMA.append(0)

            EMA.append((df['Close'][(len(df)-period):len(df)]).mean())

            for i in range(period, len(df)-1):
                EMA.append(
                    ((df['Close'][(len(df)-1)-i])*k) + ((EMA[-1])*(1-k)))

            EMA.reverse()
            df[f'EMA_{period}'] = EMA
            return df

        def eval(self, df: pd.DataFrame):
            pass

    # Moving Average Convergence/Divergence(MACD):

    class Indicator_MACD():
        def main(self, df: pd.DataFrame, macdPeriod1: int, macdPeriod2: int, signalPeriod: int):
            df[f'MACD_{macdPeriod1},{macdPeriod2}'] = Indicator_EMA.main(self, df, macdPeriod1)[
                f'EMA_{macdPeriod1}'] - Indicator_EMA.main(self, df, macdPeriod2)[f'EMA_{macdPeriod2}']

            signal = []
            k = 2/(signalPeriod+1)

            for i in range(0, signalPeriod):
                signal.append(0)

            signal.append((df[f'MACD_{macdPeriod1},{macdPeriod2}'][(
                len(df)-signalPeriod):len(df)]).mean())

            for i in range(signalPeriod, len(df)-1):
                signal.append(
                    ((df[f'MACD_{macdPeriod1},{macdPeriod2}'][(len(df)-1)-i])*k) + ((signal[-1])*(1-k)))

            signal.reverse()
            df[f'MACDsignal_{signalPeriod}'] = signal
            return df

        def eval(self, df: pd.DataFrame):
            pass

    # RSI Indecator

    class Indicator_RSI():
        def main(self, df: pd.DataFrame, period: int):
            RSI = []
            for i in range(0, period):
                RSI.append(0)

            sumGain = 0
            sumLoss = 0
            for i in range(0, period):
                if df['Close'][(len(df)-2)-i] > df["Close"][(len(df)-1)-i]:
                    sumGain += (df['Close'][(len(df)-2)-i] -
                                df["Close"][(len(df)-1)-i])
                elif df['Close'][(len(df)-2)-i] == df["Close"][(len(df)-1)-i]:
                    pass
                else:
                    sumLoss += (df["Close"][(len(df)-1)-i] -
                                df['Close'][(len(df)-2)-i])
            averageGain = sumGain/period
            averageLoss = sumLoss/period
            RS = averageGain/averageLoss
            RSI.append(100-(100/(1+RS)))

            for i in range(period, len(df)-1):
                if df['Close'][(len(df)-2)-i] > df["Close"][(len(df)-1)-i]:
                    averageGain = ((averageGain*(period-1)) + (
                        df['Close'][(len(df)-2)-i] - df["Close"][(len(df)-1)-i]))/period
                elif df['Close'][(len(df)-2)-i] == df["Close"][(len(df)-1)-i]:
                    pass
                else:
                    averageLoss = ((averageLoss*(period-1)) + (
                        df["Close"][(len(df)-1)-i] - df['Close'][(len(df)-2)-i]))/period
                RS = averageGain/averageLoss
                RSI.append(100-(100/(1+RS)))

            RSI.reverse()
            df[f'RSI_{period}'] = RSI
            return df

        def eval(self, df: pd.DataFrame):
            pass

    # On-Balance Volume(OBV):
    class Indicator_OBV():
        def main(self, df: pd.DataFrame):
            OBV = []
            OBV.append(df['Volume'][len(df)-1])
            for i in range(1, len(df)):
                if df['Close'][(len(df)-1)-i] > df["Close"][(len(df))-i]:
                    OBV.append(OBV[-1]+(df['Volume'][len(df)-i]))
                elif df['Close'][(len(df)-1)-i] == df["Close"][(len(df))-i]:
                    OBV.append(OBV[-1])
                else:
                    OBV.append(OBV[-1]-(df['Volume'][len(df)-i]))
            OBV.reverse()
            df['OBV'] = OBV
        
        def eval(self, df: pd.DataFrame):
            pass
    
    # Accumulation/Distribution(A_D):
    class Indicator_A_D():
        def main(self, df: pd.DataFrame):
            A_D = []

            MFM = (((df['Close'][len(df)-1] - df['Low'][len(df)-1])-(df['High'][len(df)-1] -
                df['Close'][len(df)-1]))/(df['High'][len(df)-1] - df['Low'][len(df)-1]))
            MFV = MFM * df['Volume'][len(df)-1]
            A_D.append(MFV)

            for i in range(1, len(df)):
                MFM = (((df['Close'][(len(df)-1)-i] - df['Low'][(len(df)-1)-i])-(df['High'][(len(df)-1)-i] -
                    df['Close'][(len(df)-1)-i]))/(df['High'][(len(df)-1)-i] - df['Low'][(len(df)-1)-i]))
                MFV = MFM * df['Volume'][(len(df)-1)-i]
                A_D.append(A_D[-1]+MFV)

            A_D.reverse()
            df["A/D"] = A_D
            return df
        def eval(self, df: pd.DataFrame):
            pass

    # Average Directional Index(ADX):

    class Indicator_ADX():
        def main(self, df: pd.DataFrame, period: int):
            TR = [0]
            DM_positive = [0]
            DM_negative = [0]

            for i in range(1, len(df)):
                TR.append(df['High'][(len(df)-1)-i] -
                        df['Low'][(len(df)-1)-i])
                if (df['High'][(len(df)-1)-i] - df['High'][(len(df))-i]) > (df['Low'][(len(df))-i] - df['Low'][(len(df)-1)-i]):
                    DM_positive.append(
                        df['High'][(len(df)-1)-i] - df['High'][(len(df))-i])
                    DM_negative.append(0)
                elif (df['High'][(len(df)-1)-i] - df['High'][(len(df))-i]) < (df['Low'][(len(df))-i] - df['Low'][(len(df)-1)-i]):
                    DM_positive.append(0)
                    DM_negative.append(
                        df['Low'][(len(df))-i] - df['Low'][(len(df)-1)-i])
                else:
                    DM_positive.append(0)
                    DM_negative.append(0)

            STR = []
            SDM_positive = []
            SDM_negative = []
            for i in range(0, period):
                STR.append(0)
                SDM_positive.append(0)
                SDM_negative.append(0)

            STR.append(sum(TR[1:(period+1)]))
            SDM_positive.append(sum(DM_positive[1:(period+1)]))
            SDM_negative.append(sum(DM_negative[1:(period+1)]))

            for i in range(period+1, len(df)):
                SDM_positive.append(
                    SDM_positive[-1]-(SDM_positive[-1]/period) + DM_positive[i])
                SDM_negative.append(
                    SDM_negative[-1]-(SDM_negative[-1]/period) + DM_negative[i])
                STR.append(STR[-1]-(STR[-1]/period) + TR[i])

            DI_positive = []
            DI_negative = []
            DX = []

            for i in range(0, period):
                DI_positive.append(0)
                DI_negative.append(0)
                DX.append(0)

            DI_positive = DI_positive + \
                [(SDM_positive[i]/STR[i])*100 for i in range(period, len(df))]
            DI_negative = DI_negative + \
                [(SDM_negative[i]/STR[i])*100 for i in range(period, len(df))]

            DX = DX + [abs((DI_positive[i]-DI_negative[i])/(DI_positive[i] +
                        DI_negative[i]))*100 for i in range(period, len(df))]

            ADX = []
            for i in range(0, (period*2)-1):
                ADX.append(50)

            ADX.append(statistics.mean(DX[period:period*2]))
            for i in range(period*2, len(df)):
                ADX.append(((ADX[-1]*(period-1))+DX[i])/period)

            ADX.reverse()
            DI_positive.reverse()
            DI_negative.reverse()

            df[f'ADX_{period}'] = ADX
            df[f'+DI_{period}'] = DI_positive
            df[f'-DI_{period}'] = DI_negative

        def eval(self, df: pd.DataFrame):
            pass

    # Aroon Indicator/Oscillator:
    class Indicator_Aroon():
        def main(self, df: pd.DataFrame, period: int):
            aroonUp = []
            aroonDown = []
            aroonOsc = []

            for i in range(0, period):
                aroonUp.append(50)
                aroonDown.append(50)

            for i in range(period, len(df)):
                High_max = max(df['High'][len(df)-i:(len(df)-i)+period])
                index_of_high_max = df[df['High'] == High_max].index.values
                aroonUp.append(((period-(index_of_high_max[0]-(len(df)-i)))/period)*100)

                Low_min = min(df['Low'][len(df)-i:(len(df)-i)+period])
                index_of_low_min = df[df['Low'] == Low_min].index.values
                aroonDown.append(
                    ((period-(index_of_low_min[0]-(len(df)-i)))/period)*100)

            aroonOsc = [aroonUp[i]-aroonDown[i] for i in range(0, len(df))]

            aroonUp.reverse()
            aroonDown.reverse()
            aroonOsc.reverse()

            df[f'AroonUP_{period}'] = aroonUp
            df[f'AroonDown_{period}'] = aroonDown
            df[f'AroonOsc_{period}'] = aroonOsc
        
        def eval(self, df: pd.DataFrame):
            pass

    # Stochastic Oscillator(%k and %D)
    
    class Indicator_Stoc():
        def main(self, df: pd.DataFrame, period: int, subPeriod = 3):
            Stoc = []
            
            for i in range(0, period):
                Stoc.append(50)
            for i in range(period, len(df)):
                High_max = max(df['High'][len(df)-i:(len(df)-i)+period])
                Low_min = min(df['Low'][len(df)-i:(len(df)-i)+period])
                Stoc.append(
                    ((df['Close'][len(df)-i]-Low_min)/(High_max-Low_min))*100)

            Stoc.reverse()

            Stoc_D = []

            for i in range(0, len(df)-subPeriod):
                Stoc_D.append(statistics.mean(Stoc[i:i+subPeriod]))
            for n in range(0, subPeriod):
                Stoc_D.append(0)

            df[f'Stochastic%K_{period}'] = Stoc
            df[f'Stochastic%D_{subPeriod}'] = Stoc_D
            
            return df

        def eval(self, df:pd.DataFrame):
            pass

    # Contribution:
    class Indicator_Contribution():
        def main(self, df: pd.DataFrame):
            contribution_List = []

            for i in range(0, len(df)):
                contribution_List.append(df['Transaction'][i])

            contribution_List = [math.log(contribution_List[x], 10)
                                for x in range(0, len(contribution_List))]
            contribution_List = [(df['Volume'][i])/(contribution_List[i])
                                for i in range(0, len(contribution_List))]
            df['Contribution'] = contribution_List

            return df
        
        def eval(self, df: pd.DataFrame):
            pass




In [25]:
# def BackTest_Ind(dataFrames, indicator: Indicator):
#     for df in dataFrames:
#         print(indicator.eval(df))

# def BackTest_Strat():
#     pass
