In [1]:
#!pip install -r requirements.txt

In [2]:
import pandas as pd
import numpy as np
from typing import Union, List, Dict, Any, Optional
from sklearn.preprocessing import StandardScaler
from collections import deque
import pickle
import json
import io


class InMemoryBuffer:
    """Buffer to store recent observations for rolling feature computation."""
    
    def __init__(self, max_size: int):
        self.max_size = max_size
        self.buffer = deque(maxlen=max_size)
        self.observation_count = 0  # Track total observations seen
    
    def add(self, observations: Union[pd.Series, pd.DataFrame]) -> None:
        """Add new observation(s) to buffer."""
        if isinstance(observations, pd.Series):
            self.buffer.append(observations)
            self.observation_count += 1
        elif isinstance(observations, pd.DataFrame):
            for _, row in observations.iterrows():
                self.buffer.append(row)
                self.observation_count += 1
    
    def get_data(self) -> pd.DataFrame:
        """Return buffer contents as DataFrame."""
        if not self.buffer:
            return pd.DataFrame()
        return pd.DataFrame(list(self.buffer))
    
    def size(self) -> int:
        """Return current buffer size."""
        return len(self.buffer)
    
    def get_state(self) -> Dict[str, Any]:
        """Get buffer state for persistence."""
        return {
            'max_size': self.max_size,
            'buffer_data': [obs.to_dict() if hasattr(obs, 'to_dict') else obs for obs in self.buffer],
            'observation_count': self.observation_count
        }
    
    def load_state(self, state: Dict[str, Any]) -> None:
        """Load buffer state from persistence."""
        self.max_size = state['max_size']
        self.observation_count = state['observation_count']
        self.buffer = deque(maxlen=self.max_size)
        
        for obs_dict in state['buffer_data']:
            if isinstance(obs_dict, dict):
                obs = pd.Series(obs_dict)
                self.buffer.append(obs)


class ScalerManager:
    """Manages scaler state and incremental updates with persistence support."""
    
    def __init__(self, scaler, update_enabled: bool = True, update_freq: int = 1):
        self.base_scaler = scaler
        self.update_enabled = update_enabled
        self.update_freq = update_freq
        self.update_counter = 0
        self.scaler_version = 0
        self.last_update_count = 0
        
    def fit(self, X: pd.DataFrame) -> None:
        """Initial fit of the scaler."""
        self.base_scaler.fit(X)
        self.scaler_version += 1
        
    def transform(self, X: pd.DataFrame) -> np.ndarray:
        """Transform data using current scaler state."""
        return self.base_scaler.transform(X)
        
    def partial_update(self, X: pd.DataFrame) -> bool:
        """
        Incrementally update scaler if conditions are met.
        
        Returns:
            bool: True if scaler was updated, False otherwise
        """
        if not self.update_enabled:
            return False
            
        self.update_counter += len(X)
        
        # Check if we should update based on frequency
        if self.update_counter - self.last_update_count >= self.update_freq:
            if hasattr(self.base_scaler, 'partial_fit'):
                self.base_scaler.partial_fit(X)
                self.scaler_version += 1
                self.last_update_count = self.update_counter
                return True
            else:
                print("Warning: Scaler doesn't support partial_fit")
                return False
        
        return False
    
    def get_state(self) -> Dict[str, Any]:
        """Get scaler state for persistence."""
        return {
            'scaler_state': pickle.dumps(self.base_scaler),
            'update_enabled': self.update_enabled,
            'update_freq': self.update_freq,
            'update_counter': self.update_counter,
            'scaler_version': self.scaler_version,
            'last_update_count': self.last_update_count
        }
    
    def load_state(self, state: Dict[str, Any]) -> None:
        """Load scaler state from persistence."""
        self.base_scaler = pickle.loads(state['scaler_state'])
        self.update_enabled = state['update_enabled']
        self.update_freq = state['update_freq']
        self.update_counter = state['update_counter']
        self.scaler_version = state['scaler_version']
        self.last_update_count = state['last_update_count']


