In [1]:
import numpy as np
from scipy import stats
import math
import pandas as pd

In [2]:
#Using scipy

# Example data
x = np.array([1, 2, 3, 4, 5, 100, 101])  # two large outliers

alpha = 0.3
trimmed = stats.trim_mean(x, proportiontocut=alpha)

In [3]:
def trimmed_mean(x, alpha=0.1):
    """
    Compute the trimmed mean manually, ignoring NaNs.
    alpha: fraction (0 <= α < 0.5) of data to trim from each tail.
    """
    x = np.asarray(x, dtype=float)
    x = x[~np.isnan(x)]               
    n = len(x)
    if n == 0:
        return np.nan
    k = int(np.floor(alpha * n))
    if 2 * k >= n:                    
        return np.mean(x)
    x_sorted = np.sort(x)
    trimmed = x_sorted[k:n - k]
    return np.mean(trimmed)

In [4]:
print(f"Trimmed mean scipy (α={alpha}):", stats.trim_mean(x, proportiontocut=0.3))
print(f"Trimmed mean numpy (α={alpha}):", trimmed_mean(x,0.3))
print("Regular mean:", np.mean(x))

Trimmed mean scipy (α=0.3): 4.0
Trimmed mean numpy (α=0.3): 4.0
Regular mean: 30.857142857142858


In [5]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_array, check_is_fitted


def _trimmed_mean_1d(x, alpha):
    x = np.asarray(x, dtype=float)
    x = x[~np.isnan(x)]
    n = x.size
    if n == 0:
        return np.nan
    k = int(math.floor(alpha * n))
    if 2 * k >= n:
        return float(np.mean(x))
    xs = np.sort(x)
    return float(np.mean(xs[k:n-k]))

class TrimmedMeanImputer(BaseEstimator, TransformerMixin):
    def __init__(self, trim_fraction=0.1):
        self.trim_fraction = trim_fraction
        self.statistics_ = None

    def fit(self, X, y=None):
        X = check_array(X, dtype=float, ensure_all_finite="allow-nan",ensure_2d=False)
        X = np.asarray(X, dtype=float)
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        stats = []
        for j in range(X.shape[1]):
            stats.append(_trimmed_mean_1d(X[:, j], self.trim_fraction))
        self.statistics_ = np.asarray(stats, dtype=float)
        return self

    def transform(self, X):
        check_is_fitted(self, attributes=["statistics_"])
        X = check_array(X, dtype=float, ensure_all_finite="allow-nan",ensure_2d=False)
        if self.statistics_ is None:
            raise RuntimeError("Not fitted yet.")
        X = np.asarray(X, dtype=float)
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        out = np.tile(self.statistics_, (X.shape[0], 1))
        return out


In [6]:
X1 = np.array([10., np.nan, 12., 1000., 11.])
tmi = TrimmedMeanImputer(trim_fraction=0.2).fit(X1)
print("stats:", tmi.statistics_)
Xt1 = tmi.transform(X1)
print("Xt shape:", Xt1.shape)
print("Xt:\n", Xt1)

stats: [258.25]
Xt shape: (5, 1)
Xt:
 [[258.25]
 [258.25]
 [258.25]
 [258.25]
 [258.25]]


In [7]:
X2 = np.array([
    [10.,   1.2],
    [11.,   np.nan],
    [np.nan,1.1],
    [12.,   1.0],
    [1000., 1.3],
])

tmi2 = TrimmedMeanImputer(trim_fraction=0.2).fit(X2)
print("stats:", tmi2.statistics_)
Xt2 = tmi2.transform(X2)
print("Xt:\n", Xt2)


stats: [258.25   1.15]
Xt:
 [[258.25   1.15]
 [258.25   1.15]
 [258.25   1.15]
 [258.25   1.15]
 [258.25   1.15]]


In [8]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

rng = np.random.RandomState(0)
X = rng.normal(size=(200, 2))
X[:5, 0] = [10, 12, 15, -20, 30]
X[::10, 1] = np.nan
y = (X[:, 0] + 0.5 * np.nan_to_num(X[:, 1]) > 0).astype(int)

pipe = Pipeline(steps=[
    ("impute", TrimmedMeanImputer(trim_fraction=0.2)),
    ("clf", LogisticRegression(max_iter=1000))
])

X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25, random_state=42, stratify=y)
pipe.fit(X_tr, y_tr)
pred = pipe.predict(X_te)
print("Test accuracy:", accuracy_score(y_te, pred))


Test accuracy: 0.56


In [9]:
from sklearn.model_selection import GridSearchCV

pipe = Pipeline(steps=[
    ("impute", TrimmedMeanImputer()),
    ("clf", LogisticRegression(max_iter=1000))
])

param_grid = {
    "impute__trim_fraction": [0.0, 0.05, 0.1, 0.2, 0.3],
}

grid = GridSearchCV(pipe, param_grid=param_grid, cv=5, scoring="accuracy", n_jobs=-1)
grid.fit(X, y)

print("Best params:", grid.best_params_)
print("Best CV score:", grid.best_score_)
best_pipe = grid.best_estimator_


Best params: {'impute__trim_fraction': 0.0}
Best CV score: 0.55
