# Main Program

This notebook orchestrates a data pipeline to load datasets, engineer features, train a model to predict tips, and merge predictions with a template. The pipeline is managed using Prefect for workflow orchestration.

In [None]:
import numpy as np
import pandas as pd
import features
from prefect import task, flow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import logging


  merged['order_count'] = merged.groupby('department').cumcount().astype('int32')
  merged['tip_cumsum_before'] = merged.groupby('department')['tip'].cumsum() - merged['tip']
  merged['order_count'] = merged.groupby('aisle').cumcount().astype('int32')
  merged['tip_cumsum_before'] = merged.groupby('aisle')['tip'].cumsum() - merged['tip']


In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@task
def load_data(orders_path: str, products_path: str, tips_path: str) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Load and preprocess the datasets.

    Args:
        orders_path (str): Path to orders dataset.
        products_path (str): Path to order products dataset.
        tips_path (str): Path to tips dataset.

    Returns:
        tuple: (orders, order_products_denormalized, tips_public) DataFrames.

    Raises:
        FileNotFoundError: If any file path is invalid.
    """
    logger.info("Loading datasets...")
    try:
        orders = pd.read_parquet(orders_path)
        order_products_denormalized = pd.read_csv(products_path, dtype={'order_id': 'int64'})
        tips_public = pd.read_csv(tips_path, dtype={'order_id': 'int64'}).drop(columns=["Unnamed: 0"], errors='ignore')

        # Optimize memory usage
        order_products_denormalized['department'] = order_products_denormalized['department'].astype('category')
        order_products_denormalized['aisle'] = order_products_denormalized['aisle'].astype('category')

        # Ensure consistent data types
        orders['order_id'] = orders['order_id'].astype('int64')
        orders['user_id'] = orders['user_id'].astype('int64')
        order_products_denormalized['product_id'] = order_products_denormalized['product_id'].astype('int64')
        tips_public['order_id'] = tips_public['order_id'].astype('int64')

        logger.info(f"Datasets loaded: orders={orders.shape}, products={order_products_denormalized.shape}, tips={tips_public.shape}")
        return orders, order_products_denormalized, tips_public
    except FileNotFoundError as e:
        logger.error(f"File not found: {e}")
        raise

@task
def engineer_features(orders: pd.DataFrame, order_products_denormalized: pd.DataFrame, tips_public: pd.DataFrame) -> pd.DataFrame:
    """Generate engineered features using the features module.

    Args:
        orders (pd.DataFrame): Orders dataset.
        order_products_denormalized (pd.DataFrame): Order products dataset.
        tips_public (pd.DataFrame): Tips dataset.

    Returns:
        pd.DataFrame: DataFrame with all engineered features.

    Raises:
        ValueError: If feature engineering fails.
    """
    logger.info("Engineering features...")
    try:
        all_features_df = features.combine_all_features(orders, order_products_denormalized, tips_public)
        logger.info(f"Features engineered: {all_features_df.shape}, columns={all_features_df.columns.tolist()}")
        return all_features_df
    except Exception as e:
        logger.error(f"Feature engineering failed: {e}")
        raise

@task
def save_features(all_features_df: pd.DataFrame, output_path: str) -> None:
    """Save the feature DataFrame to a CSV file.

    Args:
        all_features_df (pd.DataFrame): DataFrame with features.
        output_path (str): Path to save the CSV file.

    Raises:
        IOError: If saving fails.
    """
    logger.info(f"Saving features to {output_path}...")
    try:
        all_features_df.to_csv(output_path, index=False)
        logger.info(f"Features saved successfully to {output_path}")
    except IOError as e:
        logger.error(f"Failed to save features: {e}")
        raise

@task
def split_data(all_features_df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Split data into training (non-NaN tips) and prediction (NaN tips) sets.

    Args:
        all_features_df (pd.DataFrame): DataFrame with all features.

    Returns:
        tuple: (train_df, predict_df) DataFrames.
    """
    logger.info("Splitting data...")
    train_df = all_features_df[~all_features_df['tip'].isna()]
    predict_df = all_features_df[all_features_df['tip'].isna()]
    logger.info(f"Train data: {train_df.shape}, Predict data: {predict_df.shape}")
    return train_df, predict_df

