In [2]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

from pyspark.sql import SparkSession

import pandas as pd

In [3]:
spark = SparkSession.builder.master("local[1]").appName("SparkSes").getOrCreate()

22/04/06 15:37:33 WARN Utils: Your hostname, SweetCard resolves to a loopback address: 127.0.1.1; using 192.168.0.249 instead (on interface wlp2s0)
22/04/06 15:37:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/06 15:37:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/04/06 15:37:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Reading data

In [5]:
df_in = spark.read.csv('dataframe.csv', header=True, inferSchema=True)
df_in.show()

+--------+-----------------------+----------------------+---------------------+---------------------+
| user_id|month_interaction_count|week_interaction_count|day_interaction_count|cancelled_within_week|
+--------+-----------------------+----------------------+---------------------+---------------------+
|66860ae6|                     41|                     9|                    0|                    1|
|249803f8|                     25|                     9|                    2|                    0|
|32ed74cc|                     21|                     2|                    1|                    1|
|7ed76e6a|                     22|                     5|                    2|                    0|
|46c81f43|                     32|                     8|                    2|                    0|
|cf0f185e|                     26|                     4|                    0|                    1|
|568275b3|                     29|                     5|                    1|   

Vectorizing data and preparing Train and Test dataframes

In [8]:
assembler = VectorAssembler(inputCols=["month_interaction_count", 
                                       "week_interaction_count", 
                                       "day_interaction_count"],
                            outputCol="features")
df = assembler.transform(df_in)
df_train = df.select("user_id", "cancelled_within_week", "features")
df_test = df.select("user_id", "features")

Defining the model

In [9]:
lrgen = LogisticRegression(labelCol="cancelled_within_week", 
                           featuresCol="features", 
                           maxIter=10, 
                           regParam=0.1,
                           threshold=0.6)

Training the model

In [10]:
linearModelgen = lrgen.fit(df_train)

22/04/06 15:40:35 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/04/06 15:40:35 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


Making prediction on the test set

In [16]:
predictions_res = linearModelgen.transform(df_test)
predictions_out = predictions_res.select('user_id', 
                                         'rawPrediction', 
                                         'probability', 
                                         'prediction')
predictions_out.show()

+--------+--------------------+--------------------+----------+
| user_id|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+----------+
|66860ae6|[-0.6913927322203...|[0.33372332467871...|       1.0|
|249803f8|[1.99534219815040...|[0.88030717060971...|       0.0|
|32ed74cc|[-1.5519845889443...|[0.17479981666924...|       1.0|
|7ed76e6a|[0.55912506936656...|[0.63625007405661...|       0.0|
|46c81f43|[1.28102716568289...|[0.78262457201960...|       0.0|
|cf0f185e|[-1.9709630540962...|[0.12228548294156...|       1.0|
|568275b3|[-0.7384022990095...|[0.32335361572998...|       1.0|
|86a060ec|[-0.1348939766407...|[0.46632755009981...|       0.0|
|c0c07290|[-0.0229177965620...|[0.49427080161733...|       0.0|
|709dc1da|[1.30132315214311...|[0.78605758316099...|       0.0|
+--------+--------------------+--------------------+----------+



Performing data output into `file predictions.csv`

In [17]:
predictions_out.toPandas().to_csv('predictions.csv', sep=';')

### Note
The case above is a very simplified approach to buliding an ML model.

In a more realisitic case we should split the data into Train and Test parts and do some feature engineering and model hyperparameters tuning and pick the best features and hyperparameters using crossvalidation on a Train set. After that we should train the model on the whole Train set and once again check it's performance fitting the model to the Test set to make sure that there's no overfitting to the Train data. On the final step we may combine Train and Test sets and train our model on that combined dataset and use it on the new unknown data.