In [1]:
import utils

load everything up in spark, nothing interesting here

In [4]:
payments = spark.read.parquet(utils.pay_path)

In [5]:
registrations = spark.read.parquet(utils.reg_path)

In [6]:
demo = spark.read.parquet(utils.demo_path)

register dataframes as views so we can do sql on them

In [11]:
payments.createOrReplaceTempView('payments')
registrations.createOrReplaceTempView('registrations')
demo.createOrReplaceTempView('demo')

In [12]:
demo.printSchema()

root
 |-- CONTACT_WID: integer (nullable = true)
 |-- SEX_MF_CD_I: string (nullable = true)
 |-- Affluence: double (nullable = true)
 |-- age_bands: string (nullable = true)
 |-- _Region: string (nullable = true)
 |-- mosaic_description: string (nullable = true)
 |-- financial_stress: string (nullable = true)
 |-- household_income: string (nullable = true)
 |-- Soft_Match_Key: integer (nullable = true)



In [13]:
payments.printSchema()

root
 |-- CONTACT_WID: integer (nullable = true)
 |-- PAYMENT_DATE: timestamp (nullable = true)
 |-- PAY_AMOUNT: double (nullable = true)
 |-- PRODUCT_CODE: string (nullable = true)
 |-- EVENT_CODE: string (nullable = true)
 |-- CATEGORY: string (nullable = true)
 |-- PRODUCT: string (nullable = true)
 |-- SUB_PRODUCT: string (nullable = true)
 |-- RESTRICTED_CAT: string (nullable = true)



In [14]:
registrations.printSchema()

root
 |-- CONTACT_WID: integer (nullable = true)
 |-- EVENT_CODE: string (nullable = true)
 |-- REGISTRATION_DATE: timestamp (nullable = true)
 |-- PRODUCT: string (nullable = true)
 |-- SUB_PRODUCT: string (nullable = true)



split dataset into halves. We are going to try to predict total payments in 2016 based on behaviour in 2015

In [28]:
payments2016 = payments.filter("PAYMENT_DATE >= '2016-01-01'")
payments2016.createOrReplaceTempView('payments2016')
payments2015 = payments.filter("PAYMENT_DATE < '2016-01-01'")
payments2015.createOrReplaceTempView('payments2015')

In [29]:
customers = payments2015.select('CONTACT_WID').distinct()
customers.createOrReplaceTempView('customers')

extract features - all things that might be predictive of future payments. So: total past payments broken by category, payment counts, average payment; also demographics

