## Import lib

In [1]:
import os
import math
from typing import Optional, List, Tuple, Dict, Any

import numpy as np
import pandas as pd
import torch

from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score,
    mean_squared_error,
    mean_absolute_error,
    r2_score,
)

# Choose device for tensors
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

# For reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)


Using device: cpu


<torch._C.Generator at 0x7c15b05c4f70>

## Custom impurity functions + regression metrics

In [5]:
# Compute Gini impurity for classification target y.
def gini_impurity(y: torch.Tensor) -> float:
    # If node is empty, impurity is zero by definition
    if y.numel() == 0:
        return 0.0

    # Get unique classes and their counts
    classes, counts = torch.unique(y, return_counts=True)
    probs = counts.float() / y.numel()

    gini = 1.0 - torch.sum(probs ** 2)
    return float(gini.item())

# Compute entropy impurity for classification target y.
def entropy_impurity(y: torch.Tensor, eps: float = 1e-12) -> float:
    if y.numel() == 0:
        return 0.0

    classes, counts = torch.unique(y, return_counts=True)
    probs = counts.float() / y.numel()
    probs = torch.clamp(probs, min=eps)

    ent = -torch.sum(probs * torch.log2(probs))
    return float(ent.item())

# Compute MSE-based impurity for regression node.
def mse_impurity(y: torch.Tensor) -> float:
    if y.numel() == 0:
        return 0.0

    y = y.float()
    mean_y = torch.mean(y)
    mse = torch.mean((y - mean_y) ** 2)
    return float(mse.item())


In [6]:
def regression_metrics(y_true, y_pred):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    mse = np.mean((y_true - y_pred) ** 2)
    mae = np.mean(np.abs(y_true - y_pred))
    rmse = np.sqrt(mse)

    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    r2 = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0

    return {
        "MAE": float(mae),
        "MSE": float(mse),
        "RMSE": float(rmse),
        "R2": float(r2),
    }


### Test functions

In [7]:
# Quick test for impurity functions
y_cls = torch.tensor([0, 0, 1, 1, 1])
y_reg = torch.tensor([1.0, 2.0, 3.0])

print("Gini:", gini_impurity(y_cls))
print("Entropy:", entropy_impurity(y_cls))
print("MSE:", mse_impurity(y_reg))


Gini: 0.47999995946884155
Entropy: 0.9709506034851074
MSE: 0.6666666865348816


## Decision tree:

