In [4]:
"""Assignment - making a sklearn estimator and cv splitter.

The goal of this assignment is to implement by yourself:

- a scikit-learn estimator for the KNearestNeighbors for classification
  tasks and check that it is working properly.
- a scikit-learn CV splitter where the splits are based on a Pandas
  DateTimeIndex.

Detailed instructions for question 1:
The nearest neighbor classifier predicts for a point X_i the target y_k of
the training sample X_k which is the closest to X_i. We measure proximity with
the Euclidean distance. The model will be evaluated with the accuracy (average
number of samples corectly classified). You need to implement the `fit`,
`predict` and `score` methods for this class. The code you write should pass
the test we implemented. You can run the tests by calling at the root of the
repo `pytest test_sklearn_questions.py`. Note that to be fully valid, a
scikit-learn estimator needs to check that the input given to `fit` and
`predict` are correct using the `check_*` functions imported in the file.
You can find more information on how they should be used in the following doc:
https://scikit-learn.org/stable/developers/develop.html#rolling-your-own-estimator.
Make sure to use them to pass `test_nearest_neighbor_check_estimator`.


Detailed instructions for question 2:
The data to split should contain the index or one column in
datatime format. Then the aim is to split the data between train and test
sets when for each pair of successive months, we learn on the first and
predict of the following. For example if you have data distributed from
november 2020 to march 2021, you have have 4 splits. The first split
will allow to learn on november data and predict on december data, the
second split to learn december and predict on january etc.

We also ask you to respect the pep8 convention: https://pep8.org. This will be
enforced with `flake8`. You can check that there is no flake8 errors by
calling `flake8` at the root of the repo.

Finally, you need to write docstrings for the methods you code and for the
class. The docstring will be checked using `pydocstyle` that you can also
call at the root of the repo.

Hints
-----
- You can use the function:

from sklearn.metrics.pairwise import pairwise_distances

to compute distances between 2 sets of samples.
"""
import numpy as np
import pandas as pd

from sklearn.base import BaseEstimator
from sklearn.base import ClassifierMixin

from sklearn.model_selection import BaseCrossValidator

from sklearn.utils.validation import check_X_y, check_is_fitted
from sklearn.utils.validation import check_array
from sklearn.utils.multiclass import check_classification_targets
from sklearn.metrics.pairwise import pairwise_distances





In [53]:


class KNearestNeighbors(BaseEstimator, ClassifierMixin):
    def __init__(self, n_neighbors=1):
        self.n_neighbors = n_neighbors

    def fit(self, X, y):
        X, y = check_X_y(X, y)
        check_classification_targets(y)
        self.X_ = X
        self.classes_, self.y_ = np.unique(y, return_inverse=True)
        self.n_features_in_ = X.shape[1]  # Ajoutez cette ligne
        return self

    def predict(self, X):
        check_is_fitted(self, ['X_', 'y_', 'n_features_in_'])
        X = check_array(X)
        dist = np.sqrt(((X[:, np.newaxis] - self.X_) ** 2).sum(axis=2))
        nearest = np.argpartition(dist, self.n_neighbors, axis=1)[:, :self.n_neighbors]
        votes = self.y_[nearest]
        y_pred = np.array([np.argmax(np.bincount(votes[i])) for i in range(votes.shape[0])])
        return self.classes_[y_pred]

    def score(self, X, y):
        y_pred = self.predict(X)
        return np.mean(y_pred == y)

from sklearn.utils.estimator_checks import check_estimator

for k in [1, 3, 5, 7]:
    check_estimator(KNearestNeighbors(n_neighbors=k))

In [58]:

def test_one_nearest_neighbor_match_sklearn(k):
    X, y = make_classification(n_samples=200, n_features=20,
                               random_state=42)
    X_train, X_test, y_train, y_test = \
        train_test_split(X, y, random_state=42)
    knn = KNeighborsClassifier(n_neighbors=k)
    y_pred_sk = knn.fit(X_train, y_train).predict(X_test)

    onn = KNearestNeighbors(k)
    y_pred_me = onn.fit(X_train, y_train).predict(X_test)
    assert_array_equal(y_pred_me, y_pred_sk)

    assert onn.score(X_test, y_test) == knn.score(X_test, y_test)

for k in [1, 3, 5, 7]:
    test_one_nearest_neighbor_match_sklearn(k)

In [91]:
import pytest
@pytest.mark.parametrize("end_date, expected_splits",
                         [('2021-01-31', 12), ('2020-12-31', 11)])
