# Plotly Dash and Financial Modeling Prep API Tutorial

## Financial Analysis Companion

In [None]:
import dash
from dash import dcc, html, Dash, Input, Output, State, ctx
import plotly.graph_objs as go
import pandas as pd
import requests
from datetime import datetime, timedelta
import dash_bootstrap_components as dbc
from pmdarima import auto_arima
import numpy as np
from sklearn.metrics import mean_absolute_error
from itertools import count


api_key = 'YOUR_API_KEY'
# Initialize Dash app with Bootstrap for better styling
app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Define default start and end dates for the date picker
default_end_date = datetime.now()
default_start_date = default_end_date - timedelta(days=180)
# Counter for generating unique IDs
unique_id_generator = count(start=0)

def fetch_stock_list():
    url = f'https://financialmodelingprep.com/api/v3/stock/list?apikey={api_key}'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        stocks = [{'label': stock['symbol'], 'value': stock['symbol']} for stock in data]
        return stocks
    else:
        return []
    
# Fetch the list of stocks when initializing the app
stock_options = fetch_stock_list()

# Define app layout
app.layout = dbc.Container([
    html.H1('Financial Analysis Companion'),
    html.H3('Compare Stocks Performance and Forecasting'),
    dbc.Row([
        dbc.Col([
            dcc.Dropdown(
                id='stock-selector',
                options=stock_options,  # Set the options to the list of stocks
                value=[],  # Default value can be empty or some default stocks
                multi=True,
                placeholder="Select a stock"
            )
        ], width=6),
        dbc.Col([
            dcc.DatePickerRange(
                id='date-picker',
                start_date=default_start_date,
                end_date=default_end_date
            )
        ], width=6)
    ]),
    dbc.Row([
        dbc.Col([
            html.Button('Forecast', id='forecast-button', n_clicks=0),
        ], width=12)
    ]),
    dbc.Row([
        dbc.Col([
            dcc.Graph(id='stock-graph'),
            dcc.Loading(
                id="loading-1",
                type="default",
                children=html.Div(id="loading-output-1")
            )
        ])
    ]),
    html.H3('Build Your Portfolio'),
    dbc.Row([
        dbc.Col([
            dcc.Dropdown(
                id='stock-input',
                options=stock_options,  # Set the options to the list of stocks
                value='',
                placeholder="Select a stock"
            ),
        ], width=6),
        dbc.Col([
            dcc.Input(id='weight-input', type='number', placeholder='Enter stock weight'),
        ], width=6)
    ]),
    html.Button('Add to Portfolio', id='add-stock-button', n_clicks=0),
    html.Div(id='portfolio-list', children=[]),
    html.Div(id='portfolio-message', children=''),
    dcc.Graph(id='portfolio-performance'),
    html.H3('Company Financials'),
    dcc.Dropdown(
            id='company-selector',
            options=stock_options,  # Set the options to the list of stocks
            value='',
            placeholder="Select a stock"
        ),
    dcc.Dropdown(
        id='report-type-selector',
        options=[
            {'label': 'Income Statement', 'value': 'income-statement'},
            {'label': 'Balance Sheet', 'value': 'balance-sheet-statement'},
            {'label': 'Cash Flow Statement', 'value': 'cash-flow-statement'}
        ],
        value='income-statement',
        placeholder="Select a report type"
    ),
    dcc.Graph(id='financial-report-graph')
], fluid=True)
    

# Function to fetch and process data
def fetch_data(stock, start_date, end_date):
    formatted_start_date = pd.to_datetime(start_date).strftime('%Y-%m-%d')
    formatted_end_date = pd.to_datetime(end_date).strftime('%Y-%m-%d')
    url = f'https://financialmodelingprep.com/api/v3/historical-price-full/{stock}?from={formatted_start_date}&to={formatted_end_date}&apikey={api_key}'
    response = requests.get(url)
    if response.status_code != 200:
        return pd.DataFrame()  # Return an empty DataFrame in case of an error
    data = response.json()
    df = pd.DataFrame(data['historical'])
    df['date'] = pd.to_datetime(df['date'])
    return df


def fetch_financial_data(company, report_type):
    url = f'https://financialmodelingprep.com/api/v3/{report_type}/{company}?apikey={api_key}'
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return {}


