<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#CTR-prediction" data-toc-modified-id="CTR-prediction-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>CTR-prediction</a></span><ul class="toc-item"><li><span><a href="#Problem-Formulation" data-toc-modified-id="Problem-Formulation-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Problem Formulation</a></span></li><li><span><a href="#Dataset" data-toc-modified-id="Dataset-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Dataset</a></span><ul class="toc-item"><li><span><a href="#Dataset-construction:" data-toc-modified-id="Dataset-construction:-1.2.1"><span class="toc-item-num">1.2.1&nbsp;&nbsp;</span>Dataset construction:</a></span></li><li><span><a href="#Format:" data-toc-modified-id="Format:-1.2.2"><span class="toc-item-num">1.2.2&nbsp;&nbsp;</span>Format:</a></span></li></ul></li><li><span><a href="#Metrics" data-toc-modified-id="Metrics-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Metrics</a></span></li></ul></li><li><span><a href="#Dataset-preprocessing" data-toc-modified-id="Dataset-preprocessing-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Dataset preprocessing</a></span><ul class="toc-item"><li><span><a href="#ML-Pipelines-(Transformers,-Estimators)" data-toc-modified-id="ML-Pipelines-(Transformers,-Estimators)-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline-components" target="_blank">ML Pipelines (Transformers, Estimators)</a></a></span><ul class="toc-item"><li><span><a href="#Prepare-stages-of-pipeline" data-toc-modified-id="Prepare-stages-of-pipeline-2.1.1"><span class="toc-item-num">2.1.1&nbsp;&nbsp;</span>Prepare stages of pipeline</a></span></li><li><span><a href="#Fit-and-save-pipeline" data-toc-modified-id="Fit-and-save-pipeline-2.1.2"><span class="toc-item-num">2.1.2&nbsp;&nbsp;</span>Fit and save pipeline</a></span></li><li><span><a href="#Load-fitted-pipeline" data-toc-modified-id="Load-fitted-pipeline-2.1.3"><span class="toc-item-num">2.1.3&nbsp;&nbsp;</span>Load fitted pipeline</a></span></li><li><span><a href="#Transform-dataset-using-pipeline" data-toc-modified-id="Transform-dataset-using-pipeline-2.1.4"><span class="toc-item-num">2.1.4&nbsp;&nbsp;</span>Transform dataset using pipeline</a></span></li><li><span><a href="#Make-dataset-split" data-toc-modified-id="Make-dataset-split-2.1.5"><span class="toc-item-num">2.1.5&nbsp;&nbsp;</span>Make dataset split</a></span></li></ul></li></ul></li><li><span><a href="#Classification" data-toc-modified-id="Classification-3"><span class="toc-item-num">3&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-classification-regression.html" target="_blank">Classification</a></a></span><ul class="toc-item"><li><span><a href="#Logistic-Regression" data-toc-modified-id="Logistic-Regression-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression" target="_blank">Logistic Regression</a></a></span><ul class="toc-item"><li><span><a href="#Define-and-Train-model" data-toc-modified-id="Define-and-Train-model-3.1.1"><span class="toc-item-num">3.1.1&nbsp;&nbsp;</span>Define and Train model</a></span></li></ul></li></ul></li><li><span><a href="#Evaluation" data-toc-modified-id="Evaluation-4"><span class="toc-item-num">4&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html" target="_blank">Evaluation</a></a></span><ul class="toc-item"><li><span><a href="#Binary-classification-metrics" data-toc-modified-id="Binary-classification-metrics-4.1"><span class="toc-item-num">4.1&nbsp;&nbsp;</span><a href="https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#binary-classification" target="_blank">Binary classification metrics</a></a></span></li><li><span><a href="#Make-submission" data-toc-modified-id="Make-submission-4.2"><span class="toc-item-num">4.2&nbsp;&nbsp;</span>Make submission</a></span></li></ul></li></ul></div>

# CTR-prediction

## Problem Formulation

$\newcommand{\vecw}{{\bf w}}$
$\newcommand{\vecx}{{\bf x}}$