@pytest.mark.parametrize("shuffle_data", [True, False])
def test_time_split(end_date, expected_splits, shuffle_data):

    date = pd.date_range(start='2020-01-01', end=end_date, freq='D')
    n_samples = len(date)
    X = pd.DataFrame(range(n_samples), index=date, columns=['val'])
    y = pd.DataFrame(
        np.array([i % 2 for i in range(n_samples)]),
        index=date
    )

    if shuffle_data:
        X, y = shuffle(X, y, random_state=0)

    X_1d = X['val']

    cv = MonthlySplit()
    cv_repr = "MonthlySplit(time_col='index')"

    # Test if the repr works without any errors
    assert cv_repr == repr(cv)

    # Test if get_n_splits works correctly
    assert cv.get_n_splits(X, y) == expected_splits

    # Test if the cross-validator works as expected even if
    # the data is 1d
    np.testing.assert_equal(
        list(cv.split(X, y)), list(cv.split(X_1d, y))
    )

    # Test that train, test indices returned are integers and
    # data is correctly ordered
    for train, test in cv.split(X, y):
        assert np.asarray(train).dtype.kind == "i"
        assert np.asarray(test).dtype.kind == "i"

        X_train, X_test = X.iloc[train], X.iloc[test]
        y_train, y_test = y.iloc[train], y.iloc[test]
        assert X_train.index.max() < X_test.index.min()
        assert y_train.index.max() < y_test.index.min()
        assert X.index.equals(y.index)

    with pytest.raises(ValueError, match='datetime'):
        cv = MonthlySplit(time_col='val')
        next(cv.split(X, y))


In [139]:
class MonthlySplit(BaseCrossValidator):
    def __init__(self, time_col='index'):
        self.time_col = time_col

    def get_n_splits(self, X, y=None, groups=None):
        if self.time_col == 'index':
            return len(np.unique(X.index.month))
        else:
            return len(np.unique(X[self.time_col]))

    def split(self, X, y=None, groups=None):
        if self.time_col == 'index':
            unique_dates = np.unique(X.index.month)
        else:
            unique_dates = np.unique(X[self.time_col])
        for date in unique_dates:
            if self.time_col == 'index':
                test_index = X.index.month == date
            else:
                test_index = X[self.time_col] == date
            yield (~test_index, test_index)

In [140]:
test_time_split('2021-01-31', 12, True)
test_time_split('2020-12-31', 11, False)


AssertionError: 

In [None]:

@pytest.mark.parametrize("end_date", ['2021-01-31', '2020-12-31'])
@pytest.mark.parametrize("shuffle_data", [True, False])
def test_time_split_on_column(end_date, shuffle_data):

    date = pd.date_range(
        start='2020-01-01 00:00', end=end_date, freq='D'
    )
    n_samples = len(date)
    X = pd.DataFrame({'val': range(n_samples), 'date': date})
    y = pd.DataFrame(
        np.array([i % 2 for i in range(n_samples)])
    )

    if shuffle_data:
        X, y = shuffle(X, y, random_state=0)

    cv = MonthlySplit(time_col='date')

    # Test that train, test indices returned are integers and
    # data is correctly ordered
    n_splits = 0
    last_time = None
    for train, test in cv.split(X, y):

        X_train, X_test = X.iloc[train], X.iloc[test]
        assert X_train['date'].max() < X_test['date'].min()
        assert X_train['date'].dt.month.nunique() == 1
        assert X_test['date'].dt.month.nunique() == 1
        assert X_train['date'].dt.year.nunique() == 1
        assert X_test['date'].dt.year.nunique() == 1
        if last_time is not None:
            assert X_test['date'].min() > last_time
        last_time = X_test['date'].max()
        n_splits += 1

    assert 'idx' not in X.columns

    assert n_splits == cv.get_n_splits(X, y)

In [3]:
#Test retiré

@pytest.mark.parametrize("end_date", ['2021-01-31', '2020-12-31'])
@pytest.mark.parametrize("shuffle_data", [True, False])
def test_time_split_on_column(end_date, shuffle_data):

    date = pd.date_range(
        start='2020-01-01 00:00', end=end_date, freq='D'
    )
    n_samples = len(date)
    X = pd.DataFrame({'val': range(n_samples), 'date': date})
    y = pd.DataFrame(
        np.array([i % 2 for i in range(n_samples)])
    )

    if shuffle_data:
        X, y = shuffle(X, y, random_state=0)

    cv = MonthlySplit(time_col='date')

    # Test that train, test indices returned are integers and
    # data is correctly ordered
    n_splits = 0
    last_time = None
    for train, test in cv.split(X, y):

        X_train, X_test = X.iloc[train], X.iloc[test]
        assert X_train['date'].max() < X_test['date'].min()
        assert X_train['date'].dt.month.nunique() == 1
        assert X_test['date'].dt.month.nunique() == 1
        assert X_train['date'].dt.year.nunique() == 1
        assert X_test['date'].dt.year.nunique() == 1
        if last_time is not None:
            assert X_test['date'].min() > last_time
        last_time = X_test['date'].max()
        n_splits += 1

    assert 'idx' not in X.columns

    assert n_splits == cv.get_n_splits(X, y)


NameError: name 'pytest' is not defined