In [1]:
import sys

# Add your local path
sys.path.append('/workspaces/tubular')

In [2]:
from __future__ import annotations

from typing import TYPE_CHECKING

import narwhals as nw
import numpy as np
import pandas as pd
import polars as pl
from sklearn.cluster import KMeans

from tubular.base import BaseTransformer, DataFrameMethodTransformer
from tubular.mixins import (
    CheckNumericMixin,
    DropOriginalMixin,
    NewColumnNameMixin,
    TwoColumnMixin,
)

if TYPE_CHECKING:
    from narwhals.typing import (
        FrameT,
        IntoSeriesT,
    )

In [3]:

class BaseNumericTransformer(BaseTransformer, CheckNumericMixin):
    """
    Extends BaseTransformer for datetime scenarios.

    Parameters
    ----------
    columns : List[str]
        List of columns to be operated on.

    **kwargs
        Arbitrary keyword arguments passed onto BaseTransformer.init method.

    Attributes
    ----------
    columns : List[str]
        List of columns to be operated on

    polars_compatible : bool
        class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework
    FITS: bool
        class attribute, indicates whether transform requires fit to be run first

    """

    polars_compatible = True

    FITS = False

    def __init__(self, columns: list[str], **kwargs: dict[str, bool]) -> None:
        super().__init__(columns=columns, **kwargs)

    @nw.narwhalify
    def fit(
        self,
        X: FrameT,
        y: nw.Series | None = None,
    ) -> BaseNumericTransformer:
        """Base fit method. Validates data and attributes prior to the child objects fit logic.

        Parameters
        ----------
        X : pd/pl.DataFrame
            A dataframe containing the required columns

        y : pd/pl.Series | None
            Required for pipeline.

        """

        super().fit(X, y)

        CheckNumericMixin.check_numeric_columns(self, X[self.columns])

        return self

    @nw.narwhalify
    def transform(self, X: FrameT) -> FrameT:
        """Base transform method. Validates data and attributes prior to the child objects tranform logic.

        Parameters
        ----------
        X : pd/pl.DataFrame
            Data to transform.

        Returns
        -------
        X : pd/pl.DataFrame
            Validated data

        """

        X = super().transform(X)

        CheckNumericMixin.check_numeric_columns(self, X[self.columns])

        return X


In [4]:
from sklearn import datasets

# Load the iris dataset
iris = datasets.load_iris()
df = iris.data
df = pl.DataFrame(df).select('column_0').rename({'column_0':'a'})
df = nw.from_native(df)

In [5]:
kmeans = KMeans(n_clusters=5)
kmeans.fit_predict(df)

  super()._check_params_vs_input(X, default_n_init=10)


array([2, 2, 2, 2, 2, 0, 2, 2, 2, 2, 0, 2, 2, 2, 0, 0, 0, 2, 0, 2, 0, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 0, 2, 0, 2, 2, 0, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 0, 2, 1, 3, 1, 0, 3, 0, 3, 2, 1, 2, 2, 0, 3, 3, 0, 1,
       0, 0, 3, 0, 0, 3, 3, 3, 3, 1, 1, 1, 3, 0, 0, 0, 0, 3, 0, 3, 1, 3,
       0, 0, 0, 3, 0, 2, 0, 0, 0, 3, 2, 0, 3, 0, 1, 3, 3, 4, 2, 4, 1, 1,
       3, 3, 1, 0, 0, 3, 3, 4, 4, 3, 1, 0, 4, 3, 1, 1, 3, 3, 3, 1, 4, 4,
       3, 3, 3, 4, 3, 3, 3, 1, 1, 1, 0, 1, 1, 1, 3, 3, 3, 0], dtype=int32)

In [6]:
groups = kmeans.predict(df)
groups = nw.from_native(pl.DataFrame(groups)).rename({'column_0':'groups'})
groups = groups.with_row_index()

In [7]:
df2 = nw.from_native(df)
df2 = df2.with_row_index()

In [8]:
results = df2.join(groups, on = 'index')
bins_max = results.group_by('groups').agg(
    nw.col('a').max()
    ).sort("a").select('a').to_numpy().ravel()

In [9]:
bins_max

array([5.2, 5.9, 6.5, 7.2, 7.9])

