In [315]:
import pandas as pd
import numpy as np
import holoviews as hv
from holoviews import opts,dim
import matplotlib as plt 
import colorcet as cc
import functools as ft
import operator as ops 
import os 
import json 

pn.extension()
hv.extension('bokeh')

# crypto analysis using python
datadir = "crypto_market_data_1"
cachedir = "cache"
folders = os.listdir(datadir)

# get the full fname of a specific file 
def get_fname(day,pair,futures_or_spot, ftype) : 
         return "{}/{}/{}/{}/{}_{}.json".format(datadir,day,pair,futures_or_spot,pair,ftype)

# read a text file 
def read_text_file(fname) : 
    with open(fname, 'r') as file:
        return file.read()

def read_dictionary_string(s) : 
    d = json.loads(s) 
    r = {} 
    r['p'] = float(d['p'])
    r['q'] = float(d['q'])
    r['m'] = d['m']
    r['epoch_time_ms'] = float(d['T'])
    return r 

def extract_data_from_file(day,pair,futures_or_spot, ftype) :  
    # get the text of the file 
    fstr = read_text_file(get_fname(day,pair,futures_or_spot, ftype)) 
    # parse it into a pandas dataframe 
    data =  pd.DataFrame([ read_dictionary_string(x) for x in fstr.split("\n") if x != "" ])    
    # convert the epoch times (currently floats) into timestamps , then return 
    return data.assign(tms = pd.to_datetime(data['epoch_time_ms'], unit='ms'))

def extract_data_from_file_with_tbins(day,pair,futures_or_spot, ftype, aggregate="T") :  
    data = extract_data_from_file(day,pair,futures_or_spot, ftype) 
    data = data.assign(tbin= lambda x: x['tms'].round(aggregate))
    return data 

def trade_aggregator(df) :  
    """
    Function that aggregates trades over a given interval. It does not check the interval kength - it only summarizes the data that is within each group 
    Will likely keep contributing summary statistics over time as this modular approach allows for maintainability 
    """
    
    p = df['p']
    p_avg = np.mean(p) 
    p_max = np.max(p)
    p_min = np.min(p)
    p_range = p_max - p_min
    p_open = df.iloc[0].p
    p_close = df.iloc[-1].p 
    p_change = p_close - p_open 
    p_change_percent = 100*p_change/p_open 
    p_range_percent  = p_range/p_open #not sure if denom should be open or min 
    
    df_sell = df[df['m']== True]['q'] # "buyer is market maker"
    v_sell_tot  = np.sum(df_sell)
    v_sell_max  = np.max(df_sell)
    
    df_buy = df[df['m']== False]['q'] 
    v_buy_tot   = np.sum(df_buy)
    v_buy_max  = np.max(df_buy)

    v_tot = np.sum(df['q']) 
    # (above) total volume  - not sure how meaningful this really is--- interesting 
    # am starting to think about resistance metrics again --  
    
    v_net = v_buy_tot - v_sell_tot 
    
    return pd.DataFrame([{
        'p_avg' : p_avg, 
        'p_max' : p_max, 
        'p_min' : p_min, 
        'p_range' : p_range, 
        'p_open' : p_open, 
        'p_close' : p_close, 
        'p_change' : p_change, 
        'p_change_percent' : p_change_percent, 
        'p_range_percent' : p_range_percent, 
        'v_sell_tot' : v_sell_tot , 
        'v_sell_max' : v_sell_max,  
        'v_buy_tot' : v_buy_tot, 
        'v_buy_max' : v_buy_max ,
        'v_tot' : v_tot , 
        'v_net' : v_net, 
    }])

def extract_tbinned_data_from_file(day,pair,futures_or_spot, ftype, aggregate="T") : 
    
    # this function will load aggregated and summarized data, and if it has already been loaded then it will returned the cached version
    # this is good, however if the 'trade_aggregator' function is changed then the cache will incorrectly return an old value 
    
    # will have to think how to mitigate this 
    
    # I think i will start by employing a direct caching technique right here 
    # each unique access will be defined by the access_id
    access_id = "_".join([day,pair,futures_or_spot,ftype,aggregate])  + ".pkl"
    access_namespace = "extract_tbinned_data_from_file"  
    
    dirname = os.path.join(cachedir,access_namespace) 
    os.makedirs(dirname, exist_ok=True)
    fname = os.path.join(dirname,access_id)
    print("cache req: {}".format(fname)) 
        
    if os.path.exists(fname) : 
        # if the file exists this is considered a cache hit 
        print("cache hit!")
        # parse the file and get the data 
        data = pd.read_pickle(fname) 
        # note that even though we are reading the data from disk this should be much faster as it is a 
        # binary stored formate of aggregated trades, rather than the raw format of strings reprenting individual trades as below 
        return data 
     
    # if we are here then its a cache miss ... 
    print("cache miss! ~> proceeding with request ")
    # extract data from the original text file format 
    data =  extract_data_from_file_with_tbins(day,pair,futures_or_spot, ftype, aggregate="T")
    data_tbinned = data.groupby(['tbin']).apply(trade_aggregator).reset_index()
    
    # can add additional information to the dataframe here 
    # cummulative sums over the day 
    data_tbinned = data_tbinned.assign(v_buy_cumulative=data_tbinned['v_buy_tot'].cumsum())
    data_tbinned = data_tbinned.assign(v_sell_cumulative=data_tbinned['v_sell_tot'].cumsum())
    data_tbinned = data_tbinned.assign(v_tot_cumulative=data_tbinned['v_tot'].cumsum()) 
    
    # and finally we have to store the cached data prior to returning 
    data_tbinned.to_pickle(fname)
    print("cache: wrote: {}".format(fname)) 
    
    return data_tbinned 