# Function to split data into training and test sets
def split_data(df, test_size=30):
    df = df.sort_values(by='date')
    train = df[:-test_size]
    test = df[-test_size:]
    return train, test

# Function to perform forecasting and return predictions
def perform_forecast(train, test, future_periods=30):
    model = auto_arima(train['close'], seasonal=False, error_action='ignore', suppress_warnings=True)
    model.fit(train['close'])

    # Predict on training data
    train_pred = model.predict_in_sample()

    # Forecast future data
    future_forecast = model.predict(n_periods=len(test) + future_periods)

    # Generating future dates
    last_date = train['date'].iloc[-1]
    future_dates = pd.date_range(start=last_date, periods=len(test) + future_periods + 1, closed='right')

    return train_pred, future_forecast, future_dates

# Function to evaluate model performance
def evaluate_model(test, predictions):
    mae = mean_absolute_error(test['close'], predictions[:len(test)])
    return mae

# Callback to update graph with forecasting
@app.callback(
    Output('stock-graph', 'figure'),
    [Input('stock-selector', 'value'),
     Input('date-picker', 'start_date'),
     Input('date-picker', 'end_date'),
     Input('forecast-button', 'n_clicks')],
    prevent_initial_call=True
)
def update_graph_with_forecast(selected_stocks, start_date, end_date, n_clicks):
    triggered_id = ctx.triggered_id if not ctx.triggered_id is None else ''

    traces = []
    layout = {
        'title': 'Stock Data and Forecast',
        'xaxis': {'title': 'Date'},
        'yaxis': {'title': 'Price'},
        'annotations': []
    }

    base_y_position = 1.05  # Starting y position for the first MAE annotation
    y_step = 0.05  # Vertical spacing between annotations

    for index, stock in enumerate(selected_stocks):
        df = fetch_data(stock, start_date, end_date)
        if df.empty:
            continue

        historical_trace = go.Scatter(x=df['date'], y=df['close'], mode='lines', name=f'{stock} - Historical')
        traces.append(historical_trace)

        if triggered_id == 'forecast-button':
            train, test = split_data(df)
            train_pred, future_forecast, future_dates = perform_forecast(train, test)
            mae = evaluate_model(test, future_forecast)

            forecast_trace = go.Scatter(x=future_dates, y=future_forecast, mode='lines', name=f'{stock} - Forecast', line={'dash': 'dot'})
            traces.append(forecast_trace)

            # Calculate the y position for the current annotation
            y_position = base_y_position - (index * y_step)
            
            layout['annotations'].append(
                dict(
                    xref='paper', yref='paper', x=0.5, y=y_position,
                    xanchor='center', yanchor='bottom',
                    text=f'{stock} MAE: {mae:.2f}',
                    showarrow=False,
                    font=dict(color="blue", size=12)
                )
            )

    figure = {'data': traces, 'layout': layout}
    return figure


@app.callback(
    Output('stock-selector', 'value'),
    [Input('stock-selector', 'value')]
)
def limit_stock_selection(selected_stocks):
    max_stocks = 2
    if len(selected_stocks) > max_stocks:
        selected_stocks = selected_stocks[:max_stocks]  # Keep only the first two selected stocks
    return selected_stocks


def align_data(historical_data):
    # Check if historical_data is empty or contains empty DataFrames
    if not historical_data or any(df.empty for df in historical_data.values()):
        return {}

    start_dates = [df.index.min() for df in historical_data.values()]
    end_dates = [df.index.max() for df in historical_data.values()]
    max_start = max(start_dates)
    min_end = min(end_dates)

    # Truncate each DataFrame to this common date range
    aligned_data = {symbol: df[(df.index >= max_start) & (df.index <= min_end)] for symbol, df in historical_data.items()}
    return aligned_data



def calculate_portfolio_volatility(portfolio_return, weights):
    # Calculate the covariance matrix of the returns
    covariance_matrix = portfolio_return.cov()

    # Calculate portfolio variance
    portfolio_variance = np.dot(weights, np.dot(covariance_matrix, weights))

    # Calculate portfolio standard deviation (volatility)
    portfolio_volatility = np.sqrt(portfolio_variance)

    # Annualize the volatility
    annualized_volatility = portfolio_volatility * np.sqrt(252)

    return annualized_volatility


