In [1]:
%%bigquery df
SELECT *
FROM `stockswhatsup.stocks_data_historical.stocks_data_all`

Query is running:   0%|          |

Downloading:   0%|          |

In [2]:
!pip install pmdarima
!pip install sktime

Collecting pmdarima
  Using cached pmdarima-2.0.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl.metadata (7.8 kB)
Using cached pmdarima-2.0.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl (2.1 MB)
Installing collected packages: pmdarima
Successfully installed pmdarima-2.0.4
Collecting sktime
  Using cached sktime-0.25.0-py3-none-any.whl.metadata (29 kB)
Collecting scikit-base<0.7.0 (from sktime)
  Using cached scikit_base-0.6.2-py3-none-any.whl.metadata (8.7 kB)
Using cached sktime-0.25.0-py3-none-any.whl (21.7 MB)
Using cached scikit_base-0.6.2-py3-none-any.whl (122 kB)
Installing collected packages: scikit-base, sktime
Successfully installed scikit-base-0.6.2 sktime-0.25.0


In [3]:
import pandas as pd
import numpy as np
import pmdarima as pm
from sktime.forecasting.base import ForecastingHorizon
from google.cloud import bigquery

In [None]:
def preprocess_data(stock_data, stock_symbol):
    """
    Preprocesses stock data by filling missing dates and interpolating missing values.
    
    Args:
        stock_data (DataFrame): Input stock data with 'date' and 'volume_weighted_avg_price' columns.
        stock_symbol (str): Stock symbol for labeling.

    Returns:
        DataFrame: Preprocessed stock data with a continuous date range and interpolated prices.
    """
    try:
        start_date = stock_data.date.min()
        end_date = stock_data.date.max()
        full_date_range = pd.date_range(start=start_date, end=end_date, freq='D')
        
        full_date_range_df = pd.DataFrame({f'{stock_symbol}_full_date_range': full_date_range}) 
        full_date_range_df[f'{stock_symbol}_full_date_range'] = pd.to_datetime(full_date_range_df[f'{stock_symbol}_full_date_range'])
        full_date_range_df = full_date_range_df.reset_index(drop=True)
        
        stock_data['date'] = pd.to_datetime(stock_data['date'])
        stock_data = stock_data.reset_index(drop=True)
        
        continuous_df = pd.merge(full_date_range_df, stock_data, how='left', left_on=f'{stock_symbol}_full_date_range', right_on='date')
        continuous_df.set_index(f'{stock_symbol}_full_date_range', inplace=True)
        continuous_df.index = pd.DatetimeIndex(continuous_df.index).to_period('D')

        continuous_df = continuous_df[['volume_weighted_avg_price']].interpolate(method='time')
        continuous_df.index = continuous_df.index.to_timestamp()
        return continuous_df
    
    except Exception as e:
        print(f'Failed to preprocess the data: {e}')

def auto_arima_forecast(preprocessed_stock_data, stock):
    """
    Applies Auto ARIMA forecasting on preprocessed stock data.

    Args:
        preprocessed_stock_data (DataFrame): Preprocessed stock data with continuous date range.
        stock (str): Stock symbol for labeling.

    Returns:
        DataFrame: Forecasted stock prices with confidence intervals.
    """
    try:
        last_date = preprocessed_stock_data.index[-1]
        forecast_horizon_start = last_date + pd.DateOffset(days=1)
        forecast_horizon_end = last_date + pd.DateOffset(days=90)
        forecast_horizon_range = pd.date_range(start=forecast_horizon_start, end=forecast_horizon_end, freq='D')

        fh = ForecastingHorizon(forecast_horizon_range, is_relative=False)

        arima_model = pm.auto_arima(
            preprocessed_stock_data,
            start_p=1, start_q=1,
            max_p=5, max_q=5,
            seasonal=True, m=7,
            start_P=0, start_Q=0,
            max_P = 2, max_Q = 2,
            max_D=2,
            max_d=2,
            alpha=0.05,
            test='adf',
            seasonal_test='ocsb',
            trace=True,
            error_action='ignore',
            suppress_warnings=True,
            stepwise=True,
            n_fits=5,
            information_criterion='bic',
            out_of_sample_size=round(len(preprocessed_stock_data)*0.2)
        )

        forecast_result, pred_ci = arima_model.predict(
            n_periods = 90,
            return_conf_int=True,
            alpha=0.05)

        forecasted_stock_df = pd.DataFrame({'stock_symbol':stock,'forecasted_weighted_avg_price':forecast_result, 
                                            'ci_lower': pred_ci[:,0], 'ci_upper': pred_ci[:,1]})
        forecasted_stock_df.index = fh.to_pandas()
        forecasted_stock_df.reset_index(inplace=True)
        return forecasted_stock_df
    
    except Exception as e:
            print(f'Failed to forecast with Auto ARIMA: {e}')
            
