This notebook replicates the Airflow DAG and its tasks for stock price prediction using a Random Forest model.

In [2]:
!pip install \
    apache-airflow apache-airflow-providers-google \
    pandas numpy scikit-learn joblib holidays \
    pandas-gbq google-auth numpy

from datetime import datetime, timedelta, date
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import joblib
import os
import logging
import holidays

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator

Defaulting to user installation because normal site-packages is not writeable
Collecting numpy
  Using cached numpy-1.26.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.2 MB)
Collecting scikit-learn
  Using cached scikit_learn-1.5.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.3 MB)
Installing collected packages: numpy, scikit-learn
  Attempting uninstall: numpy
    Found existing installation: numpy 2.2.5
    Uninstalling numpy-2.2.5:
      Successfully uninstalled numpy-2.2.5
  Attempting uninstall: scikit-learn
    Found existing installation: scikit-learn 1.6.1
    Uninstalling scikit-learn-1.6.1:
      Successfully uninstalled scikit-learn-1.6.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
streamlit 1.44.1 requires packaging<25,>=20, but you have packaging 25.0 which is incompatible.[0m[31m
[0mSuccessfully inst

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [None]:
# 1. Remove the broken installs
!pip uninstall -y numpy pandas scipy scikit-learn joblib

# 2. Reinstall cleanly
!pip install numpy pandas scipy scikit-learn joblib

# 3. (Re)install your other deps
!pip install pandas-gbq google-auth holidays apache-airflow-providers-google


Found existing installation: numpy 1.26.4
Uninstalling numpy-1.26.4:
  Successfully uninstalled numpy-1.26.4
Found existing installation: pandas 2.1.4
Uninstalling pandas-2.1.4:
  Successfully uninstalled pandas-2.1.4
Found existing installation: scipy 1.15.2
Uninstalling scipy-1.15.2:
  Successfully uninstalled scipy-1.15.2
Found existing installation: scikit-learn 1.5.2
Uninstalling scikit-learn-1.5.2:
  Successfully uninstalled scikit-learn-1.5.2
Found existing installation: joblib 1.4.2
Uninstalling joblib-1.4.2:
  Successfully uninstalled joblib-1.4.2
Defaulting to user installation because normal site-packages is not writeable
Collecting numpy
  Using cached numpy-2.2.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.4 MB)
Collecting pandas
  Using cached pandas-2.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.1 MB)
Collecting scipy
  Using cached scipy-1.15.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (37.6 MB)
Collecting scikit

In [1]:
import numpy as np, pandas as pd, scipy, sklearn
from sklearn.model_selection import train_test_split
print(np.__version__, pd.__version__, scipy.__version__, sklearn.__version__)


2.2.5 2.2.3 1.15.2 1.6.1


## Default Arguments and Constants

In [3]:

PROJECT_ID = 'terraform-457118'
DATASET_ID = 'stock_market_data'
LOCATION = 'US'
TABLE_ID = 'stock_data_raw'
SERVICE_ACCOUNT_PATH = '/home/jtsarmento/Projects/airflow-dbt-demo/keys/service-account.json'
HISTORY_DAYS = '365'

# Define dataset paths
RAW_TABLE = f"`{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`"
METRICS_TABLE = f"{DATASET_ID}.model_metrics"
PREDICTIONS_TABLE = f"{DATASET_ID}.price_predictions"
MARTS_DATASET = f"{DATASET_ID}_marts"
STAGING_DATASET = f"{DATASET_ID}_staging"

In [4]:
from google.oauth2 import service_account
import pandas_gbq

creds = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_PATH,
    scopes=["https://www.googleapis.com/auth/bigquery"],
)

query = f"""
SELECT 
  symbol, date, close,
  LAG(close,1)  OVER(PARTITION BY symbol ORDER BY date) AS prev_close_1,
  volume,
  LAG(volume,1) OVER(PARTITION BY symbol ORDER BY date) AS prev_volume_1
FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL {HISTORY_DAYS} DAY)
ORDER BY symbol, date
"""

