# Model experiments
In this notebook we will try to write a couple of models to use for our experiments. Specifically, we will be implementing them so that they are `sklearn` compatible. Specifically we will be using two models to compare. Our end goal would be to actually have tools to plug the model in a `sklearn` pipeline and reliably measure its performance.

In [42]:
%matplotlib inline
# standard library
import itertools
import sys, os
import re
import glob

from collections import OrderedDict
from urllib.parse import urlparse

# pandas
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask

from dask import persist


# numpy, matplotlib, seaborn
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


# skelearn
import sklearn
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.model_selection import LeaveOneOut
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, precision_recall_fscore_support
from sklearn.neural_network import MLPClassifier
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline


# local imports
sys.path.append(os.path.join(os.getcwd(), "../src"))

# this styling is purely my preference
# less chartjunk
sns.set_context('notebook', font_scale=1.5, rc={'line.linewidth': 2.5})
sns.set(style='ticks', palette='Set2')

In [2]:
dask.set_options(temporary_directory='/home/jovyan/work/partd/')

<dask.context.set_options at 0x7fbdb6f79518>

In [3]:
# read the features 
feat_ddf = dd.read_csv('../data/final/dragnet/dom-full-*.csv')
feat_ddf.head()

Unnamed: 0,depth,sibling_pos,no_classes,id_len,class_len,no_children,text_len,descendant1_no_nodes,descendant1_no_children_avg,descendant1_id_len_avg,...,ancestor5_tag_h3,ancestor5_tag_maxamineignore,ancestor5_tag_a,ancestor5_tag_ifcommentsaccepted,ancestor5_tag_noindex,ancestor5_tag_property,ancestor5_tag_iframe,ancestor5_tag_http:,ancestor5_tag_bodyonload,content_label
0,3,12,0,0,0,0,0,0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,False
1,3,0,0,6,0,2,30,2,1.0,2.0,...,0,0,0,0,0,0,0,0,0,False
2,8,1,0,0,0,0,0,0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,False
3,8,56,0,0,0,1,0,1,1.0,0.0,...,0,0,0,0,0,0,0,0,0,False
4,9,2,1,0,4,0,254,0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,False


## Scaling
In order to use our data for different model, we should normaize it first so that it's centerend in 0 and between -1 and 1. This way wewill ensure faster convergence for most gradeint descent-absed models.

In [4]:
class StandardScaler(sklearn.preprocessing.StandardScaler):

    def fit(self, X, y=None):
        self._reset()
        to_persist = OrderedDict()

        if self.with_mean:
            mean_ = X.mean(0)
            to_persist['mean_'] = mean_
        if self.with_std:
            to_persist['scale_'] = X.std(0)

        to_persist['n_samples_seen_'] = len(X)
        
        values = persist(*to_persist.values())

        for k, v in zip(to_persist, values):
            setattr(self, k, v)
        return self

    def partial_fit(self, X, y=None):
        raise NotImplementedError()

    def transform(self, X, y=None, copy=None):
        if self.with_mean:
            X -= self.mean_
        if self.with_std:
            X /= self.scale_
        return X

    def inverse_transform(self, X, copy=None):
        if self.with_std:
            X *= self.scale_
        if self.with_mean:
            X += self.mean_
        return X

In [5]:
scaler = StandardScaler()
scaler.fit(feat_ddf.drop(['url', 'path', 'content_label'], axis=1))

StandardScaler(copy=True, with_mean=True, with_std=True)

In [22]:
scaled_ddf = scaler.transform(feat_ddf.drop(['url', 'path', 'content_label'], axis=1))
scaled_ddf.head()

Unnamed: 0,depth,sibling_pos,no_classes,id_len,class_len,no_children,text_len,descendant1_no_nodes,descendant1_no_children_avg,descendant1_id_len_avg,...,ancestor5_tag_dt,ancestor5_tag_h3,ancestor5_tag_maxamineignore,ancestor5_tag_a,ancestor5_tag_ifcommentsaccepted,ancestor5_tag_noindex,ancestor5_tag_property,ancestor5_tag_iframe,ancestor5_tag_http:,ancestor5_tag_bodyonload
0,-1.665306,0.064887,-0.58817,-0.322564,-0.502271,-0.238585,-0.096867,-0.238585,-0.185236,-0.203909,...,-0.006146,-0.002146,-0.018391,-0.006439,-0.004894,-0.001357,-0.001357,-0.001357,-0.00096,-0.008745
1,-1.665306,-0.175504,-0.58817,0.587575,-0.502271,0.239191,-0.087393,0.239191,0.250009,0.289981,...,-0.006146,-0.002146,-0.018391,-0.006439,-0.004894,-0.001357,-0.001357,-0.001357,-0.00096,-0.008745
2,-0.611792,-0.155472,-0.58817,-0.322564,-0.502271,-0.238585,-0.096867,-0.238585,-0.185236,-0.203909,...,-0.006146,-0.002146,-0.018391,-0.006439,-0.004894,-0.001357,-0.001357,-0.001357,-0.00096,-0.008745
3,-0.611792,0.946323,-0.58817,-0.322564,-0.502271,0.000303,-0.096867,0.000303,0.250009,-0.203909,...,-0.006146,-0.002146,-0.018391,-0.006439,-0.004894,-0.001357,-0.001357,-0.001357,-0.00096,-0.008745
4,-0.401089,-0.135439,0.492541,-0.322564,-0.146124,-0.238585,-0.016653,-0.238585,-0.185236,-0.203909,...,-0.006146,-0.002146,-0.018391,-0.006439,-0.004894,-0.001357,-0.001357,-0.001357,-0.00096,-0.008745