In [266]:
totals2015 = spark.sql("""
    select
        CONTACT_WID,
        avg(pay_amount) as avg_payment,
        sum(pay_amount) as total_payment,
        count(1) as payment_count,
        sum(case when category = 'Standing Orders' then pay_amount else 0 end) as standing_orders_tot,
        sum(case when category = 'Standing Orders' then 1 else 0 end) as standing_orders_count,
        sum(case when category = 'Sponsorship/Donations' then pay_amount else 0 end) as sponsorship_donations_tot,
        sum(case when category = 'Sponsorship/Donations' then 1 else 0 end) as sponsorship_donations_count,
        sum(case when category = 'Fees' then pay_amount else 0 end) as fees_tot,
        sum(case when category = 'Fees' then 1 else 0 end) as fees_count,
        sum(case when category = 'Direct Debit' then pay_amount else 0 end) as direct_debit_tot,
        sum(case when category = 'Direct Debit' then 1 else 0 end) as direct_debit_count,
        sum(case when category = 'Corporate Partnerships' then pay_amount else 0 end) as corporate_partnerships_tot,
        sum(case when category = 'Corporate Partnerships' then 1 else 0 end) as corporate_partnerships_count,
        sum(case when category = 'Volunteer Fundraising' then pay_amount else 0 end) as volunteer_fundraising_tot,
        sum(case when category = 'Volunteer Fundraising' then 1 else 0 end) as volunteer_fundraising_count,
        sum(case when category = 'Lottery' then pay_amount else 0 end) as lottery_tot,
        sum(case when category = 'Lottery' then 1 else 0 end) as lottery_count,
        sum(case when category = 'Legacy' then pay_amount else 0 end) as legacy_tot,
        sum(case when category = 'Legacy' then 1 else 0 end) as legacy_count,
        sum(case when category = 'Trading' then pay_amount else 0 end) as trading_tot,
        sum(case when category = 'Trading' then 1 else 0 end) as trading_count,
        sum(case when category = 'Admin' then pay_amount else 0 end) as admin_tot,
        sum(case when category = 'Admin' then 1 else 0 end) as admin_count,
        sum(case when category = 'Catering' then pay_amount else 0 end) as catering_tot,
        sum(case when category = 'Catering' then 1 else 0 end) as catering_count,
        sum(case when category = 'Raffle' then pay_amount else 0 end) as raffle_tot,
        sum(case when category = 'Raffle' then 1 else 0 end) as raffle_count,
        sum(case when category = 'Give As You Earn' then pay_amount else 0 end) as give_as_you_earn_tot,
        sum(case when category = 'Give As You Earn' then 1 else 0 end) as give_as_you_earn_count,
        sum(case when category = 'In Memory' then pay_amount else 0 end) as in_memory_tot,
        sum(case when category = 'In Memory' then 1 else 0 end) as in_memory_count,
        sum(case when category = 'IG' then pay_amount else 0 end) as ig_tot,
        sum(case when category = 'IG' then 1 else 0 end) as ig_count,
        sum(case when category = 'Favours' then pay_amount else 0 end) as favours_tot,
        sum(case when category = 'Favours' then 1 else 0 end) as favours_count

        
    from payments2015
    group by CONTACT_WID
""")
totals2015.createOrReplaceTempView('totals2015')
totals2016 = spark.sql("""
    select
        CONTACT_WID,
        avg(pay_amount) as avg_payment,
        sum(pay_amount) as total_payment,
        count(1) as payment_count,
        sum(case when category = 'Standing Orders' then pay_amount else 0 end) as standing_orders_tot,
        sum(case when category = 'Standing Orders' then 1 else 0 end) as standing_orders_count,
        sum(case when category = 'Sponsorship/Donations' then pay_amount else 0 end) as sponsorship_donations_tot,
        sum(case when category = 'Sponsorship/Donations' then 1 else 0 end) as sponsorship_donations_count,
        sum(case when category = 'Fees' then pay_amount else 0 end) as fees_tot,
        sum(case when category = 'Fees' then 1 else 0 end) as fees_count,
        sum(case when category = 'Direct Debit' then pay_amount else 0 end) as direct_debit_tot,
        sum(case when category = 'Direct Debit' then 1 else 0 end) as direct_debit_count,
        sum(case when category = 'Corporate Partnerships' then pay_amount else 0 end) as corporate_partnerships_tot,
        sum(case when category = 'Corporate Partnerships' then 1 else 0 end) as corporate_partnerships_count,
        sum(case when category = 'Volunteer Fundraising' then pay_amount else 0 end) as volunteer_fundraising_tot,
        sum(case when category = 'Volunteer Fundraising' then 1 else 0 end) as volunteer_fundraising_count,
        sum(case when category = 'Lottery' then pay_amount else 0 end) as lottery_tot,
        sum(case when category = 'Lottery' then 1 else 0 end) as lottery_count,
        sum(case when category = 'Legacy' then pay_amount else 0 end) as legacy_tot,
        sum(case when category = 'Legacy' then 1 else 0 end) as legacy_count,
        sum(case when category = 'Trading' then pay_amount else 0 end) as trading_tot,
        sum(case when category = 'Trading' then 1 else 0 end) as trading_count,
        sum(case when category = 'Admin' then pay_amount else 0 end) as admin_tot,
        sum(case when category = 'Admin' then 1 else 0 end) as admin_count,
        sum(case when category = 'Catering' then pay_amount else 0 end) as catering_tot,
        sum(case when category = 'Catering' then 1 else 0 end) as catering_count,
        sum(case when category = 'Raffle' then pay_amount else 0 end) as raffle_tot,
        sum(case when category = 'Raffle' then 1 else 0 end) as raffle_count,
        sum(case when category = 'Give As You Earn' then pay_amount else 0 end) as give_as_you_earn_tot,
        sum(case when category = 'Give As You Earn' then 1 else 0 end) as give_as_you_earn_count,
        sum(case when category = 'In Memory' then pay_amount else 0 end) as in_memory_tot,
        sum(case when category = 'In Memory' then 1 else 0 end) as in_memory_count,
        sum(case when category = 'IG' then pay_amount else 0 end) as ig_tot,
        sum(case when category = 'IG' then 1 else 0 end) as ig_count,
        sum(case when category = 'Favours' then pay_amount else 0 end) as favours_tot,
        sum(case when category = 'Favours' then 1 else 0 end) as favours_count

    from payments2016
    group by CONTACT_WID
""")
totals2016.createOrReplaceTempView('totals2016')