@task
def train_model(train_df: pd.DataFrame, feature_columns: list[str]) -> tuple[RandomForestClassifier, float, str]:
    """Train a Random Forest Classifier on the training data.

    Args:
        train_df (pd.DataFrame): Training DataFrame.
        feature_columns (list[str]): List of feature column names.

    Returns:
        tuple: (trained model, accuracy, classification report).

    Raises:
        ValueError: If required columns are missing or data is empty.
    """
    logger.info("Training model...")
    try:
        if train_df.empty:
            raise ValueError("Training DataFrame is empty.")
        missing_cols = [col for col in feature_columns if col not in train_df.columns]
        if missing_cols:
            raise ValueError(f"Missing feature columns: {missing_cols}")

        X = train_df[feature_columns]
        y = train_df['tip'].astype('int')  # Binary classification

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
        model.fit(X_train, y_train)

        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred)

        logger.info(f"Model trained. Accuracy: {accuracy:.4f}")
        logger.info(f"Classification Report:\n{report}")
        return model, accuracy, report
    except Exception as e:
        logger.error(f"Model training failed: {e}")
        raise

@task
def predict_tips(model: RandomForestClassifier, predict_df: pd.DataFrame, feature_columns: list[str]) -> pd.DataFrame:
    """Predict tips for rows with missing tip values.

    Args:
        model (RandomForestClassifier): Trained model.
        predict_df (pd.DataFrame): DataFrame with NaN tips.
        feature_columns (list[str]): List of feature column names.

    Returns:
        pd.DataFrame: DataFrame with order_id and predicted tip.

    Raises:
        ValueError: If required columns are missing.
    """
    logger.info("Predicting tips...")
    try:
        if predict_df.empty:
            logger.warning("No rows with missing tips to predict.")
            return pd.DataFrame(columns=['order_id', 'tip'])

        missing_cols = [col for col in feature_columns if col not in predict_df.columns]
        if missing_cols:
            raise ValueError(f"Missing feature columns in predict_df: {missing_cols}")

        X_predict = predict_df[feature_columns]
        predict_df = predict_df.copy()
        predict_df['tip'] = model.predict(X_predict).astype('int')
        result_df = predict_df[['order_id', 'tip']].copy()

        logger.info(f"Predictions made: {result_df.shape}")
        return result_df
    except Exception as e:
        logger.error(f"Prediction failed: {e}")
        raise

@task
def merge_with_template(predictions: pd.DataFrame, template_path: str) -> pd.DataFrame:
    """Merge predictions with the template CSV.

    Args:
        predictions (pd.DataFrame): DataFrame with order_id and tip.
        template_path (str): Path to the template CSV.

    Returns:
        pd.DataFrame: Merged DataFrame with Unnamed: 0, order_id, tip.

    Raises:
        FileNotFoundError: If template file is missing.
        ValueError: If merge fails due to missing columns.
    """
    logger.info("Merging predictions with template...")
    try:
        template = pd.read_csv(template_path)
        if 'Unnamed: 0' not in template.columns or 'order_id' not in template.columns:
            raise ValueError("Template missing required columns: 'Unnamed: 0', 'order_id'")

        merged_df = template[['Unnamed: 0', 'order_id']].merge(predictions, on='order_id', how='left')
        logger.info(f"Merged template: {merged_df.shape}")
        return merged_df
    except FileNotFoundError as e:
        logger.error(f"Template file not found: {e}")
        raise
    except Exception as e:
        logger.error(f"Merge failed: {e}")
        raise