# Function to generate a portfolio item with a delete button
def generate_portfolio_item(symbol, weight, unique_id):
    return html.Div([
        html.Span(f'{symbol.upper()}: {weight}%'),
        html.Button('Delete', id={'type': 'delete-stock-button', 'index': unique_id}, n_clicks=0)
    ], id={'type': 'portfolio-item', 'index': unique_id})

# Callback for adding and deleting stocks
@app.callback(
    Output('portfolio-list', 'children'),
    [Input('add-stock-button', 'n_clicks'),
     Input({'type': 'delete-stock-button', 'index': dash.dependencies.ALL}, 'n_clicks')],
    [State('stock-input', 'value'),
     State('weight-input', 'value'),
     State('portfolio-list', 'children')]
)
def update_portfolio(add_n_clicks, delete_n_clicks, stock_symbol, weight, portfolio_children):
    ctx = callback_context
    triggered_id = ctx.triggered[0]['prop_id']

    if 'add-stock-button' in triggered_id:
        if add_n_clicks > 0 and stock_symbol and weight:
            # Correctly extract weights from portfolio_children
            current_total_weight = sum([float(child['props']['children'][0]['props']['children'].split(': ')[1].strip('%')) for child in portfolio_children])

            new_total_weight = current_total_weight + float(weight)

            if new_total_weight <= 100:
                unique_id = next(unique_id_generator)
                new_element = generate_portfolio_item(stock_symbol, weight, unique_id)
                portfolio_children.append(new_element)
            else:
                return portfolio_children, 'Total portfolio weight cannot exceed 100%'
    
    # Check if any delete button was clicked
    elif 'delete-stock-button' in triggered_id:
        # Extract the delete button index
        delete_index = int(triggered_id.split('{"index":')[1][0])
        # Remove the item with the matching unique ID
        portfolio_children = [child for child in portfolio_children if child['props']['id']['index'] != delete_index]

    return portfolio_children


@app.callback(
    Output('portfolio-performance', 'figure'),
    [Input('portfolio-list', 'children')]
)
def display_portfolio_performance(portfolio_children):
    if not portfolio_children:
        return go.Figure()  # Return an empty figure if no children

    symbols = []
    weights = []
    for child in portfolio_children:
        # child is a dictionary representing a Dash component
        portfolio_item = child['props']['children'][0]['props']['children']  # Getting the text content
        symbol, weight = portfolio_item.split(': ')
        weight = float(weight.strip('%')) / 100
        symbols.append(symbol)
        weights.append(weight)

    historical_data = {symbol: fetch_data(symbol, default_start_date, default_end_date) for symbol in symbols}
    historical_data = align_data(historical_data)

    portfolio_return = pd.DataFrame()
    for symbol in symbols:
        df = historical_data[symbol]
        df[f'{symbol}_return'] = df['close'].pct_change()
        portfolio_return = pd.concat([portfolio_return, df[f'{symbol}_return']], axis=1)

    # Normalize weights to sum to 1
    weights = np.array(weights) / sum(weights)
    portfolio_return['portfolio_return'] = portfolio_return.dot(weights)
    portfolio_cumulative_return = (1 + portfolio_return['portfolio_return']).cumprod() - 1

    # Calculate annualized volatility
    annualized_volatility = calculate_portfolio_volatility(portfolio_return.iloc[:, :-1], weights)

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=portfolio_cumulative_return.index, y=portfolio_cumulative_return, mode='lines', name='Portfolio Cumulative Return'))
    fig.update_layout(title=f'Portfolio Performance (Annualized Volatility: {annualized_volatility:.2%})', xaxis_title='Date', yaxis_title='Cumulative Returns')

    return fig

