### Goals

- What are the tickers available in PolygonIO?
- What's the correlation between two industris? Where does the 'rotation' take place?

### Imports

In [4]:
import sys
import os
import requests

# sys.path.append("C:\\Users\\SHIRAM\\Documents\\streams")
sys.path.append("/mnt/c/Users/SHIRAM/Documents/streams/")

from connections import Connections
from multi_thread_streams import get_distinct_col_values_from_equities_info, get_multiple_distinct_col_values_from_equities_info
from multi_thread_streams import establish_ssh_tunnel

import pandas as pd
import numpy as np 
from functools import reduce

from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL

from math import pi
import psycopg2

import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

### Functions

In [9]:
if "read_db" in locals():
    del read_db
    
def read_db(
    table_name:str=None, 
    columns:list=None, 
    condition:str=None, 
    ssh_conn_params:dict=None, 
    db_conn_params:dict=None, 
    query:str=None,
    use_sqlalchemy:bool=True
):
    """
    All query's will be run from tunnel init here, as freq of query's will be low.
    """
    
    # a bit more flexibility on column level
    if query is None:
        if columns[0] != "*":
            columns = ', '.join(columns)
        else:
            columns = columns[0]
        query = f"SELECT {columns} FROM {table_name} WHERE {condition};"
    
    # tunnel and all others are 
    tunnel = establish_ssh_tunnel(ssh_conn_params=ssh_conn_params)
    tunnel.daemon_transport = True
    tunnel.daemon_forward_servers = True
    # tunnel.start()
    
    # local bind port will be used by both psycopg2 and sqlalchemy conditions
    db_conn_params["port"] = 5433 # int(tunnel.local_bind_port)
    
    if use_sqlalchemy:
        # need to change the 'user' key to 'username', if the key is found
        if "user" in db_conn_params.keys():
            username = db_conn_params.pop("user")
            db_conn_params["username"] = username
        
        # some other conditons that are not accepted by the engine are thrown out
        for ele in ["keepalives", "keepalives_idle", "keepalives_interval", "keepalives_count"]:
            if ele in db_conn_params.keys():
                _ = db_conn_params.pop(ele)
        
        db_conn_params["drivername"] = "postgresql+psycopg2"
        engine = create_engine(URL(**db_conn_params))
        result = pd.read_sql_query(sql=query, con=engine)
        
    else:
        with psycopg2.connect(**db_conn_params) as conn:
            with conn.cursor() as cur:
                cur.execute(query)
                result = cur.fetchall()
            
#     if tunnel.is_alive | tunnel.is_active:
#         tunnel.stop()
        
    return result

if "get_data" in locals():
    del get_data
    
def get_data(industries:list = None, sectors:list = None) -> pd.DataFrame:
    """
    Filter by multiple industries and sectors
    """
    query = ["SELECT A.*, B.industry, B.sector FROM polygon_stocks_agg_candles A LEFT JOIN equities_info B ON B.symbol = A.ticker"]
    
    if industries is not None:
        industries = [f"'{val}'" for val in industries]
        industries = ", ".join(industries)
        query += [f"WHERE B.industry in ({industries})"]
        
    if sectors is not None:
        sectors = [f"'{val}'" for val in sectors]
        sectors = ", ".join(sectors)
        if len(query) > 1:
            query += [f"AND B.sector in ({sectors}) AND A.timespan = 'day'"]
        else:
            query += [f"WHERE B.sector in ({sectors}) AND A.timespan = 'day'"]
    
    query = " ".join(query)
    query = f"{query};"
    res = read_db(
        query = query,
        ssh_conn_params=conns.ssh_conn_params, 
        db_conn_params=conns.db_conn_params, 
        use_sqlalchemy=True
    )
    return res

if "process_data" in locals():
    del process_data
    
def process_data(df:pd.DataFrame, close:bool=True, ohlc:bool=False) -> pd.DataFrame:
    if close:
        cols = ["timestamp", "ticker", "close", "vwap", "industry", "sector"]
        
    if ohlc:
        cols = ["timestamp", "ticker", "open", "high", "low", "close", "vwap", "industry", "sector"]
        
    df = df[cols]
    
    cols = list(df.columns)
    cols[0] = "date"
    df.columns = cols
    
    df.loc[:, "date"] = pd.to_datetime(df["date"])
    df = df.sort_values(by="date")
    
    return df