@task
def save_predictions(predictions: pd.DataFrame, output_path: str) -> None:
    """Save the predictions to a CSV file.

    Args:
        predictions (pd.DataFrame): DataFrame with predictions.
        output_path (str): Path to save the CSV file.

    Raises:
        IOError: If saving fails.
    """
    logger.info(f"Saving predictions to {output_path}...")
    try:
        predictions.to_csv(output_path, index=False)
        logger.info(f"Predictions saved successfully to {output_path}")
    except IOError as e:
        logger.error(f"Failed to save predictions: {e}")
        raise

@flow(name="Tip-Prediction-Pipeline", log_prints=True)
def tip_prediction_pipeline():
    """Orchestrate the tip prediction pipeline using Prefect.

    The pipeline performs the following steps:
    1. Load datasets.
    2. Engineer features.
    3. Save features to CSV.
    4. Split data into training and prediction sets.
    5. Train a Random Forest model.
    6. Predict tips for NaN rows.
    7. Merge predictions with template.
    8. Save final predictions.
    """
    # Define file paths
    orders_path = "orders.parquet"
    products_path = "order_products_denormalized.csv"
    tips_path = "tips_public.csv"
    features_output_path = "all_features.csv"
    predictions_output_path = "predicted_tips.csv"
    template_path = "tip_testdaten_template_V2.csv"

    # Define feature columns
    feature_columns = [
        'order_has_alcohol', 'order_product_count', 'order_unique_dept_count',
        'order_unique_aisle_count', 'order_unique_dept_ratio', 'order_unique_aisle_ratio',
        'order_dept_tip_rate', 'order_aisle_tip_rate', 'order_placed_hour',
        'order_placed_dow', 'order_is_weekend', 'order_placed_hour_sin',
        'order_placed_hour_cos', 'order_placed_season_sin', 'order_placed_season_cos',
        'order_time_since_last_hours', 'user_alcohol_purchase_count',
        'user_total_purchase_count', 'user_unique_product_count',
        'user_unique_to_total_ratio', 'user_frequent_purchase_hour',
        'user_frequent_purchase_dow', 'user_avg_order_interval_hours',
        'user_frequent_hour_sin', 'user_frequent_hour_cos',
        'user_frequent_season_sin', 'user_frequent_season_cos',
        'user_total_product_purchase_count', 'user_product_tip_prob'
    ]

    # Run pipeline
    logger.info("Starting tip prediction pipeline...")
    orders, order_products_denormalized, tips_public = load_data(orders_path, products_path, tips_path)
    all_features_df = engineer_features(orders, order_products_denormalized, tips_public)
    save_features(all_features_df, features_output_path)
    train_df, predict_df = split_data(all_features_df)
    model, accuracy, report = train_model(train_df, feature_columns)
    predictions = predict_tips(model, predict_df, feature_columns)
    merged_predictions = merge_with_template(predictions, template_path)
    save_predictions(merged_predictions, predictions_output_path)

if __name__ == "__main__":
    tip_prediction_pipeline()

## Feature Overview

The following table lists all features engineered in this pipeline, aggregated to the `order_id` level.

