In [5]:
#Imports 

import sys
import os
import yaml

sys.path.append(os.getenv("CODE_PATH"))
sys.path.append(os.getenv("FIN_DATABASE_PATH"))


import plotly.graph_objects as go
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
import matplotlib.pyplot as plt
from dotenv import load_dotenv
from Data.connect import engine, DailyStockData, HourlyStockData, OneMinuteStockData, FiveMinuteStockData,FifteenMinuteStockData, StockSplits, StockNews, CompanyFinancials
from Pre_Processing.pre_processing import PreProcessing
from data_fetcher import DataFetcher
from Feature_Engineering.feature_engineering import TechnicalIndicators
from pipeline import Pipeline
import json
from pandas import json_normalize

### Balance sheet is a snapshot of a company's assets, liabilities and equity, at a certain point in time.
### The 3 main sections of a balance sheet are:
    -Assets
    -Liabilities
    -Shareholder Equity

In [6]:
GENERAL_COLUMNS = ['company_name', 'start_date', 'end_date', 'filing_date',
       'fiscal_period', 'fiscal_year', 'acceptance_datetime',
       'timeframe', 'tickers', 'sic']
SECTIONS = ['balance_sheet', 'income_statement', 'cash_flow_statement', 'comprehensive_income']


In [7]:
#Selecting some tickers to analyse
tickers = ['AAPL', 'MSFT']

In [8]:
#Fetching data from our SQL database
fetch_data = DataFetcher(tickers)
company_data = fetch_data.get_company_data()