In [13]:
class DecisionTree:
    def __init__(
        self,
        classification: bool = True,
        max_depth: int = 10,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        criterion: str = "gini",   # or "entropy" or "mse"
        verbose: bool = False,     # debug flag
        max_features_split: int | None = None,   # limit features per node
      max_thresholds: int = 32,                # limit thresholds per feature
    ):
        self.classification = classification
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.criterion = criterion
        self.tree_ = None
        self.device = DEVICE
        self.n_features_ = None

        # Debug-related fields
        self.verbose = verbose       # if True, print debug info
        self._node_count = 0         # count number of nodes in tree

        self.max_features_split = max_features_split
        self.max_thresholds = max_thresholds

    # Impurity dispatch
    def _impurity(self, y: torch.Tensor) -> float:
        if self.classification:
            if self.criterion == "gini":
                return gini_impurity(y)
            elif self.criterion == "entropy":
                return entropy_impurity(y)
            else:
                raise ValueError("Unknown criterion for classification.")
        else:
            # regression
            return mse_impurity(y)

    # Stopping condition: check if node is pure
    def _is_pure(self, y: torch.Tensor) -> bool:
        if self.classification:
            return torch.unique(y).numel() == 1
        else:
            return torch.var(y.float()) < 1e-8

    # Leaf value (prediction stored in leaf)
    def _leaf_value(self, y: torch.Tensor):
        if self.classification:
            classes, counts = torch.unique(y, return_counts=True)
            idx = torch.argmax(counts)
            return int(classes[idx].item())
        else:
            return float(torch.mean(y).item())

    # Find best split across all features & thresholds
    def _best_split(self, X: torch.Tensor, y: torch.Tensor):
        num_samples, num_features = X.shape
        if num_samples < self.min_samples_split:
          return None, None, None, None

        parent_impurity = self._impurity(y)

        best_gain = 0.0
        best_feat = None
        best_thresh = None
        best_left_mask = None
        best_right_mask = None

        # Decide which features to try at this node
        if self.max_features_split is not None and self.max_features_split < num_features:
            # random subset of features
            feat_indices = torch.randperm(num_features, device=X.device)[: self.max_features_split]
        else:
            feat_indices = torch.arange(num_features, device=X.device)

        for feat_idx in feat_indices:
            feature_values = X[:, feat_idx]

            thresholds = torch.unique(feature_values)

            # Limit number of thresholds to test (for speed)
            if thresholds.numel() > self.max_thresholds:
                # random sample of thresholds
                perm = torch.randperm(thresholds.numel(), device=X.device)[: self.max_thresholds]
                thresholds = thresholds[perm]

            for t in thresholds:
                left_mask = feature_values <= t
                right_mask = ~left_mask

                if left_mask.sum().item() < self.min_samples_leaf:
                    continue
                if right_mask.sum().item() < self.min_samples_leaf:
                    continue

                y_left = y[left_mask]
                y_right = y[right_mask]

                n_left = y_left.numel()
                n_right = y_right.numel()

                impur_left = self._impurity(y_left)
                impur_right = self._impurity(y_right)

                weighted_imp = (
                    n_left / num_samples * impur_left
                    + n_right / num_samples * impur_right
                )

                gain = parent_impurity - weighted_imp

                if gain > best_gain:
                    best_gain = gain
                    best_feat = int(feat_idx.item())  # feat_idx is tensor
                    best_thresh = float(t.item())
                    best_left_mask = left_mask
                    best_right_mask = right_mask

            return best_feat, best_thresh, best_left_mask, best_right_mask


    # Recursively build the tree
    def _build_tree(self, X: torch.Tensor, y: torch.Tensor, depth: int):
        num_samples = X.shape[0]

        # Count node creation for debug
        self._node_count += 1

        # stopping conditions
        if (
            depth >= self.max_depth
            or num_samples < self.min_samples_split
            or self._is_pure(y)
        ):
            return {
                "is_leaf": True,
                "value": self._leaf_value(y),
            }

        feat_idx, thresh, left_mask, right_mask = self._best_split(X, y)

        # if no valid split
        if feat_idx is None:
            return {
                "is_leaf": True,
                "value": self._leaf_value(y),
            }

        X_left, y_left = X[left_mask], y[left_mask]
        X_right, y_right = X[right_mask], y[right_mask]

        return {
            "is_leaf": False,
            "feature": feat_idx,
            "threshold": thresh,
            "left": self._build_tree(X_left, y_left, depth + 1),
            "right": self._build_tree(X_right, y_right, depth + 1),
        }

    # Public API: fit model
    def fit(self, X: torch.Tensor, y: torch.Tensor):
        X = X.to(self.device)
        y = y.to(self.device)

        self.n_features_ = X.shape[1]
        self._node_count = 0  # reset counter

        if self.verbose:
            print(f"[DT] Start building tree: n_samples={X.shape[0]}, "
                  f"n_features={self.n_features_}")

        self.tree_ = self._build_tree(X, y, depth=0)

        if self.verbose:
            print(f"[DT] Finished building tree, total nodes: {self._node_count}")

        return self

    # Predict one sample
    def _predict_one(self, x: torch.Tensor):
        """Traverse the tree for a single sample."""
        node = self.tree_
        while not node["is_leaf"]:
            feat = node["feature"]
            thresh = node["threshold"]
            if x[feat] <= thresh:
                node = node["left"]
            else:
                node = node["right"]
        return node["value"]

    # Predict batch
    def predict(self, X: torch.Tensor):
        """
        Predict for a batch X: [n_samples, n_features].
        Returns numpy array of predictions.
        """
        X = X.to(self.device)
        preds = []
        for i in range(X.shape[0]):
            preds.append(self._predict_one(X[i]))
        return np.array(preds)


## Random Forest

