In [1]:
import yfinance as yf
import numpy as np
import pandas as pd
import statsmodels
from statsmodels.tsa.stattools import coint
import matplotlib.pyplot as plt
import dask
import datetime as dt
import dask.dataframe as dd

In [2]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=1, n_workers=8, memory_limit='6GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:59442  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 48.00 GB


In [3]:
datalist = pd.read_csv('companylist.csv')

In [4]:
datalisttech = datalist.loc[datalist['Sector'] == 'Technology']
datalisttechbillion = datalisttech.loc[datalisttech['MarketCap'].str.contains("B", na=False)]

In [5]:
tickerlist = datalisttechbillion['Symbol'].to_list()

In [23]:
pandas_data = yf.download(tickers = tickerlist, period = '60d', interval = '2m', group_by='ticker', prepost=False)

[*********************100%***********************]  217 of 217 completed

2 Failed downloads:
- API: 2m data not available for startTime=1593178200 and endTime=1598852113. The requested range must be within the last 60 days.
- DADA: 2m data not available for startTime=1591363800 and endTime=1598852115. The requested range must be within the last 60 days.


In [7]:
#for stock in tickerlist:
#    pandas_data[stock].to_csv('stockdata/'+stock+'.csv')

In [19]:
#tryit = pandas_data['AAPL'].dropna()
#tryit2 = pandas_data['GOOG'].dropna()
#tryit2.columns = ['Open2', 'High2', 'low2', 'close2', 'Adjclose2', 'Volume2']
#tryit.head()

In [18]:
#starttime1 = dt.datetime.now()
#y = tryit.merge(tryit2, how='inner', left_index=True, right_index=True)
#endtime1 = dt.datetime.now()
#finaltime1 = endtime1-starttime1

#starttime = dt.datetime.now()
#s = coint(y['Open'], y['Open2'], autolag='aic')
#endtime = dt.datetime.now()
#finaltime = endtime-starttime
#overalltime = endtime-starttime1

In [10]:
def single_coint_test(stock1, stock2):
        pvaluetable = []
        try:    
            s1 = pd.read_csv('stockdata/'+stock1+'.csv').dropna()
            s2 = pd.read_csv('stockdata/'+stock2+'.csv', header=0, names = ['Datetime', 'Open2', 'High2', 'low2', 'close2', 
                                                             'Adjclose2', 'Volume2']).dropna()
            both_stocks = s1.merge(s2, on = 'Datetime')
            if len(both_stocks['Open'].to_list()) <= 50:
                pvaluetable.append([stock1, stock2, 1.0, -3])
            else:
                score, pvalue, _ = coint(both_stocks['Open'], both_stocks['Open2'], autolag='aic')
                if pvalue <= 0.01:
                    pvaluetable.append([stock1, stock2, pvalue, 1])
                else:
                    pvaluetable.append([stock1, stock2, pvalue, 0])
            del s1
            del s2
            del both_stocks
        except:
            pvaluetable.append(stock1, stock2, 1.0, -1)
        return pvaluetable

In [11]:
def single_coint_test_memory(stock1, stock2):
        pvaluetable = []
        try:    
            s1 = pandas_data[stock1].dropna()
            s2 = pandas_data[stock2].dropna()
            s2.columns = ['Open2', 'High2', 'low2', 'close2', 
                                                             'Adjclose2', 'Volume2']
            both_stocks = s1.merge(s2, how='inner', left_index=True, right_index=True)
            if len(both_stocks['Open'].to_list()) <= 50:
                pvaluetable.append([stock1, stock2, 1.0, -3])
            else:
                score, pvalue, _ = coint(both_stocks['Open'], both_stocks['Open2'], autolag='aic')
                if pvalue <= 0.01:
                    pvaluetable.append([stock1, stock2, pvalue, 1])
                else:
                    pvaluetable.append([stock1, stock2, pvalue, 0])
            del s1
            del s2
            del both_stocks
        except:
            pvaluetable.append(stock1, stock2, 1.0, -1)
        return pvaluetable

In [12]:
def listshift(ticker_list):
    newlist = [ticker_list[-1]] + ticker_list[:-1]
    return newlist

In [13]:
def dask_coint(tickers, rotations):
    dask_coint = dask.delayed(single_coint_test)
    dask_tasks_list = []
    newlist = tickerlist
    for count in np.arange(0, rotations):
        newlist = listshift(newlist)
        for stock_1, stock_2 in zip(tickerlist, newlist):
            task = dask_coint(stock_1, stock_2)
            dask_tasks_list.append(task)
    dask_persist_list = dask.persist(*dask_tasks_list)
    computations = dask.compute(dask_persist_list)
    return computations

In [20]:
def non_dask_coint(tickers, rotations):
    results = []
    newlist = tickerlist
    n = 0
    for count in np.arange(0, rotations):
        newlist = listshift(newlist)
        n = n+1
        print(n)
        for stock_1, stock_2 in zip(tickerlist, newlist):
            result = single_coint_test_memory(stock_1, stock_2)
            results.append(result)
    return results
        

In [21]:
def tasktime():
    starttime = dt.datetime.now()
    taskfile = non_dask_coint(tickerlist, 5)
    endtime = dt.datetime.now()
    time_elapsed = (endtime-starttime)
    return taskfile, time_elapsed

In [24]:
result, timer = tasktime()

1
2
3
4
5


In [25]:
timer

datetime.timedelta(seconds=222, microseconds=280923)

In [None]:
wow = pd.DataFrame(result[0])

In [None]:
wow.to_csv('results.csv')