# Import Pytest

In [8]:
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split

In [9]:
# Which versions are installed?
import sys
print("Python version")
print (sys.version)
print("\nPandas info")
print (pd.__version__)

Python version
3.7.4 (default, Aug  9 2019, 18:34:13) [MSC v.1915 64 bit (AMD64)]

Pandas info
0.25.3


In [10]:
# We use the assert statement to identify bugs in our programs
b = 3

assert b > 5, 'the variable b should have a value of at least 5 , but it is only %s' % b

AssertionError: the variable b should have a value of at least 5 , but it is only 3

In [11]:
# Here, we use the assert statement to validate that our data was successfully loaded

def get_data():
    df_1 = pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data", 
                   names=['age', 'workclass', 'fnlwgt', 'education', 'education-num', 
                   'marital-status', 'occupation', 'relationship', 'race', 'sex', 
                   'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 
                   '>=50K'], skipinitialspace=True)
    df_2 = pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test", 
                   names=['age', 'workclass', 'fnlwgt', 'education', 'education-num', 
                   'marital-status', 'occupation', 'relationship', 'race', 'sex', 
                   'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 
                   '>=50K'], skipinitialspace=True, skiprows=1)
    df_combined = df_1.append(df_2, ignore_index=True, sort=True)
    
    # Check this out -  
    assert df_combined.shape == (48842, 15), 'Expected data frame shape:%s; actual:%s' % ((48842, 16), df_combined.shape)

    X = df_combined.drop(columns=['>=50K'])
    y = df_combined['>=50K']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)
    return X_train, X_test, y_train, y_test

X_train, X_test, y_train, y_test = get_data()

In [12]:
class CharacterStripper(BaseEstimator, TransformerMixin):
    def __init__(self, character_to_strip='.'):
        self.character_to_strip = character_to_strip

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_transformed = X
        X_transformed = X_transformed.str.strip(self.character_to_strip)
        return X_transformed       

In [13]:
def test_on_normal_input():
    data = pd.Series(['a.', 'b', 'c...', 'd'])
    char_stripper = CharacterStripper('.')
    expected = pd.Series(['a', 'b', 'c', 'd'])
    actual = char_stripper.fit_transform(data)
    pd.testing.assert_series_equal(expected, actual)
    
test_on_normal_input()

In [43]:
class TestCountry2ContinentConverter():
    def test_on_normal_data_1(self):
        data = pd.DataFrame({'number':[1, 2, 3, 4, 5],
                            'country':['U.S.', 'China', 'India', 'U.S.', 'Mexico']})
        conversion_rules = {'U.S.':'America',
                           'Mexico':'America',
                           'China':'Asia',
                           'India':'Asia'}
        country_2_continent = Country2ContinentConverter(country_col='country', 
                                                     continent_col='continent', 
                                                     conversion_rules=conversion_rules)
        expected = pd.DataFrame({'number':[1, 2, 3, 4, 5],
                            'country':['U.S.', 'China', 'India', 'U.S.', 'Mexico'],
                             'continent':['America', 'Asia', 'Asia', 'America', 'America']})
        actual = country_2_continent.fit_transform(data)
        pd.testing.assert_frame_equal(expected, actual)

    def test_on_normal_data_2(self):
        pass
    
    def test_on_missing_value(self):
        data = pd.DataFrame({'number':[1, 2, 3, 4, 5],
                            'country':['U.S.', np.nan, 'India', 'U.S.', 'Mexico']})
        conversion_rules = {'U.S.':'America',
                           'Mexico':'America',
                           'China':'Asia',
                           'India':'Asia'}
        country_2_continent = Country2ContinentConverter(country_col='country', 
                                                     continent_col='continent', 
                                                     conversion_rules=conversion_rules)
        expected = pd.DataFrame({'number':[1, 2, 3, 4, 5],
                            'country':['U.S.', '', 'India', 'U.S.', 'Mexico'],
                             'continent':['America', '', 'Asia', 'America', 'America']})
        actual = country_2_continent.fit_transform(data)
        pd.testing.assert_frame_equal(expected, actual)
    
    def test_on_unknown_value(self):
        data = pd.DataFrame({'number':[1, 2, 3, 4, 5],
                            'country':['U.S.', 'Argentina', 'India', 'U.S.', 'Mexico']})
        conversion_rules = {'U.S.':'America',
                           'Mexico':'America',
                           'China':'Asia',
                           'India':'Asia'}
        country_2_continent = Country2ContinentConverter(country_col='country', 
                                                     continent_col='continent', 
                                                     conversion_rules=conversion_rules)
        expected = pd.DataFrame({'number':[1, 2, 3, 4, 5],
                            'country':['U.S.', 'Argentina', 'India', 'U.S.', 'Mexico'],
                             'continent':['America', '', 'Asia', 'America', 'America']})
        actual = country_2_continent.fit_transform(data)
        pd.testing.assert_frame_equal(expected, actual)
    
    def test_on_missing_column(self):
        pass