# this is the final training set
train_set = spark.sql("""
    select
        past.CONTACT_WID,
        future.total_payment as FUTURE_TOTAL,
        past.avg_payment,
        log(past.total_payment) as log_total_payment,
        past.total_payment,
        past.payment_count,
        past.standing_orders_tot,
        past.standing_orders_count,
        past.sponsorship_donations_tot,
        past.sponsorship_donations_count,
        past.fees_tot,
        past.fees_count,
        past.direct_debit_tot,
        past.direct_debit_count,
        past.corporate_partnerships_tot,
        past.corporate_partnerships_count,
        past.volunteer_fundraising_tot,
        past.volunteer_fundraising_count,
        past.lottery_tot,
        past.lottery_count,
        past.legacy_tot,
        past.legacy_count,
        past.trading_tot,
        past.trading_count,
        past.admin_tot,
        past.admin_count,
        past.catering_tot,
        past.catering_count,
        past.raffle_tot,
        past.raffle_count,
        past.give_as_you_earn_tot,
        past.give_as_you_earn_count,
        past.in_memory_tot,
        past.in_memory_count,
        past.ig_tot,
        past.ig_count,
        past.favours_tot,
        past.favours_count,
        future.standing_orders_tot as standing_orders_future_tot,
        future.sponsorship_donations_tot as sponsorship_donations_future_tot,
        future.fees_tot as fees_future_tot,
        future.direct_debit_tot as direct_debit_future_tot,
        future.corporate_partnerships_tot as corporate_partnerships_future_tot,
        future.volunteer_fundraising_tot as volunteer_fundraising_future_tot,
        future.lottery_tot as lottery_future_tot,
        future.legacy_tot as legacy_future_tot,
        future.trading_tot as trading_future_tot,
        future.admin_tot as admin_future_tot,
        future.catering_tot as catering_future_tot,
        future.raffle_tot as raffle_future_tot,
        future.give_as_you_earn_tot as give_as_you_earn_future_tot,
        future.in_memory_tot as in_memory_future_tot,
        future.ig_tot as ig_future_tot,
        future.favours_tot as favours_future_tot,

        SEX_MF_CD_I,
        age_bands,
        mosaic_description,
        _Region,
        financial_stress
    from totals2015 past
    inner join totals2016 future on
        past.CONTACT_WID = future.CONTACT_WID
    inner join demo on
        demo.CONTACT_WID = past.CONTACT_WID    
""")
train_set.createOrReplaceTempView('train_set')


convert from spark dataframe to pandas dataframe and shuffle the records just in case

In [267]:
train_df_raw = train_set.toPandas()
train_df_raw = train_df_raw.sample(frac=1).reset_index(drop=True)
# train_df_raw['payment_count'] = train_df_raw.payment_count + 0.0
train_df = train_df_raw.copy()

some columns are strings, so they need to be encoded before training the model

In [268]:
from sklearn.preprocessing import LabelEncoder
from collections import defaultdict
string_columns = ['SEX_MF_CD_I', 'age_bands', 'mosaic_description', '_Region', 'financial_stress']

def encode_string_cols(df, columns, encoder_dict):
    for c in columns:
        df[c] = encoder_dict[c].fit_transform(df[c])

encoders = defaultdict(LabelEncoder)
encode_string_cols(train_df, string_columns, encoders)

In [357]:
# this is how many training examples we have
len(train_df)

1286382

let's use the first million for training and the rest for testing.

**We will be predicting total spend in 2016 (FUTURE_TOTAL) based on all kinds of things measured in 2015**

In [315]:
train_n = 1000000

