In [1]:
import numpy as np
import pandas as pd

from sklearn.base import BaseEstimator
from sklearn.base import ClassifierMixin

from sklearn.model_selection import BaseCrossValidator

from sklearn.utils.validation import check_X_y, check_is_fitted
from sklearn.utils.validation import check_array
from sklearn.utils.multiclass import check_classification_targets
from sklearn.metrics.pairwise import pairwise_distances

In [2]:
from sklearn.utils.multiclass import unique_labels
from scipy import stats

In [3]:
import pytest

from numpy.testing import assert_array_equal

from sklearn.utils.estimator_checks import check_estimator
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.datasets import make_classification
from sklearn.neighbors import KNeighborsClassifier

from sklearn_questions import KNearestNeighbors
from sklearn_questions import MonthlySplit

In [4]:
class KNearestNeighbors(BaseEstimator, ClassifierMixin):
    """KNearestNeighbors classifier."""

    def __init__(self, n_neighbors=1):  # noqa: D107
        self.n_neighbors = n_neighbors

    def fit(self, X, y):
        """Fitting function.

         Parameters
        ----------
        X : ndarray, shape (n_samples, n_features)
            Data to train the model.
        y : ndarray, shape (n_samples,)
            Labels associated with the training data.

        Returns
        ----------
        self : instance of KNearestNeighbors
            The current instance of the classifier
        """
        self.X_ , self.y_ = check_X_y(X, y)
        self.classes_ = unique_labels(self.y_)
        check_classification_targets(self.y_)
        return self

    def predict(self, X):
        """Predict function.

        Parameters
        ----------
        X : ndarray, shape (n_test_samples, n_features)
            Data to predict on.

        Returns
        ----------
        y : ndarray, shape (n_test_samples,)
            Predicted class labels for each test data sample.
        """
        # Check if fit has been called
#         check_is_fitted(self)
#         # Input validation
#         X = check_array(X)
#         self.X_ = check_array(self.X_)
#         n_test_samples = X.shape[0]
#         y_pred = np.zeros(X.shape[0])
#         for k in range(n_test_samples):
#             dist = np.squeeze(pairwise_distances(self.X_, X[k][np.newaxis]))
#             n_closest = np.argpartition(dist, self.n_neighbors)[:self.n_neighbors]
#             lst = list(self.y_[n_closest])
# #             print(lst)
#             y_pred[k] = max(lst,key=lst.count)
        X = check_array(X)
        check_is_fitted(self)
        distance = pairwise_distances(X,self.X_)
        n_closest = np.argpartition(distance, self.n_neighbors, axis=1)[:,:self.n_neighbors]
        y_pred = stats.mode(self.y_[n_closest], axis=1)[0].ravel()
        return y_pred

    def score(self, X, y):
        """Calculate the score of the prediction.

        Parameters
        ----------
        X : ndarray, shape (n_samples, n_features)
            Data to score on.
        y : ndarray, shape (n_samples,)
            target values.

        Returns
        ----------
        score : float
            Accuracy of the model computed for the (X, y) pairs.
        """
        sc = 0
#         y = check_array(y)
        y_pred = self.predict(X)
        for i in range(X.shape[0]):
            if y_pred[i]==y[i]:
                sc+=1
        return sc/X.shape[0]

In [5]:
X, y = make_classification(n_samples=200, n_features=20,
                           random_state=42)
X_train, X_test, y_train, y_test = \
    train_test_split(X, y, random_state=42)

onn = KNearestNeighbors(5)
y_pred_me = onn.fit(X_train, y_train).predict(X_test)
onn.score(X_test, y_test)

0.78

In [6]:
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(X_train, y_train)
knn.predict(X_test)
knn.score(X_test, y_test)

0.78

In [7]:
check_estimator(KNearestNeighbors(n_neighbors=5))



In [8]:
from pandas.core.dtypes.common import is_datetime64_any_dtype

In [16]:
class MonthlySplit(BaseCrossValidator):
    """CrossValidator based on monthly split.

    Split data based on the given `time_col` (or default to index). Each split
    corresponds to one month of data for the training and the next month of
    data for the test.

    Parameters
    ----------
    time_col : str, defaults to 'index'
        Column of the input DataFrame that will be used to split the data. This
        column should be of type datetime. If split is called with a DataFrame
        for which this column is not a datetime, it will raise a ValueError.
        To use the index as column just set `time_col` to `'index'`.
    """

    def __init__(self, time_col='index'):  # noqa: D107
        self.time_col = time_col

    def get_n_splits(self, X, y=None, groups=None):
        """Return the number of splitting iterations in the cross-validator.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data, where `n_samples` is the number of samples
            and `n_features` is the number of features.
        y : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        groups : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.

        Returns
        -------
        n_splits : int
            The number of splits.
        """