In [37]:
test_country_2_continent = TestCountry2ContinentConverter()
test_country_2_continent.test_on_normal_data_1()

In [41]:
test_country_2_continent.test_on_missing_value()

In [42]:
test_country_2_continent.test_on_unknown_value()

In [26]:
!pytest

platform win32 -- Python 3.7.4, pytest-5.3.5, py-1.8.1, pluggy-0.13.1
rootdir: C:\Users\tyifat\Workspace\python-for-dna\Season 2\lesson 12
collected 1 item

tests\test_custom_transformers.py F                                      [100%]

_________________ TestCharacterStripper.test_on_normal_input __________________

self = <test_custom_transformers.TestCharacterStripper object at 0x000001F10FD8C308>

    def test_on_normal_input(self):
        data = pd.Series(['a.', 'b', 'c...', 'd'])
        char_stripper = CharacterStripper(character_to_strip='.')
        expected = pd.Series(['a', 'b', 'c', 'dc'])
        actual = char_stripper.fit_transform(data)
>       pd.testing.assert_series_equal(expected, actual)

tests\test_custom_transformers.py:16: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pandas/_libs/testing.pyx:65: in pandas._libs.testing.assert_almost_equal
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _



In [3]:
# A class that changes category names according to specified rules
class CategoryReviser(BaseEstimator, TransformerMixin):
    ''' This transformer revises categories according to a dictionary with rules '''
    def __init__(self, cat_change_rules={}):
        self.cat_change_rules = cat_change_rules

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_transformed = X.copy()
        for feature in self.cat_change_rules:
            for cat in self.cat_change_rules[feature]:
                X_transformed.loc[X[feature]==cat, feature] = self.cat_change_rules[feature][cat]
        return X_transformed

In [24]:
# A class that changes category names according to specified rules
class Country2ContinentConverter(BaseEstimator, TransformerMixin):
    ''' This transformer revises categories according to a dictionary with rules '''
    def __init__(self, country_col='native-country', continent_col='continent', conversion_rules={}):
        self.country_col = country_col
        self.continent_col = continent_col
        self.conversion_rules = conversion_rules
        
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_transformed = X.copy()
        X_transformed[self.continent_col] = ""
        for country in self.conversion_rules:
            X_transformed.loc[X[self.country_col]==country, self.continent_col] = self.conversion_rules[country]
        return X_transformed

In [5]:
class TargetEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, features, reg_weight=20):
        self.features = features
        self.reg_weight = reg_weight
        self.mapping = {}

    def fit(self, X, y=None):
        X_positive_rate = y.mean()
        X_y = X.copy()
        X_y['target'] = y
        for feat in self.features:
            cat_positive_rates = {}
            for feat in self.features:
                self.mapping[feat] = {}
                positive_rates = X_y.groupby(feat)['target'].mean()
                value_counts = X_y[feat].value_counts()
                for cat in X_y[feat].unique():
                    n = value_counts.loc[cat]
                    rate = positive_rates.loc[cat]
                    regularized_rate = (rate * n + X_positive_rate * self.reg_weight) / (n + self.reg_weight)
                    self.mapping[feat][cat] = regularized_rate
        return self
    
    def transform(self, X, y=None):
        X_transformed = X.copy()
        for feat in self.mapping:
            for cat in self.mapping[feat]:
                X_transformed.loc[X[feat]==cat, feat] = self.mapping[feat][cat]
        return X_transformed

