In [None]:
import plotly.io as pio
import plotly.graph_objs as go
from plotly.colors import qualitative as pc
import pandas as pd
import numpy as np
import yfinance as yf
import logging
import time

pio.renderers.default = 'notebook'


class Stockanalyzer:
    
    def __init__(self, file= None, freq= 'yearly', interval= '1d'):
        
        self.file = file
        self.logger = logging.getLogger(__name__)
        self.freq = freq if freq in ['quarterly', 'yearly'] else None
        self.stocks = self._load_tickers() if file else []
        self.interval = interval if interval in ['1d', '1wk', '1mo'] else None
        
        
    def _load_tickers(self):
        
        try:
            with open(self.file, 'r') as f:
                return [i.strip() for i in f.readlines()]
            
        except FileNotFoundError:
            self.logger.error(f'{self.file} not found in cache')
            return []
    
    
    def _load_data(self):
        
        if not self.stocks or self.freq is None or self.interval is None:
            return False
        
        for ticker in self.stocks:
            pathf = f'{ticker}_financials_{self.freq}.csv'
            patht = f'{ticker}_{self.interval}.csv'
            
            try:
                dff = pd.read_csv(pathf, index_col=0)
                dft = pd.read_csv(patht, index_col=0)
                self.logger.info(f'{ticker} found in cache')
                
            except FileNotFoundError:
                try:
                    stock = yf.Ticker(ticker)
                    cf = stock.get_cashflow(freq= self.freq)
                    inc = stock.get_incomestmt(freq= self.freq)
                    dff = pd.concat([cf,inc], join= 'inner', axis= 0)
                    
                    if self.freq == 'quarterly':
                        dff.columns = dff.columns.to_period('Q').astype('str')
                    else:
                        dff.columns = dff.columns.year.astype('str')
                        
                    dft = yf.download(ticker, interval= self.interval, period= 'max', auto_adjust= True, progress= False)
                    dft.columns = ['Open','High','Low','Close','Volume']
                    dff.to_csv(pathf)
                    dft.to_csv(patht)
                    time.sleep(0.1)
                    
                except ValueError:
                    self.logger.warning(f'{ticker} not available')
                    continue
                    
        return True

    
    def get_profitability(self, tickers= None):
        
        if not self.stocks or self.freq is None:
            return False
        
        if tickers is not None:
            try:
                stocks = tickers
            except (ValueError, TypeError):
                self.logger.error('Input has to be a type series or list')
                return False
        else:
            stocks = self.stocks
            
        for ticker in stocks:
            try:
                df = pd.read_csv(f'{ticker}_financials_{self.freq}.csv', index_col= 0)
                df = df.loc[['TotalRevenue','NetIncome','FreeCashFlow'],::-1].copy().dropna(axis= 1)
                
            except (FileNotFoundError,KeyError):
                self.logger.warning(f'{ticker} not found in cache')
                continue
                
            colors = pc.Plotly
            fig = go.Figure()
            
            for j, i in enumerate(df.index.drop('TotalRevenue')):
                fig.add_trace(
                    go.Bar(
                        x= df.columns,
                        y= df.loc[i] / df.loc['TotalRevenue'] * 100,
                        name= i + 'Margin',
                        marker= dict(color= colors[j], line= dict(color= '#fff', width= 2)),
                        hovertext= i + 'Margin',
                        hovertemplate= '<b>%{x}</b><br>%{y:,.0f}%'
                    )
                )
                
            fig.add_trace(
                go.Scatter(
                    x= df.columns,
                    y= (df.loc['TotalRevenue'].pct_change().fillna(0.0) * 100),
                    name= 'RevenueSlope',
                    mode= 'lines+markers',
                    line= dict(color= colors[2], width= 3, shape= 'spline'),
                    marker= dict(size= 11, symbol= 'diamond', color= colors[2], line= dict(color= '#fff', width= 2)),
                    hovertext= 'ChangeInRevenue', hovertemplate= '<b>%{x}</b><br>%{y:.2}%'
                )
            )
            
            fig.update_layout(
                plot_bgcolor= '#000',
                paper_bgcolor= '#000',
                xaxis= dict(gridcolor= '#000'),
                yaxis= dict(gridcolor= '#444', title= '%'),
                font= dict(color= '#fff', size= 12),
                title= dict(text= f'{ticker} Profitability & Momentum', font= dict(weight= 'bold', size= 16))
            )
            
            fig.show()
            
        return True
    
    
    def get_pricetoearnings(self):
        
        if not self.stocks or self.freq is None or self.interval is None:
            return False
        
        peratio = pd.DataFrame()
        
        for ticker in self.stocks:
            try:
                try:
                    self.logger.info(f'Fetching forward PE of {ticker}')
                    stock = yf.Ticker(ticker)
                    fpe = stock.info['forwardPE']
                    time.sleep(0.1)
                    
                except (ValueError,KeyError):
                    fpe = np.NaN
                    self.logger.warning(f'No Forward PE of {ticker} available')
                    continue
                    
                if self.freq == 'quarterly':
                    try:
                        deps_ttm = pd.read_csv(f'{ticker}_financials_{self.freq}.csv', index_col= 0).loc['DilutedEPS'].iloc[-4:].sum()
                    except KeyError:
                        self.logger.warning(f'No Diluted EPS for {ticker} available')
                        continue
                else:
                    deps_ttm = pd.read_csv(f'{ticker}_financials_{self.freq}.csv', index_col= 0).loc['DilutedEPS'].iloc[-1]
                    
                price = pd.read_csv(f'{ticker}_{self.interval}.csv', index_col= 0, parse_dates= True)['Close'].iloc[-1]
                pe = round(price / deps_ttm, 2)
                
                peratio.loc[ticker, 'PE_ttm'] = pe if pe >= 0 else np.NaN
                peratio.loc[ticker, 'PE_forward'] = round(fpe ,2) if fpe >= 0 else np.NaN
                peratio.loc[ticker, 'Discount_pct'] = round((1 - fpe / pe) * 100, 2) if pe >= 0 else np.NaN
                
            except FileNotFoundError:
                self.logger.warning(f'No sufficient data for {ticker} to calculate P/E')
                continue
                
        peratio.sort_values(by= 'Discount_pct', ascending= False, inplace= True)
        dc = list(peratio.dropna().index[:int(len(peratio.dropna())*0.25)])
        print(peratio)
        
        return [] if not dc else dc
    
        
    def get_historic_performance(self, startdate= None, tickers= None):
        
        if not self.stocks or self.interval is None:
            return False
        
        if tickers is not None:
            try:
                stocks = tickers
                
            except (ValueError, TypeError):
                self.logger.error('Input has to be a type series or list')
                return False
        else:
            stocks = self.stocks
            
        pf = []
        
        for ticker in stocks:
            try:
                df = pd.read_csv(f'{ticker}_{self.interval}.csv', index_col= 0, parse_dates= True)
                
                if startdate is not None:
                    try:
                        startdate = pd.to_datetime(startdate)
                        df = df.loc[df.index >= startdate]
                        
                    except Exception as e:
                        self.logger.error(f'Invalid Startdate: {e}')
                        return False
                    
            except FileNotFoundError:
                self.logger.warning(f'{ticker} data not found in cache')
                continue
                
            pf.append(df['Close'].rename(ticker))
        
        pf_df = pd.concat(pf, join= 'inner', axis= 1)
        pf_abs_return = (pf_df.mean(axis= 1).iloc[1:] / pf_df.mean(axis= 1).iloc[0] - 1) * 100
        pf_rel_return = pf_df.mean(axis= 1).pct_change(axis= 0).dropna(axis= 0) * 100
        
        afactor = {'1d' : 252, '1wk' : 52, '1mo' : 12}
        fixed_rfr_pct = 4

        mean = pf_rel_return.mean() * afactor[self.interval]
        std = pf_rel_return.std() * np.sqrt(afactor[self.interval])
        sr = round((mean - fixed_rfr_pct) / std, 2)
        
        fig = go.Figure()
        fig.add_trace(
            go.Scatter(
                x= pf_abs_return.index,
                y= pf_abs_return,
                mode= 'lines',
                line= dict(color= 'royalblue', width= 1),
                fill= 'tozeroy',
                hovertemplate= '<b>%{x}</b><br>%{y:,.0f}%',
                name= 'Return'
            )
        )
        
        fig.update_layout(
            paper_bgcolor= '#000',
            plot_bgcolor= '#000',
            xaxis= dict(gridcolor= '#444'),
            yaxis= dict(gridcolor= '#444', title= 'Return(%)'),
            font= dict(color= '#fff', size= 12),
            title= dict(font= dict(size= 16, weight= 'bold'), text= 'Historic Equal-Weight Porfolio Return')
        )
        
        fig.show()
        
        print('\nAnnualized Performance:')
        print('='*35)
        print(f'E(x): {round(mean, 2)}%, [{mean * ((100 - std) / 100) :.0f};{mean * ((100 + std) / 100) :.0f}]%')
        print(f'sigma(x): {round(std, 2)}%')
        print(f'Sharp Ratio: {sr}\n')
        
        return True
                
        
    def __call__(self):
        
        if self._load_data():
            dc = self.get_pricetoearnings()
            self.get_historic_performance('2021-01-01',dc)
            self.get_profitability(dc)
    
    
if __name__ == '__main__':
    
    logging.basicConfig(level= logging.WARNING)
    pf = Stockanalyzer('DJI.txt', 'quarterly', '1wk')
    pf()