#         X, y = check_X_y(X, y)
#         print(X)
# #         X = X.reset_index()
# #         X = X.set_index('date')
#         self.X = X
#         print(self.X)
#         self.y = y
#         self.groups = groups
# #         return len(self.X.groupby([self.X.index.month, self.X.index.year]))-1
#         return len(self.X.resample('M').max())-1
        X = X.reset_index()
        datetimes = X[self.time_col]
        if not isinstance(X[self.time_col][0], pd.Timestamp):
            raise ValueError()
        return datetimes.dt.to_period('M').nunique()-1

    def split(self, X, y, groups=None):
        """Generate indices to split data into training and test set.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data, where `n_samples` is the number of samples
            and `n_features` is the number of features.
        y : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.
        groups : array-like of shape (n_samples,)
            Always ignored, exists for compatibility.

        Yields
        ------
        idx_train : ndarray
            The training set indices for that split.
        idx_test : ndarray
            The testing set indices for that split.
        """

        n_samples = X.shape[0]
        n_splits = self.get_n_splits(X, y, groups)
#         X['month'], X['year'] = X.index.month, X.index.year
        X = X.reset_index()
#         x2 = X.groupby([X['year'], X['month']], as_index = False).count()
#         for i in range(n_splits-1):
#             idx_train = X.loc[(X['year']  == x2.iloc[i]['year']) & (X['month']  == x2.iloc[i]['month'])].index
#             idx_test = X.loc[(X['year']  == x2.iloc[i+1]['year']) & (X['month']  == x2.iloc[i+1]['month'])].index
#             yield (idx_train, idx_test)  
        datetimes = X[self.time_col]
        year_month = datetimes.dt.to_period('M')
        months = np.sort(year_month.unique())
        for i in range(n_splits):
            idx_train = range(n_samples)
            idx_test = range(n_samples)
            idx_train = X[year_month == months[i]].index.tolist()
            idx_test = X[year_month == months[i + 1]].index.tolist()
            yield (
                idx_train, idx_test
            )

In [17]:
end_date = '2021-01-31'
shuffle_data = True
date = pd.date_range(start='2020-01-01', end=end_date, freq='D')
n_samples = len(date)
X = pd.DataFrame(range(n_samples), index=date, columns=['val'])
y = pd.DataFrame(
    np.array([i % 2 for i in range(n_samples)]),
    index=date
)

if shuffle_data:
    X, y = shuffle(X, y, random_state=0)

X_1d = X['val']
cv = MonthlySplit()
cv_repr = "MonthlySplit(time_col='index')"

In [18]:
def test_time_split(end_date, expected_splits, shuffle_data):

    date = pd.date_range(start='2020-01-01', end=end_date, freq='D')
    n_samples = len(date)
    X = pd.DataFrame(range(n_samples), index=date, columns=['val'])
    y = pd.DataFrame(
        np.array([i % 2 for i in range(n_samples)]),
        index=date
    )

    if shuffle_data:
        X, y = shuffle(X, y, random_state=0)

    X_1d = X['val']

    cv = MonthlySplit()
    cv_repr = "MonthlySplit(time_col='index')"

    # Test if the repr works without any errors
    assert cv_repr == repr(cv)

    # Test if get_n_splits works correctly
    assert cv.get_n_splits(X, y) == expected_splits

    # Test if the cross-validator works as expected even if
    # the data is 1d
    np.testing.assert_equal(
        list(cv.split(X, y)), list(cv.split(X_1d, y))
    )

    # Test that train, test indices returned are integers and
    # data is correctly ordered
    for train, test in cv.split(X, y):
        assert np.asarray(train).dtype.kind == "i"
        assert np.asarray(test).dtype.kind == "i"

        X_train, X_test = X.iloc[train], X.iloc[test]
        y_train, y_test = y.iloc[train], y.iloc[test]
        assert X_train.index.max() < X_test.index.min()
        assert y_train.index.max() < y_test.index.min()
        assert X.index.equals(y.index)

    with pytest.raises(ValueError, match='datetime'):
        cv = MonthlySplit(time_col='val')
        next(cv.split(X, y))

In [19]:
test_time_split(end_date, 12, shuffle_data)

In [12]:
def test_time_split_on_column(end_date, shuffle_data):

    date = pd.date_range(
        start='2020-01-01 00:00', end=end_date, freq='D'
    )
    n_samples = len(date)
    X = pd.DataFrame({'val': range(n_samples), 'date': date})
    y = pd.DataFrame(
        np.array([i % 2 for i in range(n_samples)])
    )

    if shuffle_data:
        X, y = shuffle(X, y, random_state=0)

    cv = MonthlySplit(time_col='date')

    # Test that train, test indices returned are integers and
    # data is correctly ordered
    n_splits = 0
    last_time = None
    for train, test in cv.split(X, y):

        X_train, X_test = X.iloc[train], X.iloc[test]
        assert X_train['date'].max() < X_test['date'].min()
        assert X_train['date'].dt.month.nunique() == 1
        assert X_test['date'].dt.month.nunique() == 1
        assert X_train['date'].dt.year.nunique() == 1
        assert X_test['date'].dt.year.nunique() == 1
        if last_time is not None:
            assert X_test['date'].min() > last_time
        last_time = X_test['date'].max()
        n_splits += 1

    assert 'idx' not in X.columns

    assert n_splits == cv.get_n_splits(X, y)

In [13]:
test_time_split_on_column(end_date, shuffle_data)

In [27]:
X['month'], X['year'] = X.index.month, X.index.year
x2 = X.groupby([X['year'], X['month']], as_index = False).count()
idx_train_2 = X.loc[(X['year']  == x2.iloc[0]['year']) & (X['month']  == x2.iloc[0]['month'])].index