In [None]:
#This code is used to build strategy based on historical data collected

In [40]:
import requests
import time
import pandas as pd
import json
import sqlalchemy as db
import matplotlib.pyplot as plt
import pylab
column = db.Column
integer = db.Integer
floats = db.Float
string = db.String


In [46]:
#create db and connect 
engine = db.create_engine('sqlite:///forex.db')
conn = engine.connect()
metadata = db.MetaData(engine)
#print("Opened database successfully")

#this class is used to build strategy for automated live investment decisions of real-time data using historical data
#Instances of this class can be created for different currency pairs 
class CurrencyClass:
    #this method defines empty lists to store data
    def __init__(self):
        #data is the 24 hours historical data collected for currency pairs
        self.data = None
        #saves the list of aggregated 6 minutes data
        self.aggData = []
        #saves the list of consecutive positive returns used to build strategy
        self.positive_entries_collection = []
    
    #this method is used to read 24 hours currency historical data from json file and returns a list of objects
    def readData(self, pair):
        with open(pair +'.json', 'r') as f:
              self.data = json.load(f)
              return self.data
        
    #this method creates a table and stores the data read from json file in the table
    def storeData(self, pair):
        #create table
        self.table_name = pair
        print(self.table_name)
        self.table = db.Table(
          self.table_name, metadata, 
          column('ID', integer, primary_key = True, nullable=False),  
          column('RATE_AS_BID', floats, nullable=False), 
          column('RATE_AS_ASK', floats, nullable=False), 
          column('CURRENCY_PAIR', string, nullable=False), 
          column('TIMESTAMP_FROM_SOURCE', integer, nullable=False), 
          column('TIMESTAMP_AT_INSERTION', integer, nullable=False), 
        )
        metadata.create_all(engine)