* Dataset: $X^N = \{ z_i \}^N_{i=1}$, где $z_i = (\vecx_i, y_i) \sim P(z), y_i \in \{0,1\}$
* Prediction: $$ \hat{y}_i = f_{\vecw}(\vecx_i) =  \mathbb{P} \left\{ y = 1 \mid \vecx_i \right\} $$
* Loss function (Binary Cross-Entropy): $$ \min\limits_{\vecw} \quad \frac{\lambda}{2}\| \vecw \|^2_2 - \frac{1}{N} \sum\limits_{i=1}^{N} y_i \log \hat{y}_i + (1-y_i) \log(1-\hat{y}_i) $$

## Dataset
$ $
<details>
  <summary>Click here to see the details</summary>

For more details see `/data/criteo/readme.txt`

### Dataset construction:


>There are 13 features taking **integer** values and 26
**categorical** features. The values of the categorical features have been hashed
onto 32 bits for anonymization purposes. 
Some features may have missing values.

> The rows are chronologically ordered by `id` column.

> The test set corresponds to events on the day following the training period. 
The first column (`label`) has been removed.


### Format:

> The columns are comma separeted with the following schema:
`<label>,<integer feature 1>, ... <integer feature 13>,<categorical feature 1>, ... <categorical feature 26>,<id>`

> When a value is missing, the field is "". There is no `label` field in the test set.

</details>
    
## Metrics

The evaluation metrics for this task are
* ROC AUC
* LogLoss
* [Normalized Entropy](https://quinonero.net/Publications/predicting-clicks-facebook.pdf)

In [1]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import shutil
import glob
import pickle
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

import pyspark
import pyspark.sql.functions as F
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import FloatType

from pyspark.ml import PipelineModel, Pipeline
from pyspark.ml.feature import  MinMaxScaler, StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import LogisticRegression

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "6g") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [2]:
DATA_PATH = '/workspace/data/criteo'

TRAIN_PATH = os.path.join(DATA_PATH, 'train.csv')
TEST_PATH  = os.path.join(DATA_PATH, 'test.csv')

Lets begin our introduction to Spark [MLlib](https://spark.apache.org/docs/latest/ml-guide.html)

---
# Dataset preprocessing

Before we can train any prediction model on our dataset we need to conver each row into real-valued features vector ($\vecx \in \mathbb{R}^n$).

Spark MLlib provides easy to use tools for preprocessing raw features and turning them into suitable format.

In [3]:
df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + TRAIN_PATH)

In [4]:
df = df.sample(False, 0.5)