In [31]:
class RandomForest:
    def __init__(
        self,
        classification: bool = True,
        n_trees: int = 10,
        max_depth: int = 10,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        max_features: Optional[int] = None,  # number of features per tree
        bootstrap: bool = True,
        random_state: int = 42,
        verbose: bool = False,              # debug flag
    ):
        self.classification = classification
        self.n_trees = n_trees
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_features = max_features
        self.bootstrap = bootstrap
        self.random_state = random_state
        self.verbose = verbose

        self.trees: List[DecisionTree] = []
        self.features_per_tree: List[torch.Tensor] = []  # store feature indices
        self.device = DEVICE

        # Set seeds for reproducibility
        np.random.seed(self.random_state)
        torch.manual_seed(self.random_state)

    # Internal: create bootstrap sample
    def _bootstrap_sample(
        self, X: torch.Tensor, y: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        n_samples = X.shape[0]

        if self.bootstrap:
            # Sample with replacement
            indices = torch.randint(
                low=0,
                high=n_samples,
                size=(n_samples,),
                device=X.device,
            )
        else:
            # Use all samples without resampling
            indices = torch.arange(n_samples, device=X.device)

        return X[indices], y[indices]

    # Fit the forest
    def fit(self, X: torch.Tensor, y: torch.Tensor):
        X = X.to(self.device)
        y = y.to(self.device)

        n_samples, n_features = X.shape

        # Decide how many features each tree will see
        if self.max_features is None:
            max_feats = n_features
        else:
            max_feats = min(self.max_features, n_features)

        self.trees = []
        self.features_per_tree = []

        if self.verbose:
            print(f"[RF] Start training forest with {self.n_trees} trees, "
                  f"{n_samples} samples, {n_features} features, "
                  f"max_features per tree = {max_feats}")

        for i in range(self.n_trees):
            if self.verbose:
                print(f"[RF] Training tree {i+1}/{self.n_trees} ...")

            # Sample feature indices for this tree
            feat_indices = torch.randperm(n_features, device=X.device)[:max_feats]
            self.features_per_tree.append(feat_indices)

            # Bootstrap samples for this tree
            X_boot, y_boot = self._bootstrap_sample(X[:, feat_indices], y)

            # Create and train a DecisionTree
            tree = DecisionTree(
                classification=self.classification,
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                min_samples_leaf=self.min_samples_leaf,
                criterion="entropy" if self.classification else "mse",
                verbose=False,
                max_features_split=128,
                max_thresholds=48,
            )

            tree.fit(X_boot, y_boot)
            self.trees.append(tree)

            if self.verbose:
                print(f"[RF] Done tree {i+1}/{self.n_trees}")

        if self.verbose:
            print("[RF] Finished training all trees.")

        return self

    # Predict for batch
    def predict(self, X: torch.Tensor):
        X = X.to(self.device)
        n_samples = X.shape[0]

        if len(self.trees) == 0:
            raise RuntimeError("RandomForest has not been fitted yet.")

        # Collect predictions of all trees
        all_preds = []

        for tree, feat_idx in zip(self.trees, self.features_per_tree):
            X_sub = X[:, feat_idx]              # same feature subset as training
            preds = tree.predict(X_sub)         # numpy array
            all_preds.append(preds)

        # Shape: [n_trees, n_samples]
        all_preds = np.stack(all_preds, axis=0)

        if self.classification:
            # Majority vote along axis=0
            final_preds = []
            for i in range(n_samples):
                values, counts = np.unique(all_preds[:, i], return_counts=True)
                final_class = values[np.argmax(counts)]
                final_preds.append(final_class)
            return np.array(final_preds)
        else:
            # Regression: average predictions
            return np.mean(all_preds, axis=0)

## Gradient Boosting

In [44]:
class GradientBoostingRegressor:
    def __init__(
        self,
        n_estimators: int = 50,
        learning_rate: float = 0.1,
        max_depth: int = 3,
        min_samples_split: int = 2,
        min_samples_leaf: int = 1,
        max_features_split: Optional[int] = None,  # pass to inner DT
        max_thresholds: int = 32,                 # pass to inner DT
        random_state: int = 42,
        verbose: bool = False,                    # debug flag
    ):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_features_split = max_features_split
        self.max_thresholds = max_thresholds
        self.random_state = random_state
        self.verbose = verbose

        self.device = DEVICE
        self.trees: List[DecisionTree] = []
        self.init_value_: float = 0.0  # F0(x) = mean(y)

        np.random.seed(self.random_state)
        torch.manual_seed(self.random_state)

    # Fit the boosting model
    def fit(self, X: torch.Tensor, y: torch.Tensor):
        X = X.to(self.device)
        y = y.to(self.device).float()

        n_samples = X.shape[0]

        # Initial prediction: mean of y
        self.init_value_ = float(torch.mean(y).item())

        # Current prediction F(x) for all training samples
        F_current = torch.full((n_samples,), self.init_value_, device=self.device)

        self.trees = []

        if self.verbose:
            print(f"[GB] Start training: n_estimators={self.n_estimators}, "
                  f"max_depth={self.max_depth}, lr={self.learning_rate}")

        for m in range(self.n_estimators):
            # Compute residuals: r = y - F_{m-1}(x)
            residuals = y - F_current

            # Train a shallow regression tree to predict residuals
            tree = DecisionTree(
                classification=False,
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                min_samples_leaf=self.min_samples_leaf,
                criterion="mse",
                verbose=False,
                max_features_split=self.max_features_split,
                max_thresholds=self.max_thresholds,
            )
            tree.fit(X, residuals)

            self.trees.append(tree)

            # Update F_current = F_current + lr * tree(X)
            pred_residuals_np = tree.predict(X)  # numpy
            pred_residuals = torch.from_numpy(pred_residuals_np).float().to(self.device)

            F_current = F_current + self.learning_rate * pred_residuals

            if self.verbose:
                mse = torch.mean((y - F_current) ** 2).item()
                print(f"[GB] Stage {m+1}/{self.n_estimators}, train MSE={mse:.4f}")

        if self.verbose:
            print("[GB] Finished training all stages.")

        return self

    # Predict for batch
    def predict(self, X: torch.Tensor):
        X = X.to(self.device)
        n_samples = X.shape[0]

        # Start from initial prediction
        F = torch.full((n_samples,), self.init_value_, device=self.device)

        # Add contribution of each tree
        for tree in self.trees:
            pred_np = tree.predict(X)  # numpy
            pred = torch.from_numpy(pred_np).float().to(self.device)
            F = F + self.learning_rate * pred

        return F.cpu().numpy()


In [45]:
#       One-vs-Rest Gradient Boosting for Classification

class GradientBoostingOVRClassifier:
    """
    One-vs-Rest multiclass classifier based on GradientBoostingRegressor.

    For each class k, trains a separate GB regressor on targets:
        y_k = 1 if (y == k) else 0
    Prediction: choose class with highest regressor output.
    """

    def __init__(
        self,
        n_classes: int,
        n_estimators: int = 30,
        learning_rate: float = 0.1,
        max_depth: int = 2,
        min_samples_split: int = 10,
        min_samples_leaf: int = 5,
        max_features_split: Optional[int] = None,
        max_thresholds: int = 32,
        random_state: int = 42,
        verbose: bool = False,
    ):
        self.n_classes = n_classes
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.min_samples_leaf = min_samples_leaf
        self.max_features_split = max_features_split
        self.max_thresholds = max_thresholds
        self.random_state = random_state
        self.verbose = verbose

        self.device = DEVICE
        self.models: List[GradientBoostingRegressor] = []

    def fit(self, X: torch.Tensor, y: torch.Tensor):
        """
        X: [n_samples, n_features], torch.float32
        y: [n_samples], torch.long with values in [0..n_classes-1]
        """
        X = X.to(self.device)
        y = y.to(self.device)

        self.models = []

        for k in range(self.n_classes):
            if self.verbose:
                print(f"[GB-OVR] Training class {k}/{self.n_classes-1} ...")

            # Build binary target: 1 for class k, 0 otherwise
            y_k = (y == k).float()

            gb_reg = GradientBoostingRegressor(
                n_estimators=self.n_estimators,
                learning_rate=self.learning_rate,
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                min_samples_leaf=self.min_samples_leaf,
                max_features_split=self.max_features_split,
                max_thresholds=self.max_thresholds,
                random_state=self.random_state + k,  # different seed per class
                verbose=self.verbose,
            )

            gb_reg.fit(X, y_k)
            self.models.append(gb_reg)

        if self.verbose:
            print("[GB-OVR] Finished training all class models.")

        return self

    def predict(self, X: torch.Tensor):
        """
        X: [n_samples, n_features]
        Returns: numpy array of shape [n_samples], predicted class indices.
        """
        X = X.to(self.device)
        n_samples = X.shape[0]

        if len(self.models) == 0:
            raise RuntimeError("GB-OVR classifier has not been fitted yet.")

        # Collect predictions from each class-regressor
        all_scores = []

        for k, model in enumerate(self.models):
            scores_k = model.predict(X)  # numpy [n_samples]
            all_scores.append(scores_k)

        # Shape: [n_classes, n_samples]
        all_scores = np.stack(all_scores, axis=0)

        # Choose class with highest score
        pred_classes = np.argmax(all_scores, axis=0)
        return pred_classes


## Work with dataset London-Bike

### Preparing steps

Load dataset from file CSV (London Bike)

In [12]:
csv_path = "/content/london_merged.csv"  # adjust if needed

df = pd.read_csv(csv_path)
df.head()

# timestamp - time
# cnt - count of a new bike shares
# t1 - real temperature in C
# t2 - temperature in C "feels like"
# hum - humidity in percentage
# wind_speed - wind speed in km/h
# weather_code - category of the weather
# is_holiday - boolean field - 1 holiday / 0 non holiday
# is_weekend -  boolean field - 1 if the day is weekend
# season - 0-spring ; 1-summer; 2-fall; 3-winter

Unnamed: 0,timestamp,cnt,t1,t2,hum,wind_speed,weather_code,is_holiday,is_weekend,season
0,2015-01-04 00:00:00,182,3.0,2.0,93.0,6.0,3.0,0.0,1.0,3.0
1,2015-01-04 01:00:00,138,3.0,2.5,93.0,5.0,1.0,0.0,1.0,3.0
2,2015-01-04 02:00:00,134,2.5,2.5,96.5,0.0,1.0,0.0,1.0,3.0
3,2015-01-04 03:00:00,72,2.0,2.0,100.0,0.0,1.0,0.0,1.0,3.0
4,2015-01-04 04:00:00,47,2.0,0.0,93.0,6.5,1.0,0.0,1.0,3.0


choose features & target; preprocessing data
* target: cnt

In [13]:
# Choose target column (bike count)
target_col = "cnt"

# Choose a set of numeric and categorical features
feature_cols = [
    "t1",           # real temperature
    "t2",           # feels-like temperature
    "hum",          # humidity
    "wind_speed",   # wind speed
    "weather_code", # weather condition code
    "is_holiday",
    "is_weekend",
    "season",
]

# Filter dataframe (drop rows with missing values just in case)
df_model = df[feature_cols + [target_col]].dropna()

X = df_model[feature_cols].values
y = df_model[target_col].values

print("X shape:", X.shape)
print("y shape:", y.shape)


X shape: (17414, 8)
y shape: (17414,)


split train & test samples

In [14]:
#  Train/test split (NumPy level)
X_train_np, X_test_np, y_train_np, y_test_np = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
)

