# Pipelines Exploration

The purpose of a pipeline is to seamlessly fix various problems in the machine learning process

* Data might not be in $R^d$
* Features need to be engineered
* Transformations need to be applied
* Hyperparameters need to be tuned

In [1]:
from dao import DataAccess

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin

In [9]:
import numpy as np
import pandas as pd

First we easily instantiate a `DataAccess` and get the data in a DataFrame

In [3]:
X = DataAccess.as_dataframe()

In [4]:
X.head(2)

Unnamed: 0_level_0,created_at,labels,predict,text,user
_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
556e0ee3d6dfbb462880f0a5,Tue Jun 02 20:16:08 +0000 2015,{'alcohol': 0},0.52605,Impatiently waiting to get our hands on the ne...,{'created_at': 'Thu Jun 12 22:14:05 +0000 2014...
556e128ad6dfbb46288111e4,Tue Jun 02 20:31:44 +0000 2015,{'alcohol': 1},0.516649,Beer fans need their @ColumbusBrewing Bodhi. I...,{'created_at': 'Mon Oct 06 21:00:38 +0000 2008...


----

# Helpers and Utilities

    class ExploringRecordJoiner()
    class ItemGetter()


### Expanding Nested Dictionary Columns

We use `ExplodingRecordJoiner` to fix the problem we see with this user column. Notice that its a dictionary...

In [5]:
X.user.head()

_id
556e0ee3d6dfbb462880f0a5    {'created_at': 'Thu Jun 12 22:14:05 +0000 2014...
556e128ad6dfbb46288111e4    {'created_at': 'Mon Oct 06 21:00:38 +0000 2008...
556e1464d6dfbb4628812330    {'created_at': 'Sun Mar 11 08:22:56 +0000 2012...
556e15f1d6dfbb4628813236    {'created_at': 'Thu Jan 14 03:03:33 +0000 2010...
556e1adcd6dfbb50e34a1ed6    {'created_at': 'Sun Oct 24 23:02:03 +0000 2010...
Name: user, dtype: object

In [6]:
class ExplodingRecordJoiner(BaseEstimator, TransformerMixin):
    """
    ExplodingRecordJoiner
    ~~~~~~~~~~~~~~~~~~~~~
    
    ExplodingRecordJoiner is a Transformer for Pipeline Objects
    
    Usage:
        The reason we use this is because of the fact that
        using DataFrams is better than using JSON parsing.
    
        However, the data coming in is nested JSON so this exploder 
        allows use to select a `col` that is one level nested dictionary
        (taken from json) and selects the `subcol` and joins
        it to the original DataFrame.
    """
    def __init__(self, **kwargs):
        self.cols = kwargs
        
    def fit(self, X, y=None):
        pass
    
    def transform(self, X, y=None):
        # Extract column of dicts then apply from_records,
        # Match indicies then select the `subcols` we want,
        # Join with existing DataFrame.
        for col, subcol in self.cols.items():
            new_cols = ["{}.{}".format(col, c) for c in subcol]
            sub = pd.DataFrame.from_records(X[col], index=X.index)[subcol]
            sub.columns = new_cols
            X = X.join(sub)
        return X
    
    def fit_transform(self, X, y=None):
        return self.transform(X)
    
    def __repr__(self):
        st = [k+"="+ str(v) for k,v in self.cols.items()]
        return "ExplodingRecordJoiner({})".format(", ".join(st))
    
    def get_params(self):
        return self.cols

#### Example of a ExplodingRecordJoiner

In [10]:
ExplodingRecordJoiner(
    labels=[
        "alcohol"
    ],
    user=[
        'created_at', 
        'favourites_count', 
        'followers_count', 
        'friends_count', 
        'statuses_count',
        'verified'
    ]
).fit_transform(X).head(4)

Unnamed: 0_level_0,created_at,labels,predict,text,user,user.created_at,user.favourites_count,user.followers_count,user.friends_count,user.statuses_count,user.verified,labels.alcohol
_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
556e0ee3d6dfbb462880f0a5,Tue Jun 02 20:16:08 +0000 2015,{'alcohol': 0},0.52605,Impatiently waiting to get our hands on the ne...,{'created_at': 'Thu Jun 12 22:14:05 +0000 2014...,Thu Jun 12 22:14:05 +0000 2014,394,407,1997,823,False,0
556e128ad6dfbb46288111e4,Tue Jun 02 20:31:44 +0000 2015,{'alcohol': 1},0.516649,Beer fans need their @ColumbusBrewing Bodhi. I...,{'created_at': 'Mon Oct 06 21:00:38 +0000 2008...,Mon Oct 06 21:00:38 +0000 2008,806,1006,960,10442,False,1
556e1464d6dfbb4628812330,Tue Jun 02 20:39:37 +0000 2015,{'alcohol': 1},0.502633,Stone Cold use to be the baddest MF in my book...,{'created_at': 'Sun Mar 11 08:22:56 +0000 2012...,Sun Mar 11 08:22:56 +0000 2012,860,703,684,89573,False,1
556e15f1d6dfbb4628813236,Tue Jun 02 20:46:14 +0000 2015,{'alcohol': 1},0.535758,Now @iamjohnoliver has to drink a Bud Light Li...,{'created_at': 'Thu Jan 14 03:03:33 +0000 2010...,Thu Jan 14 03:03:33 +0000 2010,3473,9414,1486,16435,True,1


### Selecting Text Column for Text Pipeline

In [11]:
class ItemGetter(BaseEstimator, TransformerMixin):
    """
    ItemGetter
    ~~~~~~~~~~
    
    ItemGetter is a Transformer for Pipeline objects.
    
    Usage:
        Initialize the ItemGetter with a `key` and its 
        transform call will select a column out of the 
        specified DataFrame.
    """
    
    def __init__(self, key):
        self.key = key
        
    def fit(self, X, y=None):
        pass
    
    def transform(self, X, y=None):
        return X[self.key]
    
    def fit_transform(self, X, y=None):
        return self.transform(X)

#### Example of ItemGetter

In [12]:
ItemGetter(
    key="text"
).fit_transform(X).head(4)

_id
556e0ee3d6dfbb462880f0a5    Impatiently waiting to get our hands on the ne...
556e128ad6dfbb46288111e4    Beer fans need their @ColumbusBrewing Bodhi. I...
556e1464d6dfbb4628812330    Stone Cold use to be the baddest MF in my book...
556e15f1d6dfbb4628813236    Now @iamjohnoliver has to drink a Bud Light Li...
Name: text, dtype: object

----

# Text Features

First on our list is a simple Text Pipeline that uses TfidfVectorizer and TruncatedSVD (LSI)
Also use Twokenize from [brendano/tweetmotif](https://github.com/brendano/tweetmotif).

    Brendan O'Connor, Michel Krieger, and David Ahn. TweetMotif: Exploratory Search and Topic Summarization for Twitter. ICWSM-2010.
    
Our basic pipeline is made up of TFIDF with LSI implemented by TruncatedSVD. Another Pipeline will be created using Gensim tools.

In [13]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD

from twokenize import tokenize

text_pipe = []

text_pipe.append(
    ("text", 
     ItemGetter("text")
    )
)

text_pipe.append(
    ("tfidf", 
     TfidfVectorizer(
            analyzer="char",
            ngram_range=(2,8),
            min_df = 10,
            max_df = .98
        )
    )
)

text_pipe.append(
    ("lsi",
    TruncatedSVD(
            n_components=3000
        )
    )
)

# TruncatedSVD is annoying expensive...
text_pipeline = Pipeline(text_pipe[:2])

----

# Time Pipeline

### Vectorizers

I'll describe vectorization process a bit later as i've design it in a way so that it can be easily modified
for future implementations.

### Transformers

`DateTimeTransformer` takes the `created_at` selection and converts it into a `pandas.DatetimeIndex` which is amazingly powerful.

Currently I am using the `dayofweek`, `hour`, and `hourofweek` features.

In [33]:
import pandas as pd

class Timestamp2DatetimeIndex(BaseEstimator, TransformerMixin):
    """
    Timestamp2DatetimeIndex
    ~~~~~~~~~~~~~~~~~~~~~~~
    
    This consumes a timestamp series and applies `pandas.DatetimeIndex`
    to return a DatetimeIndex object
    """
    def fit(self, X, y=None):
        pass
    
    def transform(self, X, y=None):
        return pd.DatetimeIndex(X)
    
    def fit_transform(self, X, y=None):
        return self.transform(X)

#### Example of Timestamp2DatetimeIndex

In [34]:
Timestamp2DatetimeIndex().fit_transform(X.created_at)

DatetimeIndex(['2015-06-02 20:16:08+00:00', '2015-06-02 20:31:44+00:00',
               '2015-06-02 20:39:37+00:00', '2015-06-02 20:46:14+00:00',
               '2015-06-02 21:07:13+00:00', '2015-06-02 21:22:06+00:00',
               '2015-06-02 21:34:17+00:00', '2015-06-02 21:48:26+00:00',
               '2015-06-02 23:33:23+00:00', '2015-06-02 23:37:05+00:00', 
               ...
               '2015-06-29 20:14:54+00:00', '2015-06-29 20:24:24+00:00',
               '2015-06-29 20:37:51+00:00', '2015-06-29 20:52:28+00:00',
               '2015-06-29 21:05:21+00:00', '2015-06-29 21:10:59+00:00',
               '2015-06-29 21:11:55+00:00', '2015-06-29 21:21:59+00:00',
               '2015-06-13 21:50:17+00:00', '2015-06-11 03:49:52+00:00'],
              dtype='datetime64[ns]', length=3165, freq=None, tz='UTC')

### DatetimeIndexAttr

Once something is a DatetimeIndex we need to access the relevant attributes.

In [35]:
from scipy.sparse import csc_matrix

import pandas as pd

class DatetimeIndexAttr(BaseEstimator, TransformerMixin):
    """
    DatetimeIndexAttr
    ~~~~~~~~~~~~~~~~~
    
    Accesses all of the available `pandas.DatetimeIndex` attributes when initialized.
    Also provides a new attribute called "hourofweek".
    
    Usage:
        Initialize it with kind=`attribute` that you want, for example `hour` or `dayofweek`
    """
    
    def __init__(self, kind):
        self.kind = kind
        
    def fit(self, X, y=None):
        pass
    
    def transform(self, X, y=None):
        n = len(X)
        if self.kind == "hourofweek":
            col = X.dayofweek * 24 + X.hour
        else:
            col = getattr(X, self.kind)
        return pd.DataFrame(col)
    
    def fit_transform(self, X, y=None):
        return self.transform(X)

#### Example of DatetimeIndexAttr

In [32]:
Pipeline([
        ("to_index", Timestamp2DatetimeIndex()),
        ("to_hour", DatetimeIndexAttr("hour"))
    ]).fit_transform(X.created_at).head()

Unnamed: 0,0
0,20
1,20
2,20
3,20
4,21


Everything is going to be one hot encoded, am I ashamed? A little...

In [37]:
from sklearn.preprocessing import OneHotEncoder

time_pipe = list()

time_pipe.append(
    ("get_created_at", 
     ItemGetter("created_at")
    )
)

time_pipe.append(
    ("to_datetimeindex",
    Timestamp2DatetimeIndex()
    )
)

time_pipe.append(
    ("features",
    FeatureUnion([
        ("dayofweek", 
         Pipeline(
                    [("index", DatetimeIndexAttr("dayofweek")),
                     ("onehot", OneHotEncoder())])),
        ("hour", 
         Pipeline(
                    [("index", DatetimeIndexAttr("hour")),
                     ("onehot", OneHotEncoder())])),
        ("hourofweek", 
         Pipeline(
                    [("index", DatetimeIndexAttr("hourofweek")),
                     ("onehot", OneHotEncoder())]))
        ])
    )
)

### Future Work

Notice that right now the things are all `OneHotEncoder()`. This will change later. We see that a lot of this infromation is periodic so we can probably include features like the different between Phases rather than the time itself.

This will probably function better than collapsing it into larger semantic intervals like `Afternoon` or `Sunday Afternoon`

Moreover instead of using prior densities based on our other data, we could also have that a part of the fit process...

----

# User Features

In [70]:
class UserEgoVectorizer(BaseEstimator, TransformerMixin):
    
    def __init__(self, log=True, mean=True):
        self.log = log
        self.mean = mean
        
        self.features = [
            'user.favourites_count',
            'user.followers_count', 
            'user.friends_count', 
            'user.statuses_count',
            'user.verified'
        ]

    
    def fit(self, X, y=None):
        pass
    
    def transform(self, X, y=None):
        U = X[self.features].copy()
        U["user.normality"] = U["user.friends_count"] \
                    / ((U["user.followers_count"] + U["user.friends_count"]) + 1)
        
        # all features omitting user.verified
        for feature in self.features[:-1]:
            # Adding one fixes the log(0) problem
            U[feature] = np.log(U[feature]+1)
            
        if self.mean:
            for feature in self.features[:-1]:
                U[feature+"_mean"] = U[feature] - np.mean(U[feature])
                U[feature+"_std"] = (U[feature] - np.mean(U[feature]))**2
        return U
    
    def fit_transform(self, X, y=None):
        return self.transform(X)

#### Example of UserEgo

In [71]:
exploder = ExplodingRecordJoiner(
    labels=[
        "alcohol"
    ],
    user=[
        'created_at', 
        'favourites_count', 
        'followers_count', 
        'friends_count', 
        'statuses_count',
        'verified'
    ]
)


Pipeline([
    ("exploder", exploder),
    ("user", UserEgoVectorizer())    
]).fit_transform(X).head(3)

Unnamed: 0_level_0,user.favourites_count,user.followers_count,user.friends_count,user.statuses_count,user.verified,user.normality,user.favourites_count_mean,user.favourites_count_std,user.followers_count_mean,user.followers_count_std,user.friends_count_mean,user.friends_count_std,user.statuses_count_mean,user.statuses_count_std
_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
556e0ee3d6dfbb462880f0a5,5.978886,6.011267,7.599902,6.714171,False,0.830353,-1.490446,2.221428,-0.023291,0.000542,1.696399,2.877768,-2.237507,5.006437
556e128ad6dfbb46288111e4,6.693324,6.914731,6.867974,9.253687,False,0.488053,-0.776008,0.602188,0.880172,0.774703,0.964471,0.930204,0.30201,0.09121
556e1464d6dfbb4628812330,6.758095,6.556778,6.529419,11.40282,False,0.492795,-0.711237,0.505858,0.52222,0.272714,0.625916,0.39177,2.451143,6.008102


In [78]:
class UserAgeMonths(BaseEstimator, TransformerMixin):
    """
    UserAgeMonths
    ~~~~~~~~~~~~~
    
    Calculates difference in months between user creation time and tweet creation
    """
    def __init__(self):
        self.to_index = Timestamp2DatetimeIndex()
    
    def fit(self, X, y=None):
        pass
    
    def transform(self, X, y=None):
        tweet_time = pd.to_datetime(X["created_at"])
        user_time = pd.to_datetime(X["user.created_at"])
        return (tweet_time - user_time).apply(int) // 2.62974e15
    
    def fit_transform(self, X, y=None):
        return self.transform(X)

#### UserAge Example

In [79]:
exploder = ExplodingRecordJoiner(
    labels=[
        "alcohol"
    ],
    user=[
        'created_at', 
        'favourites_count', 
        'followers_count', 
        'friends_count', 
        'statuses_count',
        'verified'
    ]
)


Pipeline([
    ("exploder", exploder),
    ("user_months", UserAgeMonths())    
]).fit_transform(X).head(3)

_id
556e0ee3d6dfbb462880f0a5    11
556e128ad6dfbb46288111e4    79
556e1464d6dfbb4628812330    38
dtype: float64

# The End

So now the next notebook will contain all the imported tools and provide a view of how the pipelines work together.