In [71]:
class OneDKmeansTransformer(BaseNumericTransformer, DropOriginalMixin):
    """Transformer that generates a new column based on kmeans algorithm.
    Transformer runs the kmean algorithm based on given number of clusters and then identifies the bins' cuts based on the results.
    Finally it passes them into the a cut function.

    Parameters
    ----------
    column : str
        Name of the column to discretise.

    new_column_name : str
        Name given to the new discrete column.

    n_clusters : int, default = 8
        The number of clusters to form as well as the number of centroids to generate.

    n_init "auto" or int, default="auto"
        Number of times the k-means algorithm is run with different centroid seeds.
        The final results is the best output of n_init consecutive runs in terms of inertia.
        Several runs are recommended for sparse high-dimensional problems (see `Clustering sparse data with k-means <https://scikit-learn.org/stable/auto_examples/text/plot_document_clustering.html#kmeans-sparse-high-dim>`__).

        When n_init='auto', the number of runs depends on the value of init: 10 if using init='random' or init is a callable;
        1 if using init='k-means++' or init is an array-like.

    drop_original : bool, default=False
        Should the original columns to be transformed be dropped after applying the
        OneDKmeanstransformer?

    kmeans_kwargs : dict, default = {}
        A dictionary of keyword arguments to be passed to the sklearn KMeans method when it is called in fit.

    **kwargs
        Arbitrary keyword arguments passed onto BaseTransformer.init().

    Attributes
    ----------

    polars_compatible : bool
        class attribute, indicates whether transformer has been converted to polars/pandas agnostic narwhals framework
    FITS: bool
        class attribute, indicates whether transform requires fit to be run first

    """

    polars_compatible = True

    FITS = True

    def __init__(
        self,
        column: str,
        new_column_name: str,
        n_init: str | int = "auto",
        n_clusters: int = 8,
        drop_original: bool = False,
        kmeans_kwargs: dict[str, object] | None = None,
        **kwargs: dict[str, bool],
    ) -> None:
        if not isinstance(new_column_name, str):
            msg = f"{self.classname()}: new_column_name should be a str but got type {type(new_column_name)}"
            raise TypeError(msg)

        if not isinstance(column, str):
            msg = f"{self.classname()}: column arg should be a single str giving the column to group."
            raise TypeError(msg)

        if not isinstance(n_clusters, int):
            msg = f"{self.classname()}: n_clusters should be a int but got type {type(n_clusters)}"
            raise TypeError(msg)

        if not (n_init == "auto" or isinstance(n_init, int)):
            msg = f"{self.classname()}: n_init should be 'auto' or int but got type {type(n_init)}"
            raise TypeError(msg)

        if kmeans_kwargs is None:
            kmeans_kwargs = {}
        else:
            if type(kmeans_kwargs) is not dict:
                msg = f"{self.classname()}: kmeans_kwargs should be a dict but got type {type(kmeans_kwargs)}"
                raise TypeError(msg)

        for i, k in enumerate(kmeans_kwargs.keys()):
            if type(k) is not str:
                msg = f"{self.classname()}: unexpected type ({type(k)}) for kmeans_kwargs key in position {i}, must be str"
                raise TypeError(msg)

        self.n_clusters = n_clusters
        self.new_column_name = new_column_name
        self.n_init = n_init
        self.kmeans_kwargs = kmeans_kwargs

        # This attribute is not for use in any method, use 'columns' instead.
        # Here only as a fix to allow string representation of transformer.
        self.column = column

        super().__init__(columns=[column], **kwargs)
        self.set_drop_original_column(drop_original)

    @nw.narwhalify
    def fit(self, X: FrameT, y: IntoSeriesT | None = None) -> OneDKmeansTransformer:
        """Fir transformer to input data.

        Parameters
        ----------
        X : pd/pl.DataFrame
            Dataframe with columns to learn scaling values from.

        y : None
            Required for pipeline.

        """

        super().fit(X, y)

        #X = nw.from_native(X)

        # Check that X does not contain Nans and return ValueError.
        if (
            X.select(nw.col(self.columns[0]).is_null().any()).to_numpy().ravel()[0]
            or X.select(nw.col(self.columns[0]).is_nan().any()).to_numpy().ravel()[0]
        ):
            msg = f"{self.classname()}: X should not contain missing values."
            raise ValueError(msg)

        kmeans = KMeans(
            n_clusters=self.n_clusters,
            n_init=self.n_init,
            copy_x=False,
            **self.kmeans_kwargs,
        )

        native_namespace = nw.get_native_namespace(X).__name__
        groups = kmeans.fit_predict(X.select(self.columns[0]).to_native())
        print(groups)

        X = X.with_columns(
            nw.new_series(
                name="groups",
                values=groups,
                backend=native_namespace,
            ),
        )

        print(
            X.group_by("groups")
            .agg(
                nw.col(self.columns[0]).max(),
            )
            .sort(self.columns[0])
            .select(self.columns[0])
            .to_numpy()
            .ravel()
        )
        self.bins = (
            X.group_by("groups")
            .agg(
                nw.col(self.columns[0]).max(),
            )
            .sort(self.columns[0])
            .select(self.columns[0])
            .to_numpy()
            .ravel()
        )
        return self

    @nw.narwhalify
    def transform(self, X: FrameT) -> FrameT:
        """Generate from input pd/pl.DataFrame (X) bins based on Kmeans results and add this column or columns in X.

        Parameters
        ----------
        X : pl/pd.DataFrame
            Data to transform.

        Returns
        -------
        X : pl/pd.DataFrame
            Input X with additional cluster column added.
        """
        X = super().transform(X)

        X = nw.from_native(X)
        native_namespace = nw.get_native_namespace(X).__name__

        groups = np.digitize(
            X.select(self.column[0]).to_numpy().ravel(),
            bins=self.bins,
            right=True,
        )

        X = X.with_columns(
            nw.new_series(
                name=self.new_column_name,
                values=groups,
                backend=native_namespace,
            ),
        )
        return self.drop_original_column(X, self.drop_original, self.columns)