@app.callback(
    Output('financial-report-graph', 'figure'),
    [Input('company-selector', 'value'),
     Input('report-type-selector', 'value')]
)
def update_financial_report_graph(company, report_type):
    financial_data = fetch_financial_data(company, report_type)

    if not financial_data:
        return go.Figure()

    # Convert the data to a pandas DataFrame
    df = pd.DataFrame(financial_data)
    df['date'] = pd.to_datetime(df['date'])
    df.sort_values(by='date', inplace=True)
    
    # Initialize figure
    fig = go.Figure()

    # Depending on the report type, plot different metrics
    if report_type == 'income-statement':
        fig.add_trace(go.Scatter(x=df['date'], y=df['revenue'], mode='lines', name='Revenue'))
        fig.add_trace(go.Scatter(x=df['date'], y=df['netIncome'], mode='lines', name='Net Income'))
        fig.add_trace(go.Scatter(x=df['date'], y=df['grossProfit'], mode='lines', name='Gross Profit'))
    elif report_type == 'balance-sheet-statement':
        fig.add_trace(go.Scatter(x=df['date'], y=df['totalAssets'], mode='lines', name='Total Assets'))
        fig.add_trace(go.Scatter(x=df['date'], y=df['totalLiabilities'], mode='lines', name='Total Liabilities'))
        fig.add_trace(go.Scatter(x=df['date'], y=df['totalStockholdersEquity'], mode='lines', name='Total Stockholders’ Equity'))
    elif report_type == 'cash-flow-statement':
        fig.add_trace(go.Scatter(x=df['date'], y=df['netCashProvidedByOperatingActivities'], mode='lines', name='Net Cash from Operating Activities'))
        fig.add_trace(go.Scatter(x=df['date'], y=df['capitalExpenditure'], mode='lines', name='Capital Expenditure'))
        fig.add_trace(go.Scatter(x=df['date'], y=df['freeCashFlow'], mode='lines', name='Free Cash Flow'))

    # Update layout
    fig.update_layout(
        title=f'{company.upper()} Financials - {report_type.replace("-", " ").title()}',
        xaxis_title='Date',
        yaxis_title='Amount (USD)',
        hovermode='closest'
    )

    return fig

# Run the app
if __name__ == '__main__':
    app.run_server(debug=True)

## Customizable ARIMA Model

In [None]:
# Initialize Dash app with Bootstrap for better styling
app = Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Define default start and end dates for the date picker
default_end_date = datetime.now()
default_start_date = default_end_date - timedelta(days=180)

def fetch_stock_list():
    url = f'https://financialmodelingprep.com/api/v3/stock/list?apikey={api_key}'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        stocks = [{'label': stock['symbol'], 'value': stock['symbol']} for stock in data]
        return stocks
    else:
        return []
    
# Fetch the list of stocks when initializing the app
stock_options = fetch_stock_list()

# Define app layout
app.layout = dbc.Container([
    dbc.Row([
        dbc.Col([
            dcc.Dropdown(
                id='stock-selector',
                options=stock_options,  # Set the options to the list of stocks
                value=[],  # Default value can be empty or some default stocks
                multi=True,
                placeholder="Select a stock"
            )
        ], width=6),
        dbc.Col([
            dcc.DatePickerRange(
                id='date-picker',
                start_date=default_start_date,
                end_date=default_end_date
            )
        ], width=6)
    ]),
    dbc.Row([
        dbc.Col([
            html.Button('Forecast', id='forecast-button', n_clicks=0), 
        ], width=12)
    ]),
    dbc.Row([
        dbc.Col([
            dcc.Graph(id='stock-graph'),
            dcc.Loading(
                id="loading-1",
                type="default",
                children=html.Div(id="loading-output-1")
            )
        ])
    ]),
     dbc.Row([
        dbc.Col(dcc.Input(id='arima-p', type='number', value=0, min=-5, max=5, step=1, placeholder='p (AR term)')),
        dbc.Col(dcc.Input(id='arima-d', type='number', value=0, min=-5, max=5, step=1, placeholder='d (Diff term)')),
        dbc.Col(dcc.Input(id='arima-q', type='number', value=0, min=-5, max=5, step=1, placeholder='q (MA term)')),
    ]),
], fluid=True)
    

# Function to fetch and process data
def fetch_data(stock, start_date, end_date):
    formatted_start_date = pd.to_datetime(start_date).strftime('%Y-%m-%d')
    formatted_end_date = pd.to_datetime(end_date).strftime('%Y-%m-%d')
    url = f'https://financialmodelingprep.com/api/v3/historical-price-full/{stock}?from={formatted_start_date}&to={formatted_end_date}&apikey={api_key}'
    response = requests.get(url)
    if response.status_code != 200:
        return pd.DataFrame()  # Return an empty DataFrame in case of an error
    data = response.json()
    df = pd.DataFrame(data['historical'])
    df['date'] = pd.to_datetime(df['date'])
    return df