## da.learn
We will try to use da.learn to actualyl fit an `sklearn` model that supports partial fitting. IF this is sucessful, we will use it to implement our wrapper/mixin so we can add it to any sklearn model(including the one we plan to implemnt in `tf` and us it in our pipeline).

In [7]:
# we will be using a stochastic gradient descent classifier for this test
model = SGDClassifier()  
model

In [8]:
scaled_ddf.values  # we will be apssing this to the model

dask.array<values, shape=(nan, 1324), dtype=float64, chunksize=(nan, 1324)>

In [9]:
da.learn.fit(model, scaled_ddf.values, y=feat_ddf['content_label'].values, classes=[0, 1])  # train the model

SGDClassifier(alpha=0.0001, average=False, class_weight=None, epsilon=0.1,
       eta0=0.0, fit_intercept=True, l1_ratio=0.15,
       learning_rate='optimal', loss='hinge', n_iter=5, n_jobs=1,
       penalty='l2', power_t=0.5, random_state=None, shuffle=True,
       verbose=0, warm_start=False)

In [20]:
# do a couple predictios with the distributed predict
pred_ddf =  da.learn.predict(model, scaled_ddf.values)
pred_arr = pred_ddf.compute()

In [26]:
pred_arr[:10]

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

Now that we have proof of it actually working without overflowing our memory, next thing to do is to try and implement it as a mixin or decorator for models so we can use them in `Pipeline` so they will accept `dask` dataframes and defer the computation.

In [34]:
class _BigPartialFitMixin(object):
    """ Wraps a partial_fit enabled estimator for use with Dask arrays """

    _init_kwargs = []
    _fit_kwargs = []

    def __init__(self, **kwargs):
        missing = set(self._init_kwargs) - set(kwargs)

        if missing:
            raise TypeError("{} requires the keyword arguments {}".format(
                type(self), missing)
            )
        for kwarg in self._init_kwargs:
            setattr(self, kwarg, kwargs.pop(kwarg))
        super(_BigPartialFitMixin, self).__init__(**kwargs)

    @classmethod
    def _get_param_names(cls):
        # Evil hack to make sure repr, get_params work
        # We could also try rewriting __init__ once the class is created
        bases = cls.mro()
        # walk bases until you hit an sklearn class.
        for base in bases:
            if base.__module__.startswith("sklearn"):
                break

        # merge the inits
        my_init = cls._init_kwargs
        their_init = base._get_param_names()
        return my_init + their_init

    def fit(self, X, y=None, get=None):
        if get is None:
            get = dask.threaded.get

        fit_kwargs = {k: getattr(self, k) for k in self._fit_kwargs}
        result = da.learn.fit(self, X, y, get=get, **fit_kwargs)

        # Copy the learned attributes over to self
        # It should go without saying that this is *not* threadsafe
        attrs = {k: v for k, v in vars(result).items() if k.endswith('_')}
        for k, v in attrs.items():
            setattr(self, k, v)
        return self

    def predict(self, X, dtype=None):
        predict = super(_BigPartialFitMixin, self).predict
        if dtype is None:
            dtype = self._get_predict_dtype(X)
        return X.map_blocks(predict, dtype=dtype, drop_axis=1)

    def _get_predict_dtype(self, X):
        xx = np.zeros((1, X.shape[1]), dtype=X.dtype)
        return super(_BigPartialFitMixin, self).predict(xx).dtype
    
class DaskSGDClassifier(_BigPartialFitMixin, SGDClassifier):
    _init_kwargs = ['classes']
    _fit_kwargs = ['classes']

In [37]:
model = DaskSGDClassifier(classes=[0, 1])
model.fit(scaled_ddf.values, y=feat_ddf['content_label'].values)  # train the model

DaskSGDClassifier(alpha=0.0001, average=False, class_weight=None,
         classes=[0, 1], epsilon=0.1, eta0=0.0, fit_intercept=True,
         l1_ratio=0.15, learning_rate='optimal', loss='hinge', n_iter=5,
         n_jobs=1, penalty='l2', power_t=0.5, random_state=None,
         shuffle=True, verbose=0, warm_start=False)

In [41]:
model.predict(scaled_ddf.values).compute()

array([0, 0, 0, ..., 0, 0, 0])

## Conclusion
In the meantime the package `dask-ml` has been realeased which pretty much implements these desired features in a far better tested manner. We will be exploring that package in another notebook, where we will also try to implement our `sklearn`-compatible and even `dask-ml` compatible `tensorflow` estmator to use as our model.