In [83]:
df_dict = {
    'a': [4, 5, 4, 5, 2, 1, 3, 2, 1, 5, 10, 12, 4, 16, 17],
    'b': [43, -77, -61, 29, 84, 29, -24, 40, 84, -96, 10, -4, 15, -12, 15],
    'c': ["a", "b", "a", "b", "a", "b", "b", "a", "c", "b", "a", "c", "a", "c", "a"]
}

In [82]:
df = pl.DataFrame(df_dict)
kmeans = OneDKmeansTransformer(column='b', n_clusters=5, new_column_name='new', drop_original=False, kmeans_kwargs={"random_state":42}).fit(X=df)
df = kmeans.transform(df)

print(df)



[0 1 1 0 2 0 3 0 2 1 4 3 4 3 4]
[-61  -4  15  43  84]
shape: (15, 4)
┌─────┬─────┬─────┬─────┐
│ a   ┆ b   ┆ c   ┆ new │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ str ┆ i64 │
╞═════╪═════╪═════╪═════╡
│ 4   ┆ 43  ┆ a   ┆ 3   │
│ 5   ┆ -77 ┆ b   ┆ 0   │
│ 4   ┆ -61 ┆ a   ┆ 0   │
│ 5   ┆ 29  ┆ b   ┆ 3   │
│ 2   ┆ 84  ┆ a   ┆ 4   │
│ …   ┆ …   ┆ …   ┆ …   │
│ 10  ┆ 10  ┆ a   ┆ 2   │
│ 12  ┆ -4  ┆ c   ┆ 1   │
│ 4   ┆ 15  ┆ a   ┆ 2   │
│ 16  ┆ -12 ┆ c   ┆ 1   │
│ 17  ┆ 15  ┆ a   ┆ 2   │
└─────┴─────┴─────┴─────┘


In [76]:
df2 = pd.DataFrame(df_dict)
print(df2)
kmeans = OneDKmeansTransformer(column='b', n_clusters=2, new_column_name='new', drop_original=False, kmeans_kwargs={"random_state":42}).fit(X=df2)
df2 = kmeans.transform(df2)

print(df2)

   a   b
0  4  43
1  5  77
2  4  61
3  5  29
4  2  84
5  1  29
6  3  24
7  2  40
8  1  84
9  5  96
[0 1 0 0 1 0 0 0 1 1]
[61 96]
   a   b  new
0  4  43    0
1  5  77    1
2  4  61    0
3  5  29    0
4  2  84    1
5  1  29    0
6  3  24    0
7  2  40    0
8  1  84    1
9  5  96    1