# Function to split data into training and test sets
def split_data(df, test_size=30):
    df = df.sort_values(by='date')
    train = df[:-test_size]
    test = df[-test_size:]
    return train, test

# Function to perform forecasting and return predictions
def perform_forecast(train, test, p, d, q, future_periods=30):
    # Initialize and fit the ARIMA model with the specified parameters
    model = auto_arima(train['close'], order=(p, d, q), error_action='ignore', suppress_warnings=True)
    model.fit(train['close'])

    # Predict on training data
    train_pred = model.predict_in_sample()

    # Forecast future data
    future_forecast = model.predict(n_periods=len(test) + future_periods)

    # Generating future dates
    last_date = train['date'].iloc[-1]
    future_dates = pd.date_range(start=last_date, periods=len(test) + future_periods + 1, closed='right')

    return train_pred, future_forecast, future_dates

# Function to evaluate model performance
def evaluate_model(test, predictions):
    mae = mean_absolute_error(test, predictions[:len(test)])
    return mae

# Callback to update graph with forecasting
@app.callback(
    Output('stock-graph', 'figure'),
    [
        Input('stock-selector', 'value'),
        Input('date-picker', 'start_date'),
        Input('date-picker', 'end_date'),
        Input('forecast-button', 'n_clicks'),
        Input('arima-p', 'value'),
        Input('arima-d', 'value'),
        Input('arima-q', 'value')
    ],
    prevent_initial_call=True
)
def update_graph_with_forecast(selected_stocks, start_date, end_date, n_clicks, p, d, q):
    # Get the ID of the input that triggered the callback
    triggered_id = callback_context.triggered[0]['prop_id'].split('.')[0]

    # Convert string dates to datetime objects
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)

    # Set default values for ARIMA parameters if none are provided
    p = int(p) if p is not None else 0
    d = int(d) if d is not None else 0
    q = int(q) if q is not None else 0

    traces = []  # Initialize an empty list for traces
    annotations = []  # Initialize an empty list for annotations
    layout = go.Layout(
        title='Stock Data and Forecast',
        xaxis=dict(title='Date'),
        yaxis=dict(title='Price'),
        hovermode='closest'
    )

    for stock in selected_stocks:
        df = fetch_data(stock, start_date, end_date)
        if df.empty:
            continue

        # Create a trace for the historical data
        historical_trace = go.Scatter(
            x=df['date'],
            y=df['close'],
            mode='lines',
            name=f'{stock} - Historical'
        )
        traces.append(historical_trace)

        # Only add forecast trace if the forecast button has been clicked at least once
        if n_clicks is not None and n_clicks > 0:
            if 'arima-p' in triggered_id or 'arima-d' in triggered_id or 'arima-q' in triggered_id or 'forecast-button' in triggered_id:
                # Split data into train and test sets
                train, test = split_data(df)
                
                # Perform forecasting using the specified ARIMA parameters
                train_pred, future_forecast, future_dates = perform_forecast(train, test, p, d, q)
                
                # Create a trace for the forecast data
                forecast_trace = go.Scatter(
                    x=future_dates,
                    y=future_forecast,
                    mode='lines',
                    name=f'{stock} - Forecast',
                    line=dict(dash='dot')
                )
                traces.append(forecast_trace)

                # Evaluate the model and add the MAE annotation
                mae = evaluate_model(test['close'], future_forecast[:len(test)])
                annotations.append(
                    dict(
                        xref='paper',
                        yref='paper',
                        x=0.5,
                        y=1.05,
                        xanchor='center',
                        yanchor='bottom',
                        text=f'{stock} MAE: {mae:.2f}',
                        showarrow=False,
                        font=dict(color="blue", size=12)
                    )
                )

    # After the loop, add all annotations to the layout
    layout['annotations'] = annotations
                
    return {'data': traces, 'layout': layout}


@app.callback(
    Output('stock-selector', 'value'),
    [Input('stock-selector', 'value')]
)
def limit_stock_selection(selected_stocks):
    max_stocks = 2
    if len(selected_stocks) > max_stocks:
        selected_stocks = selected_stocks[:max_stocks]  # Keep only the first two selected stocks
    return selected_stocks


# Run the app
if __name__ == '__main__':
    app.run_server(debug=True)