| **Feature Name** | **Level** | **Output Columns** | **Data Type** | **Description** |
|------------------|-----------|--------------------|---------------|-----------------|
| `user_alcohol_purchase_count` | User | `[user_id, user_alcohol_purchase_count]` | Integer | Counts the total number of alcohol products purchased by each user across all orders, merged via user_id. |
| `user_total_purchase_count` | User | `[user_id, user_total_purchase_count]` | Integer | Counts the total number of products purchased by each user across all orders, merged via user_id. |
| `user_unique_product_count` | User | `[user_id, user_unique_product_count]` | Integer | Counts the number of unique products purchased by each user, merged via user_id. |
| `user_unique_to_total_ratio` | User | `[user_id, user_unique_to_total_ratio]` | Float | Calculates the ratio of unique products to total products purchased by each user, merged via user_id. |
| `user_frequent_purchase_hour` | User | `[user_id, user_frequent_purchase_hour]` | Integer (0–23) | Identifies the hour of the day when the user places the most orders, defaulting to 12 (noon) if missing, merged via user_id. |
| `user_frequent_purchase_dow` | User | `[user_id, user_frequent_purchase_dow]` | Integer (0–6) | Identifies the day of the week (0=Monday, 6=Sunday) when the user places the most orders, defaulting to 0 (Monday), merged via user_id. |
| `user_avg_order_interval_hours` | User | `[user_id, user_avg_order_interval_hours]` | Float | Calculates the average time (in hours) between consecutive orders for each user, using the dataset median for users with one order, merged via user_id. |
| `user_frequent_hour_sin`, `user_frequent_hour_cos` | User | `[user_id, user_frequent_hour_sin, user_frequent_hour_cos]` | Float (-1 to 1) | Applies sine-cosine transformation to the most frequent purchase hour to capture its cyclical nature, merged via user_id. |
| `user_frequent_season_sin`, `user_frequent_season_cos` | User | `[user_id, user_frequent_season_sin, user_frequent_season_cos]` | Float (-1 to 1) | Applies sine-cosine transformation to the most frequent purchase month to capture seasonal cyclicality, defaulting to January, merged via user_id. |
| `order_has_alcohol` | Order | `[order_id, order_has_alcohol]` | Integer (0 or 1) | Flags whether an order contains any alcohol products (1 if yes, 0 if no). |
| `order_product_count` | Order | `[order_id, order_product_count]` | Integer | Counts the total number of items (products) in each order. |
| `order_unique_dept_count` | Order | `[order_id, order_unique_dept_count]` | Integer | Counts the number of unique departments in each order. |
| `order_unique_aisle_count` | Order | `[order_id, order_unique_aisle_count]` | Integer | Counts the number of unique aisles in each order. |
| `order_unique_dept_ratio` | Order | `[order_id, order_unique_dept_ratio]` | Float | Calculates the ratio of unique departments to total items in each order. |
| `order_unique_aisle_ratio` | Order | `[order_id, order_unique_aisle_ratio]` | Float | Calculates the ratio of unique aisles to total items in each order. |
| `order_dept_tip_rate` | Order | `[order_id, order_dept_tip_rate]` | Float (0 to 1) | Computes the average tip rate for the departments in an order based on prior orders, defaulting to 0.500111 for no history. |
| `order_aisle_tip_rate` | Order | `[order_id, order_aisle_tip_rate]` | Float (0 to 1) | Computes the average tip rate for the aisles in an order based on prior orders, defaulting to 0.500111 for no history. |
| `order_placed_hour` | Order | `[order_id, order_placed_hour]` | Integer (0–23) | Extracts the hour of the day when the order was placed. |
| `order_placed_dow` | Order | `[order_id, order_placed_dow]` | Integer (0–6) | Extracts the day of the week (0=Monday, 6=Sunday) when the order was placed. |
| `order_is_weekend` | Order | `[order_id, order_is_weekend]` | Integer (0 or 1) | Flags whether the order was placed on a weekend (Saturday or Sunday). |
| `order_placed_hour_sin`, `order_placed_hour_cos` | Order | `[order_id, order_placed_hour_sin, order_placed_hour_cos]` | Float (-1 to 1) | Applies sine-cosine transformation to the order’s hour to capture its cyclical nature. |
| `order_placed_season_sin`, `order_placed_season_cos` | Order | `[order_id, order_placed_season_sin, order_placed_season_cos]` | Float (-1 to 1) | Applies sine-cosine transformation to the order’s month to capture seasonal cyclicality. |
| `order_time_since_last_hours` | Order | `[order_id, order_time_since_last_hours]` | Float | Calculates the time (in hours) since the user’s previous order, using the dataset median for first orders. |
| `user_total_product_purchase_count` | User | `[user_id, user_total_product_purchase_count]` | Integer | Total count of products purchased by each user, aggregated from user-product level, merged via user_id. |
| `user_product_tip_prob` | Order | `[order_id, user_product_tip_prob]` | Float (0 to 1) | Average tip probability for user-product pairs in an order, aggregated to order_id, defaulting to 0.500111 for no history. |