class InMemoryBuffer:
    """Buffer to store recent observations for rolling feature computation."""
    
    def __init__(self, max_size: int):
        self.max_size = max_size
        self.buffer = deque(maxlen=max_size)
        self.observation_count = 0  # Track total observations seen
    
    def add(self, observations: Union[pd.Series, pd.DataFrame]) -> None:
        """Add new observation(s) to buffer."""
        if isinstance(observations, pd.Series):
            self.buffer.append(observations)
            self.observation_count += 1
        elif isinstance(observations, pd.DataFrame):
            for _, row in observations.iterrows():
                self.buffer.append(row)
                self.observation_count += 1
    
    def get_data(self) -> pd.DataFrame:
        """Return buffer contents as DataFrame."""
        if not self.buffer:
            return pd.DataFrame()
        return pd.DataFrame(list(self.buffer))
    
    def size(self) -> int:
        """Return current buffer size."""
        return len(self.buffer)
    
    def get_state(self) -> Dict[str, Any]:
        """Get buffer state for persistence."""
        buffer_data = []
        for obs in self.buffer:
            if hasattr(obs, 'to_dict'):
                # Handle pandas Series
                obs_dict = obs.to_dict()
                # Also store the index if it exists
                if hasattr(obs, 'name') and obs.name is not None:
                    obs_dict['_index'] = str(obs.name)
                buffer_data.append(obs_dict)
            else:
                buffer_data.append(obs)
        
        return {
            'max_size': self.max_size,
            'buffer_data': buffer_data,
            'observation_count': self.observation_count
        }
    
    def load_state(self, state: Dict[str, Any]) -> None:
        """Load buffer state from persistence."""
        self.max_size = state['max_size']
        self.observation_count = state['observation_count']
        self.buffer = deque(maxlen=self.max_size)
        
        for obs_dict in state['buffer_data']:
            if isinstance(obs_dict, dict) and '_index' in obs_dict:
                # Reconstruct pandas Series with index
                index_val = obs_dict.pop('_index')
                obs = pd.Series(obs_dict, name=index_val)
                self.buffer.append(obs)
            elif isinstance(obs_dict, dict):
                obs = pd.Series(obs_dict)
                self.buffer.append(obs)
            else:
                self.buffer.append(obs_dict)


