In [None]:
from kafka import (
    KafkaConsumer,
    TopicPartition
)
from kafka.errors import NoBrokersAvailable
from typing import Any, Dict, Hashable, Optional, List
from river import (
    base,
    compose,
    metrics,
    drift,
    forest,
    cluster,
    preprocessing,
    time_series,
    linear_model,
)

In [28]:
KAFKA_HOST = "localhost"
KAFKA_PORT = 9092  # Change to 9093 if skaffold port-forward uses that port
KAFKA_BROKERS = f'{KAFKA_HOST}:{KAFKA_PORT}'
PROJECT_NAME = "Transaction Fraud Detection"

In [29]:
dir(metrics)

['Accuracy',
 'AdjustedMutualInfo',
 'AdjustedRand',
 'BalancedAccuracy',
 'ClassificationReport',
 'CohenKappa',
 'Completeness',
 'ConfusionMatrix',
 'CrossEntropy',
 'F1',
 'FBeta',
 'FowlkesMallows',
 'GeometricMean',
 'Homogeneity',
 'Jaccard',
 'LogLoss',
 'MAE',
 'MAPE',
 'MCC',
 'MSE',
 'MacroF1',
 'MacroFBeta',
 'MacroJaccard',
 'MacroPrecision',
 'MacroRecall',
 'MicroF1',
 'MicroFBeta',
 'MicroJaccard',
 'MicroPrecision',
 'MicroRecall',
 'MultiFBeta',
 'MutualInfo',
 'NormalizedMutualInfo',
 'Precision',
 'R2',
 'RMSE',
 'RMSLE',
 'ROCAUC',
 'Rand',
 'Recall',
 'RollingROCAUC',
 'SMAPE',
 'Silhouette',
 'VBeta',
 'WeightedF1',
 'WeightedFBeta',
 'WeightedJaccard',
 'WeightedPrecision',
 'WeightedRecall',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 'accuracy',
 'annotations',
 'balanced_accuracy',
 'base',
 'confusion',
 'cross_entropy',
 'efficient_rollingrocauc',
 'expected_mutu

In [30]:
def create_consumer(project_name, max_retries: int = 5, retry_delay: float = 5.0, start_offset: Optional[int] = None):
    """Create and return Kafka consumer with manual partition assignment.
    
    Args:
        project_name: Name of the project for topic selection.
        max_retries: Maximum connection retry attempts.
        retry_delay: Delay between retries in seconds.
        start_offset: If provided, seek to this offset + 1 to continue from last processed.
                     If None, seeks to the earliest available offset (handles log compaction).
    
    Note: Uses manual partition assignment instead of group-based subscription
    due to Kafka 4.0 compatibility issues with kafka-python's consumer group protocol.
    """
    consumer_name_dict = {
        "Transaction Fraud Detection": "transaction_fraud_detection",
        "Estimated Time of Arrival": "estimated_time_of_arrival",
        "E-Commerce Customer Interactions": "e_commerce_customer_interactions",
        "Sales Forecasting": "sales_forecasting"
    }
    KAFKA_TOPIC = consumer_name_dict[project_name]
    
    for attempt in range(max_retries):
        try:
            # Create consumer without topic subscription (manual assignment)
            consumer = KafkaConsumer(
                bootstrap_servers = KAFKA_BROKERS,
                value_deserializer = lambda v: json.loads(v.decode('utf-8')),
                consumer_timeout_ms = 1000,  # 1 second timeout for graceful shutdown checks
                api_version = (3, 7),  # Force API version for Kafka 4.0 compatibility
            )
            
            # Manually assign partition 0 of the topic
            tp = TopicPartition(KAFKA_TOPIC, 0)
            consumer.assign([tp])
            
            # Seek to appropriate offset
            if start_offset is not None:
                # Continue from the next message after last processed
                next_offset = start_offset + 1
                consumer.seek(tp, next_offset)
                print(f"Kafka consumer seeking to offset {next_offset} (continuing from {start_offset})")
            else:
                # No stored offset - seek to earliest AVAILABLE offset
                # Using seek_to_beginning + poll + position as workaround for Kafka 4.0
                # (beginning_offsets() blocks indefinitely with kafka-python + Kafka 4.0)
                consumer.seek_to_beginning(tp)
                consumer.poll(timeout_ms=100)  # Trigger the seek
                begin_offset = consumer.position(tp)
                consumer.seek(tp, begin_offset)  # Ensure we're at the detected position
                print(f"Kafka consumer seeking to earliest available offset {begin_offset}")
            
            print(f"Kafka consumer created for {project_name} (manual assignment)")
            return consumer
            
        except NoBrokersAvailable as e:
            if attempt < max_retries - 1:
                print(f"Kafka not available for {project_name}, retrying in {retry_delay}s... (attempt {attempt + 1}/{max_retries})")
                time.sleep(retry_delay)
            else:
                print(f"Failed to connect to Kafka for {project_name} after {max_retries} attempts. Continuing without consumer.")
                return None
        except Exception as e:
            print(f"Error creating Kafka consumer for {project_name}: {e}")
            return None
    
    return None

In [31]:
class CustomOrdinalEncoder:
    """
    An incremental ordinal encoder that is picklable and processes dictionaries.
    Assigns a unique integer ID to each unique category encountered for each feature.
    """
    def __init__(self):
        # Dictionary to store mappings for each feature.
        # Keys are feature names (from input dictionary), values are dictionaries
        # mapping category value to integer ID for that feature.
        self._feature_mappings: Dict[Hashable, Dict[Any, int]] = {}
        # Dictionary to store the next available integer ID for each feature.
        # Keys are feature names, values are integers.
        self._feature_next_ids: Dict[Hashable, int] = {}
    def learn_one(self, x: Dict[Hashable, Any]):
        """
        Learns categories from a single sample dictionary.
        Iterates through the dictionary's items and learns each category value
        for its corresponding feature.
        Args:
            x: A dictionary representing a single sample.
               Keys are feature names, values are feature values.
               Assumes categorical features are present in this dictionary.
        """
        for feature_name, category_value in x.items():
            # Ensure the category value is hashable (dictionaries/lists are not)
            # You might need more sophisticated type checking or handling
            # if your input dictionaries contain complex unhashable types
            if not isinstance(category_value, Hashable):
                 print(f"Warning: Skipping unhashable value for feature '{feature_name}': {category_value}")
                 continue # Skip this feature for learning
            # If this is the first time we see this feature, initialize its mapping and counter
            if feature_name not in self._feature_mappings:
                self._feature_mappings[feature_name] = {}
                self._feature_next_ids[feature_name] = 0
            # Get the mapping and counter for this specific feature
            feature_map = self._feature_mappings[feature_name]
            feature_next_id = self._feature_next_ids[feature_name]
            # Check if the category value is already in the mapping for this feature
            if category_value not in feature_map:
                # If it's a new category for this feature, assign the next available ID
                feature_map[category_value] = feature_next_id
                # Increment the counter for the next new category for this feature
                self._feature_next_ids[feature_name] += 1
    def transform_one(self, x: Dict[Hashable, Any]) -> Dict[Hashable, int]:
        """
        Transforms categorical features in a single sample dictionary into integer IDs.
        Args:
            x: A dictionary representing a single sample.
               Keys are feature names, values are feature values.
        Returns:
            A new dictionary containing the transformed integer IDs for the
            categorical features that the encoder has seen. Features not
            seen by the encoder are excluded from the output dictionary.
        Raises:
            KeyError: If a feature is seen but a specific category value
                      within that feature has not been seen during learning.
                      You might want to add logic here to handle unseen categories
                      (e.g., return a default value like -1 or NaN for that feature).
        """
        transformed_sample: Dict[Hashable, int] = {}
        for feature_name, category_value in x.items():
            # Only attempt to transform features that the encoder has seen
            if feature_name in self._feature_mappings:
                feature_map = self._feature_mappings[feature_name]
                # Check if the category value for this feature has been seen
                if category_value in feature_map:
                    # Transform the category value using the feature's mapping
                    transformed_sample[feature_name] = feature_map[category_value]
                else:
                    # Handle unseen category values for a known feature
                    # By default, this will raise a KeyError as per the docstring.
                    # Example: return a placeholder value instead of raising error:
                    # transformed_sample[feature_name] = -1 # Or some other indicator
                    # print(f"Warning: Unseen category '{category_value}' for feature '{feature_name}' during transform.")
                    # Or raise the error explicitly:
                    raise KeyError(f"Unseen category '{category_value}' for feature '{feature_name}' during transform.")
            # Features not in self._feature_mappings are ignored in the output.
            # If you need to include them (e.g., original numerical features),
            # you would copy them over here. This encoder only outputs encoded features.
        return transformed_sample
    def get_feature_mappings(self) -> Dict[Hashable, Dict[Any, int]]:
        """Returns the current mappings for all features."""
        return self._feature_mappings
    def get_feature_next_ids(self) -> Dict[Hashable, int]:
        """Returns the next available IDs for all features."""
        return self._feature_next_ids
    def __repr__(self) -> str:
        """String representation of the encoder."""
        num_features = len(self._feature_mappings)
        feature_details = ", ".join([f"{name}: {len(mapping)} categories" for name, mapping in self._feature_mappings.items()])
        return f"CustomPicklableOrdinalEncoder(features={num_features} [{feature_details}])"
    


In [32]:
class DictImputer(base.Transformer):
    """
    Imputes missing values (None or missing keys) for specified features in a dictionary.

    Parameters
    ----------
    on
        List of feature names to impute.
    fill_value
        The value to use for imputation.
    """
    def __init__(self, on: list, fill_value):
        self.on = on
        self.fill_value = fill_value
    def transform_one(self, x: dict):
        x_transformed = x.copy()
        for feature in self.on:
            if x_transformed.get(feature) is None:
                x_transformed[feature] = self.fill_value
        return x_transformed

In [33]:
def extract_device_info(x):
    x_ = x['device_info']
    return {
        'os': x_['os'],
        'browser': x_['browser'],
    }

def extract_timestamp_info(x):
    x_ = dt.datetime.strptime(
        x['timestamp'],
        "%Y-%m-%dT%H:%M:%S.%f%z")
    return {
        'year': x_.year,
        'month': x_.month,
        'day': x_.day,
        'hour': x_.hour,
        'minute': x_.minute,
        'second': x_.second
    }

def extract_coordinates(x):
    x_ = x['location']
    return {
        'lat': x_['lat'],
        'lon': x_['lon'],
    }

In [34]:
def process_sample(x, encoders, project_name):
    """Process a single sample for River incremental learning."""
    if project_name == "Transaction Fraud Detection":
        pipe1 = compose.Select(
            "amount",
            "account_age_days",
            "cvv_provided",
            "billing_address_match"
        )
        pipe1.learn_one(x)
        x1 = pipe1.transform_one(x)
        pipe2 = compose.Select(
            "currency",
            "merchant_id",
            "payment_method",
            "product_category",
            "transaction_type",
        )
        pipe2.learn_one(x)
        x_pipe_2 = pipe2.transform_one(x)
        pipe3a = compose.Select("device_info")
        pipe3a.learn_one(x)
        x_pipe_3 = pipe3a.transform_one(x)
        pipe3b = compose.FuncTransformer(extract_device_info)
        pipe3b.learn_one(x_pipe_3)
        x_pipe_3 = pipe3b.transform_one(x_pipe_3)
        pipe4a = compose.Select("timestamp")
        pipe4a.learn_one(x)
        x_pipe_4 = pipe4a.transform_one(x)
        pipe4b = compose.FuncTransformer(extract_timestamp_info)
        pipe4b.learn_one(x_pipe_4)
        x_pipe_4 = pipe4b.transform_one(x_pipe_4)
        x_to_encode = x_pipe_2 | x_pipe_3 | x_pipe_4
        encoders["ordinal_encoder"].learn_one(x_to_encode)
        x2 = encoders["ordinal_encoder"].transform_one(x_to_encode)
        return x1 | x2, {"ordinal_encoder": encoders["ordinal_encoder"]}
    elif project_name == "Estimated Time of Arrival":
        pipe1 = compose.Select(
            'estimated_distance_km',
            'temperature_celsius',
            'hour_of_day',
            'driver_rating',
            'initial_estimated_travel_time_seconds',
            'debug_traffic_factor',
            'debug_weather_factor',
            'debug_incident_delay_seconds',
            'debug_driver_factor'
        )
        pipe1.learn_one(x)
        x1 = pipe1.transform_one(x)
        pipe2 = compose.Select(
            'driver_id',
            'vehicle_id',
            'weather',
            'vehicle_type'
        )
        pipe2.learn_one(x)
        x_pipe_2 = pipe2.transform_one(x)
        pipe3a = compose.Select(
            "timestamp",
        )
        pipe3a.learn_one(x)
        x_pipe_3 = pipe3a.transform_one(x)
        pipe3b = compose.FuncTransformer(
            extract_timestamp_info,
        )
        pipe3b.learn_one(x_pipe_3)
        x_pipe_3 = pipe3b.transform_one(x_pipe_3)
        x_to_encode = x_pipe_2 | x_pipe_3
        encoders["ordinal_encoder"].learn_one(x_to_encode)
        x2 = encoders["ordinal_encoder"].transform_one(x_to_encode)
        return x1 | x2, {
            "ordinal_encoder": encoders["ordinal_encoder"]
        }
    elif project_name == "E-Commerce Customer Interactions":
        pipe1 = compose.Select(
            'price',
            'quantity',
            'session_event_sequence',
            'time_on_page_seconds'
        )
        pipe1.learn_one(x)
        x1 = pipe1.transform_one(x)
        pipe2 = compose.Select(
            'event_type',
            'product_category',
            'product_id',
            'referrer_url',
        )
        pipe2.learn_one(x)
        x_pipe_2 = pipe2.transform_one(x)
        pipe3a = compose.Select(
            "device_info"
        )
        pipe3a.learn_one(x)
        x_pipe_3 = pipe3a.transform_one(x)
        pipe3b = compose.FuncTransformer(
            extract_device_info,
        )
        pipe3b.learn_one(x_pipe_3)
        x_pipe_3 = pipe3b.transform_one(x_pipe_3)
        pipe4a = compose.Select(
            "timestamp",
        )
        pipe4a.learn_one(x)
        x_pipe_4 = pipe4a.transform_one(x)
        pipe4b = compose.FuncTransformer(
            extract_timestamp_info,
        )
        pipe4b.learn_one(x_pipe_4)
        x_pipe_4 = pipe4b.transform_one(x_pipe_4)
        pipe5a = compose.Select(
            "location",
        )
        pipe5a.learn_one(x)
        x_pipe_5 = pipe5a.transform_one(x)
        pipe5b = compose.FuncTransformer(
            extract_coordinates,
        )
        pipe5b.learn_one(x_pipe_5)
        x_pipe_5 = pipe5b.transform_one(x_pipe_5)
        x_to_prep = x1 | x_pipe_2 | x_pipe_3 | x_pipe_4 | x_pipe_5
        x_to_prep = DictImputer(
            fill_value = False, 
            on = list(x_to_prep.keys())).transform_one(
                x_to_prep)
        numerical_features = [
            'price',
            'session_event_sequence',
            'time_on_page_seconds',
            'quantity'
        ]
        categorical_features = [
            'event_type',
            'product_category',
            'product_id',
            'referrer_url',
            'os',
            'browser',
            'year',
            'month',
            'day',
            'hour',
            'minute',
            'second'
        ]
        num_pipe = compose.Select(*numerical_features)
        num_pipe.learn_one(x_to_prep)
        x_num = num_pipe.transform_one(x_to_prep)
        cat_pipe = compose.Select(*categorical_features)
        cat_pipe.learn_one(x_to_prep)
        x_cat = cat_pipe.transform_one(x_to_prep)
        encoders["standard_scaler"].learn_one(x_num)
        x_scaled = encoders["standard_scaler"].transform_one(x_num)
        encoders["feature_hasher"].learn_one(x_cat)
        x_hashed = encoders["feature_hasher"].transform_one(x_cat)
        return x_scaled | x_hashed, {
            "standard_scaler": encoders["standard_scaler"], 
            "feature_hasher": encoders["feature_hasher"]
        }
    elif project_name == "Sales Forecasting":
        pipe1 = compose.Select(
            'concept_drift_stage',
            'day_of_week',
            'is_holiday',
            'is_promotion_active',
            'month',
            #'total_sales_amount',
            'unit_price'
        )
        pipe1.learn_one(x)
        x1 = pipe1.transform_one(x)
        pipe2a = compose.Select(
            "timestamp",
        )
        pipe2a.learn_one(x)
        x_pipe_2 = pipe2a.transform_one(x)
        pipe2b = compose.FuncTransformer(
            extract_timestamp_info,
        )
        pipe2b.learn_one(x_pipe_2)
        x2 = pipe2b.transform_one(x_pipe_2)
        pipe3a = compose.Select(
            'product_id',
            'promotion_id',
            'store_id'
        )
        pipe3a.learn_one(x)
        x3 = pipe3a.transform_one(x)
        x_to_process = x1 | x2 | x3
        numerical_features = [
            'unit_price',
            #'total_sales_amount',
        ]
        categorical_features = [
            'is_promotion_active',
            'is_holiday',
            'day_of_week',
            'concept_drift_stage',
            'year',
            'month',
            'day',
            #'hour',
            #'minute',
            #'second',
            'product_id',
            'promotion_id',
            'store_id',
        ]
        pipe_num = compose.Select(*numerical_features)
        pipe_num.learn_one(x_to_process)
        x_num = pipe_num.transform_one(x_to_process)
        pipe_cat = compose.Select(*categorical_features)
        pipe_cat.learn_one(x_to_process)
        x_cat = pipe_cat.transform_one(x_to_process)
        encoders["standard_scaler"].learn_one(x_num)
        x_num = encoders["standard_scaler"].transform_one(x_num)
        encoders["one_hot_encoder"].learn_one(x_cat)
        x_cat = encoders["one_hot_encoder"].transform_one(x_cat)
        return x_num | x_cat, {
            "one_hot_encoder": encoders["one_hot_encoder"],
            "standard_scaler": encoders["standard_scaler"],
        }

In [35]:
def _create_default_model(project_name):
    """Create default model based on project type.

    Models are configured based on River ML documentation and best practices.
    All parameters are documented with their River ML defaults and rationale.

    See: https://riverml.xyz/latest/
    """
    if project_name == "Transaction Fraud Detection":
        # =================================================================
        # ARFClassifier - Adaptive Random Forest Classifier
        # For fraud detection with concept drift handling
        # =================================================================
        # OLD CONFIGURATION:
        # return forest.ARFClassifier(
        #     n_models = 10,
        #     drift_detector = drift.ADWIN(),
        #     warning_detector = drift.ADWIN(),
        #     metric = metrics.ROCAUC(),
        #     max_features = "sqrt",
        #     lambda_value = 6,
        #     seed = 42
        # )

        # CONFIGURATION based on River ML documentation:
        # Reference: https://riverml.xyz/latest/api/forest/ARFClassifier/
        # Reference: https://riverml.xyz/latest/examples/imbalanced-learning/
        #
        # - n_models=10: Default number of trees in ensemble
        # - max_features="sqrt": Default, sqrt of features per split
        # - lambda_value=6: Default Leveraging Bagging parameter
        # - metric=ROCAUC(): RECOMMENDED by River for imbalanced fraud detection
        #   (River's imbalanced-learning guide uses ROCAUC for fraud detection)
        # - disable_weighted_vote=False: Enable weighted voting for better accuracy
        # - drift_detector ADWIN(delta=0.002): Default sensitivity (0.002)
        # - warning_detector ADWIN(delta=0.01): Default warning sensitivity
        # - grace_period=50: Default observations between split attempts
        # - max_depth=None: Default, unlimited tree depth
        # - split_criterion="info_gain": Default, information gain criterion
        # - delta=0.01: Default allowed error in split decision
        # - tau=0.05: Default tie-breaking threshold
        # - leaf_prediction="nba": Default, Naive Bayes Adaptive
        # - nb_threshold=0: Default, enable NB immediately
        # - binary_split=False: Default, allow multi-way splits
        # - min_branch_fraction=0.01: Default minimum data per branch
        # - max_share_to_split=0.99: Default majority class proportion
        # - max_size=100.0: Default max memory in MiB
        # - memory_estimate_period=2000000: Default instances between memory checks
        # - merit_preprune=True: Default merit-based pre-pruning
        return forest.ARFClassifier(
            n_models = 10,
            max_features = "sqrt",
            lambda_value = 6,
            metric = metrics.ROCAUC(),
            disable_weighted_vote = False,
            drift_detector = drift.ADWIN(delta = 0.002),
            warning_detector = drift.ADWIN(delta = 0.01),
            grace_period = 50,
            max_depth = None,
            split_criterion = "info_gain",
            delta = 0.01,
            tau = 0.05,
            leaf_prediction = "nba",
            nb_threshold = 0,
            nominal_attributes = None,
            binary_split = False,
            min_branch_fraction = 0.01,
            max_share_to_split = 0.99,
            max_size = 100.0,
            memory_estimate_period = 2000000,
            stop_mem_management = False,
            remove_poor_attrs = False,
            merit_preprune = True,
            seed = 42,
        )
    elif project_name == "Estimated Time of Arrival":
        # =================================================================
        # ARFRegressor - Adaptive Random Forest Regressor
        # For ETA prediction with continuous drift handling
        # =================================================================
        # OLD CONFIGURATION:
        # return forest.ARFRegressor(
        #     n_models = 10,
        #     drift_detector = drift.ADWIN(),
        #     warning_detector = drift.ADWIN(),
        #     metric = metrics.RMSE(),
        #     max_features = "sqrt",
        #     lambda_value = 6,
        #     seed = 42
        # )

        # CONFIGURATION based on River ML documentation:
        # Reference: https://riverml.xyz/latest/api/forest/ARFRegressor/
        #
        # - n_models=10: Default number of trees
        # - max_features="sqrt": Default feature selection
        # - aggregation_method="median": Default, robust to outliers
        # - lambda_value=6: Default Leveraging Bagging parameter
        # - metric=MAE(): Using MAE as it's common for ETA prediction
        # - disable_weighted_vote=True: Default for regressor
        # - drift_detector ADWIN(delta=0.002): Default sensitivity
        # - warning_detector ADWIN(delta=0.01): Default warning sensitivity
        # - grace_period=50: Default observations between split attempts
        # - max_depth=None: Default unlimited depth
        # - delta=0.01: Default allowed error
        # - tau=0.05: Default tie-breaking threshold
        # - leaf_prediction="adaptive": Default, dynamically chooses mean/model
        # - model_selector_decay=0.95: Default decay for leaf model selection
        # - min_samples_split=5: Default minimum samples for split
        # - binary_split=False: Default multi-way splits
        # - max_size=500.0: Default max memory in MiB
        return forest.ARFRegressor(
            n_models=10,
            max_features="sqrt",
            aggregation_method="median",
            lambda_value=6,
            metric=metrics.MAE(),
            disable_weighted_vote=True,
            drift_detector=drift.ADWIN(delta=0.002),
            warning_detector=drift.ADWIN(delta=0.01),
            grace_period=50,
            max_depth=None,
            delta=0.01,
            tau=0.05,
            leaf_prediction="adaptive",
            leaf_model=None,
            model_selector_decay=0.95,
            min_samples_split=5,
            binary_split=False,
            max_size=500.0,
            memory_estimate_period=2000000,
            nominal_attributes=None,
            seed=42,
        )
    elif project_name == "E-Commerce Customer Interactions":
        # =================================================================
        # DBSTREAM - Density-Based Stream Clustering
        # For customer behavior clustering with arbitrary shapes
        # =================================================================
        # OLD CONFIGURATION:
        # return cluster.DBSTREAM(
        #     clustering_threshold = 1.0,
        #     fading_factor = 0.01,
        #     cleanup_interval = 2,
        # )

        # CONFIGURATION based on River ML documentation example:
        # Reference: https://riverml.xyz/latest/api/cluster/DBSTREAM/
        #
        # The River documentation provides this exact example configuration:
        # - clustering_threshold=1.5: Micro-cluster radius
        # - fading_factor=0.05: Historical data importance (must be > 0)
        # - cleanup_interval=4: Time between cleanup processes
        # - intersection_factor=0.5: Cluster overlap ratio for connectivity
        # - minimum_weight=1.0: Threshold for non-noisy cluster classification
        return cluster.DBSTREAM(
            clustering_threshold=1.5,
            fading_factor=0.05,
            cleanup_interval=4,
            intersection_factor=0.5,
            minimum_weight=1.0,
        )
    elif project_name == "Sales Forecasting":
        # =================================================================
        # SNARIMAX - Seasonal Non-linear Auto-Regressive Integrated
        # Moving Average with eXogenous inputs
        # For sales forecasting with weekly seasonality
        # =================================================================
        # OLD CONFIGURATION:
        # regressor_snarimax = linear_model.PARegressor(
        #     C = 0.01,
        #     mode = 1)
        # return time_series.SNARIMAX(
        #     p = 2,
        #     d = 1,
        #     q = 1,
        #     m = 7,
        #     sp = 1,
        #     sd = 0,
        #     sq = 1,
        #     regressor = regressor_snarimax
        # )

        # CONFIGURATION based on River ML documentation:
        # Reference: https://riverml.xyz/latest/api/time-series/SNARIMAX/
        # Reference: https://riverml.xyz/latest/api/linear-model/PARegressor/
        #
        # SNARIMAX parameters for weekly sales data:
        # - p=7: Past 7 days of target values (full week)
        # - d=1: First-order differencing for trend removal
        # - q=2: Past error terms for noise handling
        # - m=7: Weekly seasonality period
        # - sp=1: Seasonal autoregressive order
        # - sd=1: Seasonal differencing (recommended for seasonal data)
        # - sq=1: Seasonal moving average order
        #
        # PARegressor parameters (defaults from River docs):
        # - C=1.0: Default regularization strength
        # - mode=1: Default algorithm mode
        # - eps=0.1: Default tolerance parameter
        # - learn_intercept=True: Default bias learning
        regressor_snarimax = linear_model.PARegressor(
            C=1.0,
            mode=1,
            eps=0.1,
            learn_intercept=True,
        )
        return time_series.SNARIMAX(
            p=7,
            d=1,
            q=2,
            m=7,
            sp=1,
            sd=1,
            sq=1,
            regressor=regressor_snarimax,
        )
    else:
        raise ValueError(f"Unknown project: {project_name}")

In [36]:
def _create_default_encoders(project_name):
    """Create default encoders based on project type."""
    if project_name in ["Transaction Fraud Detection", "Estimated Time of Arrival"]:
        return {"ordinal_encoder": CustomOrdinalEncoder()}
    elif project_name == "E-Commerce Customer Interactions":
        return {
            "standard_scaler": preprocessing.StandardScaler(),
            "feature_hasher": preprocessing.FeatureHasher()
        }
    elif project_name == "Sales Forecasting":
        return {
            "one_hot_encoder": preprocessing.OneHotEncoder(),
            "standard_scaler": preprocessing.StandardScaler(),
        }
    else:
        raise ValueError(f"Unknown project: {project_name}")

In [37]:
consumer = create_consumer(PROJECT_NAME, start_offset = -1)

Kafka consumer seeking to offset 0 (continuing from -1)
Kafka consumer created for Transaction Fraud Detection (manual assignment)


In [38]:
# Test Kafka connectivity - hardcoded offset (known valid: 467546-674409)
from kafka import KafkaConsumer, TopicPartition
import json

print(f"Testing connection to {KAFKA_BROKERS}...")

try:
    print("Creating consumer...")
    consumer = KafkaConsumer(
        bootstrap_servers=KAFKA_BROKERS,
        value_deserializer=lambda v: json.loads(v.decode('utf-8')),
        api_version=(3, 7),
    )
    print("Consumer created!")
    
    tp = TopicPartition('transaction_fraud_detection', 0)
    consumer.assign([tp])
    print("Partition assigned")
    
    # Use known valid offset from earlier check
    # Topic range: 467546 to ~676000
    OFFSET = 670000
    print(f"Seeking to offset {OFFSET}...")
    consumer.seek(tp, OFFSET)
    print("Seek complete")
    
    print("Polling...")
    for i in range(5):
        records = consumer.poll(timeout_ms=3000, max_records=5)
        print(f"Poll {i+1}: {len(records)} partitions, {sum(len(m) for m in records.values())} msgs")
        if records:
            for p, msgs in records.items():
                for m in msgs[:2]:
                    print(f"  offset={m.offset} keys={list(m.value.keys())[:3]}")
            print("SUCCESS!")
            break
    
    consumer.close()
except Exception as e:
    import traceback
    traceback.print_exc()

Testing connection to localhost:9092...
Creating consumer...
Consumer created!
Partition assigned
Seeking to offset 670000...
Seek complete
Polling...
Poll 1: 0 partitions, 0 msgs
Poll 2: 0 partitions, 0 msgs
Poll 3: 0 partitions, 0 msgs
Poll 4: 0 partitions, 0 msgs
Poll 5: 0 partitions, 0 msgs


In [39]:
binary_classification_metrics_dict = {
    "Accuracy": metrics.Accuracy(),
}

In [40]:
# Get only 1000 messages from consumer
MAX_MESSAGES = 1000
message_count = 0
messages = []

for message in consumer:
    messages.append(message.value)
    message_count += 1
    if message_count % 100 == 0:
        print(f"Processed {message_count} messages...")
    if message_count >= MAX_MESSAGES:
        break

print(f"\nTotal messages collected: {len(messages)}")
if messages:
    print(f"First message sample: {messages[0]}")


Total messages collected: 0