print("Train size:", X_train_np.shape[0])
print("Test size :", X_test_np.shape[0])
#  Convert to torch tensors

# Features as float32
X_train = torch.from_numpy(X_train_np).float().to(DEVICE)
X_test  = torch.from_numpy(X_test_np).float().to(DEVICE)

# Regression targets as float32
y_train = torch.from_numpy(y_train_np).float().to(DEVICE)
y_test  = torch.from_numpy(y_test_np).float().to(DEVICE)

Train size: 13931
Test size : 3483


### Train with DecisionTree

In [16]:
dt_reg = DecisionTree(
    classification=False,
    max_depth=6,           # you can tune this
    min_samples_split=10,  # to avoid overfitting
    min_samples_leaf=5,
    criterion="mse",
)

dt_reg.fit(X_train, y_train)

# Predictions on train and test (returned as NumPy arrays)
y_train_pred_dt = dt_reg.predict(X_train)
y_test_pred_dt  = dt_reg.predict(X_test)

### Train with RandomForest

In [17]:
rf_reg = RandomForest(
    classification=False,
    n_trees=10,            # can be increased if not too slow
    max_depth=6,
    min_samples_split=10,
    min_samples_leaf=5,
    max_features=None,     # use all features
    bootstrap=True,
    random_state=42,
)

