In [1]:
from alpha_vantage.timeseries import TimeSeries
import pandas as p
import csv
import os

In [2]:
class ScriptData(dict):

    def __getitem__(self,key:str) -> p.DataFrame:
        if not self.__contains__(key):
            print("Invalid key or the data is not present for the key : "+key)
            return
        return super().__getitem__(key)

    def __setitem__(self,key:str,val:p.DataFrame) -> None:
        return super().__setitem__(key,val)

    def __contains__(self,obj) -> bool:
        return super().__contains__(obj)

    def fetch_intraday_data(self,script) -> None:
        try:
            self.__setitem__(script,TimeSeries(key='XKUTMFECRW800M5A',output_format='pandas').get_intraday(script, interval = '60min'))
        except:
            print("Something went wrong while fetching the data")
            return

    def convert_intraday_data(self,script:str) -> None:

        try:
            df=p.DataFrame(self[script][0])
        except:
            print("Invalid key or data not fetched")
            return

        #correcting the order of data
        df.to_csv("file.csv")
        df=p.read_csv("file.csv")
        os.remove("file.csv")

        #changing column headings
        df.rename(columns={'date':'timestamp','1. open':'open','2. high':'high','3. low':'low','4. close':'close','5. volume':'volume'},inplace=True)

        #reversing the data
        df.to_csv("reverse.csv")
        f=open("reverse.csv",'r')
        read=csv.reader(f)
        f.readline()
        l=[]
        for i in read:
            l.append(i)
        l=l[::-1]
        f.close()
        f=open("reverse.csv",'w+')
        write=csv.writer(f)
        write.writerow(["timestamp","open","high","low","close","volume"])
        x=0
        for i in l:
            i[0]=x
            write.writerow(i)
            x+=1
        f.close()
        df=p.read_csv("reverse.csv")
        os.remove("reverse.csv")
        
        #changing data type
        ctype={'open':float,'high':float,'low':float,'close':float,'volume':int}
        df=df.astype(ctype)
        
        self.__setitem__(script,df)


In [3]:
sd=ScriptData()

In [4]:
sd.fetch_intraday_data('GOOGL')
sd.convert_intraday_data('GOOGL')
sd['GOOGL']

Unnamed: 0,timestamp,open,high,low,close,volume
0,2023-02-03 17:00:00,104.7800,105.01,104.70,104.9467,2113880
1,2023-02-03 18:00:00,104.7800,104.98,104.78,104.9100,45878
2,2023-02-03 19:00:00,104.9400,104.94,104.76,104.7600,9995
3,2023-02-03 20:00:00,104.7600,104.80,104.65,104.7400,12817
4,2023-02-06 05:00:00,104.4000,104.60,103.16,103.2400,22840
...,...,...,...,...,...,...
95,2023-02-13 16:00:00,94.2701,94.65,94.10,94.6300,6317935
96,2023-02-13 17:00:00,94.6200,94.70,94.49,94.5600,2027623
97,2023-02-13 18:00:00,94.5700,94.63,94.50,94.5100,52206
98,2023-02-13 19:00:00,94.5100,94.53,94.20,94.2700,36885


In [5]:
sd.fetch_intraday_data('AAPL')
sd.convert_intraday_data('AAPL')
sd['AAPL']

Unnamed: 0,timestamp,open,high,low,close,volume
0,2023-02-03 17:00:00,154.2650,154.5346,153.8756,154.2001,4422622
1,2023-02-03 18:00:00,154.1951,154.2650,153.7958,153.9655,155453
2,2023-02-03 19:00:00,153.9156,154.1153,153.8157,154.0454,49726
3,2023-02-03 20:00:00,154.0554,154.1552,154.0154,154.1552,39258
4,2023-02-06 05:00:00,153.2765,153.2765,152.6375,152.7274,118275
...,...,...,...,...,...,...
95,2023-02-13 16:00:00,153.0700,153.9800,152.7600,153.8400,10885106
96,2023-02-13 17:00:00,153.8400,153.8500,153.6600,153.7100,2873954
97,2023-02-13 18:00:00,153.7400,153.8900,153.7100,153.8000,205821
98,2023-02-13 19:00:00,153.8300,153.8400,153.6300,153.7100,24978


In [6]:
'GOOGL' in sd

True