feat_names = [
    'avg_payment',
    'total_payment',
    'log_total_payment',
    'payment_count',
    'SEX_MF_CD_I',
    'age_bands',
    'mosaic_description',
    '_Region',
    'financial_stress',
    "standing_orders_tot",
    "sponsorship_donations_tot",
    "fees_tot",
    "direct_debit_tot",
    "corporate_partnerships_tot",
    "volunteer_fundraising_tot",
    "lottery_tot",
    "legacy_tot",
    "trading_tot",
    "admin_tot",
    "catering_tot",
    "raffle_tot",
    "give_as_you_earn_tot",
    "in_memory_tot",
    "ig_tot",
    "favours_tot",
]

target_name = 'FUTURE_TOTAL'
X = train_df[feat_names]
y = train_df[target_name]

X_train = X[:train_n]
y_train = y[:train_n]

X_test = X[train_n:]
y_test = y[train_n:]


functions for benchmarking

In [358]:
from sklearn.metrics import mean_absolute_error, mean_squared_error

def model_name(model):
    return str(model).split("(")[0]

def benchmark(model):
    predictions = model.fit(X_train, y_train).predict(X_test)
    score1 = mean_squared_error(y_test, predictions)
    score2 = mean_absolute_error(y_test, predictions)
    print 'MSE:', score1, 'MAD:', score2, model_name(model)

def fbenchmark(model):
    preds = model.fit(X, y).predict(X)
    score1 = mean_squared_error(y, predictions)
    score2 = mean_absolute_error(y, predictions)
    print score1, score2, model_name(model)

In [359]:
# COMPARING A FEW MODELS
from xgboost import XGBRegressor
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.dummy import DummyRegressor
from sklearn.ensemble import RandomForestRegressor

benchmark(LinearRegression())
benchmark(XGBRegressor())
benchmark(DummyRegressor())
benchmark(RandomForestRegressor())

MSE: 11792594.3259 MAD: 262.11615679 LinearRegression
MSE: 11916356.2453 MAD: 82.8346297361 XGBRegressor
MSE: 14834967.977 MAD: 106.064352106 DummyRegressor
MSE: 13334594.2389 MAD: 74.351193782 RandomForestRegressor


<img src="http://s2.quickmeme.com/img/e2/e2494b2dae9d4c537141a47eaad0913f6732f4839e20513804ad74f4fa7d28be.jpg">

### good news everyone!
MAD and MSE scores achieved by the best model (XGB) are significantly better than flat prediction (Dummy Regressor)


Let's now train the model on full data and make an on-line predictor that we can test on handcrafted examples

In [None]:
xgb = XGBRegressor().fit(X, y)

In [360]:
import pandas as pd

def predict(model, 
    avg_payment=4,
    total_payment=48,
    payment_count=12,
    standing_orders_tot=0,
    sponsorship_donations_tot=0,
    fees_tot=0,
    direct_debit_tot=0,
    corporate_partnerships_tot=0,
    volunteer_fundraising_tot=0,
    lottery_tot=0,
    legacy_tot=0,
    trading_tot=0,
    admin_tot=0,
    catering_tot=0,
    raffle_tot=0,
    give_as_you_earn_tot=0,
    in_memory_tot=0,
    ig_tot=0,
    favours_tot=0,
    SEX_MF_CD_I='Female',
    age_bands='46-55',
    mosaic_description='Prestige Positions',
    _Region='SOUTH EAST',
    financial_stress='Low  '):
    
    
    feats = [
        avg_payment,
        total_payment,
        np.log(total_payment),
        payment_count,
        SEX_MF_CD_I,
        age_bands,
        mosaic_description,
        _Region,
        financial_stress,
        standing_orders_tot,
        sponsorship_donations_tot,
        fees_tot,
        direct_debit_tot,
        corporate_partnerships_tot,
        volunteer_fundraising_tot,
        lottery_tot,
        legacy_tot,
        trading_tot,
        admin_tot,
        catering_tot,
        raffle_tot,
        give_as_you_earn_tot,
        in_memory_tot,
        ig_tot,
        favours_tot]
    dataf = pd.DataFrame({
        fname: encoders[fname].transform([feat]) if fname in encoders else [feat]
        for fname, feat in zip(feat_names, feats)
    })
    dataf = dataf[feat_names]
    return model.predict(dataf)