rf_reg.fit(X_train, y_train)

y_train_pred_rf = rf_reg.predict(X_train)
y_test_pred_rf  = rf_reg.predict(X_test)

### Train with GradientBoosting

In [18]:
gb_reg = GradientBoostingRegressor(
    n_estimators=50,       # number of boosting stages
    learning_rate=0.1,
    max_depth=3,           # shallow trees as "stumps"
    min_samples_split=10,
    min_samples_leaf=5,
    random_state=42,
)

gb_reg.fit(X_train, y_train)

y_train_pred_gb = gb_reg.predict(X_train)
y_test_pred_gb  = gb_reg.predict(X_test)

### Metric comparison  

In [19]:
print("=== London Bike Regression: Comparison ===")

print("=== DecisionTree Regression metrics ===")
dt_train_metrics = regression_metrics(y_train_np, y_train_pred_dt)
dt_test_metrics  = regression_metrics(y_test_np,  y_test_pred_dt)
print("Train:", dt_train_metrics)
print("Test :", dt_test_metrics)
print()

print("=== RandomForest Regression metrics ===")
rf_train_metrics = regression_metrics(y_train_np, y_train_pred_rf)
rf_test_metrics  = regression_metrics(y_test_np,  y_test_pred_rf)
print("Train:", rf_train_metrics)
print("Test :", rf_test_metrics)
print()

print("=== GradientBoosting Regression metrics ===")
gb_train_metrics = regression_metrics(y_train_np, y_train_pred_gb)
gb_test_metrics  = regression_metrics(y_test_np,  y_test_pred_gb)
print("Train:", gb_train_metrics)
print("Test :", gb_test_metrics)

=== London Bike Regression: Comparison ===
=== DecisionTree Regression metrics ===
Train: {'MAE': 642.2857025477268, 'MSE': 781641.5865371141, 'RMSE': 884.1049635292826, 'R2': 0.33317057474508704}
Test : {'MAE': 667.2574478924839, 'MSE': 848428.6816567325, 'RMSE': 921.101884514809, 'R2': 0.29193936401733833}

=== RandomForest Regression metrics ===
Train: {'MAE': 633.4912978702639, 'MSE': 760149.9432867651, 'RMSE': 871.8657828397471, 'R2': 0.3515053977167065}
Test : {'MAE': 658.6913439349541, 'MSE': 827007.7889622814, 'RMSE': 909.3996860359483, 'R2': 0.30981628311787146}

=== GradientBoosting Regression metrics ===
Train: {'MAE': 639.0373218876745, 'MSE': 770185.726058651, 'RMSE': 877.6022596020654, 'R2': 0.3429437303578755}
Test : {'MAE': 658.3612724208914, 'MSE': 825637.3093925558, 'RMSE': 908.6458657764068, 'R2': 0.3109600240789222}


### Анализ результатов на датасете London-Bike

## DecisionTree

* Одиночное дерево даёт наихудшие метрики: наибольшие MSE/RMSE и наименьший R² на тесте.

* Модель обладает высокой дисперсией и легко подстраивается под шум обучающей выборки.

* Способность к обобщению ограничена — это классический недостаток одиночных решающих деревьев.

## RandomForest

* RandomForest заметно улучшает качество по сравнению с DecisionTree: MSE и RMSE на тесте немного снижаются, R² растёт.