In [7]:
'AAPL' in sd

True

In [8]:
'NVDA' in sd

False

In [9]:
#function for creating indicator    
def indicator1(df:p.DataFrame,n:int) -> p.DataFrame :

    #creating list of the required data
    ts=list(df["timestamp"])
    close=list(df["close"])

    #creating file to store data
    f=open("new_data.csv","a",newline="")
    write=csv.writer(f)
    write.writerow(["timestamp","indicator"]) #adding the header

    #calculating and adding data in the file
    i=0
    b=False
    while i<len(ts):
        sum=0
        for x in range(n+1):
            if i==len(close):
                b=True
                break
            sum+=close[i]
            i+=1
        if b:
            write.writerow([ts[i-1],sum/x])
            break
        write.writerow([ts[i-1],sum/(x+1)])
    f.close()

    #saving the data as required and deleting csv files
    df=p.read_csv("new_data.csv")
    os.remove("new_data.csv")
    df=df.astype({'indicator':float})

    return df


In [10]:
indicator1(sd['AAPL'],5)

Unnamed: 0,timestamp,indicator
0,2023-02-06 06:00:00,153.631833
1,2023-02-06 12:00:00,152.323
2,2023-02-06 18:00:00,151.6452
3,2023-02-07 08:00:00,151.597433
4,2023-02-07 14:00:00,152.474367
5,2023-02-07 20:00:00,153.9971
6,2023-02-08 10:00:00,153.502883
7,2023-02-08 16:00:00,151.980067
8,2023-02-09 06:00:00,152.555983
9,2023-02-09 12:00:00,153.212467


In [11]:
class Strategy:

    def __init__(self,script:str) -> None:
        self.script=script
        self.signal=p.DataFrame()
    
    def get_script_data(self) -> None:

        #creating script data file
        try:
            obj=ScriptData()
            obj.fetch_intraday_data(self.script)
            obj.convert_intraday_data(self.script)
            obj[self.script].to_csv("script_data.csv")
        except:
            print("cannot fetch the intraday data")
            return

        #creating int indicator file
        t=5  #timeperiod
        indicator1(obj[self.script],t).to_csv("indicator.csv")

        #creting a data.csv file to storing insections
        f=open("intersection.csv",'a',newline='')
        write=csv.writer(f)
        write.writerow(["timestamp","signal"])

        #finding and storing intersection
        ind=open("indicator.csv",'r')
        ind_read=csv.reader(ind)
        ind.readline()
        sdata=open("script_data.csv",'r')
        sdata_read=list(csv.reader(sdata))
        t+=1
        pos=t
        for i in ind_read:
            if sdata_read[pos][5].split('.')[0] == i[2].split('.')[0]:
                if float(sdata_read[pos][5])<float(sdata_read[pos+1][5]):
                    write.writerow([i[1],'BUY'])
                else:
                    write.writerow([i[1],'SELL'])
            else:
                write.writerow([i[1],'NO_SIGNAL'])
            pos+=t
            if pos>=len(sdata_read)-1:
                break
        f.close()
        ind.close()
        sdata.close()

        self.signal=p.read_csv("intersection.csv")  

        #deleting all the csv files
        os.remove("intersection.csv")
        os.remove("indicator.csv")
        os.remove("script_data.csv")

    def get_signal(self) -> p.DataFrame:

        #converting Dataframe to csv file
        self.signal.to_csv("signal.csv")
        f=open("signal.csv",'r')
        read=csv.reader(f)
        f.readline()  #skipping the header

        #creating file to store required data
        new_f=open("new_signal.csv",'a',newline='')
        write=csv.writer(new_f)
        write.writerow(["timespan","signals"]) #adding header to the file
        
        #storing required data
        for i in read:
            if i[2]=='NO_SIGNAL':
                continue
            else:
                write.writerow(i)
        f.close()
        new_f.close()

        #saving the data and deleting the csv files
        self.signal=p.read_csv("new_signal.csv")
        os.remove("new_signal.csv")
        os.remove("signal.csv")

        return self.signal

In [12]:
st=Strategy('NVDA')

In [13]:
st.get_script_data()

In [14]:
st.get_signal()

Unnamed: 0,timespan,signals
2,2023-02-06 18:00:00,BUY
13,2023-02-10 20:00:00,BUY