def get_future_spot_compared_data(day,pair,aggregate='T') : 
    print("Extracting futures data")
    f = extract_tbinned_data_from_file(day,pair,"FUTURES","TRADE", aggregate) 
    print("Extracting spot data")
    s = extract_tbinned_data_from_file(day,pair,"SPOT","TRADE", aggregate) 
    
    p_avg_diff = f['p_avg'] - s['p_avg']  
    p_mean = (f['p_avg'] + s['p_avg'])/2 
    p_avg_diff_percent = 100*p_avg_diff/p_mean 
    compared = pd.DataFrame({ 
        'tbin' : f['tbin'] , 
        'p_avg_diff' : p_avg_diff, 
        'p_avg_diff_percent' : p_avg_diff_percent, 
        'v_tot_diff' : f['v_tot']  - s['v_tot'] , 
        'v_sell_tot_diff' : f['v_sell_tot'] - s['v_sell_tot'] , 
        'v_buy_tot_biff' : f['v_buy_tot'] - s['v_buy_tot'] , 
        'p_percent_diff' : f['p_change_percent'] - s['p_change_percent'], 
        'v_net_diff' : f['v_net'] - s['v_net'] , 
    })
    return (f,s,compared) 

def convert_future_spot_compare_to_ds(f,s,c) :  
    c_ds = hv.Dataset(c, kdims=['tbin'],vdims=['p_avg_diff','p_avg_diff_percent','v_tot_diff','p_percent_diff','v_net_diff'])
    f_ds = hv.Dataset(f, kdims=['tbin'], vdims=['p_avg','p_change', 'p_change_percent', 'v_sell_tot', 'v_buy_tot' , 'v_tot','v_net'])
    s_ds = hv.Dataset(s, kdims=['tbin'], vdims=['p_avg','p_change', 'p_change_percent', 'v_sell_tot', 'v_buy_tot' , 'v_tot','v_net'])
    return (f_ds,s_ds,c_ds)    

def rsz(graph,x) : 
        return graph.options(width=x[0],height=x[1])

default_width = 1300 

def analyze_future_spot(day,pair,aggregate="T") : 
    (f, s, c)         = get_future_spot_compared_data(day,pair,aggregate) # get the data 
    (f_ds, s_ds, c_ds )  = convert_future_spot_compare_to_ds(f,s,c) # convert it to hv objects 

    # time to visualize now 
    l1 = hv.Curve(c_ds,'tbin','p_avg_diff_percent',)
    l2 = hv.Curve(f_ds,'tbin', 'v_net',label='futures') * hv.Curve(s_ds,'tbin','v_net',label='spot')
    #l3 = hv.Curve(f_ds,'tbin','v_sell_tot') 
    graphs = [l1,l2]

    s1 = f_ds.to(hv.Curve,'tbin','p_avg',label='futures') 
    s2 = s_ds.to(hv.Curve,'tbin','p_avg', label='spot')
    price_graph = ft.reduce(ops.mul, [rsz(x,(default_width,400)) for x in [s1,s2]] )
   
    diff_graphs = ft.reduce(ops.add, [rsz(x,(default_width,200)) for x in graphs] ).cols(1)  
    return ((price_graph, diff_graphs), f, s, c, f_ds,s_ds,c_ds )

def compare_futures_spot_v_vs_dp(f_ds,s_ds) : 
    # futures 
    s1 = hv.Scatter(f_ds,'v_net','p_change_percent',label='futures')
    s2 = hv.Scatter(s_ds,'v_net','p_change_percent',label='spot')
    gs = [s1,s2]
    return ft.reduce(ops.add, [rsz(x,(int(default_width/2),500)) for x in gs]).cols(2)

def epoch_graph_for_df(s,label=None) :
    s['epoch'] = s['tbin'].apply(lambda x:x.value) 
    s['index'] = np.arange(len(s))

    new_ds = hv.Dataset(s,kdims=["index"],vdims=["epoch"]) 
    g = rsz(hv.Curve(new_ds,'index','epoch',label=label), (default_width,300)) 
    return (g,new_ds) 


def futures_spot_epoch_graph(f,s) : 
    (sg,snds) = epoch_graph_for_df(s,"spot")
    (fg,fnds) = epoch_graph_for_df(f,"futures") 
    g = sg*fg
    return (g,fg,fnds,sg,snds) 
    
    
def futures_spot_total_v(f_ds,s_ds)     : 
    sz=(default_width,400)
    return rsz(f_ds.to(hv.Curve,'tbin', 'v_tot',label='futures'),sz) *  rsz(s_ds.to(hv.Curve,'tbin', 'v_tot',label='spot'),sz)