* Усреднение предсказаний множества независимых деревьев снижает дисперсию модели.

* Использование bootstrap-выборок помогает уменьшить переобучение и делает предсказания более стабильными.

## GradientBoosting

* GradientBoosting показывает лучшие результаты: наименьшие MSE/RMSE и наибольший R² на тестовой выборке.

* Boosting обучает последовательность неглубоких деревьев на остатках (residuals), последовательно исправляя ошибки предыдущих моделей.

* При параметрах learning_rate = 0.1, max_depth = 3, n_estimators = 50 достигается хороший баланс между точностью и устойчивостью без явного переобучения.

## Вывод: по качеству на задаче регрессии выполняется соотношение
### DecisionTree < RandomForest < GradientBoosting.


## Work with dataset CIFAR

### Preparing steps

#### Load dataset

In [14]:
!tar -xzf cifar-10-python.tar.gz -C /content/
!ls

cifar-10-batches-py  cifar-10-python.tar.gz  sample_data


function load 1 batch CIFAR-10

In [15]:
import pickle


def load_cifar10_batch(batch_path: str):
    """
    Load a single CIFAR-10 batch file.
    Returns:
        X : np.ndarray of shape [10000, 3072]
        y : np.ndarray of shape [10000]
    """
    with open(batch_path, "rb") as f:
        batch = pickle.load(f, encoding="bytes")

    # b"data" has shape [10000, 3072] (32*32*3)
    X = batch[b"data"]
    # b"labels" is a list of length 10000
    y = np.array(batch[b"labels"], dtype=np.int64)

    return X, y


def load_cifar10_train_test(root_dir: str):
    """
    Load CIFAR-10 train (5 batches) and test set.
    root_dir: directory with cifar-10-batches-py
    """
    # Load training data from batches 1..5
    X_list = []
    y_list = []
    for i in range(1, 6):
        batch_path = os.path.join(root_dir, f"data_batch_{i}")
        X_batch, y_batch = load_cifar10_batch(batch_path)
        X_list.append(X_batch)
        y_list.append(y_batch)

    X_train = np.concatenate(X_list, axis=0)  # [50000, 3072]
    y_train = np.concatenate(y_list, axis=0)  # [50000]

    # Load test batch
    test_batch_path = os.path.join(root_dir, "test_batch")
    X_test, y_test = load_cifar10_batch(test_batch_path)

    return X_train, y_train, X_test, y_test

In [16]:
def accuracy_score(y_true, y_pred):
    """
    Simple accuracy: percentage of correct predictions.
    """
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    return np.mean(y_true == y_pred)

Load all train + test CIFAR-10

In [18]:
# Path to CIFAR-10 python version
cifar_root = "/content/cifar-10-batches-py"  # adjust if needed

X_train_full, y_train_full, X_test_full, y_test_full = load_cifar10_train_test(cifar_root)

print("CIFAR-10 full shapes:")
print("X_train_full:", X_train_full.shape)
print("y_train_full:", y_train_full.shape)
print("X_test_full :", X_test_full.shape)
print("y_test_full :", y_test_full.shape)


CIFAR-10 full shapes:
X_train_full: (50000, 3072)
y_train_full: (50000,)
X_test_full : (10000, 3072)
y_test_full : (10000,)


Choose a smaller, balanced CIFAR-10 subset between classes, randomly controlled,

In [36]:
def sample_balanced(X, y, n_per_class, random_state=42):
    """
    Sample n_per_class examples for each class (0..9) from X, y.
    """
    rng = np.random.RandomState(random_state)
    classes = np.unique(y)
    X_out = []
    y_out = []

    for c in classes:
        idx = np.where(y == c)[0]
        if len(idx) < n_per_class:
            raise ValueError(f"Not enough samples for class {c}")
        chosen = rng.choice(idx, size=n_per_class, replace=False)
        X_out.append(X[chosen])
        y_out.append(y[chosen])

    X_out = np.vstack(X_out)
    y_out = np.concatenate(y_out)
    return X_out, y_out


#### Preprocessing data

In [38]:
n_train_per_class = 5000
n_test_per_class  = 1000

X_train_small, y_train_small = sample_balanced(
    X_train_full, y_train_full,
    n_per_class=n_train_per_class,
    random_state=0,
)
X_test_small, y_test_small = sample_balanced(
    X_test_full, y_test_full,
    n_per_class=n_test_per_class,
    random_state=1,
)

print("Small train:", X_train_small.shape, y_train_small.shape)
print("Small test :", X_test_small.shape,  y_test_small.shape)

Small train: (50000, 3072) (50000,)
Small test : (10000, 3072) (10000,)


In [39]:
# ======== Normalize to [0, 1] and convert to float32 ========

X_train_small = X_train_small.astype(np.float32) / 255.0
X_test_small  = X_test_small.astype(np.float32) / 255.0

# ======== Convert to torch tensors ========