In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (

In [6]:
pandas_df = df.limit(5).toPandas()

pandas_df.loc[:, :'_c13']

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13
0,1,0.0,1,20,16,1548.0,93.0,42,32,912,0.0,15,1.0,16
1,0,8.0,0,15,20,115.0,24.0,8,23,24,2.0,2,,20
2,0,,5,30,4,,,0,5,4,,0,,5
3,0,,0,17,3,19811.0,,0,3,54,,0,,3
4,0,0.0,18,15,9,4494.0,,0,9,8,0.0,0,,9


In [7]:
pandas_df.loc[:, '_c14':'_c26']

Unnamed: 0,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21,_c22,_c23,_c24,_c25,_c26
0,8cf07265,942f9a8d,a8e40bcf,0365276a,25c83c98,7e0ccccf,3f4ec687,1f89b562,a73ee510,726f00fd,c4adf918,27c604a6,85dbe138
1,5a9ed9b0,c66fca21,78171040,373c404a,25c83c98,,8ff6f5af,0b153874,a73ee510,5ba575e7,b5a9f90e,6766a7f0,949ea585
2,68fd1e64,207b2d81,74e1a23a,9a6888fb,25c83c98,7e0ccccf,d356c7e6,5b392875,7cc72ec2,3b08e48b,727af3e2,fb8fab62,49fe3d4e
3,05db9164,f0cf0024,6f67f7e5,41274cd7,25c83c98,fbad5c96,9b6a4cc9,0b153874,a73ee510,a5aa06c8,8e3de34d,623049e6,b50e2ed0
4,05db9164,2c16a946,0d427480,1b69e68d,25c83c98,7e0ccccf,ade953a9,0b153874,a73ee510,4072f40f,29e4ad33,6be9ae06,80467802


In [8]:
pandas_df.loc[:, '_c27':'_c39']

Unnamed: 0,_c27,_c28,_c29,_c30,_c31,_c32,_c33,_c34,_c35,_c36,_c37,_c38,_c39
0,07d13a8f,a8e962af,c449f783,27c07bd6,1f868fdd,21ddcdc9,a458ea53,7eee76d1,,32c7478e,9af06ad9,9d93af03,cdfe5ab7
1,1adce6ef,8736735c,59974c9c,8efede7f,1304f63b,21ddcdc9,b1252a9d,07b2853e,,32c7478e,94bde4f2,010f6491,09b76f8d
2,b28479f6,231f3923,c6b1e1b2,2005abd1,25935396,21ddcdc9,5840adea,99c09e97,,be7c41b4,335a6a1e,001f3601,8d8eb391
3,b28479f6,e6c5b5cd,c92f3b61,1e88c74f,b04e4670,21ddcdc9,5840adea,60f6221e,,32c7478e,43f13e8b,ea9a246c,731c3655
4,b28479f6,3628a186,acfad74a,07c540c4,e4ca448c,,,f973405d,,3a171ecb,9117a34a,,


Keeping only first two categorical features for simplicity

In [9]:
num_columns = ['_c{}'.format(i) for i in range(1, 14)]
cat_columns = ['_c{}'.format(i) for i in range(14, 40)][:2]
len(num_columns), len(cat_columns)

(13, 2)

In [10]:
df = df.fillna(0, subset=num_columns) # TODO: try different default value

In [11]:
df.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c31,_c32,_c33,_c34,_c35,_c36,_c37,_c38,_c39,id
0,1,0,1,20,16,1548,93,42,32,912,...,1f868fdd,21ddcdc9,a458ea53,7eee76d1,,32c7478e,9af06ad9,9d93af03,cdfe5ab7,26
1,0,8,0,15,20,115,24,8,23,24,...,1304f63b,21ddcdc9,b1252a9d,07b2853e,,32c7478e,94bde4f2,010f6491,09b76f8d,39
2,0,0,5,30,4,0,0,0,5,4,...,25935396,21ddcdc9,5840adea,99c09e97,,be7c41b4,335a6a1e,001f3601,8d8eb391,108
3,0,0,0,17,3,19811,0,0,3,54,...,b04e4670,21ddcdc9,5840adea,60f6221e,,32c7478e,43f13e8b,ea9a246c,731c3655,135
4,0,0,18,15,9,4494,0,0,9,8,...,e4ca448c,,,f973405d,,3a171ecb,9117a34a,,,175


## [ML Pipelines (Transformers, Estimators)](https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline-components)


MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow.

* `Transformer`: A Transformer is an algorithm that can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.


* `Estimator`: An Estimator is an algorithm that can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.


* `Pipeline`: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

---
Basically speaking `transformer` is an instance of class that implements `transform` method, and both `estimator` and `pipeline` implements `transform` and `fit` methods.

---

### Prepare stages of pipeline

We might benefit from using `StringIndexer, OneHotEncoderEstimator, VectorAssembler` (see [doc](https://spark.apache.org/docs/latest/ml-features) for details) 

In [12]:
# in proccess
columns = cat_columns[:2]
columns_indexed = [col + '_indexed' for col in columns]
columns_vectorized = [col + '_vec' for col in columns]

vector_assembler = VectorAssembler(inputCols=num_columns, outputCol="num_features")
string_indexers = [StringIndexer(inputCol=col, outputCol=col + '_indexed', handleInvalid="keep") for col, col_ind in zip(columns, columns_indexed)]
one_hot_encoder = OneHotEncoderEstimator(inputCols=columns_indexed, outputCols=columns_vectorized)
scaler = MinMaxScaler(inputCol="num_features", outputCol="scaled_features")
final_assembler = VectorAssembler(inputCols=['scaled_features'] + columns_vectorized, outputCol="features")

pipeline = Pipeline(stages=string_indexers + [one_hot_encoder] + [vector_assembler] + [scaler] + [final_assembler])

### Fit and save pipeline

In [13]:
pipeline_model = pipeline.fit(df)

PIPELINE_MODEL_PATH = "transforming_pipeline"
if not os.path.exists(PIPELINE_MODEL_PATH):
    pipeline_model = pipeline.fit(df)
    pipeline_model.save(PIPELINE_MODEL_PATH)

### Load fitted pipeline

In [14]:
pipeline_model = PipelineModel.load(PIPELINE_MODEL_PATH)

### Transform dataset using pipeline

In [15]:
transformed_df = pipeline_model.transform(df).select('_c0', 'features', 'id')
transformed_df.limit(5).toPandas()

Unnamed: 0,_c0,features,id
0,1,"(0.0, 0.00013594344752582924, 0.00030518043793...",26
1,0,"(0.0013852813852813853, 9.062896501721951e-05,...",39
2,0,"(0.0, 0.0003172013775602683, 0.000457770656900...",108
3,0,"(0.0, 9.062896501721951e-05, 0.000259403372243...",135
4,0,"(0.0, 0.0009062896501721951, 0.000228885328450...",175


### Make dataset split

Spark provides [randomSplit](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) method.

It is not the best choice in our task since we have chronological order in data.

We need to implement our own split function which will split the data in parts with respect to chronological order.

In [16]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import lit

def split_by_col(df, split_col, part_fractions):
    """
    df - DataFrame
    split_col - total order column
    part_fractions - fractions of the resulting parts
    """
    
    window = Window.orderBy(split_col).partitionBy('const')
    ranks = df \
        .select(split_col) \
        .distinct() \
        .withColumn('const', F.lit(0)) \
        .withColumn('percent_rank', F.percent_rank().over(window)) \
        .select(split_col, 'percent_rank')

    df = df.join(ranks, split_col)
    
    parts = []
    s = 0
    for part_fraction in part_fractions:
        l = s
        r = s + part_fraction
        part = df \
            .filter((l <= F.col('percent_rank')) & (F.col('percent_rank') < r)) \
            .drop('percent_rank')
        
        parts.append(part)
        s = r
        
    return parts

In [17]:
train_df, val_df, test_df = split_by_col(df, 'id', [0.8, 0.1, 0.1])

In [18]:
N = df.count()
N

1834224

In [19]:
train_df.count() / N, val_df.count() / N, test_df.count() / N

(0.7999998909620635, 0.09999978192412705, 0.09999978192412705)

---
# [Classification](https://spark.apache.org/docs/latest/ml-classification-regression.html)

## [Logistic Regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression)

### Define and Train model

In [20]:
%%time

# Regressor for raw df
class LogisticRegressor:
    def __init__(self):
        self.pipeline_model = PipelineModel.load(PIPELINE_MODEL_PATH)
        self.linear_model = LogisticRegression(featuresCol = 'features', labelCol = '_c0', maxIter=10)
        self.num_columns = num_columns = ['_c{}'.format(i) for i in range(1, 14)]
    
    def fit(self, train_df):
        train_df = self.__prepare_df(train_df).select('_c0', 'features', 'id')
        self.linear_model = self.linear_model.fit(train_df)
        
    def predict(self, df, keep_true_labels=False):
        df = self.__prepare_df(df)
        df = df.select('_c0', 'features', 'id') if keep_true_labels else df.select('features', 'id')
    
        predictions = self.linear_model.transform(df)
        get_second_element = F.udf(lambda v:float(v[1]),FloatType())
        
        return predictions \
          .withColumn('proba', get_second_element(F.col('probability')))
        
    def __prepare_df(self, df):
        df = df.fillna(0, subset=self.num_columns) 
        return self.pipeline_model.transform(df)
        
        
regressor = LogisticRegressor()
regressor.fit(train_df)

CPU times: user 85.1 ms, sys: 11.7 ms, total: 96.8 ms
Wall time: 49.9 s


In [21]:
regressor.predict(val_df).first()

Row(features=SparseVector(1941, {0: 0.0043, 1: 0.0001, 2: 0.0004, 3: 0.0273, 4: 0.0, 5: 0.0002, 6: 0.0097, 7: 0.0026, 8: 0.0049, 9: 0.2222, 10: 0.0321, 12: 0.0026, 19: 1.0, 1429: 1.0}), id=455266598959, rawPrediction=DenseVector([-0.3922, 0.3922]), probability=DenseVector([0.4032, 0.5968]), prediction=1.0, proba=0.5968092083930969)

---
# [Evaluation](https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html)

## [Binary classification metrics](https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html#binary-classification)

* ROC AUC
* LogLoss
* Normalized Entropy

In [27]:
from pyspark.sql.types import *
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Returns RDD[(proba, true_label)]
def get_prediction_labels(regressor, df):
    predictions = regressor.predict(df, keep_true_labels=True).select('id', 'proba', '_c0')
    prediction_labels = predictions.rdd.map(lambda row: (float(row.proba), float(row._c0)))
    return prediction_labels


def rocauc(regressor, df):
    prediction_labels = get_prediction_labels(regressor, df)
    return BinaryClassificationMetrics(prediction_labels).areaUnderROC


def loss_part(p, true_label):        
    return - (true_label * np.log(p) + (1 - true_label) * np.log(1 - p))

def logloss(regressor, df):
    prediction_labels = get_prediction_labels(regressor, df)
    return prediction_labels.map(lambda p: loss_part(p[0], p[1])).mean()
    
    
def ne(regressor, df):
    prediction_labels = get_prediction_labels(regressor, df)
    true_proba = prediction_labels.map(lambda p: p[1]).mean()
    random_log_loss = - (true_proba * np.log(true_proba) + (1 - true_proba) * np.log(1 - true_proba))  
    return logloss(regressor, df) / random_log_loss

In [30]:
rocauc(regressor, val_df)

0.7030486326003856

In [31]:
ne(regressor, val_df)

0.9187740459664768

In [32]:
rocauc(regressor, test_df)

0.7029653856425887

In [33]:
ne(regressor, test_df)

0.9188876197542192

## Make submission

Join the [competition](https://www.kaggle.com/c/mlbd-20-ctr-prediction-1) and make a submission

In [34]:
train_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + TRAIN_PATH)

test_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + TEST_PATH)

In [35]:
train_df.limit(5).toPandas()

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c31,_c32,_c33,_c34,_c35,_c36,_c37,_c38,_c39,id
0,1,0,-1,,,1465,0,17,0,4,...,e5f8f18f,,,f3ddd519,,32c7478e,b34f3128,,,12
1,1,0,1,20.0,16.0,1548,93,42,32,912,...,1f868fdd,21ddcdc9,a458ea53,7eee76d1,,32c7478e,9af06ad9,9d93af03,cdfe5ab7,26
2,0,8,0,15.0,20.0,115,24,8,23,24,...,1304f63b,21ddcdc9,b1252a9d,07b2853e,,32c7478e,94bde4f2,010f6491,09b76f8d,39
3,1,88,319,,4.0,5,4,89,40,88,...,bbf70d82,,,16e2e3b3,,32c7478e,d859b4dd,,,41
4,0,0,53,,10.0,6550,98,34,11,349,...,fa0643ee,21ddcdc9,b1252a9d,0094bc78,,32c7478e,29ece3ed,001f3601,402185f3,85


In [36]:
test_df.limit(5).toPandas()

Unnamed: 0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,...,_c31,_c32,_c33,_c34,_c35,_c36,_c37,_c38,_c39,id
0,,19,2,4.0,4576,6.0,6,5,15,,...,43de85d3,,,b64021bd,,32c7478e,f1a27f66,,,566935904713
1,,1,1,,5688,,0,2,10,,...,e7e991cb,efa3470f,a458ea53,6ef75f1d,78e2e389,32c7478e,f53ea242,cb079c2d,e9b68fcc,566935904715
2,,445,2,2.0,8579,26.0,1,2,26,,...,e5f8f18f,,,f3ddd519,ad3062eb,32c7478e,b34f3128,,,566935904727
3,0.0,172,7,1.0,2008,143.0,24,28,430,0.0,...,eef7297e,,,8ae05402,ad3062eb,423fab69,8d4a9014,,,566935904737
4,,11,4,4.0,14,,0,4,6,,...,7181ccc8,,,2265e99d,,32c7478e,5dc43b96,,,566935904741


In [37]:
%%time

regressor = LogisticRegressor()
regressor.fit(train_df)
test_predictions = regressor.predict(test_df)
test_predictions.first()

CPU times: user 123 ms, sys: 20.3 ms, total: 144 ms
Wall time: 1min 7s


Row(features=SparseVector(1941, {1: 0.001, 2: 0.0, 3: 0.0073, 4: 0.0017, 5: 0.0001, 6: 0.0006, 7: 0.001, 8: 0.0008, 10: 0.0128, 12: 0.0009, 13: 1.0, 1419: 1.0}), id=566935904713, rawPrediction=DenseVector([1.1083, -1.1083]), probability=DenseVector([0.7518, 0.2482]), prediction=0.0, proba=0.24818068742752075)

In [38]:
submission_path = os.path.join(DATA_PATH, 'submition.csv')
test_predictions.select('id', 'proba').toPandas().to_csv(submission_path, index=False)