### Calls

In [10]:
# to fetch data from cloud
conns = Connections()

results = get_multiple_distinct_col_values_from_equities_info(
    logger=conns.logger, 
    ssh_conn_params=conns.ssh_conn_params, 
    db_conn_params=conns.db_conn_params, 
    col_names=["industry", "sector"]
)

energy_df = process_data(df=get_data(sectors=["Energy"]), close=True)
finance_df = process_data(df=get_data(sectors=["Finance"]), close=True)

# energy_df.to_csv("energy_tickers.csv", index=True)
# finance_df.to_csv("finance_tickers.csv", index=True)

2020-11-14 03:31:07 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:416}-INFO-Making ssh tunnel to get sectors list...
2020-11-14 03:31:07 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:416}-INFO-Making ssh tunnel to get sectors list...
2020-11-14 03:31:07 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:416}-INFO-Making ssh tunnel to get sectors list...
2020-11-14 03:31:07 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:416}-INFO-Making ssh tunnel to get sectors list...
2020-11-14 03:31:08 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:427}-INFO-Querying db...
2020-11-14 03:31:08 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:427}-INFO-Querying db...
2020-11-14 03:31:08 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:427}-INFO-Querying db...
2020-11-14 03:31:08 PM-{/mnt/c/Users/SHIRAM/Documents/streams/multi_thread_streams.py:427}-INFO-Querying db...
2020-11-

In [28]:
from tqdm.notebook import tqdm_notebook

In [29]:
conns.establish_rest_client()
client = conns.rest_client
tickers = []
for page in tqdm_notebook(range(1, 10000)):
    try:
        tkr = client.reference_tickers(page=page)
        tickers.append(tkr)
    except Exception as e: 
        print(f"Exception: {e}")

2020-11-14 03:58:00 PM-{/mnt/c/Users/SHIRAM/Documents/streams/connections.py:241}-INFO-Polygon REST connection established
2020-11-14 03:58:00 PM-{/mnt/c/Users/SHIRAM/Documents/streams/connections.py:241}-INFO-Polygon REST connection established
2020-11-14 03:58:00 PM-{/mnt/c/Users/SHIRAM/Documents/streams/connections.py:241}-INFO-Polygon REST connection established
2020-11-14 03:58:00 PM-{/mnt/c/Users/SHIRAM/Documents/streams/connections.py:241}-INFO-Polygon REST connection established


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=9999.0), HTML(value='')))

Exception: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response',))



KeyboardInterrupt: 

In [35]:
all_tickers = []
for tkr in tqdm_notebook(tickers):
    all_tickers += tkr.tickers

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=4026.0), HTML(value='')))




In [38]:
all_tickers_df = pd.DataFrame.from_dict(all_tickers)

In [41]:
all_tickers_df[all_tickers_df["market"] == "INDICES"]

Unnamed: 0,ticker,name,market,locale,type,currency,active,primaryExch,updated,codes,url,attrs
17982,I:AAXJ,iShares MSCI All Country Asia ex Japan Index Fund,INDICES,G,ETF,USD,True,GIDS,2020-11-14,,https://api.polygon.io/v2/tickers/I:AAXJ,{}
17983,I:ABAQ,ABA NASDAQ Community Bank,INDICES,G,INDEX,USD,True,GIDS,2020-11-14,,https://api.polygon.io/v2/tickers/I:ABAQ,"{'holiday': False, 'assettype': 'EQ', 'entitle..."
17984,I:ABQI,NASDAQ OMX ABA Community Bank,INDICES,G,INDEX,USD,True,GIDS,2020-11-14,,https://api.polygon.io/v2/tickers/I:ABQI,"{'holiday': False, 'assettype': 'EQ', 'entitle..."
17985,I:ABQX,NASDAQ OMX ABA Community Bank Total Rtn,INDICES,G,INDEX,USD,True,GIDS,2020-11-14,,https://api.polygon.io/v2/tickers/I:ABQX,"{'holiday': False, 'assettype': 'EQ', 'entitle..."
17986,I:ACWI,iShares MSCI ACWI Index Fund,INDICES,G,ETF,USD,True,GIDS,2020-11-14,,https://api.polygon.io/v2/tickers/I:ACWI,{}
...,...,...,...,...,...,...,...,...,...,...,...,...
100681,I:ZPRS,SPDR MSCI World Small Cap UCITS ETF,INDICES,G,ETF,USD,True,GIDS,2020-01-30,,https://api.polygon.io/v2/tickers/I:ZPRS,{}
100682,I:ZPRU,SPDR MSCI USA Value Weighted UCITS ETF,INDICES,G,ETF,USD,True,GIDS,2020-01-30,,https://api.polygon.io/v2/tickers/I:ZPRU,{}
100683,I:ZPRV,SPDR MSCI USA Small Cap Value Weighted UCITS ETF,INDICES,G,ETF,USD,True,GIDS,2020-01-30,,https://api.polygon.io/v2/tickers/I:ZPRV,{}
100684,I:ZPRW,SPDR MSCI Europe Value Weighted UCITS ETF,INDICES,G,ETF,USD,True,GIDS,2020-01-30,,https://api.polygon.io/v2/tickers/I:ZPRW,{}


