## Experiment Overview

In this exercise, we've some goals we're seeking to accomplish.

1. Pull in the experimental version of RForestry built for Python
2. Setup a sklearn pipeline which requires the use of a Ray cluster
3. Run distributed work across a Ray cluster which leverages RForestry's fit() method
4. Link to the outputs written to MLFlow

In [3]:
import time
import numpy as np
import pandas as pd
from random import randrange
from random_forestry import RandomForest
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score

data = load_iris()
df = pd.DataFrame(data['data'], columns=data['feature_names'])
df['target'] = data['target']
X = df.loc[:, df.columns != 'sepal length (cm)']
y = df['sepal length (cm)']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

fr = RandomForest(ntree = 100)

start_time = time.time()
fr.fit(X_train, y_train)
forest_preds = fr.predict(X_test)
print(fr.score(X_test, y_test))
print("--- %s seconds ---" % (time.time() - start_time))

-0.11150505389719978
--- 0.010935783386230469 seconds ---


In [4]:
from sklearn.utils.estimator_checks import check_estimator

try:
    check_estimator(RandomForest())
except TypeError:
    print("Ain't gonna work as-is. Food for thought to upstream contributors. Let's fix it.")

Ain't gonna work as-is. Food for thought to upstream contributors. Let's fix it.


## Modified Version of RForestry for sklearn

Below is the scrappy modification of the RForestry package to "work" with sklearn. There are aspects here that are most certainly not correct, but for demonstration purpsoses deemed good enough.

In [5]:
# https://scikit-learn.org/stable/developers/develop.html
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import euclidean_distances
from typing import Union, List, Optional

import dataclasses
import math
import os
import pickle  # nosec B403 - 'Consider possible security implications associated with pickle'
import sys
import warnings
from pathlib import Path
from random import randrange
from typing import Any, Dict, List, Optional, Tuple, Union, Final

import numpy as np
import pandas as pd
import statsmodels.api as sm
from pydantic import (  # pylint: disable=no-name-in-module
    ConfigDict,
    StrictBool,
    StrictFloat,
    StrictInt,
    confloat,
    conint,
    validator,
)
from pydantic.dataclasses import dataclass

from dataclasses import dataclass, field

from random_forestry import RandomForest
from random_forestry import extension

@dataclass
class ProcessedDta:  # pylint: disable=too-many-instance-attributes
    processed_x: pd.DataFrame = field(default_factory=pd.DataFrame)
    y: np.ndarray = field(default_factory=lambda: np.array(0))
    categorical_feature_cols: np.ndarray = field(default_factory=lambda: np.array(0))
    categorical_feature_mapping: List[Dict[Any, Any]] = field(default_factory=list)
    feature_weights: Optional[np.ndarray] = None
    feature_weights_variables: Optional[np.ndarray] = None
    deep_feature_weights: Optional[np.ndarray] = None
    deep_feature_weights_variables: Optional[str] = None
    observation_weights: Optional[str] = None
    monotonic_constraints: Optional[str] = None
    linear_feature_cols: np.ndarray = field(default_factory=lambda: np.array(0))
    groups_mapping: Optional[Dict[str, Any]] = None
    groups: Optional[str] = None
    col_means: np.ndarray = field(default_factory=lambda: np.array(0))
    col_sd: np.ndarray = field(default_factory=lambda: np.array(0))
    has_nas: bool = False
    na_direction: bool = False
    n_observations: int = 0
    num_columns: int = 0
    feat_names: Optional[np.ndarray] = None

import math
import sys
import warnings
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.model_selection import LeaveOneOut


def has_nas(x: pd.DataFrame) -> bool:
    return x.isnull().values.any()


def get_sampsize(forest, x: pd.DataFrame) -> int:
    nrows, _ = x.shape
    if forest.sampsize is None:
        sampsize = nrows if forest.replace else math.ceil(0.632 * nrows)

    # only if sample.fraction is given, update sampsize
    if forest.sample_fraction is not None:
        sampsize = math.ceil(forest.sample_fraction * nrows)

    return sampsize