df_raw = pandas_gbq.read_gbq(query, project_id=PROJECT_ID, credentials=creds)
df_raw.head(), df_raw.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1757 entries, 0 to 1756
Data columns (total 6 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   symbol         1757 non-null   object 
 1   date           1757 non-null   dbdate 
 2   close          1757 non-null   float64
 3   prev_close_1   1750 non-null   float64
 4   volume         1757 non-null   Int64  
 5   prev_volume_1  1750 non-null   Int64  
dtypes: Int64(2), dbdate(1), float64(2), object(1)
memory usage: 85.9+ KB


(  symbol        date       close  prev_close_1    volume  prev_volume_1
 0   AAPL  2024-04-25  169.091568           NaN  50558300           <NA>
 1   AAPL  2024-04-26  168.504349    169.091568  44838400       50558300
 2   AAPL  2024-04-29  172.684601    168.504349  68169400       44838400
 3   AAPL  2024-04-30  169.529510    172.684601  65934800       68169400
 4   AAPL  2024-05-01  168.504349    169.529510  50383100       65934800,
 None)

## prepare_features Task

In [5]:

query = f"""
SELECT 
    symbol,
    date,
    close,
    LAG(close, 1) OVER(PARTITION BY symbol ORDER BY date) as prev_close_1,
    LAG(close, 2) OVER(PARTITION BY symbol ORDER BY date) as prev_close_2,
    LAG(close, 3) OVER(PARTITION BY symbol ORDER BY date) as prev_close_3,
    LAG(close, 4) OVER(PARTITION BY symbol ORDER BY date) as prev_close_4,
    LAG(close, 5) OVER(PARTITION BY symbol ORDER BY date) as prev_close_5,
    volume,
    LAG(volume, 1) OVER(PARTITION BY symbol ORDER BY date) as prev_volume_1
FROM 
    {RAW_TABLE}
WHERE 
    date >= DATE_SUB(CURRENT_DATE(), INTERVAL {HISTORY_DAYS} DAY)
ORDER BY 
    symbol, date
"""
from google.oauth2 import service_account
import pandas_gbq

credentials = service_account.Credentials.from_service_account_file(
    SERVICE_ACCOUNT_PATH,
    scopes=["https://www.googleapis.com/auth/bigquery"]
)

df = pandas_gbq.read_gbq(query, project_id=PROJECT_ID, credentials=credentials)
df = df.dropna()
df['price_change'] = df['close'] - df['prev_close_1']
df['price_change_pct'] = df['price_change'] / df['prev_close_1']
df['volume_change_pct'] = (df['volume'] - df['prev_volume_1']) / df['prev_volume_1']
# df.to_csv('/downloads/prepared_features.csv', index=False)
print(f"Prepared features for {len(df)} rows across {df['symbol'].nunique()} symbols")
print(df.head())






Prepared features for 1722 rows across 7 symbols
  symbol        date       close  prev_close_1  prev_close_2  prev_close_3  \
5   AAPL  2024-05-02  172.216827    168.504349    169.529510    172.684601   
6   AAPL  2024-05-03  182.518188    172.216827    168.504349    169.529510   
7   AAPL  2024-05-06  180.856049    182.518188    172.216827    168.504349   
8   AAPL  2024-05-07  181.542786    180.856049    182.518188    172.216827   
9   AAPL  2024-05-08  181.881195    181.542786    180.856049    182.518188   

   prev_close_4  prev_close_5     volume  prev_volume_1  price_change  \
5    168.504349    169.091568   94214900       50383100      3.712479   
6    172.684601    168.504349  163224100       94214900     10.301361   
7    169.529510    172.684601   78569700      163224100     -1.662140   
8    168.504349    169.529510   77305800       78569700      0.686737   
9    172.216827    168.504349   45057100       77305800      0.338409   

   price_change_pct  volume_change_pct  
5

## train_model Task

In [15]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import joblib


# Build a records list including date, features, actual, predicted, error
records = []
features = [
    'prev_close_1','prev_close_2','prev_close_3',
    'prev_close_4','prev_close_5','prev_volume_1',
    'price_change','price_change_pct','volume_change_pct'
]

for symbol in df['symbol'].unique():
    # Prepare symbol-specific DataFrame with date
    symbol_df = df[df['symbol'] == symbol].copy()
    data = symbol_df[['date'] + features + ['close']]

    # Split
    train, test = train_test_split(data, test_size=0.2, random_state=42)

    # Train
    model = RandomForestRegressor(n_estimators=100, random_state=42)
    model.fit(train[features], train['close'])

    # Predict
    preds = model.predict(test[features])
    test = test.reset_index(drop=True)

    # Collect results
    for i, row in test.iterrows():
        actual = row['close']
        pred = preds[i]
        rec = {
            'symbol': symbol,
            'date': row['date'],
            'actual': actual,
            'predicted': pred,
            'error': pred - actual
        }
        # Add feature columns
        for feat in features:
            rec[feat] = row[feat]
        records.append(rec)

# Create DataFrame and display
results_df = pd.DataFrame(records)
# …after collecting records…
results_df = pd.DataFrame(records)

# Sort by date ascending (and symbol if you want symbol groups)
results_df = results_df.sort_values(by=['symbol','date'], ascending=[True, True]).reset_index(drop=True)


print(results_df.head())




  symbol        date      actual   predicted     error  prev_close_1  \
0   AAPL  2024-05-10  182.436844  185.253645  2.816801    183.702606   
1   AAPL  2024-05-15  189.084503  187.080103 -2.004401    186.802170   
2   AAPL  2024-05-16  189.204102  188.241727 -0.962375    189.084503   
3   AAPL  2024-05-23  186.254013  191.676177  5.422164    190.260544   
4   AAPL  2024-05-24  189.343628  191.382126  2.038499    186.254013   

   prev_close_2  prev_close_3  prev_close_4  prev_close_5  prev_volume_1  \
0    181.881195    181.542786    180.856049    182.518188       48983000   
1    185.656006    182.436844    183.702606    181.881195       52393600   
2    186.802170    185.656006    182.436844    183.702606       70400000   
3    191.705704    190.400085    189.233994    189.204102       34648500   
4    190.260544    191.705704    190.400085    189.233994       51005900   

   price_change  price_change_pct  volume_change_pct  
0     -1.265762         -0.006890           0.036268  


In [16]:
import numpy as np
from sklearn.metrics import r2_score

# … after you’ve built and sorted results_df …

# 1) Compute summary metrics per symbol
summary = results_df.groupby('symbol').apply(
    lambda grp: pd.Series({
        'count':        len(grp),
        'MAE':          grp['error'].abs().mean(),
        'MSE':          (grp['error']**2).mean(),
        'RMSE':         np.sqrt((grp['error']**2).mean()),
        'Mean Error':   grp['error'].mean(),
        'R2 Score':     r2_score(grp['actual'], grp['predicted'])
    })
).reset_index()

# 2) Rank by RMSE (lowest is best)
summary = summary.sort_values('RMSE').reset_index(drop=True)

# 3) Show the ranking
print("Performance ranking by RMSE:")
print(summary)

# Optional: merge back into results_df if you want symbol-level stats on every row
results_df = results_df.merge(summary[['symbol','MAE','RMSE','R2 Score']], on='symbol', how='left')


Performance ranking by RMSE:
  symbol  count       MAE         MSE       RMSE  Mean Error  R2 Score
0     EU   50.0  0.055768    0.005731   0.075706    0.011212  0.995626
1  GOOGL   50.0  1.246353    3.434206   1.853161    0.157977  0.981409
2   CELH   50.0  1.137653    3.974211   1.993542    0.106089  0.991345
3   AMZN   50.0  1.673389    8.040863   2.835642    0.447877  0.980400
4   MSFT   50.0  2.533694   14.490864   3.806687    0.536989  0.975373
5   AAPL   50.0  2.440495   15.263162   3.906810    0.682667  0.955681
6   META   50.0  8.337671  129.002907  11.357945   -0.815428  0.972362


  summary = results_df.groupby('symbol').apply(


In [17]:

from datetime import timedelta
import holidays

# (Re-define or copy in your trading‐day logic)
def is_trading_day(d):
    if d.weekday() >= 5:  # Saturday=5, Sunday=6
        return False
    us_hols = holidays.US(years=d.year)
    market_hols = {
        "New Year's Day", "Martin Luther King Jr. Day", "Presidents Day",
        "Good Friday", "Memorial Day", "Juneteenth National Independence Day",
        "Independence Day", "Labor Day", "Thanksgiving", "Christmas Day"
    }
    name = us_hols.get(d)
    return not (name and any(h in name for h in market_hols))

def next_trading_day(d):
    nd = d + timedelta(days=1)
    while not is_trading_day(nd):
        nd += timedelta(days=1)
    return nd

# Build predictions
next_preds = []
for symbol, model in models.items():
    sym_df = df[df['symbol']==symbol].sort_values('date')
    last_row = sym_df.iloc[-1]
    
    # feature vector for the most recent date
    X_new = last_row[features].values.reshape(1, -1)
    pred_price = model.predict(X_new)[0]
    
    # compute next trading date
    last_date = pd.to_datetime(last_row['date']).date()
    next_date = next_trading_day(last_date)
    
    next_preds.append({
        'symbol': symbol,
        'last_date':  last_date,
        'next_date':  next_date,
        'predicted_close': pred_price,
        'last_close':      last_row['close'],
        'predicted_change': pred_price - last_row['close'],
        'predicted_pct':   (pred_price - last_row['close'])/last_row['close']*100
    })

# Show as DataFrame
next_pred_df = pd.DataFrame(next_preds)
next_pred_df




Unnamed: 0,symbol,last_date,next_date,predicted_close,last_close,predicted_change,predicted_pct
0,AAPL,2025-04-25,2025-04-28,207.313665,208.080002,-0.766337,-0.36829
1,AMZN,2025-04-25,2025-04-28,188.454303,188.865005,-0.410702,-0.217458
2,CELH,2025-04-25,2025-04-28,36.61156,36.400101,0.21146,0.580932
3,EU,2025-04-25,2025-04-28,1.5253,1.57,-0.0447,-2.847135
4,GOOGL,2025-04-25,2025-04-28,160.668885,161.660004,-0.991118,-0.613088
5,META,2025-04-25,2025-04-28,540.545451,546.914978,-6.369527,-1.164628
6,MSFT,2025-04-25,2025-04-28,388.711951,390.095001,-1.38305,-0.354542