In [None]:
class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""

    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kwargs = self.tasks.get()
            try:
                func(*args, **kwargs)
            except Exception as e:
                print(f"Exception: {e}")

            self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""

    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_tasks(self, func, *args, **kwargs):
        self.tasks.put((func, args, kwargs))

    def wait_completion(self):
        self.tasks.join()


In [None]:
pool = ThreadPool(num_threads=100)
for eq in tqdm()

In [21]:
tickers.tickers

[{'ticker': 'AAMC',
  'name': 'Altisource Asset Mgmt Corp',
  'market': 'STOCKS',
  'locale': 'US',
  'currency': 'USD',
  'active': True,
  'primaryExch': 'AMX',
  'updated': '2020-06-23',
  'codes': {'cik': '0001555074',
   'figiuid': 'EQ0000000027544942',
   'scfigi': 'BBG003PNL145',
   'cfigi': 'BBG003PNL136',
   'figi': 'BBG003Q1GJL0'},
  'url': 'https://api.polygon.io/v2/tickers/AAMC'},
 {'ticker': 'AAME',
  'name': 'Atlantic American Corporation Common Stock',
  'market': 'STOCKS',
  'locale': 'US',
  'currency': 'USD',
  'active': True,
  'primaryExch': 'NASDAQ',
  'updated': '2020-06-23',
  'codes': {'cik': '0000008177',
   'figiuid': 'EQ0010179600001000',
   'scfigi': 'BBG001S5N8T1',
   'cfigi': 'BBG000B9XB24',
   'figi': 'BBG000B9XMQ4'},
  'url': 'https://api.polygon.io/v2/tickers/AAME'},
 {'ticker': 'AAMMF',
  'name': 'Almadex Minerals Ltd Ordinary Shares (Canada)',
  'market': 'STOCKS',
  'locale': 'US',
  'currency': 'USD',
  'active': True,
  'primaryExch': 'OTC',
  'upd

In [93]:
# to fetch data from local files
energy_df = pd.read_csv("energy_tickers.csv", index_col=0, parse_dates=True)
finance_df = pd.read_csv("finance_tickers.csv", index_col=0, parse_dates=True)

energy_df.loc[:, "date"] = pd.to_datetime(energy_df["date"])
finance_df.loc[:, "date"] = pd.to_datetime(finance_df["date"])

### Change to returns

In [7]:
# cpi that seasonally adjusted
cpi_sl_df = pd.read_csv("../CPIAUCSL.csv", parse_dates=True)
cpi_sl_df.columns = ["date", "cpi_all_urban_custormers_seasonally_adjusted"]
cpi_sl_df["date"] = pd.to_datetime(cpi_sl_df["date"])
cpi_sl_df[f"{cpi_sl_df.columns[1]}_pct_change"] = cpi_sl_df[cpi_sl_df.columns[1]].pct_change()

# cpi not seasonally adjusted
cpi_df = pd.read_csv("../CPIAUCNS.csv", parse_dates=True)
cpi_df.columns = ["date", "cpi_all_urban_customers"]
cpi_df["date"] = pd.to_datetime(cpi_df["date"])
cpi_df[f"{cpi_df.columns[1]}_pct_change"] = cpi_df[cpi_df.columns[1]].pct_change()

# historical fed interest rates
interest_rates_df = pd.read_excel("../fed-funds-rate-historical-chart.xlsx", sheet_name="Sheet1")
interest_rates_df.columns = ["date", "value"]
interest_rates_df["date"] = pd.to_datetime(interest_rates_df["date"])
interest_rates_df["value"] = interest_rates_df["value"].ffill()

In [98]:
# merge all this cpi stuff with the prices that you have
energy_df = pd.merge(
    energy_df.assign(grouper=energy_df["date"].dt.to_period("M")),
    cpi_df.assign(grouper=cpi_df["date"].dt.to_period("M")),
    how="outer", 
    on="grouper"
)

In [235]:
from scipy.signal import detrend

In [242]:
def get_returns_plot(
    df:pd.DataFrame, 
    tickers:list, 
    colors:list, 
    cpi_adj:str=None, 
    as_scatter:bool=False, 
    as_bars:bool=False, 
    as_line:bool=False
) -> go.Figure:
    
    fig = make_subplots(rows=3, cols=1, row_heights=[1, 2, 1], vertical_spacing=0.05)
    ys = []
    dfs = []
    suffixes = []
    closes = []
    cols = ["close_returns", "ticker", "cpi_all_urban_customers", "cpi_all_urban_customers_pct_change"]
    
    for ticker, color in zip(tickers, colors):
        dff = df[df["ticker"] == ticker].sort_values(by="date_x")
        dff = dff.rename(columns={"date_x": "date"})
        dff["date"] = dff["date"].dt.date
        dff = dff.set_index("date", drop=True, inplace=False)
        
        # store these variables for merging use
        suffixes.append(f"_{ticker}")
        closes.append(f"close_returns_{ticker}")

        if cpi_adj is None:
#             dff["close_returns"] = dff['close'].pct_change()# .cumsum().fillna(0.0)
            dff["close_returns"] = (( dff['close'].shift(-1) - dff["close"] ) / dff["close"]).cumsum().fillna(0.0)
        else:
            dff["close_returns"] = ((dff["close"].pct_change() + 1).astype(float) / (dff[f"{cpi_adj}_pct_change"] + 1.0) - 1 )#.cumsum().fillna(0.0)
            
        dff["close_returns"] = detrend(dff["close_returns"].ffill().fillna(0.0))
#         dff["close_returns"] = np.log(dff["close_returns"])
        dfs.append(dff)

        fig.append_trace(go.Histogram(x=dff["close_returns"], name=f"{ticker}_histogram", marker={"color": color}, legendgroup=ticker), row=1, col=1)
        fig.update_layout({"xaxis": {"side": "top"}, "height": 800, "width": 1800})

        if as_scatter:
            fig.add_scatter(x=dff.index, y=dff["close_returns"], row=2, col=1, name=ticker, legendgroup=ticker, mode="markers", marker={"color": color})
        elif as_bars:
            fig.add_bar(x=dff.index, y=dff["close_returns"], row=2, col=1, name=ticker, legendgroup=ticker, marker={"color": color})
        elif as_line:
            fig.add_scatter(x=dff.index, y=dff["close_returns"], row=2, col=1, name=ticker, legendgroup=ticker, marker={"color": color})
        else:
            fig.add_scatter(x=dff.index, y=dff["close_returns"], row=2, col=1, name=ticker, legendgroup=ticker, mode="markers", marker={"color": color})
        
        fig.update_layout({"xaxis": {"title": {"standoff": 1}}})
    
    # now merge these columns, and find out pearson corr
    merged_df = reduce(lambda x, y: pd.merge(left=x[cols], right=y[cols], left_index=True, right_index=True, suffixes=tuple(suffixes)), dfs)
    
    fig.add_scatter(
        x=merged_df.index, 
        y=merged_df[closes].fillna(0.0).rolling(7).corr(pairwise=True)[closes[1]], 
        row=3, 
        col=1,
        name=f"Corr b/w {tickers[0]} & {tickers[1]}"
    )
    
    return fig

energy_tickers = list(energy_df.ticker.unique())
ticker = energy_tickers[1]
cpi_col = cpi_df.columns[1]
colors = px.colors.sequential.Blackbody

get_returns_plot(
    df=energy_df,
    tickers=energy_tickers[2:4], 
    as_line=True,
    colors=[colors[0], colors[-1]],
#     cpi_adj=cpi_col, 
)