def upload_forecast_to_bq(data):
    """
    Uploads forecasted stock data to BigQuery.

    Args:
        data (DataFrame): Forecasted stock data with 'date', 'stock_symbol', 'forecasted_weighted_avg_price', 'lower_limit', and 'upper_limit' columns.
    """
    client = bigquery.Client()
    forecast_table_id = "stockswhatsup.stocks_data_historical.stocks_forecast"
    
    forecast_table_config = bigquery.LoadJobConfig(
          schema=[
              bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE),
              bigquery.SchemaField("stock_symbol", bigquery.enums.SqlTypeNames.STRING),
              bigquery.SchemaField("forecasted_weighted_avg_price", bigquery.enums.SqlTypeNames.FLOAT),
              bigquery.SchemaField("lower_limit", bigquery.enums.SqlTypeNames.FLOAT),
              bigquery.SchemaField("upper_limit", bigquery.enums.SqlTypeNames.FLOAT),
          ],
          write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
      )

    try:
        forecast_table_job = client.load_table_from_dataframe(
            data, forecast_table_id, job_config=forecast_table_config
        )
        print('Success: Uploaded Forecasted data to BigQuery')
        
    except Exception as e:
        print(f'Failed to upload to BigQuery: {e}')

def main(df):
    """
    Main function to execute the entire forecasting process for multiple stocks and upload data to BigQuery table.

    Args:
        df (DataFrame): Input DataFrame containing stock data with 'stock_symbol', 'date', and 'volume_weighted_avg_price' columns.
    """
    try:
        stocks = list(df.stock_symbol.unique())
        combined_forecasts = []

        for stock in stocks:
            stock_df = df[df['stock_symbol']==stock][['volume_weighted_avg_price','date']]
            preprocessed_stock_df = preprocess_data(stock_df, stock)
            forecasted_stock_df = auto_arima_forecast(preprocessed_stock_df,stock)
            combined_forecasts.append(forecasted_stock_df)

        combined_forecasts_df = pd.concat(combined_forecasts, ignore_index=True)
        combined_forecasts_df.rename(columns={'index':'date', 'ci_lower':'lower_limit', 'ci_upper':'upper_limit'}, inplace=True)        
        upload_forecast_to_bq(combined_forecasts_df)
        
        print('Job finished successfully')
    except Exception as e:
            print(f'Failed to run the job: {e}')
            
main(df)

Performing stepwise search to minimize bic
 ARIMA(1,1,1)(0,0,0)[7] intercept   : BIC=3887.552, Time=0.40 sec
 ARIMA(0,1,0)(0,0,0)[7] intercept   : BIC=3920.072, Time=0.11 sec
 ARIMA(1,1,0)(1,0,0)[7] intercept   : BIC=3889.040, Time=0.99 sec
 ARIMA(0,1,1)(0,0,1)[7] intercept   : BIC=3887.527, Time=1.13 sec
 ARIMA(0,1,0)(0,0,0)[7]             : BIC=3926.363, Time=0.12 sec
 ARIMA(0,1,1)(0,0,0)[7] intercept   : BIC=3881.006, Time=0.18 sec
 ARIMA(0,1,1)(1,0,0)[7] intercept   : BIC=3887.528, Time=1.01 sec
 ARIMA(0,1,1)(1,0,1)[7] intercept   : BIC=3894.141, Time=2.47 sec
 ARIMA(0,1,2)(0,0,0)[7] intercept   : BIC=3887.565, Time=0.70 sec
 ARIMA(1,1,0)(0,0,0)[7] intercept   : BIC=3882.505, Time=0.25 sec
 ARIMA(1,1,2)(0,0,0)[7] intercept   : BIC=3894.067, Time=0.56 sec
 ARIMA(0,1,1)(0,0,0)[7]             : BIC=3883.153, Time=0.09 sec

Best model:  ARIMA(0,1,1)(0,0,0)[7] intercept
Total fit time: 8.065 seconds
Performing stepwise search to minimize bic
 ARIMA(1,1,1)(0,0,0)[7] intercept   : BIC=304