#         print("Tables created successfully")
#         print(self.table_name)
#         #insert data into table
#         for item in self.data:
#            ins = db.insert(self.table).values(
#            RATE_AS_BID=item['rateAsBid'],
#            RATE_AS_ASK=item['rateAsAsk'],
#            CURRENCY_PAIR=item['currencyPair'],
#            TIMESTAMP_FROM_SOURCE=item['timestampFromSource'],
#            TIMESTAMP_AT_INSERTION=item['timestampAtInsertion']
#            )
#            conn.execute(ins)
#         print("Inserted successfully") 

    #this method selects the 24 hours grannular prices(RATE_AS_BID) from the database. returns a list of prices
    def selectPrices6mins(self):
        s = db.select(self.table.c.RATE_AS_BID)
        result = conn.execute(s).all()
        price_list = [item[0] for item in result]
        return price_list
    
    #this method takes agg_list as a parameter and computes average. Returns avg as floating number
    def calculate_avg(self, agg_list):
        values = agg_list
        sum = 0
        no_of_items = len(values)
        for item in values:
            sum += float(item)
        avg = sum/no_of_items
        return avg
    
    #this method takes agg_list as a parameter and computes average. Returns avg as floating number
    def calculateStdDev(self, mean, list):
        sum = 0
        no_of_items = len(list)
        for item in list:
            #print (type(item))
            dev_mean = float(item) - float(mean)
            dev_mean_squared = dev_mean**2
            sum += dev_mean_squared
            stdev = (sum/no_of_items)**0.5
        return stdev
    
    #this method takes agg_list as a parameter and computes historical returns. Returns 'returns' as floating number
    def calculateHistReturn(self, agg_list):
        self.size_list = len(agg_list)
        previous_item = agg_list[self.size_list - 2]
        latest_item = agg_list[self.size_list - 1]
        if self.size_list > 1:
          hist_return = 100*((latest_item / previous_item) - 1)
          return hist_return
        else:
          return None
    
    #this method takes agg_list as a parameter and calls calculate_avg method to compute moving_average and moving_stdev for 5 data points. Returns moving_average as floating number
    def calculate_moving_average(self, agg_list):
         #average of last 5 data points 
         if self.size_list > 4:
             list_of_last_5_items = agg_list[-5:]
             self.moving_average = self.calculate_avg(list_of_last_5_items)
             self.moving_stdev = self.calculateStdDev(self.moving_average, list_of_last_5_items)
             #print("mov_avg: ", self.moving_average)
             #print("mov_stdev: ", self.moving_stdev)
             return self.moving_average
         else:
             return None
    
    #This method computes bollinger bands. Returns a dictionary of upper and lower bollinger bands as floating numbers  
    def calculate_bollinger_bands(self):
        #use moving average and stdev to calculate the upper and lower bands
        #buy signals when the price hits the lower band. oversold stock #concept of higher supply, lower price
        #sell signals when the price hits the upper band. overbought stock # concept higher demand, higher price
        if self.size_list > 4:
            bollinger_upper = self.moving_average + 1.25*self.moving_stdev
            bollinger_lower = self.moving_average - 1.25*self.moving_stdev

            return {'bollinger_upper': bollinger_upper, 'bollinger_lower': bollinger_lower }
        else:
            return {'bollinger_upper': None, 'bollinger_lower': None }
    
    #This method takes no_of consecutive returns to test and list of items (agg_list) computed every 6 minutes as parameters. 
    #Returns a dictionary of no of consecutive returns used as int,
    #frequency of consecutive returns used in agg list as int,
    #frequency of times average_returns exceeded upper bollinger bands as int, 
    #and % of times times average_returns exceeded upper bollinger bands as int 
    def getStrategy(self, no_of_returns, agg_List):
          #declare an empty list of positive entries collection
          positive_entries_collection = []
          #declare an empty list to store each positive entry while looping through the agggregate list
          positive_entries = []
          #declare counter to count how many times avg_price exceeded upper bollinger
          count = 0
          
          #loop through agg_List from the 5th item since bollinger bands is calculated from 5th item
          for item in agg_List[4:]: 
            #check if returns is greater than zero
            if item['returns'] != None and item['returns'] > 0:
                #apprend to positive entries list
                positive_entries.append(item) 
            else: #reset the list to empty
              positive_entries = []
              continue 
            #if the length of positive entries list equals the no of n returns passed as a parameter
            if len(positive_entries) == no_of_returns:
              #append list positive entries object  to positive entries collection
              positive_entries_collection.append(positive_entries)
              #if correlating average price in positive entry is greater than correlating upper bollinger band
              if positive_entries[len(positive_entries)-1]['bollinger_upper'] != None and positive_entries[len(positive_entries)-1]['avg_price'] > positive_entries[len(positive_entries)-1]['bollinger_upper']:
                 count +=1
              #reset the positive entries list to empty list
              positive_entries = []
          #compute percentage of times avg_price exceeded upper bollinger bands
          perct = round((count / (len(agg_List)-4))*100)
          return {'Number of returns': no_of_returns,
                  'Frequency of consecutive returns': len(positive_entries_collection), 
                  'Frequency of times avg returns exceeded upper bollinger': count,
                  '% of times avg returns exceeded upper bollinger': perct }
                
    #This method aggregates 24 hours historical data and performs computations on each 6 minute worth of data    
    def compute6minsData(self):
        #select all 24 hours historical prices
        data = self.selectPrices6mins()
        #print(len(data))
        #create chunks of 6 minutes (360 entries) of data from 24 hours data
        chunks = [data[x:x+360] for x in range(0, len(data), 360)]
        #print(len(chunks) )
        avg_price_list = []
        returns_list = []
        #loop through each chunk(6 minutes/ 360 entries) worth of data
        for i, chunk in enumerate(chunks):
            #calculate average price 
            avg_price = self.calculate_avg(chunk)
            avg_price_list.append(avg_price)
            #calculate returns
            returns = self.calculateHistReturn(avg_price_list)
            returns_list.append(returns)
            #print(returns)
            #calculate stdev
            stdev = self.calculateStdDev(avg_price, chunk) 
            #calculate moving_average
            moving_average = self.calculate_moving_average(chunk)
            #print("moving_average:", moving_average)
            #calculate bollinger bands
            result = self.calculate_bollinger_bands()
            #print(result)
            #create object of items computed every 6 minutes
            items = {'agg': i+1,
                    'avg_price': avg_price,
                    'std_dev': stdev,  
                    'returns': returns,
                    'moving average' : moving_average,
                    'bollinger_upper': result['bollinger_upper'],
                    'bollinger_lower': result['bollinger_lower'] 
            }
            #print(items)
            #compile objects created every 6 minutes in aggregate data list
            self.aggData.append(items) 
        #use list of compiled agg_data to build strategy
        strategy = self.getStrategy(4, self.aggData)
        print(strategy)

#create an instance of my class
compute = CurrencyClass()

#pass each pair as required
pair =  'EUR_GBP'

# pass pair as a parameter and call method to read data
#compute.readData(pair)

#pass pair as a parameter and call method to store data
compute.storeData(pair)

#build strategy by calling compute6minsData method
build_strategy = compute.compute6minsData()
print(build_strategy)


   
   

EUR_GBP
{'Number of returns': 4, 'Frequency of consecutive returns': 12, 'Frequency of times avg returns exceeded upper bollinger': 3, '% of times avg returns exceeded upper bollinger': 1}
None


In [None]:
['EUR_USD', 'USD_NGN', 'GBP_USD', 'CNY_USD', 'EUR_GBP',
 'CAD_USD', 'EUR_CNY', 'CNY_CAD', 'EGP_USD', 'GBP_INR','INR_USD']

In [None]:
['EUR_USD', 'USD_NGN', 'GBP_USD', 'CNY_USD', 'EUR_GBP', 'CAD_USD', 'EUR_CNY', 'CNY_CAD', 'EGP_USD', 'GBP_INR','INR_USD']