X_train_cifar = torch.from_numpy(X_train_small).float().to(DEVICE)
X_test_cifar  = torch.from_numpy(X_test_small).float().to(DEVICE)

y_train_cifar = torch.from_numpy(y_train_small).long().to(DEVICE)
y_test_cifar  = torch.from_numpy(y_test_small).long().to(DEVICE)

### Train with DecisionTree

In [40]:
dt_cifar = DecisionTree(
    classification=True,
    max_depth=12,
    min_samples_split=15,
    min_samples_leaf=10,
    criterion="entropy",
    verbose=True,
    max_features_split=128,
    max_thresholds=48,
)

print("Training DecisionTree on CIFAR-10 balanced subset...")
dt_cifar.fit(X_train_cifar, y_train_cifar)

y_train_pred_dt = dt_cifar.predict(X_train_cifar)
y_test_pred_dt  = dt_cifar.predict(X_test_cifar)

acc_train_dt = accuracy_score(y_train_small, y_train_pred_dt)
acc_test_dt  = accuracy_score(y_test_small,  y_test_pred_dt)

print("DecisionTree CIFAR-10 (balanced subset)")
print("Train accuracy:", acc_train_dt)
print("Test  accuracy:", acc_test_dt)

Training DecisionTree on CIFAR-10 balanced subset...
[DT] Start building tree: n_samples=50000, n_features=3072
[DT] Finished building tree, total nodes: 2783
DecisionTree CIFAR-10 (balanced subset)
Train accuracy: 0.3125
Test  accuracy: 0.2458


### Train with RandomForest

In [43]:
# Typical choice for number of features per tree: sqrt(#features)
n_features = X_train_cifar.shape[1]
max_feats_for_rf = int(math.sqrt(n_features))  # ~ 55 for 3072 features

rf_cifar = RandomForest(
    classification=True,
    n_trees=100,
    max_depth=10,              # similar to DT, can try 10–12
    min_samples_split=30,
    min_samples_leaf=10,
    max_features=128,
    bootstrap=True,
    random_state=42,
    verbose=True,
)

print("\nTraining RandomForest on CIFAR-10 balanced subset...")
rf_cifar.fit(X_train_cifar, y_train_cifar)

# Predictions on train and test
y_train_pred_rf = rf_cifar.predict(X_train_cifar)
y_test_pred_rf  = rf_cifar.predict(X_test_cifar)

acc_train_rf = accuracy_score(y_train_small, y_train_pred_rf)
acc_test_rf  = accuracy_score(y_test_small,  y_test_pred_rf)

print("RandomForest CIFAR-10 (balanced subset)")
print("Train accuracy:", acc_train_rf)
print("Test  accuracy:", acc_test_rf)