def get_mtry(forest, x: pd.DataFrame) -> int:
    _, ncols = x.shape
    if forest.mtry is None:
        return max((ncols // 3), 1)
    return forest.mtry


def get_feat_names(x: Union[pd.DataFrame, pd.Series, List]) -> Optional[np.ndarray]:
    if isinstance(x, pd.DataFrame):
        return x.columns.values
    if type(x).__module__ == np.__name__ or isinstance(x, (list, pd.Series)):
        print(
            "x does not have column names. ",
            "The check that columns are provided in the same order when training and predicting will be skipped",
            file=sys.stderr,
        )
        return None

    raise AttributeError("x must be a Pandas DataFrame, a numpy array, a Pandas Series, or a regular list")


def find_match(arr_a: Union[np.ndarray, List], arr_b: Union[np.ndarray, List]) -> np.ndarray:
    """
    --------------------------------------
    Helper Function
    @return a nunpy array indicating the indices of first occurunces of
      the elements of arr_a in arr_b
    """

    temp_dict = {}

    for index, element in enumerate(arr_b):
        if str(element) not in temp_dict:
            temp_dict[str(arr_b[index])] = index

    return np.array([temp_dict[str(val)] for val in arr_a])


def forest_checker(forest) -> None:
    """
    Checks if RandomForest object has valid pointer for C++ object.
    @param object a RandomForest object
    @return A message if the forest does not have a valid C++ pointer.
    """

    if (not forest.dataframe) or (not forest.forest):
        raise ValueError("The RandomForest object has invalid ctypes pointers.")


# Given a dataframe with Y and Y.hat at least, fits an OLS and gives the LOO
# predictions on the sample
def loo_pred_helper(data_frame: pd.DataFrame) -> dict:

    Y = data_frame["Y"]
    X = data_frame.loc[:, data_frame.columns != "Y"]
    X = sm.add_constant(X)

    adjust_lm = sm.OLS(Y, X).fit()

    cv = LeaveOneOut()
    cv_pred = np.empty(Y.size)

    for i, (train, test) in enumerate(cv.split(X)):
        # split data
        X_train, X_test = X.iloc[train, :], X.iloc[test, :]
        y_train, _ = Y[train], Y[test]

        # fit model
        model = sm.OLS(y_train, X_train).fit()
        cv_pred[i] = model.predict(X_test)

    return {"insample_preds": cv_pred, "adjustment_model": adjust_lm}


def preprocess_training(x: pd.DataFrame, y) -> Tuple[pd.DataFrame, np.ndarray, List[Dict]]:
    """
    -- Methods for Preprocessing Data --------------------------------------------
    @title preprocess_training
    @description Perform preprocessing for the training data, including
      converting data to dataframe, and encoding categorical data into numerical
      representation.
    @inheritParams RandomForest
    @return A list of two datasets along with necessary information that encodes
      the preprocessing.
    """

    # Check if the input dimension of x matches y
    if len(x.index) != y.size:
        raise ValueError("The dimension of input dataset x doesn't match the output vector y.")

    # Track the order of all features
    feature_names = x.columns.values
    if feature_names.size == 0:
        warnings.warn("No names are given for each column.")

    # Track all categorical features (both factors and characters)
    categorical_feature_cols = np.array((x.select_dtypes("category")).columns)
    feature_character_cols = np.array((x.select_dtypes("object")).columns)

    if feature_character_cols.size != 0:  # convert to a factor
        warnings.warn("Character value features will be cast to categorical data.")
        categorical_feature_cols = np.concatenate((categorical_feature_cols, feature_character_cols), axis=0)

    categorical_feature_cols = x.columns.get_indexer(categorical_feature_cols)

    # For each categorical feature, encode x into numeric representation and
    # save the encoding mapping
    categorical_feature_mapping: List[Dict] = []
    for categorical_feature_col in categorical_feature_cols:
        x.iloc[:, categorical_feature_col] = pd.Series(
            x.iloc[:, categorical_feature_col], dtype="category"
        ).cat.remove_unused_categories()

        categorical_feature_mapping.append(
            {
                "categoricalFeatureCol": categorical_feature_col,
                "uniqueFeatureValues": list(x.iloc[:, categorical_feature_col].cat.categories),
                "numericFeatureValues": np.arange(len(x.iloc[:, categorical_feature_col].cat.categories)),
            }
        )

        x.iloc[:, categorical_feature_col] = pd.Series(x.iloc[:, categorical_feature_col].cat.codes, dtype="category")

    return x, categorical_feature_cols, categorical_feature_mapping


def preprocess_testing(x, categorical_feature_cols: np.ndarray, categorical_feature_mapping: List[Dict]) -> Any:
    """
    @title preprocess_testing
    @description Perform preprocessing for the testing data, including converting
      data to dataframe, and testing if the columns are consistent with the
      training data and encoding categorical data into numerical representation
      in the same way as training data.
    @inheritParams RandomForest
    @param categorical_feature_cols A list of index for all categorical data. Used
      for trees to detect categorical columns.
    @param categorical_feature_mapping A list of encoding details for each
      categorical column, including all unique factor values and their
      corresponding numeric representation.
    @return A preprocessed training dataaset x
    """

    # Track the order of all features
    testing_feature_names = x.columns.values
    if testing_feature_names.size == 0:
        warnings.warn("No names are given for each column.")

    # Track all categorical features (both factors and characters)
    feature_factor_cols = np.array((x.select_dtypes("category")).columns)
    feature_character_cols = np.array((x.select_dtypes("object")).columns)

    testing_categorical_feature_cols = np.concatenate((feature_factor_cols, feature_character_cols), axis=0)
    testing_categorical_feature_cols = x.columns.get_indexer(testing_categorical_feature_cols)

    if (set(categorical_feature_cols) - set(testing_categorical_feature_cols)) or (
        set(testing_categorical_feature_cols) - set(categorical_feature_cols)
    ):
        raise ValueError("Categorical columns are different between testing and training data.")

    # For each categorical feature, encode x into numeric representation
    for categorical_feature_mapping_ in categorical_feature_mapping:
        categorical_feature_col = categorical_feature_mapping_["categoricalFeatureCol"]
        # Get all unique feature values
        testing_unique_feature_values = x.iloc[:, categorical_feature_col].unique()
        unique_feature_values = categorical_feature_mapping_["uniqueFeatureValues"]
        numeric_feature_values = categorical_feature_mapping_["numericFeatureValues"]

        # If testing dataset contains more, adding new factors to the mapping list
        diff_unique_feature_values = set(testing_unique_feature_values) - set(unique_feature_values)
        if diff_unique_feature_values:
            unique_feature_values = np.concatenate(
                (list(unique_feature_values), list(diff_unique_feature_values)), axis=0
            )
            numeric_feature_values = np.arange(unique_feature_values.size)

            # update
            categorical_feature_mapping_["uniqueFeatureValues"] = unique_feature_values
            categorical_feature_mapping_["numericFeatureValues"] = numeric_feature_values

        x.iloc[:, categorical_feature_col] = pd.Series(
            find_match(x.iloc[:, categorical_feature_col], unique_feature_values),
            dtype="category",
        )

    # Return transformed data and encoding information
    return x


def scale_center(
    x: pd.DataFrame, categorical_feature_cols: np.ndarray, col_means: np.ndarray, col_sd: np.ndarray
) -> pd.DataFrame:
    """
    @title scale_center
    @description Given a dataframe, scale and center the continous features
    @param x A dataframe in order to be processed.
    @param categoricalFeatureCols A vector of the categorical features, we
      don't want to scale/center these.
    @param colMeans A vector of the means to center each column.
    @param colSd A vector of the standard deviations to scale each column with.
    @return A scaled and centered  dataset x
    """

    for col_idx in range(len(x.columns)):
        if col_idx not in categorical_feature_cols:
            if col_sd[col_idx] != 0:
                x.iloc[:, col_idx] = (x.iloc[:, col_idx] - col_means[col_idx]) / col_sd[col_idx]
            else:
                x.iloc[:, col_idx] = x.iloc[:, col_idx] - col_means[col_idx]

    return x


def unscale_uncenter(x: pd.DataFrame, categorical_feature_cols: list, col_means: list, col_sd: list) -> pd.DataFrame:
    """
    @title unscale_uncenter
    @description Given a dataframe, un scale and un center the continous features
    @param x A dataframe in order to be processed.
    @param categoricalFeatureCols A vector of the categorical features, we
      don't want to scale/center these. Should be 1-indexed.
    @param colMeans A vector of the means to add to each column.
    @param colSd A vector of the standard deviations to rescale each column with.
    @return A dataset x in it's original scaling
    """

    for index, column in x.columns:
        if column not in categorical_feature_cols:
            if col_sd[index] != 0:
                x.iloc[:, index] = x.iloc[:, index] * col_sd[index] + col_means[index]
            else:
                x.iloc[:, index] = x.iloc[:, index] + col_means[index]

    return x


def scale(x, y, processed_x, categorical_feature_cols):
    _, ncol = x.shape
    col_means = np.repeat(0.0, ncol + 1)
    col_sd = np.repeat(0.0, ncol + 1)

    for col_idx in range(ncol):
        if col_idx not in categorical_feature_cols:
            col_means[col_idx] = np.nanmean(processed_x.iloc[:, col_idx])
            col_sd[col_idx] = np.nanstd(processed_x.iloc[:, col_idx])

    # Scale columns of X
    processed_x = scale_center(processed_x, categorical_feature_cols, col_means, col_sd)

    # Center and scale Y
    col_means[ncol] = np.nanmean(y)
    col_sd[ncol] = np.nanstd(y)
    if col_sd[ncol] != 0:
        y = (y - col_means[ncol]) / col_sd[ncol]
    else:
        y = y - col_means[ncol]

    return processed_x, y, col_means, col_sd




import functools
from abc import ABC, abstractmethod


class BaseValidator(ABC):
    def __init__(self, function):
        self.function = function

    def __get__(self, obj, _):
        return functools.partial(self.__call__, obj)

    @abstractmethod
    def __call__(self, *args, **kwargs):
        ...


class FitValidator(BaseValidator):

    def validate_monotonic_constraints(self, *args, **kwargs):
        _self = args[0]

        x = pd.DataFrame(kwargs.get("x", args[1])).copy()
        _, ncols = x.shape

        if "monotonic_constraints" not in kwargs:
            monotonic_constraints = np.zeros(ncols, dtype=np.intc)
        else:
            monotonic_constraints = np.array(kwargs["monotonic_constraints"], dtype=np.intc)

        if monotonic_constraints.size != ncols:
            raise ValueError("monotonic_constraints must have the size of x")
        if any(i not in (0, 1, -1) for i in monotonic_constraints):
            raise ValueError("monotonic_constraints must be either 1, 0, or -1")
        if any(i != 0 for i in monotonic_constraints) and _self.linear:
            raise ValueError("Cannot use linear splitting with monotonic_constraints")

        return monotonic_constraints

    def validate_observation_weights(self, *args, **kwargs):
        _self = args[0]

        x = pd.DataFrame(kwargs.get("x", args[1])).copy()
        nrows, _ = x.shape

        if not _self.replace:
            observation_weights = np.zeros(nrows, dtype=np.double)
        elif "observation_weights" not in kwargs:
            observation_weights = np.repeat(1.0, nrows)
        else:
            observation_weights = np.array(kwargs["observation_weights"], dtype=np.double)

        if observation_weights.size != nrows:
            raise ValueError("observation_weights must have length len(x)")
        if any(i < 0 for i in observation_weights):
            raise ValueError("The entries in observation_weights must be non negative")
        if _self.replace and np.sum(observation_weights) == 0:
            raise ValueError("There must be at least one non-zero weight in observation_weights")

        return observation_weights

    def validate_lin_feats(self, *args, **kwargs):
        x = pd.DataFrame(kwargs.get("x", args[1])).copy()
        _, ncols = x.shape

        if "lin_feats" not in kwargs:
            lin_feats = np.arange(ncols, dtype=np.ulonglong)
        else:
            lin_feats = pd.unique(np.array(kwargs["lin_feats"], dtype=np.ulonglong))

        if any(i < 0 or i >= ncols for i in lin_feats):
            raise ValueError("lin_feats must contain positive integers less than len(x.columns).")

        return lin_feats

    def validate_feature_weights(self, *args, **kwargs):
        x = pd.DataFrame(kwargs.get("x", args[1])).copy()
        _, ncols = x.shape

        if "feature_weights" not in kwargs:
            feature_weights = np.repeat(1.0, ncols)
            interaction_variables = [] if "interaction_variables" not in kwargs else kwargs["interaction_variables"]
            feature_weights[interaction_variables] = 0.0
        else:
            feature_weights = np.array(kwargs["feature_weights"], dtype=np.double)

        if feature_weights.size != ncols:
            raise ValueError("feature_weights must have length len(x.columns)")

        if any(i < 0 for i in feature_weights):
            raise ValueError("The entries in feature_weights must be non negative")

        if np.sum(feature_weights) == 0:
            raise ValueError("There must be at least one non-zero weight in feature_weights")

        return feature_weights

    def validate_deep_feature_weights(self, *args, **kwargs):
        x = pd.DataFrame(kwargs.get("x", args[1])).copy()
        _, ncols = x.shape

        if "deep_feature_weights" not in kwargs:
            deep_feature_weights = np.repeat(1.0, ncols)
        else:
            deep_feature_weights = np.array(kwargs["deep_feature_weights"], dtype=np.double)

        if deep_feature_weights.size != ncols:
            raise ValueError("deep_feature_weights must have length len(x.columns)")

        if any(i < 0 for i in deep_feature_weights):
            raise ValueError("The entries in deep_feature_weights must be non negative")

        if np.sum(deep_feature_weights) == 0:
            raise ValueError("There must be at least one non-zero weight in deep_feature_weights")

        return deep_feature_weights

    def validate_groups(self, *_, **kwargs):
        if "groups" in kwargs:
            groups = kwargs["groups"]
            if not pd.api.types.is_categorical_dtype(groups):
                raise ValueError(
                    "groups must have a data dtype of categorical. ",
                    'Try using pd.Categorical(...) or pd.Series(..., dtype="category").',
                )
            if len(groups.unique()) == 1:
                raise ValueError("groups must have more than 1 level to be left out from sampling.")

            return pd.Series(groups, dtype="category")

        return None

    def __call__(self, *args, **kwargs):
        _self = args[0]

        x = pd.DataFrame(kwargs.get("x", args[1])).copy()
        y = np.array(kwargs.get("y", args[1] if "x" in kwargs else args[2]), dtype=np.double).copy()
        nrows, ncols = x.shape

        # Check if the input dimension of x matches y
        if nrows != y.size:
            raise ValueError("The dimension of input dataset x doesn't match the output y.")

        if np.isnan(y).any():
            raise ValueError("y contains missing data.")

        if _self.linear and x.isnull().values.any():
            raise ValueError("Cannot do imputation splitting with linear.")

        if not _self.replace and get_sampsize(_self, x) > nrows:
            raise ValueError("You cannot sample without replacement with size more than total number of observations.")
        if get_mtry(_self, x) > ncols:
            raise ValueError("mtry cannot exceed total amount of features in x.")

        kwargs["monotonic_constraints"] = self.validate_monotonic_constraints(*args, **kwargs)

        kwargs["lin_feats"] = self.validate_lin_feats(*args, **kwargs)

        kwargs["feature_weights"] = self.validate_feature_weights(*args, **kwargs)

        kwargs["deep_feature_weights"] = self.validate_deep_feature_weights(*args, **kwargs)

        kwargs["observation_weights"] = self.validate_observation_weights(*args, **kwargs)

        if "groups" in kwargs:
            kwargs["groups"] = self.validate_groups(*args, **kwargs)

        return self.function(*args, **kwargs)

class PredictValidator(BaseValidator):

    DEFAULT_NEWDATA: Final = None
    DEFAULT_AGGREGATION: Final[str] = "average"

    def get_newdata(self, *args, **kwargs) -> Union[pd.DataFrame, pd.Series, List, None]:
        if len(args) > 2:
            raise TypeError(f"predict() takes from 1 to 2 positional arguments but {len(args)} were given")
        if len(args) == 2:
            if "X" in kwargs or "newdata" in kwargs:
                raise AttributeError("newdata specified both in args and kwargs")
            return args[1]
        return kwargs.get("newdata", kwargs.get("X", __class__.DEFAULT_NEWDATA))

    def validate_newdata(self, *args, **kwargs) -> pd.DataFrame:
        _self = args[0]
        newdata = self.get_newdata(*args, **kwargs)

        if newdata is not None:
            if not (isinstance(newdata, (pd.DataFrame, pd.Series, list)) or type(newdata).__module__ == np.__name__):
                raise AttributeError(
                    "newdata must be a Pandas DataFrame, a numpy array, a Pandas Series, or a regular list"
                )

            newdata = (pd.DataFrame(newdata)).copy()
            newdata.reset_index(drop=True, inplace=True)

            if len(newdata.columns) != _self.processed_dta.num_columns:
                raise ValueError(
                    f"newdata has {len(newdata.columns)}, "
                    f"but the forest was trained with {_self.processed_dta.num_columns} columns."
                )

            if _self.processed_dta.feat_names is not None:
                if not set(newdata.columns) == set(_self.processed_dta.feat_names):
                    # TODO FIX THIS
                    newdata.columns = _self.processed_dta.feat_names

                # If linear is true we can't predict observations with some features missing.
                if _self.linear and newdata.isnull().values.any():
                    raise ValueError("linear does not support missing data")

                if not all(newdata.columns == _self.processed_dta.feat_names):
                    warnings.warn("newdata columns have been reordered so that they match the training feature matrix")
                    newdata = newdata[_self.processed_dta.feat_names]

        return newdata

    def validate_exact(self, **kwargs) -> bool:
        if "exact" in kwargs:
            return kwargs["exact"]
        if kwargs["newdata"] is None:
            return True
        if len(kwargs["newdata"].index) > 1e5:
            return False
        return True

    def validate_trees(self, *args, **kwargs) -> None:
        _self = args[0]

        if "trees" in kwargs:
            if not kwargs["exact"] or kwargs["aggregation"] != "average":
                raise ValueError("When using tree indices, we must have exact = True and aggregation = 'average' ")

            if any(
                (not isinstance(i, (int, np.integer))) or (i < -_self.ntree) or (i >= _self.ntree)
                for i in kwargs["trees"]
            ):
                raise ValueError("trees must contain indices which are integers between -ntree and ntree-1")

    def validate_aggregation(self, *args, **kwargs) -> str:
        _self = args[0]

        aggregation = kwargs.get("aggregation", __class__.DEFAULT_AGGREGATION)

        if aggregation == "oob":
            pass
        elif aggregation == "doubleOOB":
            if not _self.double_bootstrap:
                raise ValueError(
                    "Attempting to do double OOB predictions "
                    "with a forest that was not trained with doubleBootstrap = True"
                )
        elif aggregation == "coefs":
            if not _self.linear:
                raise ValueError("Aggregation can only be linear with setting the parameter linear = True.")
            if kwargs["newdata"] is None:
                raise ValueError("When using an aggregation that is not oob or doubleOOB, one must supply newdata")
        else:
            if kwargs["newdata"] is None:
                raise ValueError("When using an aggregation that is not oob or doubleOOB, one must supply newdata")

        return aggregation

    def __call__(self, *args, **kwargs):
        _self = args[0]

        forest_checker(_self)

        kwargs["newdata"] = self.validate_newdata(*args, **kwargs)

        kwargs["exact"] = self.validate_exact(**kwargs)

        kwargs["aggregation"] = self.validate_aggregation(*args, **kwargs)

        kwargs["nthread"] = kwargs.get("nthread", _self.nthread)

        self.validate_trees(*args, **kwargs)

        return self.function(_self, **kwargs)

    

class RandomForestEstimator(ClassifierMixin, BaseEstimator, RandomForest):
    
    def _more_tags(self):
        # https://scikit-learn.org/stable/developers/develop.html#estimator-tags
        return {
            "multioutput_only": True,
            "requires_y": True,
            "X_types": ["1darray"]
        }

    def __init__(
        self,
        ntree: conint(gt=0, strict=True) = 500,
        replace: StrictBool = True,
        sampsize: Optional[conint(gt=0, strict=True)] = None,
        sample_fraction: Optional[Union[conint(gt=0, strict=True), confloat(gt=0, strict=True)]] = None,
        mtry: Optional[conint(gt=0, strict=True)] = None,
        nodesize_spl: conint(gt=0, strict=True) = 5,
        nodesize_avg: conint(gt=0, strict=True) = 5,
        nodesize_strict_spl: conint(gt=0, strict=True) = 1,
        nodesize_strict_avg: conint(gt=0, strict=True) = 1,
        min_split_gain: confloat(ge=0) = 0,
        max_depth: Optional[conint(gt=0, strict=True)] = None,
        interaction_depth: Optional[conint(gt=0, strict=True)] = None ,
        splitratio: confloat(ge=0, le=1) = 1.0,
        oob_honest: StrictBool = False,
        double_bootstrap: Optional[StrictBool] = None,
        seed: conint(ge=0, strict=True) = randrange(1001),
        verbose: StrictBool = False,
        nthread: conint(ge=0, strict=True) = 0,
        splitrule: str = "variance",
        middle_split: StrictBool = False,
        max_obs: Optional[conint(gt=0, strict=True)] = None,
        linear: StrictBool = False,
        min_trees_per_fold: conint(ge=0, strict=True) = 0,
        fold_size: conint(gt=0, strict=True) = 1,
        monotone_avg: StrictBool = False,
        overfit_penalty: Union[StrictInt, StrictFloat] = 1,
        scale: StrictBool = False,
        double_tree: StrictBool = False,
        na_direction: StrictBool = False,
        forest: Optional[pd.DataFrame] = dataclasses.field(default=None, init=False),
        dataframe: Optional[pd.DataFrame] = dataclasses.field(default=None, init=False),
        processed_dta: Optional[ProcessedDta] = dataclasses.field(default=None, init=False),
        saved_forest: List[Dict] = dataclasses.field(default_factory=list, init=False)):

        self.ntree = ntree
        self.replace = replace
        self.sampsize = sampsize
        self.sample_fraction = sample_fraction
        self.mtry = mtry
        self.nodesize_spl = nodesize_spl
        self.nodesize_avg = nodesize_avg
        self.nodesize_strict_spl = nodesize_strict_spl
        self.nodesize_strict_avg = nodesize_strict_avg
        self.min_split_gain = min_split_gain
        self.max_depth = max_depth
        self.interaction_depth = interaction_depth
        self.splitratio = splitratio
        self.oob_honest = oob_honest
        self.double_bootstrap = double_bootstrap
        self.seed = seed
        self.verbose = verbose
        self.nthread = nthread
        self.splitrule = splitrule
        self.middle_split = middle_split
        self.max_obs = max_obs
        self.linear = linear
        self.min_trees_per_fold = min_trees_per_fold
        self.fold_size = fold_size
        self.monotone_avg = monotone_avg
        self.overfit_penalty = overfit_penalty
        self.scale = scale
        self.double_tree = double_tree
        self.na_direction = na_direction
        self.forest = forest
        self.dataframe = dataframe
        self.processed_dta = processed_dta
        self.saved_forest = saved_forest

    def fit(self, x: Union[pd.DataFrame, pd.Series, List], y: np.ndarray):
        check_X_y(x, y)
        self._fit(pd.DataFrame(x), y)
        return self

    @FitValidator
    def _fit(
        self,
        x: Union[pd.DataFrame, pd.Series, List],
        y: np.ndarray,
        *,
        interaction_variables: Optional[List] = None,  # pylint: disable=unused-argument
        feature_weights: Optional[np.ndarray] = None,
        deep_feature_weights: Optional[np.ndarray] = None,
        observation_weights: Optional[np.ndarray] = None,
        lin_feats: Optional[Union[np.ndarray, List]] = None,  # Add a default value.
        monotonic_constraints: Optional[np.ndarray] = None,  # Add a default value.
        groups: Optional[pd.Series] = None,
        seed: Optional[int] = None,
    ) -> None:

        # Make sure that all the parameters exist when passed to RandomForest

        feat_names = get_feat_names(x)

        x = (pd.DataFrame(x)).copy()
        y = (np.array(y, dtype=np.double)).copy()

        nrow, ncol = x.shape

        if self.max_depth is None:
            self.max_depth = round(nrow / 2) + 1

        if self.interaction_depth is None:
            self.interaction_depth = self.max_depth

        if self.max_obs is None:
            self.max_obs = y.size

        self.sampsize = get_sampsize(self, x)
        self.mtry = get_mtry(self, x)

        self._set_nodesize_strict()

        feature_weights_variables = self._get_weights_variables(feature_weights)
        deep_feature_weights_variables = self._get_weights_variables(deep_feature_weights)

        feature_weights /= np.sum(feature_weights)
        deep_feature_weights /= np.sum(deep_feature_weights)
        if self.replace:
            observation_weights /= np.sum(observation_weights)

        groups_mapping, group_vector = self._get_groups_mapping_and_vector(x, groups)

        (
            processed_x,
            categorical_feature_cols,
            categorical_feature_mapping,
        ) = preprocess_training(x, y)

        if categorical_feature_cols.size != 0:
            monotonic_constraints[categorical_feature_cols] = 0

        col_means = col_sd = np.repeat(0.0, ncol + 1)
        if self.scale:
            processed_x, y, col_means, col_sd = scale(x, y, processed_x, categorical_feature_cols)

        # cpp linking
        processed_x.reset_index(drop=True, inplace=True)

        self.dataframe: pd.DataFrame = extension.get_data(
            np.ascontiguousarray(pd.concat([processed_x, pd.Series(y)], axis=1).values[:, :], np.double).ravel(),
            categorical_feature_cols,
            categorical_feature_cols.size,
            lin_feats,
            lin_feats.size,
            feature_weights,
            feature_weights_variables,
            feature_weights_variables.size,
            deep_feature_weights,
            deep_feature_weights_variables,
            deep_feature_weights_variables.size,
            observation_weights,
            monotonic_constraints,
            group_vector,
            self.monotone_avg,
            nrow,
            ncol + 1,
            self._get_seed(seed),
        )

        self.forest: pd.DataFrame = extension.train_forest(
            self.dataframe,
            self.ntree,
            self.replace,
            self.sampsize,
            self.splitratio,
            self.oob_honest,
            self.double_bootstrap,
            self.mtry,
            self.nodesize_spl,
            self.nodesize_avg,
            self.nodesize_strict_spl,
            self.nodesize_strict_avg,
            self.min_split_gain,
            self.max_depth,
            self.interaction_depth,
            self._get_seed(seed),
            self.nthread,
            self.verbose,
            self.middle_split,
            self.max_obs,
            self.min_trees_per_fold,
            self.fold_size,
            x.isnull().values.any(),
            self.na_direction,
            self.linear,
            self.overfit_penalty,
            self.double_tree,
        )

        # Update the fields
        self.processed_dta = ProcessedDta(
            processed_x=processed_x,
            y=y,
            categorical_feature_cols=categorical_feature_cols,
            categorical_feature_mapping=categorical_feature_mapping,
            feature_weights=feature_weights,
            feature_weights_variables=feature_weights_variables,
            deep_feature_weights=deep_feature_weights,
            deep_feature_weights_variables=deep_feature_weights_variables,
            observation_weights=observation_weights,
            monotonic_constraints=monotonic_constraints,
            linear_feature_cols=lin_feats,
            groups_mapping=groups_mapping,
            groups=groups,
            col_means=col_means,
            col_sd=col_sd,
            has_nas=x.isnull().values.any(),
            n_observations=nrow,
            num_columns=ncol,
            feat_names=feat_names,
        )

    def predict(self, X, *args, **kw):
        X = check_array(X)
        return self._predict(newdata=X, *args, **kw)

    @PredictValidator
    def _predict(
        self,
        newdata: Optional[Union[pd.DataFrame, pd.Series, List]] = None,
        *,
        aggregation: str = "average",
        seed: Optional[int] = None,
        nthread: Optional[int] = None,
        exact: Optional[bool] = None,
        trees: Optional[np.ndarray] = None,
        training_idx: Optional[np.ndarray] = None,
        return_weight_matrix: bool = False,
    ) -> Union[np.ndarray, dict]:

        if aggregation == "oob":
            predictions, weight_matrix = self._aggregation_oob(newdata, exact, return_weight_matrix)

        elif aggregation == "doubleOOB":
            predictions, weight_matrix = self._aggregation_double_oob(newdata, exact, return_weight_matrix)

        elif aggregation == "coefs":
            predictions, weight_matrix, coefficients = self._aggregation_coefs(
                newdata, exact, self._get_seed(seed), nthread
            )
            return {
                "predictions": predictions,
                "coef": np.lib.stride_tricks.as_strided(
                    coefficients,
                    shape=(
                        self.processed_dta.n_observations,
                        self.processed_dta.linear_feature_cols.size + 1,
                    ),
                    strides=(
                        coefficients.itemsize * (self.processed_dta.linear_feature_cols.size + 1),
                        coefficients.itemsize,
                    ),
                ),
            }

        else:
            predictions, weight_matrix, _ = self._aggregation_fallback(
                newdata,
                exact,
                self._get_seed(seed),
                nthread,
                return_weight_matrix,
                trees,
            )

        if return_weight_matrix:
            return {
                "predictions": predictions,
                "weightMatrix": np.lib.stride_tricks.as_strided(
                    weight_matrix,
                    shape=(self._get_n_preds(newdata), self.processed_dta.n_observations),
                    strides=(
                        weight_matrix.itemsize * self.processed_dta.n_observations,
                        weight_matrix.itemsize,
                    ),
                ),
            }

        return predictions
    
    def score(self, X: Union[pd.DataFrame, pd.Series, List], y: np.ndarray, sample_weight: Optional[np.ndarray] = None):
        from sklearn.metrics import r2_score
        return r2_score(y, self._predict(newdata=X, aggregation="average"), sample_weight=sample_weight)

    def set_params(self, **params):
        return super().set_parameters(**params)
    
    def get_params(self, deep=True):
        return super().get_parameters()
    
    def __del__(self):
        extension.delete_forestry(self.forest, self.dataframe)
        
    def __getstate__(self):
        state = self.__dict__.copy()
        if "dataframe" in state:
            del state["dataframe"]
        if "forest" in state:
            del state["forest"]
        return state


In [6]:
try:
    check_estimator(RandomForestEstimator())
except TypeError:
    print("Ain't gonna work as-is. Food for thought to upstream contributors. Let's fix it.")
else:
    print("Works!")

Exception ignored in: <function RandomForestEstimator.__del__ at 0x7fce477c4790>
Traceback (most recent call last):
  File "/tmp/ipykernel_1628/2692302211.py", line 932, in __del__
TypeError: delete_forestry(): incompatible function arguments. The following argument types are supported:
    1. (arg0: capsule, arg1: capsule) -> None

Invoked with: Field(name=None,type=None,default=None,default_factory=<dataclasses._MISSING_TYPE object at 0x7fce7c11d700>,init=False,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=None), Field(name=None,type=None,default=None,default_factory=<dataclasses._MISSING_TYPE object at 0x7fce7c11d700>,init=False,repr=True,hash=None,compare=True,metadata=mappingproxy({}),_field_type=None)


Works!


## Running Modified RForestry Trials w/out Ray

In [7]:
import time
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np
from ray.tune.sklearn import TuneGridSearchCV
from ray.tune.sklearn import TuneSearchCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score


def tune_search_tuning(n_trials):
    data = load_iris()
    df = pd.DataFrame(data['data'], columns=data['feature_names'])
    df['target'] = data['target']
    X = df.loc[:, df.columns != 'sepal length (cm)']
    y = df['sepal length (cm)']

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

    clf = RandomForestEstimator()
    param_distributions = {
        "ntree": randint(100, 1000)
    }
    tune_search = TuneSearchCV(clf, param_distributions, n_trials=n_trials, name=f"T{n_trials}")
    tune_search.fit(x_train, y_train)
    pred = tune_search.predict(x_test)
    return tune_search.score(x_test, y_test)


#start_time = time.time()
#best_params = [tune_search_tuning(n_trials) for n_trials in [1, 10, 100]]
#print("Best Params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))

In [8]:
import time
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np
from ray.tune.sklearn import TuneGridSearchCV
from ray.tune.sklearn import TuneSearchCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score


def tune_search_tuning(n_trials):
    data = load_iris()
    df = pd.DataFrame(data['data'], columns=data['feature_names'])
    df['target'] = data['target']
    X = df.loc[:, df.columns != 'sepal length (cm)']
    y = df['sepal length (cm)']

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

    clf = RandomForestEstimator()
    param_distributions = {
        "ntree": randint(100, 1000)
    }
    tune_search = TuneSearchCV(clf, param_distributions, n_trials=n_trials, name=f"T{n_trials}")
    tune_search.fit(x_train, y_train)
    pred = tune_search.predict(x_test)
    return tune_search.score(x_test, y_test)


#start_time = time.time()
#best_params = [tune_search_tuning(n_trials) for n_trials in [1000]]
#print("Best Params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))

In [10]:
import time
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np
from ray.tune.sklearn import TuneGridSearchCV
from ray.tune.sklearn import TuneSearchCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score


@ray.remote(num_cpus=4)
def tune_search_tuning(n_trials):
    data = load_iris()
    df = pd.DataFrame(data['data'], columns=data['feature_names'])
    df['target'] = data['target']
    X = df.loc[:, df.columns != 'sepal length (cm)']
    y = df['sepal length (cm)']

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

    clf = RandomForestEstimator()
    param_distributions = {
        "ntree": randint(100, 1000)
    }
    tune_search = TuneSearchCV(clf, param_distributions, n_trials=n_trials, name=f"T{n_trials}")
    tune_search.fit(x_train, y_train)
    pred = tune_search.predict(x_test)
    return tune_search.score(x_test, y_test)


#start_time = time.time()
#best_params = ray.get([tune_search_tuning.remote(n_trials) for n_trials in [1, 10, 100]])
#print("Best Params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))

In [11]:
import time
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np
from ray.tune.sklearn import TuneGridSearchCV
from ray.tune.sklearn import TuneSearchCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score


@ray.remote
def tune_search_tuning(n_trials):
    data = load_iris()
    df = pd.DataFrame(data['data'], columns=data['feature_names'])
    df['target'] = data['target']
    X = df.loc[:, df.columns != 'sepal length (cm)']
    y = df['sepal length (cm)']

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

    clf = RandomForestEstimator()
    param_distributions = {
        "ntree": randint(100, 1000)
    }
    tune_search = TuneSearchCV(clf, param_distributions, n_trials=n_trials)
    tune_search.fit(x_train, y_train)
    pred = tune_search.predict(x_test)
    return tune_search.score(x_test, y_test)


#start_time = time.time()
#best_params = ray.get([tune_search_tuning.remote(n_trial) for n_trial in [1, 10, 100]])

#print("Best Params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))

In [12]:
import time
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np
from ray.tune.sklearn import TuneGridSearchCV
from ray.tune.sklearn import TuneSearchCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score


def tune_search_tuning(n_trials):
    data = load_iris()
    df = pd.DataFrame(data['data'], columns=data['feature_names'])
    df['target'] = data['target']
    X = df.loc[:, df.columns != 'sepal length (cm)']
    y = df['sepal length (cm)']

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

    clf = RandomForestEstimator()
    param_distributions = {
        "ntree": randint(100, 1000)
    }
    tune_search = TuneSearchCV(clf, param_distributions, n_trials=n_trials)
    tune_search.fit(x_train, y_train)
    pred = tune_search.predict(x_test)
    return tune_search.score(x_test, y_test)


#start_time = time.time()
#best_params = []

#best_params.append(tune_search_tuning(1))
#best_params.append(tune_search_tuning(10))
#best_params.append(tune_search_tuning(100))

#print("Best Params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))

In [13]:
import time
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np
from ray.tune.sklearn import TuneGridSearchCV
from ray.tune.sklearn import TuneSearchCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score


def tune_search_tuning(n_trials):
    data = load_iris()
    df = pd.DataFrame(data['data'], columns=data['feature_names'])
    df['target'] = data['target']
    X = df.loc[:, df.columns != 'sepal length (cm)']
    y = df['sepal length (cm)']

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

    clf = RandomForestEstimator()
    param_distributions = {
        "ntree": randint(100, 1000)
    }
    tune_search = TuneSearchCV(clf, param_distributions, n_trials=n_trials)
    tune_search.fit(x_train, y_train)
    pred = tune_search.predict(x_test)
    return tune_search.score(x_test, y_test)


#start_time = time.time()
#best_params = [tune_search_tuning(1000)]

#print("Best Params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))

In [14]:
import time
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np
from ray.tune.sklearn import TuneGridSearchCV
from ray.tune.sklearn import TuneSearchCV
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score

@ray.remote
def tune_search_tuning(n_trials):
    data = load_iris()
    df = pd.DataFrame(data['data'], columns=data['feature_names'])
    df['target'] = data['target']
    X = df.loc[:, df.columns != 'sepal length (cm)']
    y = df['sepal length (cm)']

    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

    clf = RandomForestEstimator()
    param_distributions = {
        "ntree": randint(100, 1000)
    }
    tune_search = TuneSearchCV(clf, param_distributions, n_trials=n_trials)
    tune_search.fit(x_train, y_train)
    pred = tune_search.predict(x_test)
    return tune_search.score(x_test, y_test)


#start_time = time.time()
#best_params = [ray.get(tune_search_tuning.remote(1000))]

#print("Best Params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))

## Running SKLearn Random Forest on Ray

In [15]:
from tune_sklearn import TuneSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn import datasets
from sklearn.model_selection import train_test_split
from scipy.stats import randint
import numpy as np

def tune_search_tuning():
    digits = datasets.load_digits()
    x = digits.data
    y = digits.target
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=.2)

    clf = RandomForestClassifier()
    param_distributions = {
        "n_estimators": randint(20, 80),
        "max_depth": randint(2, 10)
    }

    tune_search = TuneSearchCV(clf, param_distributions, n_trials=1000)

    tune_search.fit(x_train, y_train)

    pred = tune_search.predict(x_test)
    accuracy = np.count_nonzero(np.array(pred) == np.array(y_test)) / len(pred)
    print(accuracy)
    return accuracy

#start_time = time.time()
#remote_clf = tune_search_tuning()
#best_params = remote_clf
#print("Best params", best_params)
#print("--- %s seconds ---" % (time.time() - start_time))