In [2]:
from pyspark.sql import SparkSession


In [3]:
spark = SparkSession \
    .builder \
    .appName("how to read csv file") \
    .getOrCreate()

In [4]:
spark.version


'3.2.0'

In [5]:
import findspark
findspark.init()

import pyspark

In [6]:
import pandas as pd

In [7]:
df=spark.read.csv("C:\\Users\\Andrea\\Desktop\\Flightright Coding Challenge\\2. Prediction Model\\dataframe.csv", header=True)

In [8]:
df.head(10)

[Row(user_id='66860ae6', month_interaction_count='41', week_interaction_count='9', day_interaction_count='0', cancelled_within_week='1'),
 Row(user_id='249803f8', month_interaction_count='25', week_interaction_count='9', day_interaction_count='2', cancelled_within_week='0'),
 Row(user_id='32ed74cc', month_interaction_count='21', week_interaction_count='2', day_interaction_count='1', cancelled_within_week='1'),
 Row(user_id='7ed76e6a', month_interaction_count='22', week_interaction_count='5', day_interaction_count='2', cancelled_within_week='0'),
 Row(user_id='46c81f43', month_interaction_count='32', week_interaction_count='8', day_interaction_count='2', cancelled_within_week='0'),
 Row(user_id='cf0f185e', month_interaction_count='26', week_interaction_count='4', day_interaction_count='0', cancelled_within_week='1'),
 Row(user_id='568275b3', month_interaction_count='29', week_interaction_count='5', day_interaction_count='1', cancelled_within_week='1'),
 Row(user_id='86a060ec', month_int

In [9]:
df.show()

+--------+-----------------------+----------------------+---------------------+---------------------+
| user_id|month_interaction_count|week_interaction_count|day_interaction_count|cancelled_within_week|
+--------+-----------------------+----------------------+---------------------+---------------------+
|66860ae6|                     41|                     9|                    0|                    1|
|249803f8|                     25|                     9|                    2|                    0|
|32ed74cc|                     21|                     2|                    1|                    1|
|7ed76e6a|                     22|                     5|                    2|                    0|
|46c81f43|                     32|                     8|                    2|                    0|
|cf0f185e|                     26|                     4|                    0|                    1|
|568275b3|                     29|                     5|                    1|   

In [10]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- month_interaction_count: string (nullable = true)
 |-- week_interaction_count: string (nullable = true)
 |-- day_interaction_count: string (nullable = true)
 |-- cancelled_within_week: string (nullable = true)



In [11]:
from pyspark.sql.types import (StringType, BooleanType, IntegerType, FloatType, DateType)


In [12]:
coltype_map = {
    "user_id": StringType(),
    "month_interaction_count":IntegerType(),
    "week_interaction_count": IntegerType(),
    "day_interaction_count": IntegerType(),
    "cancelled_within_week": IntegerType(),
    }

In [13]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df = df.withColumn("month_interaction_count", col("month_interaction_count").cast(IntegerType())) 
df = df.withColumn("week_interaction_count", col("week_interaction_count").cast(IntegerType())) 
df = df.withColumn("day_interaction_count", col("day_interaction_count").cast(IntegerType())) 
df = df.withColumn("cancelled_within_week", col("cancelled_within_week").cast(IntegerType())) 


df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- month_interaction_count: integer (nullable = true)
 |-- week_interaction_count: integer (nullable = true)
 |-- day_interaction_count: integer (nullable = true)
 |-- cancelled_within_week: integer (nullable = true)



In [14]:
# Assemble all the features with VectorAssembler
required_features=['month_interaction_count','week_interaction_count','day_interaction_count']
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_data = assembler.transform(df)
transformed_data.show()


+--------+-----------------------+----------------------+---------------------+---------------------+---------------+
| user_id|month_interaction_count|week_interaction_count|day_interaction_count|cancelled_within_week|       features|
+--------+-----------------------+----------------------+---------------------+---------------------+---------------+
|66860ae6|                     41|                     9|                    0|                    1| [41.0,9.0,0.0]|
|249803f8|                     25|                     9|                    2|                    0| [25.0,9.0,2.0]|
|32ed74cc|                     21|                     2|                    1|                    1| [21.0,2.0,1.0]|
|7ed76e6a|                     22|                     5|                    2|                    0| [22.0,5.0,2.0]|
|46c81f43|                     32|                     8|                    2|                    0| [32.0,8.0,2.0]|
|cf0f185e|                     26|                     4

In [22]:
# Split the data
(training_data, test_data) = transformed_data.randomSplit([0.6,0.4], seed =2020)
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

Training Dataset Count: 9
Test Dataset Count: 1


In [23]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'cancelled_within_week', maxIter=10, regParam=0.1,
                        elasticNetParam=1, threshold=0.5)
lrModel = lr.fit(training_data)
lr_predictions = lrModel.transform(test_data)
lr_predictions.show()

+--------+-----------------------+----------------------+---------------------+---------------------+---------------+--------------------+--------------------+----------+
| user_id|month_interaction_count|week_interaction_count|day_interaction_count|cancelled_within_week|       features|       rawPrediction|         probability|prediction|
+--------+-----------------------+----------------------+---------------------+---------------------+---------------+--------------------+--------------------+----------+
|c0c07290|                     35|                    10|                    0|                    0|[35.0,10.0,0.0]|[-1.7914618395553...|[0.14289359118091...|       1.0|
+--------+-----------------------+----------------------+---------------------+---------------------+---------------+--------------------+--------------------+----------+



In [24]:
output=lr_predictions['user_id','rawPrediction','probability','prediction']

In [25]:
output.show()

+--------+--------------------+--------------------+----------+
| user_id|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+----------+
|c0c07290|[-1.7914618395553...|[0.14289359118091...|       1.0|
+--------+--------------------+--------------------+----------+