In [9]:
class PreProcessingFinancials:
    def __init__(self, data, sections, tickers):
        """
        args: 
        data: dictionary with multiple tickers
        sections: list with sections from a Financial Statement
        tickers: list with tickers we want to analyse
        """
        if isinstance(data,pd.DataFrame): #If we pass only one ticker, which would be only one dataframe, we transform into a dictionary.
            self.data = {}
            self.data[tickers] = data
            self.tickers = [tickers]
        else:
            self.data = data
            self.tickers = tickers 
        self.sections = sections
        
    def adjust_data(self, data):
        """Adjusting filling date for TTM, Q4, FY, as they don't have any filing dates.
        Using as a proxy the end date of the period plus 37 days, which is the average time it takes to file the 10-K/10-Q."""
        conditions = (
            (data['filing_date'].isna()) &
            (data['fiscal_period'].isin(['TTM', 'Q4', 'FY']))
                        )
    
        data['filing_date'] = np.where(conditions, data['end_date'] + timedelta(days=37), data['filing_date'])
      
        return data
    
    def replacing_nan(self,data):
        """Replace NaN values in the fiscal year column with the correct date."""
        data['period'] = np.where(data['fiscal_'])
        
    
    def flatten_json_section(self):
        """Preprocess the financials column by flattening JSON fields and handling filing dates/fiscal periods."""
        for ticker in self.tickers:
            data = self.data[ticker]
            
            # Converting JSON strings into Python dictionaries if necessary
            data['financials'] = data['financials'].apply(
                lambda x: json.loads(x) if isinstance(x, str) else x
            )

            # Flattening each section and concatenate with the main dataframe
            for section in self.sections:
                flattened = self.flattening(data, 'financials', section)
                data = pd.concat([data, flattened], axis=1)

            # Handle filing dates and fiscal periods
            data['filing_date'] = pd.to_datetime(data['filing_date'])

            #Adjust filing dates before setting index
            data = self.adjust_data(data)

            #Sort by filing date 
            data.sort_values(by='filing_date', inplace=True, ascending=False)

            self.data[ticker] = data
    
    
    
    def flattening(self, data, json_col, section):
        """ Helper function to flatten a JSON section of the financials dataframe
        """
        section_data = data[json_col].apply(lambda x: x.get(section) if section in x else {})
        flattened_section = json_normalize(section_data)  # Flatten the section
        flattened_section.columns = [f"{section}_{col}" for col in flattened_section.columns]  # Add prefix to columns
                
        return flattened_section

    
    def removing_cols(self):
        """ This function cleans the dataframe by dropping columns with '.unit' in the name and '.order' in the name.
        If .unit columns are the same for each row, we will drop.
        Also dropping columns that have .order
        """
        
        for ticker in self.tickers:
            data = self.data[ticker]
            # print(f'Processing ticker {ticker}')
            for section in self.sections:
                section_columns = [col for col in data.columns if section in col]
                section_df = data[section_columns].copy()
            
                #Removing .order columns
                order_columns = [col for col in section_df.columns if '.order' in col]
                # if order_columns:
                #     print(f"Found '.order' columns for {ticker} in {section}: {order_columns}")  # Debugging
                # else:
                #     print(f"No '.order' columns found for {ticker} in {section}") 
                section_df.drop(columns=order_columns, inplace=True)
                
                #Removing .unit columns if they only have one unique value
                unit_columns = [col for col in section_df.columns if '.unit' in col]
                # print(f"Found '.unit' columns for {ticker} in {section}: {unit_columns}")  # Debugging
 
                for col in unit_columns:
                    if section_df[col].nunique() == 1:
                        section_df.drop(columns=col, inplace=True)
        
                
                label_columns = [col for col in section_df.columns if '.label' in col]
                section_df.drop(columns=label_columns, inplace=True)
                
                
                #Converting numeric values to millions
                value_columns = [col for col in section_df.columns if '.value' in col]
                for col in value_columns:
                    section_df[col] = pd.to_numeric(section_df[col])
                    section_df[col] = section_df[col]/1000000
                    # print(f"Converted {col} to millions for {ticker} in {section}")  # Debugging
                data.drop(columns=section_columns, inplace=True)  # Remove the original section columns
                data = pd.concat([data, section_df], axis=1)
               
            self.data[ticker] = data
    
    def transform_columns(self, data):
        """Transform data column data into required type"""
        
        # Replacing empty strings with np.nan in the entire dataframe at once
        data.replace('', np.nan, inplace=True)
        
        # Convert specific columns to required types
        data['fiscal_year'] = pd.to_numeric(data['fiscal_year'], errors='coerce')
        data['fiscal_period'] = data['fiscal_period'].astype('category')
        data['start_date'] = pd.to_datetime(data['start_date'], errors='coerce')
        data['end_date'] = pd.to_datetime(data['end_date'], errors='coerce')
        
        
    def preprocess_financials(self):
        """Orchestrates the entire pre-processing of financials."""
        self.flatten_json_section()  
        self.removing_cols() 
        
        # Step 3: Create multi-indexed DataFrame for each ticker
        processed_data = {}
        
        for ticker in self.tickers:
            data = self.data[ticker]
            
            # Extract general columns from the data
            general_df = data[GENERAL_COLUMNS].copy()

            # If 'filing_date' is not already an index, set it as the index in general_df
            if 'filing_date' in general_df.columns:
                general_df.set_index('filing_date', inplace=True)
            
            # Proccessing each section
            section_dataframes = []
            
           
            for section in self.sections:
                # Filter columns related to the current section
                section_columns = [col for col in data.columns if section in col]
                section_df = data[['filing_date'] + section_columns].copy()  # Ensure 'filing_date' is included

                # Set 'filing_date' as the index for the section to align it properly
                section_df.set_index('filing_date', inplace=True)

                # Removing '.value' suffix from the column names
                section_df.columns = section_df.columns.str.replace('.value', '', regex=False)

                # Removing the section name from the second-level column names
                section_df.columns = pd.MultiIndex.from_product(
                    [[section], section_df.columns.str.replace(f'{section}_', '', regex=False)]
                )

                # Add this section DataFrame to the list
                section_dataframes.append(section_df)
            
            # Concatenate all section DataFrames into one DataFrame (financial data)
            financial_data = pd.concat(section_dataframes, axis=1)

            # Concatenate general_df (with general columns) and financial_data (with sections)
            full_data = pd.concat([general_df, financial_data], axis=1)

            # Store the processed DataFrame
            processed_data[ticker] = full_data
            
        combined_data = pd.concat(processed_data.values(), keys=processed_data.keys(), names=['ticker'])
        self.transform_columns(combined_data)
        
        
        return processed_data, combined_data

