In [None]:
import numpy as np
import pandas as pd
import pandas_ta as ta
import plotly.graph_objs as go
import plotly.io as pio
from dash import Dash,Input,Output,callback,dcc,html
from pathlib import Path
import yfinance as yf
import logging
pio.renderers.default = 'notebook'


class PortfolioPerformance():
    def __init__(self,file=None,itvl='1d',lookback='10y'):
        self.logger = logging.getLogger(__name__)
        self.itvl = itvl
        self.lookback = lookback
        self.file = file
        self.folder = Path('cache')
        if not self.folder.exists or not self.folder.is_dir():
            self.folder.mkdir()
            
        self.stocks = self._get_stocks()
        self.data = self._fetch_data()
    
    
    def _get_stocks(self):
        if self.file is not None:
            try:
                with open(self.file,'r') as f:
                    stocks = [i.strip() for i in f.readlines()]
                    return stocks
            except FileNotFoundError:
                self.logger.error('File not found.')
        return []
    
    
    def _fetch_data(self):
        if self.stocks:
            path = Path(f'{self.folder}/ohlc.pkl')
            if not path.exists():
                self.logger.info(f'Fetching OHLC data for {len(self.stocks)} stocks... ')
                df = yf.download(self.stocks,interval=self.itvl,period=self.lookback,progress=False,auto_adjust=True)
                df.to_pickle(path)
            else:
                df = pd.read_pickle(path)

            return df
        
        return pd.DataFrame()
    
        
    def pf_return(self,startdate=None):
        if startdate is not None:
            try:
                startdate = pd.to_datetime(startdate)
            except Exception as e:
                self.logger.error(f'Invalid Date {e}.')
                
        fig = go.Figure()
        fig.update_layout(template='plotly_dark',title=dict(text='Portfolio Return',font=dict(size=17,style='italic')),font=dict(size=11),yaxis_title='Return(%)',xaxis_title='Time',height=600,width=1100)
        
        if not self.data.empty:
            if startdate is None:
                returns = self.data['Close'].pct_change().mean(axis=1).fillna(0.0)*100
            else:
                returns = self.data['Close'].loc[self.data['Close'].index >= startdate].pct_change().mean(axis=1).fillna(0.0)*100
            fig.add_trace(go.Scatter(x=returns.index,y=returns.cumsum(),mode='lines',hovertemplate='<b>%{x}</b><br>%{y:.2f}%',name='Portfolio',fill='tozeroy',fillcolor='rgba(70,140,220,0.13)',line=dict(color='rgb(70,140,220)')))
        return fig
        
        
    def corr_matrix(self,startdate=None):
        if startdate is not None:
            try:
                startdate = pd.to_datetime(startdate)
            except Exception as e:
                self.logger.error(f'Invalid Date {e}.')
                
        fig = go.Figure()
        fig.update_layout(template='plotly_dark',title=dict(text='Return Correlation',font=dict(size=17,style='italic')),font=dict(size=9),height=750,width=1100)
        
        if not self.data.empty:
            if startdate is None:
                corr = self.data['Close'].pct_change().corr()
            else:
                corr = self.data['Close'].loc[self.data['Close'].index >= startdate].pct_change().corr()
        
            fig.add_trace(go.Heatmap(z=corr,y=corr.columns,x=corr.columns,colorscale='Blues',zmid=0))
        return fig
    
    
    def adx(self,startdate=None):
        if startdate is not None:
            try:
                startdate = pd.to_datetime(startdate)
            except Exception as e:
                self.logger.error(f'Invalid Date {e}.')
                
        fig = go.Figure()
        fig.update_layout(template='plotly_dark',title=dict(text='DMX (Trend Strength)',font=dict(size=17,style='italic')),font=dict(size=9),height=600,width=1100,xaxis_title='Time')
        
        if not self.data.empty:
            if startdate is None:
                fdata = self.data.copy()
                adx = ta.adx(close=fdata['Close'].mean(axis=1),high=fdata['High'].mean(axis=1),low=fdata['Low'].mean(axis=1),length=14)['ADX_14'].dropna()
            else:
                fdata = self.data.loc[self.data.index >= startdate].copy()
                adx = ta.adx(close=fdata['Close'].mean(axis=1),high=fdata['High'].mean(axis=1),low=fdata['Low'].mean(axis=1),length=14)['ADX_14'].dropna()
            
            fig.add_trace(go.Scatter(x=adx.index,y=adx,mode='lines',line=dict(color='rgb(20,140,220)'),name='ADX',hovertemplate='<b>%{x}</b><br>%{y:.2f}'))
            fig.add_hline(adx.median()+adx.std(),line_dash='dot',line_color='lightgrey',name='median+std')
            fig.add_hline(adx.median()-adx.std(),line_dash='dot',line_color='lightgrey',name='median-std')
        return fig
    
            
app = Dash()        

app.layout = html.Div([
                    html.H2('Portfolio Insights',style={'textAlign':'center','margin':'22px'}),
                    dcc.Dropdown(id='options',options=[{'label':'Return','value':'return'},{'label':'Correlation','value':'corr'},{'label':'Squeeze','value':'squeeze'}],multi=False,value='return',style={'color':'#fff','backgroundColor':'#000','fontColor':'#000','fontSize':'14px','textAlign':'center','padding':'8px','margin':'12px'}),
                    dcc.Graph(id='graph')
])


@app.callback(
        Output('graph','figure'),
        Input('options','value')
)


def update_graph(value):
    pp = PortfolioPerformance('lists/filtered.txt')
    if value == 'return':
        figure = pp.pf_return()
    elif value == 'corr':
        figure =  pp.corr_matrix()
    elif value == 'squeeze':
        figure = pp.adx()
    else:
        figure = go.Figure()
    
    return figure


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)
    app.run(debug=True)