In [9]:
def get_data():
    df_1 = pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data", 
                   names=['age', 'workclass', 'fnlwgt', 'education', 'education-num', 
                   'marital-status', 'occupation', 'relationship', 'race', 'sex', 
                   'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 
                   '>=50K'], skipinitialspace=True)
    df_2 = pd.read_csv("https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test", 
                   names=['age', 'workclass', 'fnlwgt', 'education', 'education-num', 
                   'marital-status', 'occupation', 'relationship', 'race', 'sex', 
                   'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 
                   '>=50K'], skipinitialspace=True, skiprows=1)
    df_combined = df_1.append(df_2, ignore_index=True, sort=True)
    X = df_combined.drop(columns=['>=50K'])
    y = df_combined['>=50K']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)
    return X_train, X_test, y_train, y_test

In [37]:
def preprocess_y(y):
    character_stripper = CharacterStripper(character_to_strip='.')
    y_stripped = character_stripper.fit_transform(y_train)
    label_encoder = LabelEncoder()
    y_train_prepared = label_encoder.fit_transform(y_stripped)
    return y_train_prepared

In [33]:
def build_X_pipeline():
    num_features = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country',]
    cat_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'continent']

    cat_change_rules = {'marital-status':{'Married-AF-spouse':'Married', 'Married-civ-spouse':'Married'}, 
                        'workclass':{'Without-pay':'?', 'Never-worked':'?'},
                        'occupation':{'Armed-Forces':'Prof-specialty'}}
    country_2_contonent_rules = {'United-States':'N-America',
                         'Germany':'Europe',
                         'Mexico':'LatAm',
                         'Scotland':'Europe',
                         'Peru':'LatAm',
                         'Honduras':'LatAm',
                         'Ecuador':'LatAm',
                         'Poland':'Europe',
                         'China':'Asia',
                         'Nicaragua':'LatAm',
                         'India':'Asia',
                         'Philippines':'Asia',
                         'Iran':'Asia',
                         'Japan':'Asia',
                         'Vietnam':'Asia',
                         'Dominican-Republic':'LatAm',
                         'Ireland':'Europe',
                         'Laos':'Asia',
                         'Jamaica':'LatAm',
                         'England':'Europe',
                         'Hong':'Asia',
                         'Puerto-Rico':'LatAm',
                         'Cuba':'LatAm',
                         'Haiti':'LatAm',
                         'Guatemala':'LatAm',
                         'El-Salvador':'LatAm',
                         'Columbia':'LatAm',
                         'Italy':'Europe',
                         'Taiwan':'Asia',
                         'Canada':'N-America',
                         'Portugal':'Europe',
                         'Thailand':'Asia',
                         'Cambodia':'Asia',
                         'France':'Europe',
                         'Greece':'Europe',
                         'Trinadad&Tobago':'LatAm',
                         'Yugoslavia':'Europe',
                         'Hungary':'Europe',
                         'Holand-Netherlands':'Europe',
                        }

    # Create and parametatrize data transformers
    scaler = StandardScaler()
    one_hot_encoder = OneHotEncoder(handle_unknown='ignore')
    category_reviser = CategoryReviser(cat_change_rules=cat_change_rules)
    country_2_continent = Country2ContinentConverter(country_col='native-country', 
                                                     continent_col='continent', 
                                                     conversion_rules=country_2_contonent_rules)
    target_encoder = TargetEncoder(['native-country'])

    column_transformer = ColumnTransformer([('Scaler', scaler, num_features), 
                                            ('One Hot Encoder', one_hot_encoder, cat_features)])

    # Define the data transformation pipeline
    X_pipeline = Pipeline([('Category Reviser', category_reviser), 
                           ('Country to Continent', country_2_continent), 
                           ('Target Encoder', target_encoder), 
                           ('ColumnTransformer', column_transformer)])
    return X_pipeline

In [38]:
X_train, X_test, y_train, y_test = get_data()
y_train_prepared = preprocess_y(y_train)
y_test_prepared = preprocess_y(y_test)
X_pipeline = X_pipeline
X_train_prepared = X_pipeline.fit_transform(X_train, y_train_prepared)
X_test_prepared = X_pipeline.transform(X_test)

X_train_prepared.shape, y_train_prepared.shape, X_test_prepared.shape, y_test_prepared.shape

((36631, 52), (36631,), (12211, 52), (36631,))