In [10]:
class CalculateMetrics:
    
    def __init__(self, data):
        """ data is a dataframe with tickers as index."""
        self.data = data
        
    def profitability_ratios(self):
        """Calculates profitability ratios."""
        # data = self.self.data.copy()
        
        # Gross Margin
        self.data['gross_margin'] = self.data[('income_statement', 'gross_profit')] / self.data[('income_statement', 'revenues')]
        
        # Operating Margin
        self.data['operating_margin'] = self.data[('income_statement', 'operating_income_loss')] / self.data[('income_statement', 'revenues')]
        
        # Net Profit Margin
        self.data['net_profit_margin'] = self.data[('income_statement', 'net_income_loss')] / self.data[('income_statement', 'revenues')]
        
        #ROA
        self.data['ROA'] = self.data[('income_statement', 'net_income_loss')] / self.data[('balance_sheet', 'assets')]
        
        #ROE
        self.data['ROE'] = self.data[('income_statement', 'net_income_loss')] / self.data[('balance_sheet', 'equity')]
        
        return self.data
    
    def liquidity_ratios(self):
        """ Calculates liquidity ratios."""
        # data = self.self.data.copy()

        #Current Ratio
        self.data['current_ratio'] = self.data[('balance_sheet', 'current_assets')] / self.data[('balance_sheet', 'current_liabilities')]
        
        #Quick Ratio
        self.data['quick_ratio'] = (self.data[('balance_sheet', 'current_assets')] - self.data[('balance_sheet', 'inventory')]) / self.data[('balance_sheet', 'current_liabilities')]
        
    
    def other_ratios(self):
        """Calculates other ratios."""
        # data = self.self.data.copy()
        
        #Debt to Equity
        self.data['debt_to_equity'] = self.data[('balance_sheet', 'liabilities')] / self.data[('balance_sheet', 'equity')]
        
        #Interest Coverage
        self.data['interest_coverage'] = self.data[('income_statement', 'operating_income_loss')] / self.data[('income_statement', 'interest_expense_operating')]
        self.data['R&D_ratio'] = self.data[('income_statement', 'research_and_development')] / self.data[('income_statement', 'revenues')]
        
        return self.data
    
    def calculate_metrics(self):
        """Orchestrates the calculation of financial metrics."""
        self.profitability_ratios()
        self.liquidity_ratios()
        self.other_ratios()
        
        return self.data
        

In [None]:
class FinancialPlots:
    """This class plots financial metrics for different tickers.
    It can also combine some tickers in the same plot or you can plot according to their industry"""
    
    def __init__(self, data):
        """args
        data: dataframe combining multiple tickers"""
    
    
    
    def create_period(row):
        if row['fiscal_period'] in ['Q1', 'Q2', 'Q3', 'Q4']:
            quarter = int(row['fiscal_period'].replace('Q', ''))
            year = int(row['fiscal_year'])
            # Create a Period object
            return pd.Period(freq='Q', year=year, quarter=quarter)
        else:
            return np.nan

        

In [11]:
#Fetching data from our SQL database

fetch_data = DataFetcher(tickers)
company_data = fetch_data.get_company_data()

In [12]:
#Pre Processing the data

prepocess = PreProcessingFinancials(company_data, SECTIONS, tickers)
data_dict, df = prepocess.preprocess_financials()

In [13]:
#Calculating Financial ratios

metrics = CalculateMetrics(df)
final_data = metrics.calculate_metrics()

In [28]:
final_data.xs('AAPL', level='ticker')['timeframe'].unique()

array(['ttm', 'quarterly', 'annual'], dtype=object)

In [None]:
def create_period(row):
    if row['fiscal_period'] in ['Q1', 'Q2', 'Q3', 'Q4']:
        quarter = int(row['fiscal_period'].replace('Q', ''))
        year = int(row['fiscal_year'])
        # Create a Period object
        return pd.Period(freq='Q', year=year, quarter=quarter)
    else:
        return np.nan


In [None]:
quarterly_df = final_data[final_data['fiscal_period'].isin(['Q1', 'Q2', 'Q3', 'Q4'])].copy()

In [None]:
quarterly_df['period'] = quarterly_df.apply(create_period, axis=1)

In [None]:
quarterly_df['period'] = pd.PeriodIndex(quarterly_df['period'], freq='Q')

In [None]:
quarterly_df

In [None]:
aapl_df = quarterly_df.xs('AAPL', level='ticker').copy()

In [None]:
aapl_df = aapl_df.sort_values(by='period')

# Plot the data
plt.figure(figsize=(15, 6))
plt.bar(aapl_df['period'].astype(str), aapl_df['ROE']) 
plt.title('Financial Data by Quarter')
plt.xlabel('Quarter')
plt.ylabel('Value')
plt.xticks(rotation=90, ha='center') 
plt.grid(True)
plt.tight_layout() 
plt.show()

In [None]:
final_data['period'] = final_data.apply(create_period, axis=1)

In [None]:
quarterly_df = final_data[(final_data['fiscal_period'] != 'TTM') & (final_data['fiscal_period'] != 'FY')]


In [None]:
df[('cash_flow_statement', 'net_cash_flow_from_operating_activities_continuing')]

In [None]:
#plots 

df[('income_statement', 'research_and_development')] #plotting R&D expenses
df[('income_statement', 'revenues')] # plotting revenues

In [None]:
df['balance_sheet', 'equity']