In [9]:
# Imports & setup
import os
import sys
import pandas as pd
import numpy as np

from datetime import datetime
from pathlib import Path

sys.path.insert(0, str(Path.cwd().parent / 'utils'))

from data_loader import StockDataLoader # type: ignore

print("✓ Libraries imported")
print(f"Execution time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

✓ Libraries imported
Execution time: 2025-12-04 13:47:34


## Step 1: Load Bronze Data

In [10]:
config_dir = str(Path.cwd().parent / 'config')
loader = StockDataLoader(config_path=os.path.join(config_dir, 'config.json'))
LAKEHOUSE_PATH = str(Path.cwd().parent / 'data')  # local; in Fabric use Lakehouse Files root

df = loader.read_from_bronze(LAKEHOUSE_PATH, ticker='MSFT')
df = df.sort_values('date').reset_index(drop=True)

print(f"Loaded {len(df):,} rows from Bronze")
df.head()

Reading data from Bronze layer: bronze/stocks/stock_data
✓ Read 1489 records from Bronze layer
Loaded 1,489 rows from Bronze


Unnamed: 0,date,open,high,low,close,volume,dividends,stock_splits,Ticker,FetchTimestamp,year,month
0,2020-01-02 00:00:00-05:00,150.758664,152.61015,150.331401,152.505707,22622100,0.0,0.0,MSFT,2025-12-04 13:44:25.424026,2020,1
1,2020-01-03 00:00:00-05:00,150.321872,151.869516,150.074998,150.606705,21116200,0.0,0.0,MSFT,2025-12-04 13:44:25.424026,2020,1
2,2020-01-06 00:00:00-05:00,149.144562,151.062519,148.60335,150.996048,20813700,0.0,0.0,MSFT,2025-12-04 13:44:25.424026,2020,1
3,2020-01-07 00:00:00-05:00,151.271365,151.603675,149.372403,149.619263,21634100,0.0,0.0,MSFT,2025-12-04 13:44:25.424026,2020,1
4,2020-01-08 00:00:00-05:00,150.90104,152.676579,149.970552,152.002441,27746500,0.0,0.0,MSFT,2025-12-04 13:44:25.424026,2020,1


## Step 2: Enforce Schema & Types

In [11]:
# Expected columns
expected_cols = ['date','open','high','low','close','volume','dividends','stock_splits','Ticker','FetchTimestamp','year','month']
missing = [c for c in expected_cols if c not in df.columns]
if missing:
    print('Missing columns:', missing)
    # Create placeholders if absent
    for c in missing:
        df[c] = np.nan

# Types
df['date'] = pd.to_datetime(df['date'])
for c in ['open','high','low','close','volume','dividends','stock_splits']:
    df[c] = pd.to_numeric(df[c], errors='coerce')
df['Ticker'] = df['Ticker'].astype(str)

print('✓ Schema enforced')
df.dtypes

✓ Schema enforced


date              datetime64[ns, America/New_York]
open                                       float64
high                                       float64
low                                        float64
close                                      float64
volume                                       int64
dividends                                  float64
stock_splits                               float64
Ticker                                      object
FetchTimestamp                      datetime64[us]
year                                         int32
month                                        int32
dtype: object

## Step 3: Handle Missing Values & Anomalies

In [12]:
# Remove duplicates by date
before = len(df)
df = df.drop_duplicates(subset=['date'])
dup_removed = before - len(df)

# Forward-fill small gaps
df[['open','high','low','close','volume']] = df[['open','high','low','close','volume']].ffill()

# Ensure logical bounds: high >= low
mask = df['high'] < df['low']
anomalies = mask.sum()
df.loc[mask, ['high','low']] = df.loc[mask, ['low','high']].values

print(f"✓ Cleaned data (duplicates removed: {dup_removed}, anomalies fixed: {anomalies})")
df[['date','open','high','low','close','volume']].tail()

✓ Cleaned data (duplicates removed: 0, anomalies fixed: 0)


Unnamed: 0,date,open,high,low,close,volume
1484,2025-11-26 00:00:00-05:00,486.309998,488.309998,481.200012,485.5,25709100
1485,2025-11-28 00:00:00-05:00,487.600006,492.630005,486.649994,492.01001,14386700
1486,2025-12-01 00:00:00-05:00,488.440002,489.859985,484.649994,486.73999,23964000
1487,2025-12-02 00:00:00-05:00,486.720001,493.5,486.320007,490.0,19562700
1488,2025-12-03 00:00:00-05:00,476.320007,484.23999,475.200012,477.730011,34562900


## Step 4: Optional Adjustments (Splits/Dividends)

In [13]:
# If you prefer adjusted close, compute returns based on yfinance's adjusted prices
# Here we keep raw OHLC and add daily return column
df['daily_return'] = df['close'].pct_change()
df['rolling_vol_20'] = df['daily_return'].rolling(20).std()

print('✓ Added derived columns: daily_return, rolling_vol_20')
df[['date','close','daily_return','rolling_vol_20']].tail()

✓ Added derived columns: daily_return, rolling_vol_20


Unnamed: 0,date,close,daily_return,rolling_vol_20
1484,2025-11-26 00:00:00-05:00,485.5,0.017841,0.013958
1485,2025-11-28 00:00:00-05:00,492.01001,0.013409,0.013355
1486,2025-12-01 00:00:00-05:00,486.73999,-0.010711,0.013182
1487,2025-12-02 00:00:00-05:00,490.0,0.006698,0.013355
1488,2025-12-03 00:00:00-05:00,477.730011,-0.025041,0.014271


## Step 5: Write to Silver

In [14]:
silver_dir = os.path.join(LAKEHOUSE_PATH, 'silver/stocks')
os.makedirs(silver_dir, exist_ok=True)
silver_file = os.path.join(silver_dir, 'msft_silver.parquet')

df.to_parquet(silver_file, engine='pyarrow', compression='snappy', index=False)

print('='*60)
print('DATA TRANSFORMATION COMPLETE')
print('='*60)
print(f'Rows written: {len(df):,}')
print(f'Silver path: {Path(silver_file).name}')
print('Ready for Feature Engineering (Notebook 03)')
print('='*60)

DATA TRANSFORMATION COMPLETE
Rows written: 1,489
Silver path: msft_silver.parquet
Ready for Feature Engineering (Notebook 03)