Training RandomForest on CIFAR-10 balanced subset...
[RF] Start training forest with 100 trees, 50000 samples, 3072 features, max_features per tree = 128
[RF] Training tree 1/100 ...
[RF] Done tree 1/100
[RF] Training tree 2/100 ...
[RF] Done tree 2/100
[RF] Training tree 3/100 ...
[RF] Done tree 3/100
[RF] Training tree 4/100 ...
[RF] Done tree 4/100
[RF] Training tree 5/100 ...
[RF] Done tree 5/100
[RF] Training tree 6/100 ...
[RF] Done tree 6/100
[RF] Training tree 7/100 ...
[RF] Done tree 7/100
[RF] Training tree 8/100 ...
[RF] Done tree 8/100
[RF] Training tree 9/100 ...
[RF] Done tree 9/100
[RF] Training tree 10/100 ...
[RF] Done tree 10/100
[RF] Training tree 11/100 ...
[RF] Done tree 11/100
[RF] Training tree 12/100 ...
[RF] Done tree 12/100
[RF] Training tree 13/100 ...
[RF] Done tree 13/100
[RF] Training tree 14/100 ...
[RF] Done tree 14/100
[RF] Training tree 15/100 ...
[RF] Done tree 15/100
[RF] Training tree 16/100 ...
[RF] Done tree 16/100
[RF] Training tree 17/100 ...
[

### Train with Gradient Boostring

In [46]:
n_classes = 10

gb_cifar = GradientBoostingOVRClassifier(
    n_classes=n_classes,
    n_estimators=20,        # keep small for speed
    learning_rate=0.1,
    max_depth=2,            # shallow trees like "stumps"
    min_samples_split=20,
    min_samples_leaf=10,
    max_features_split=64,  # subset of features per split
    max_thresholds=32,      # limit thresholds per feature
    random_state=42,
    verbose=True,
)

print("\nTraining GradientBoosting OVR on CIFAR-10 balanced subset...")
gb_cifar.fit(X_train_cifar, y_train_cifar)

y_train_pred_gb = gb_cifar.predict(X_train_cifar)
y_test_pred_gb  = gb_cifar.predict(X_test_cifar)

acc_train_gb = accuracy_score(y_train_small, y_train_pred_gb)
acc_test_gb  = accuracy_score(y_test_small,  y_test_pred_gb)

print("GradientBoosting OVR CIFAR-10")
print("Train accuracy:", acc_train_gb)
print("Test  accuracy:", acc_test_gb)



Training GradientBoosting OVR on CIFAR-10 balanced subset...
[GB-OVR] Training class 0/9 ...
[GB] Start training: n_estimators=20, max_depth=2, lr=0.1
[GB] Stage 1/20, train MSE=0.0898
[GB] Stage 2/20, train MSE=0.0884
[GB] Stage 3/20, train MSE=0.0881
[GB] Stage 4/20, train MSE=0.0881
[GB] Stage 5/20, train MSE=0.0877
[GB] Stage 6/20, train MSE=0.0868
[GB] Stage 7/20, train MSE=0.0866
[GB] Stage 8/20, train MSE=0.0865
[GB] Stage 9/20, train MSE=0.0859
[GB] Stage 10/20, train MSE=0.0856
[GB] Stage 11/20, train MSE=0.0854
[GB] Stage 12/20, train MSE=0.0853
[GB] Stage 13/20, train MSE=0.0852
[GB] Stage 14/20, train MSE=0.0852
[GB] Stage 15/20, train MSE=0.0850
[GB] Stage 16/20, train MSE=0.0848
[GB] Stage 17/20, train MSE=0.0845
[GB] Stage 18/20, train MSE=0.0842
[GB] Stage 19/20, train MSE=0.0837
[GB] Stage 20/20, train MSE=0.0831
[GB] Finished training all stages.
[GB-OVR] Training class 1/9 ...
[GB] Start training: n_estimators=20, max_depth=2, lr=0.1
[GB] Stage 1/20, train MSE=0.089

### Metric comparison

In [47]:
print("=== CIFAR-10 Accuracy Comparison (balanced subset) ===")

acc_train_dt = accuracy_score(y_train_small, y_train_pred_dt)
acc_test_dt  = accuracy_score(y_test_small,  y_test_pred_dt)

acc_train_rf = accuracy_score(y_train_small, y_train_pred_rf)
acc_test_rf  = accuracy_score(y_test_small,  y_test_pred_rf)

acc_train_gb = accuracy_score(y_train_small, y_train_pred_gb)
acc_test_gb  = accuracy_score(y_test_small,  y_test_pred_gb)

print(f"DecisionTree   - Train: {acc_train_dt:.4f} | Test: {acc_test_dt:.4f}")
print(f"RandomForest   - Train: {acc_train_rf:.4f} | Test: {acc_test_rf:.4f}")
print(f"GradientBoost. - Train: {acc_train_gb:.4f} | Test: {acc_test_gb:.4f}")

=== CIFAR-10 Accuracy Comparison (balanced subset) ===
DecisionTree   - Train: 0.3125 | Test: 0.2458
RandomForest   - Train: 0.2595 | Test: 0.2013
GradientBoost. - Train: 0.2932 | Test: 0.2908




### **Анализ результатов на датасете CIFAR-10**


#### DecisionTree (0.31 train / 0.25 test)

* Одиночное дерево немного подстраивается под обучающую выборку, но при этом обобщающая способность ограничена.

* Пространство признаков у CIFAR-10 очень большое (3072 признака), а структура дерева жадная и кусочно-постоянная, поэтому модель не может хорошо описать сложные границы между 10 классами.

* В результате точность на тесте невысокая, но при этом train > test, что показывает лёгкое переобучение.

### RandomForest (0.26 train / 0.20 test)

* В нашей реализации лес состоит из ограниченного числа относительно неглубоких деревьев, каждое дерево обучается на бутстреп-выборке и подмножестве признаков.

* Отдельные деревья в таком режиме оказываются слишком слабыми (underfitting): даже на обучающей выборке точность ниже, чем у одиночного дерева.

* Ансамбль из слабых деревьев даёт ещё более сглаженный, но грубый классификатор → качество на тесте падает до ~0.20. Это не противоречит теории: при сильных ограничениях по глубине, числу деревьев и признаков RandomForest может работать хуже, чем одно достаточно «сильное» дерево.

### GradientBoosting (0.29 train / 0.29 test)

* Градиентный бустинг обучается последовательно, на каждом шаге исправляя ошибки предыдущей композиции (моделируя residuals).

* В схеме One-vs-Rest для каждого класса строится отдельный регрессионный бустинг по выходу 0/1, что позволяет лучше подстраиваться под сложные нелинейные зависимости в данных.

* При умеренном числе итераций (n_estimators) и неглубоких деревьях модель не успевает сильно переобучиться, поэтому train и test accuracy близки, а качество на тесте (≈0.29) оказывается максимальным среди трёх алгоритмов.


### **Вывод**: RandomForest < DecisionTree < GradientBoosting