class TimeSeriesVW:
    """
    Time Series feature engineering class with Vowpal Wabbit output format.
    Supports incremental learning and online inference with evolving scaler.
    """
    
    def __init__(self, scaler_manager, buffer, max_window: int = 6, horizon: int = 6, 
                 target_col: str = 'target'):
        """
        Initialize TimeSeriesVW with dependency injection.
        
        Args:
            scaler_manager: ScalerManager instance for handling scaler evolution
            buffer: Buffer instance for storing recent observations
            max_window: Maximum window size for rolling features
            horizon: Forecasting horizon
            target_col: Name of target column
        """
        self.scaler_manager = scaler_manager
        self.buffer = buffer
        self.max_window = max_window
        self.horizon = horizon
        self.target_col = target_col
        self.feature_names = None
        self.is_fitted = False
    
    def _create_rolling_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create rolling features for given DataFrame."""
        df_features = df.copy()
        
        # Create rolling features for different lag windows
        for lag in range(1, self.max_window + 1):
            # Rolling mean
            df_features[f'rolling_mean_{lag}'] = df[self.target_col].rolling(window=lag, min_periods=1).mean()
            
            # Rolling std
            df_features[f'rolling_std_{lag}'] = df[self.target_col].rolling(window=lag, min_periods=1).std().fillna(0)
            
            # Percentage change
            df_features[f'pct_{lag}'] = df[self.target_col].pct_change(periods=lag).fillna(0)
        
        return df_features
    
    def _create_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create temporal features (month and quarter one-hot encoding)."""
        df_features = df.copy()
        
        # Extract month and quarter from datetime index
        df_features['month_num'] = df_features.index.month
        df_features['quarter'] = df_features.index.quarter
        
        # One-hot encoding for month (1-12)
        for month in range(1, 13):
            df_features[f'month_{month}'] = (df_features['month_num'] == month).astype(int)
        
        # One-hot encoding for quarter (1-4)
        for quarter in range(1, 5):
            df_features[f'quarter_{quarter}'] = (df_features['quarter'] == quarter).astype(int)
        
        # Drop intermediate columns
        df_features = df_features.drop(['month_num', 'quarter'], axis=1)
        
        return df_features
    
    def _create_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create all features for the dataset."""
        # Create rolling features
        df_features = self._create_rolling_features(df)
        
        # Create temporal features
        df_features = self._create_temporal_features(df_features)
        
        return df_features
    
    def _get_feature_columns(self, df: pd.DataFrame) -> List[str]:
        """Get feature column names (excluding target)."""
        return [col for col in df.columns if col != self.target_col]
    
    def fit_features(self, df: pd.DataFrame) -> 'TimeSeriesVW':
        """
        Fit the scaler on training data only.
        
        Args:
            df: Training DataFrame with datetime index
            
        Returns:
            self for method chaining
        """
        # Create features
        df_features = self._create_features(df)
        
        # Get feature columns
        feature_cols = self._get_feature_columns(df_features)
        self.feature_names = feature_cols
        
        # Fit scaler on training features only
        self.scaler_manager.fit(df_features[feature_cols])
        self.is_fitted = True
        
        return self
    
    def fit_transform_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Fit scaler and transform features for the full dataset.
        
        Args:
            df: Full DataFrame with datetime index
            
        Returns:
            DataFrame with engineered and scaled features
        """
        # Fit on the data
        self.fit_features(df)
        
        # Transform features
        transformed_df = self.transform_features(df)
        
        # CRITICAL: Update buffer with the last max_window-1 observations
        # This ensures rolling features work correctly for future incremental data
        last_obs = df.tail(self.max_window - 1)
        self.buffer.add(last_obs)
        
        return transformed_df
    
    def transform_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Transform features using fitted scaler and buffer for rolling features.
        
        Args:
            df: DataFrame to transform (can be single row or batch)
            
        Returns:
            DataFrame with transformed features
        """
        if not self.is_fitted:
            raise ValueError("Must call fit_features() before transform_features()")
        
        # For incremental processing, combine buffer with new data for rolling features
        if self.buffer.size() > 0:
            buffer_df = self.buffer.get_data()
            # Combine buffer with new data for rolling calculations
            combined_df = pd.concat([buffer_df, df], ignore_index=False)
            # Take only the new data rows after computing rolling features
            df_features = self._create_features(combined_df)
            df_features = df_features.iloc[-len(df):]
        else:
            df_features = self._create_features(df)
        
        # Create temporal features (these don't depend on buffer)
        df_features = self._create_temporal_features(df_features)
        
        # Scale features
        feature_cols = self.feature_names
        df_features[feature_cols] = self.scaler_manager.transform(df_features[feature_cols])
        
        return df_features
    
    def _update_scaler_incremental(self, new_features: pd.DataFrame) -> None:
        """
        Update scaler incrementally with new data using partial_fit if available.
        Falls back to full refit if partial_fit not supported.
        """
        feature_cols = self.feature_names
        new_feature_data = new_features[feature_cols]
        
        # Use ScalerManager to handle incremental updates
        updated = self.scaler_manager.partial_update(new_feature_data)
        if updated:
            print(f"Scaler updated to version {self.scaler_manager.scaler_version}")
    
    def update(self, new_obs: Union[pd.Series, pd.DataFrame]) -> List[str]:
        """
        Update buffer with new observation(s), update scaler, and generate VW format lines.
        
        Args:
            new_obs: New observation(s) to add to buffer
            
        Returns:
            List of VW format strings
        """
        if not self.is_fitted:
            raise ValueError("Must call fit_features() before update()")
        
        # Convert Series to DataFrame if needed
        if isinstance(new_obs, pd.Series):
            new_obs = new_obs.to_frame().T
        
        # Transform features (using current scaler state)
        transformed_obs = self.transform_features(new_obs)
        
        # Update scaler with new transformed features
        self._update_scaler_incremental(transformed_obs)
        
        # Update buffer with original observations (for future rolling calculations)
        self.buffer.add(new_obs)
        
        # Generate VW lines
        vw_lines = []
        for idx, row in transformed_obs.iterrows():
            # Create ID from index (timestamp)
            obs_id = str(idx) if hasattr(idx, 'strftime') else str(idx)
            vw_line = self._create_vw_line(row, obs_id)
            vw_lines.append(vw_line)
        
        return vw_lines
    
    def save_state(self, path: str) -> None:
        """Save complete model state for persistence (K8s readiness)."""
        state = {
            'scaler_manager_state': self.scaler_manager.get_state(),
            'buffer_state': self.buffer.get_state(),
            'max_window': self.max_window,
            'horizon': self.horizon,
            'target_col': self.target_col,
            'feature_names': self.feature_names,
            'is_fitted': self.is_fitted
        }
        
        with open(path, 'wb') as f:
            pickle.dump(state, f)
        
        print(f"Model state saved to {path}")
    
    def load_state(self, path: str) -> None:
        """Load complete model state from persistence."""
        with open(path, 'rb') as f:
            state = pickle.load(f)
        
        self.scaler_manager.load_state(state['scaler_manager_state'])
        self.buffer.load_state(state['buffer_state'])
        self.max_window = state['max_window']
        self.horizon = state['horizon']
        self.target_col = state['target_col']
        self.feature_names = state['feature_names']
        self.is_fitted = state['is_fitted']
        
        print(f"Model state loaded from {path}")
        print(f"Scaler version: {self.scaler_manager.scaler_version}")
        print(f"Buffer size: {self.buffer.size()}")
    
    def _create_vw_line(self, row: pd.Series, obs_id: str) -> str:
        """Create a single VW format line from a row."""
        target_value = row[self.target_col]
        feature_cols = [col for col in row.index if col != self.target_col]
        
        # Build feature string
        features = []
        for col in feature_cols:
            value = row[col]
            # Handle NaN values
            if pd.isna(value):
                value = 0.0
            features.append(f"{col}:{value:.6f}")
        
        feature_str = " ".join(features)
        vw_line = f"{target_value:.6f} |features {feature_str} ID:{obs_id}"
        
        return vw_line
    
    def to_vw(self, df: pd.DataFrame, path: str) -> None:
        """
        Save DataFrame to Vowpal Wabbit format.
        
        Args:
            df: DataFrame with features and target
            path: Output file path
        """
        vw_lines = []
        
        for idx, row in df.iterrows():
            obs_id = str(idx) if hasattr(idx, 'strftime') else str(idx)
            vw_line = self._create_vw_line(row, obs_id)
            vw_lines.append(vw_line)
        
        # Write to file
        with open(path, 'w') as f:
            f.write('\n'.join(vw_lines))
        
        print(f"Saved {len(vw_lines)} observations to {path}")



In [3]:

# Example usage
if __name__ == "__main__":
    # Load and prepare data
    df = pd.read_csv('air.csv')
    df['month'] = pd.to_datetime(df['month'])
    df = df.set_index('month')
    df['target'] = df['passengers']  # Create target column
    
    print("Dataset shape:", df.shape)
    print("Dataset head:")
    print(df.head())
    
    # Initialize dependencies with proper scaler management
    scaler = StandardScaler()  # Base scaler
    scaler_manager = ScalerManager(
        scaler=scaler,
        update_enabled=True,  # Enable incremental updates
        update_freq=1  # Update every observation
    )
    buffer = InMemoryBuffer(max_size=5)  # max_window - 1
    
    # Initialize TimeSeriesVW with ScalerManager
    ts_vw = TimeSeriesVW(
        scaler_manager=scaler_manager,
        buffer=buffer,
        max_window=6,
        horizon=6,
        target_col='target'
    )
    
    # Train/validation/test split
    total_len = len(df)
    train_size = total_len - 12  # Last 12 for validation + test
    val_size = 6
    test_size = 6
    
    train_df = df.iloc[:train_size]
    val_df = df.iloc[train_size:train_size + val_size]
    test_df = df.iloc[train_size + val_size:]
    
    print(f"\nData splits:")
    print(f"Train: {len(train_df)} observations")
    print(f"Validation: {len(val_df)} observations") 
    print(f"Test: {len(test_df)} observations")
    
    # Fit and transform training data
    train_transformed = ts_vw.fit_transform_features(train_df)
    print(f"\nTrain features shape: {train_transformed.shape}")
    print("Feature columns:", [col for col in train_transformed.columns if col != 'target'])
    
    # Transform validation and test data
    val_transformed = ts_vw.transform_features(val_df)
    test_transformed = ts_vw.transform_features(test_df)
    
    print(f"Validation features shape: {val_transformed.shape}")
    print(f"Test features shape: {test_transformed.shape}")
    print(f"Buffer size after validation transform: {ts_vw.buffer.size()}")  # Should still be 5
    
    # Save to VW format
    ts_vw.to_vw(train_transformed, 'train.vw')
    ts_vw.to_vw(val_transformed, 'validation.vw')
    ts_vw.to_vw(test_transformed, 'test.vw')
    
    # Demonstrate incremental learning
    print(f"\n=== Incremental Learning Demo ===")
    
    # Incremental learning demo with proper state management
    print(f"\n=== Incremental Learning Demo ===")
    
    # Save state after training (K8s checkpoint)
    ts_vw.save_state('model_checkpoint.pkl')
    
    # Simulate loading state (like K8s pod restart)
    scaler_fresh = StandardScaler()
    scaler_manager_fresh = ScalerManager(scaler=scaler_fresh, update_enabled=True, update_freq=1)
    buffer_fresh = InMemoryBuffer(max_size=5)
    
    ts_vw_reloaded = TimeSeriesVW(
        scaler_manager=scaler_manager_fresh,
        buffer=buffer_fresh,
        max_window=6,
        horizon=6,
        target_col='target'
    )
    
    # Load saved state
    ts_vw_reloaded.load_state('model_checkpoint.pkl')
    
    print(f"State reloaded - Buffer size: {ts_vw_reloaded.buffer.size()}")
    print(f"Scaler version: {ts_vw_reloaded.scaler_manager.scaler_version}")
    
    # Incrementally process validation data with scaler updates
    print("\nIncremental processing with scaler updates:")
    print("Scaler mean before updates:", ts_vw_reloaded.scaler_manager.base_scaler.mean_[:3])
    
    for i, (idx, row) in enumerate(val_df.iterrows()):
        vw_lines = ts_vw_reloaded.update(row)
        print(f"Observation {i+1} ({idx.strftime('%Y-%m')}):")
        print(f"  Target: {row['target']:.2f}")
        print(f"  VW line: {vw_lines[0][:80]}...")
        print(f"  Buffer size: {ts_vw_reloaded.buffer.size()}")
        print(f"  Scaler version: {ts_vw_reloaded.scaler_manager.scaler_version}")
        if i == 0:  # Show scaler evolution after first update
            print(f"  Scaler mean after update: {ts_vw_reloaded.scaler_manager.base_scaler.mean_[:3]}")
    
    print(f"Final scaler mean: {ts_vw_reloaded.scaler_manager.base_scaler.mean_[:3]}")
    print(f"Total scaler updates: {ts_vw_reloaded.scaler_manager.update_counter}")
    
    # Save final state
    ts_vw_reloaded.save_state('model_final_state.pkl')
    
    # Batch incremental processing
    print(f"\n=== Batch Incremental Processing ===")
    
    # Reset buffer and process test data in batch with different update frequency
    buffer_batch = InMemoryBuffer(max_size=5)
    ts_vw_batch = TimeSeriesVW(
        scaler=StandardScaler(),
        buffer=buffer_batch,
        max_window=6,
        horizon=6,
        target_col='target',
        update_scaler=True,
        scaler_update_freq=3  # Update scaler every 3 observations (batch strategy)
    )
    ts_vw_batch.fit_features(train_df)
    
    # For batch demo, we need to populate the buffer first
    last_train_obs = train_df.tail(ts_vw_batch.max_window - 1)
    ts_vw_batch.buffer.add(last_train_obs)
    
    # Process validation data as batch
    print(f"Scaler mean before batch update: {ts_vw_batch.scaler.mean_[:3]}")
    vw_lines_batch = ts_vw_batch.update(val_df)
    print(f"Processed {len(vw_lines_batch)} observations in batch")
    print(f"Scaler mean after batch update: {ts_vw_batch.scaler.mean_[:3]}")
    print("First VW line:", vw_lines_batch[0][:100] + "...")
    print("Last VW line:", vw_lines_batch[-1][:100] + "...")
    
    # Show feature engineering example
    print(f"\n=== Feature Engineering Example ===")
    sample_transformed = train_transformed.head(3)
    print("Sample transformed data (first 3 rows):")
    feature_cols = [col for col in sample_transformed.columns if col != 'target']
    print(f"Features created: {len(feature_cols)} features")
    
    # Show some key features
    key_features = [col for col in feature_cols if any(x in col for x in ['rolling_mean', 'month_', 'quarter_'])][:10]
    print(f"Sample key features: {key_features}")
    
    for idx, row in sample_transformed.head(2).iterrows():
        print(f"\nDate: {idx.strftime('%Y-%m-%d')}")
        print(f"Target: {row[ts_vw.target_col]:.2f}")
        for feat in key_features[:5]:
            print(f"  {feat}: {row[feat]:.4f}")
    
    print(f"\n=== Summary ===")
    print(f"✓ Created {len(feature_cols)} features including rolling stats and temporal features")
    print(f"✓ Scaler fitted on {len(train_df)} training observations")
    print(f"✓ Generated VW files for train/validation/test")
    print(f"✓ Implemented incremental scaler updates with versioning")
    print(f"✓ Added state persistence for K8s pod restarts")
    print(f"✓ Buffer properly maintains rolling window context")
    print(f"✓ Ready for Redis migration (state serialization included)")
    print(f"✓ Production-ready for K8s cluster with online learning")

Dataset shape: (144, 2)
Dataset head:
            passengers  target
month                         
1949-01-01         112     112
1949-02-01         118     118
1949-03-01         132     132
1949-04-01         129     129
1949-05-01         121     121

Data splits:
Train: 132 observations
Validation: 6 observations
Test: 6 observations

Train features shape: (132, 36)
Feature columns: ['passengers', 'rolling_mean_1', 'rolling_std_1', 'pct_1', 'rolling_mean_2', 'rolling_std_2', 'pct_2', 'rolling_mean_3', 'rolling_std_3', 'pct_3', 'rolling_mean_4', 'rolling_std_4', 'pct_4', 'rolling_mean_5', 'rolling_std_5', 'pct_5', 'rolling_mean_6', 'rolling_std_6', 'pct_6', 'month_1', 'month_2', 'month_3', 'month_4', 'month_5', 'month_6', 'month_7', 'month_8', 'month_9', 'month_10', 'month_11', 'month_12', 'quarter_1', 'quarter_2', 'quarter_3', 'quarter_4']
Validation features shape: (6, 36)
Test features shape: (6, 36)
Buffer size after validation transform: 5
Saved 132 observations to train.vw
Sa

AttributeError: 'InMemoryBuffer' object has no attribute 'get_state'