def predict_exp(*args, **kwargs):
    return np.exp(predict(*args, **kwargs))

In [361]:
predict(xgb,     
    avg_payment=70,
    total_payment=120,
    payment_count=10,
    fees_tot=0,
    direct_debit_tot=0,
    corporate_partnerships_tot=0,
    volunteer_fundraising_tot=0,
    lottery_tot=0,
    legacy_tot=0,
    trading_tot=0,
    admin_tot=0,
    catering_tot=0,
    raffle_tot=0,
    give_as_you_earn_tot=0,
    in_memory_tot=0,
    ig_tot=0,
    favours_tot=0,
    SEX_MF_CD_I='Male',
    age_bands='18-25',
    mosaic_description='Prestige Positions',
    _Region='SOUTH EAST',
    financial_stress='Very high')[0]

95.013359

In [362]:
predict(xgb,     
    avg_payment=70,
    total_payment=200,
    payment_count=10,
    fees_tot=0,
    direct_debit_tot=0,
    corporate_partnerships_tot=0,
    volunteer_fundraising_tot=0,
    lottery_tot=0,
    legacy_tot=0,
    trading_tot=0,
    admin_tot=0,
    catering_tot=0,
    raffle_tot=0,
    give_as_you_earn_tot=0,
    in_memory_tot=0,
    ig_tot=0,
    favours_tot=0,
    SEX_MF_CD_I='Male',
    age_bands='18-25',
    mosaic_description='Prestige Positions',
    _Region='SOUTH EAST',
    financial_stress='Very high')[0]

144.20738

It works!




Now let's look at feature importances:

In [None]:
rf = RandomForestRegressor().fit(X, y)

In [366]:
sorted(zip(feat_names, rf.feature_importances_), key=lambda (a,b): b, reverse=True)

[('avg_payment', 0.27054952679363731),
 ('payment_count', 0.11109784439428642),
 ('sponsorship_donations_tot', 0.10802542802330488),
 ('log_total_payment', 0.1048016168070571),
 ('_Region', 0.099053940565889897),
 ('legacy_tot', 0.086503913004038063),
 ('mosaic_description', 0.054028815495789806),
 ('admin_tot', 0.048267545236706254),
 ('total_payment', 0.040130037301928684),
 ('age_bands', 0.026213292191812121),
 ('financial_stress', 0.019579193596506096),
 ('SEX_MF_CD_I', 0.015481441372049065),
 ('standing_orders_tot', 0.0079991940341969589),
 ('direct_debit_tot', 0.0039924532688030041),
 ('ig_tot', 0.0034206547034777656),
 ('volunteer_fundraising_tot', 0.00042901843090358325),
 ('fees_tot', 0.00029199990786417021),
 ('raffle_tot', 6.6620369921573377e-05),
 ('trading_tot', 6.0928734863128286e-05),
 ('favours_tot', 2.1661607097923877e-06),
 ('in_memory_tot', 2.1261277350116143e-06),
 ('corporate_partnerships_tot', 1.6143055171113528e-06),
 ('give_as_you_earn_tot', 4.3566778321426624e-

there

Now, let's see if we can predict spend on individual categories the same way

In [368]:
train_n = 1000000


target_name = 'direct_debit_tot'
X = train_df[feat_names]
y = train_df[target_name]

X_train = X[:train_n]
y_train = y[:train_n]

X_test = X[train_n:]
y_test = y[train_n:]


In [369]:
benchmark(LinearRegression())
benchmark(XGBRegressor())
benchmark(DummyRegressor())
benchmark(RandomForestRegressor())

MSE: 2.93272050496e-21 MAD: 4.19898324711e-11 LinearRegression
MSE: 2.37473993394 MAD: 0.102375693874 XGBRegressor
MSE: 4420.77418001 MAD: 43.7260311279 DummyRegressor
MSE: 9.77101453058 MAD: 0.0171098624901 RandomForestRegressor


Apparently we can! Linear regression explodes (I didn't one-hot encode categorical features) but XGBoost saves the day as always

<img src="https://media.giphy.com/media/6O7ybdmHHJfoc/giphy.gif">