Big Data for Marketing
## Final Project

by

In [0]:
# Importing functions that we will use

from pyspark.sql import functions as f
from pyspark.sql.types import StringType, ArrayType, LongType, DateType, BooleanType, StructType, StructField
from pyspark.sql.window import Window
from pyspark.sql import types as tp
from pyspark.sql.functions import col, udf, explode, collect_list, element_at, to_timestamp, mean, array_contains, when, date_format
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, OneHotEncoder, StringIndexer, RobustScaler
from pyspark.sql.functions import countDistinct, avg, stddev

First we download the dataset

In [0]:
spark.sparkContext.setLogLevel("WARN")

In [0]:
%sh

wget https://www.dropbox.com/s/guri31tlfjnjb89/bdm_data.zip --quiet
unzip -d ./bdm_data/ bdm_data.zip

Archive:  bdm_data.zip
   creating: ./bdm_data/cust_df/
  inflating: ./bdm_data/cust_df/_SUCCESS  
  inflating: ./bdm_data/cust_df/_committed_7878371389005906564  
  inflating: ./bdm_data/cust_df/_committed_5076822134895256271  
  inflating: ./bdm_data/cust_df/_committed_3520508812338357534  
  inflating: ./bdm_data/cust_df/_started_5076822134895256271  
  inflating: ./bdm_data/cust_df/part-00000-tid-5076822134895256271-fa5ebda2-8174-4bce-b5ef-fec6a0376257-23668-1-c000.csv  
   creating: ./bdm_data/orders_df/
  inflating: ./bdm_data/orders_df/_committed_1749678841354862380  
  inflating: ./bdm_data/orders_df/_committed_4173638812266093034  
  inflating: ./bdm_data/orders_df/_committed_4196442019492113282  
  inflating: ./bdm_data/orders_df/_committed_1102646042990821830  
  inflating: ./bdm_data/orders_df/_started_568900438245366187  
  inflating: ./bdm_data/orders_df/_committed_568900438245366187  
  inflating: ./bdm_data/orders_df/part-00004-tid-568900438245366187-3cd9d01f-3962-4d69-

In [0]:
dbutils.fs.mv("file:/databricks/driver/bdm_data/", "dbfs:/FileStore/bdm_data/", True)

Out[5]: True

##### Importing our Datasets

In [0]:
orders_df = \
    spark.read.format("parquet") \
    .option("inferSchema", "true") \
    .load("dbfs:/FileStore/bdm_data/orders_df")

cust_df = (
    spark.read
    .format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("dbfs:/FileStore/bdm_data/cust_df/")
)

# For faster reading time, lets define the schema of Sessions_df
sessions_df_schema = StructType([
    StructField("customer_id",StringType(),True),
    StructField(
        "session_events",
        ArrayType(
            StructType([
                StructField("datetime", StringType(),True),
                StructField("event", StringType(),True)
            ]), True)
    ),
    StructField("session_id", StringType() ,True),
    StructField("session_rank", LongType(), True)
])

sessions_df = \
    spark.read.format("json") \
    .schema(sessions_df_schema) \
    .load("dbfs:/FileStore/bdm_data/sessions_df")

### Part 0

##### Labelling our data - Create the target ( View list 3 and a Callbackpurchase event)

First lets create columns so we can get the times from events and sessions

In [0]:
sessions_df1 = (
    sessions_df
        .withColumn("datetime", f.col('session_events.datetime'))
        .withColumn('session_tmsp', f.col('session_events')[0].getItem('datetime'))
        .withColumn('month', f.date_trunc('month', f.col('session_tmsp')).astype('date') )
        .withColumn('start_sess', f.to_timestamp(f.col('datetime')[0]))
        .withColumn('end_sess', f.to_timestamp(f.array_max(f.col('datetime'))))
        .withColumn('Time_sess', f.col('end_sess').cast('short') - f.col('start_sess').cast('short')))

First we filter our dataset from the last 3 months.

Lets now create different columns to be used as variables for the predictions models, columns created:
- View_list3_and_bought
- Target (label)
- Convertion Rate
- Time_Session

In [0]:
sessions_df2 = (
      sessions_df1
     .withColumn("event", f.col('session_events.event'))
     .withColumn('month', f.date_trunc('month', f.col('session_tmsp')).astype('date').astype('string'))
     .filter(f.col('month')>='2022-02-01')
     .filter(f.col('month')<='2022-04-01')
     .withColumn('view_home_variant', f.when(f.array_contains(f.col('event'), 'ViewHomeVariant'), f.lit(1)).otherwise(f.lit(0)))
     .filter(f.col('view_home_variant')==1)
     .withColumn('viewList3', f.when(f.array_contains(f.col('event'), 'ViewList3'), f.lit(1)).otherwise(f.lit(0)))
     .filter(f.col('viewList3')==1)
     .withColumn('converted', f.when(f.array_contains(f.col('event'), 'CallbackPurchase'), f.lit(1)).otherwise(f.lit(0)))
     .groupBy(f.col('month'), 'customer_id')
     .agg(
        f.sum(f.when( (f.col('viewList3')==1), f.lit(1)).otherwise(f.lit(0)) ).alias('View_list3'),
        f.sum(f.when( (f.col('ViewList3')==1) & (f.col('converted')==1), f.lit(1)).otherwise(f.lit(0)) ).alias('View_list3_and_bought'),
        f.avg(f.col('Time_sess')).alias('AVG_Session_Time'))
     .withColumn('target', f.when(f.col('view_list3_and_bought')>0, f.lit(1)).otherwise(f.lit(0)))
     .withColumn('Convertion_rate', f.col('view_list3_and_bought') / f.col('View_List3') * 100)
     .drop('view_home_variant')
     .drop('session_tmsp')
     .drop('viewList3')
     .drop('View_List3')
    )

In [0]:
sessions_df2.count()

Out[187]: 205380

After we filter our data we got around 205380 rows.

### Part 1

#### Creating a dataframe with the features for training

In this Part we will create different variables in the different datasets and then executes joins so we can procede to the modeling phase.

In [0]:
orders_df1= (
    orders_df
    .toDF('order_id', 'session_id', 'order_timestamp', 'customer_id', 'total_value', 'discount_value', 'order_category')
    .withColumn('order_timestamp', f.col('order_timestamp').astype('timestamp'))
    .withColumn('month', f.date_trunc('month', f.col('order_timestamp')).astype('date') )
)

Since the cust_df does not have times and orders columns we will create two binary columns, Paid and Free Install. This will tell us the origin from the installation on the customer device.

In [0]:
cust_df1 = (
    cust_df
    .withColumn('Paid_Install', f.when((f.col('install_origin') == "Meta") | (f.col('install_origin') == "SMS" ), True).astype('int'))
    .withColumn('Free_Install', f.when((f.col('install_origin') == "Organic") | (f.col('install_origin') == "Email" ), True).astype('int'))
    .drop("city")
    .drop("install_origin")
)

Lets fill the null values with a 0.

In [0]:
cust_df2 = (
    cust_df1
    .fillna({"Free_Install": '0'})
    .fillna({"Paid_Install": '0'})   
)


Now lets create all the features in the Orders dataset. This will be the "principal" ds since is the one that will give us most of the features to use in the modelling. So in this one we will try to understand, the spendings of our customers, the weight of the products that they buy and at what time they do it.

In [0]:
w_lag = Window.partitionBy(f.col("customer_id")).orderBy(f.col("order_timestamp"))
time_window = Window.partitionBy('customer_id')

orders_Features = (
    orders_df1
     .filter(f.col('month')>='2022-02-01')
     .filter(f.col('month')<='2022-04-01')
    .withColumn('Monetary', f.col('total_value') - f.col('discount_value'))
    .withColumn("last_order", f.max("order_timestamp").over(w_lag))
    .withColumn("Recency", f.datediff(f.current_date(), f.col('last_order')))
    .withColumn("order_date_lag", f.lag("order_timestamp", offset=1, default=None).over(w_lag))
    .withColumn("days_since_last_order", f.datediff(f.col('order_timestamp'), f.col('order_date_lag')))
    .withColumn('order_hour', f.hour(col('order_timestamp')) )
    .withColumn('shift', f.when(f.col('order_hour') <= 10, 'breakfast').when(f.col('order_hour')<=17, 'lunch' ).otherwise('dinner'))
    .withColumn('discount_percentage', f.round(f.col('discount_value') / f.col('total_value'), 2) )
    .withColumn('discount_range', f.when(f.col('discount_percentage') <= 0.10, '0-10%').when(f.col('discount_percentage') <= 0.20, '10-20%').otherwise('30%+'))
    .withColumn('japanese', f.when(f.col('order_category') == 'Japanese', f.col('Monetary')).otherwise(0))
    .withColumn('Pizza', f.when(f.col('order_category') == 'Pizza', f.col('Monetary')).otherwise(0))
    .withColumn('Burger', f.when(f.col('order_category') == 'Burger', f.col('Monetary')).otherwise(0))
    .withColumn('Vegetarian', f.when(f.col('order_category') == 'Vegetarian', f.col('Monetary')).otherwise(0))
    .withColumn('Alc Beverages', f.when(f.col('order_category') == 'Alc Beverages', f.col('Monetary')).otherwise(0))
    .withColumn('first_order_timestamp', f.min(f.col('order_timestamp')).over(time_window))
    .withColumn('days_since_first_order', f.datediff(f.col('order_timestamp'), f.col('first_order_timestamp')))
)

In [0]:
input_data = (
    orders_Features
    .groupBy('month', 'customer_id')
    .agg(
        f.countDistinct('order_id').alias('frequency'),
        f.round(f.avg('days_since_last_order'),1).alias('avg_time_to_reorder'),
        f.round(f.sum('Monetary'),1).alias('Monetary'),
        f.max('Recency').alias('Recency'),
        f.round(f.avg('order_hour'),0).alias('AVG_Order_Hour'),
        f.round(f.avg('discount_percentage'),3).alias('AVG_Discount_Percentage'),
        f.max('discount_percentage').alias('Best_Discount_perMonth'),     
        f.round(f.sum("japanese")/ f.sum('total_value'),2).alias('japanese_ratio'),
        f.round(f.sum("Pizza")/ f.sum('Monetary'),2).alias('Pizza_ratio'),
        f.round(f.sum("Burger")/ f.sum('Monetary'),2).alias('Burger_ratio'),
        f.round(f.sum("Vegetarian")/ f.sum('Monetary'),2).alias('Vegetarian_ratio'),
        f.round(f.sum("Alc Beverages")/ f.sum('Monetary'),2).alias('Alc Beverages_ratio'),
        f.sum(f.when( (f.col('shift') == "breakfast"), f.lit(1)).otherwise(f.lit(0)) ).alias('breakfast'),
        f.sum(f.when( (f.col('shift') == "lunch"), f.lit(1)).otherwise(f.lit(0)) ).alias('lunch'),
        f.sum(f.when( (f.col('shift') == "dinner"), f.lit(1)).otherwise(f.lit(0)) ).alias('dinner'))
     .withColumn("breakfast_ratio",  f.round(f.col("breakfast")/ f.col('frequency'), 2))
     .withColumn("lunch_ratio",  f.round(f.col("lunch")/ f.col('frequency'), 2))
     .withColumn("dinner_ratio",  f.round(f.col("dinner")/ f.col('frequency'), 2))
     .withColumn("AVG_Spent", f.round(f.col('Monetary') / f.col('frequency'), 2))
    .drop('breakfast')
    .drop('dinner')
    .drop('lunch')
)

Colocamos a 0 ou deixamos a null e inputamos com a media no modeling ? Perguntar ao prof

In [0]:
input_data1 = (
    input_data
    .fillna({"avg_time_to_reorder": '0'})
)

In [0]:
input_data2 = (
    input_data1
     .join(sessions_df2, ['customer_id', 'month'], 'inner')
)

In [0]:
predictions_Data = (
    input_data2
    .join(cust_df2, ['customer_id'], 'inner')
    .drop()
)

Deixamos a variavel viewlist3_and_bough ou nao ? (redundancia com o target)
Temos de enviar ao prof para ele confirmar as variaveis, mas devemos começar ja os testes do modeling

In [0]:
predictions_Data.display()

##### After creating all the features lets split the dataframe in two, one for training ( the first two months) and the test(last month)

In [0]:
train_data = (
    predictions_Data
     .filter(f.col('month')>='2022-02-01')
     .filter(f.col('month')<'2022-04-01')
     .drop('month')
)

In [0]:
test_data = (
    predictions_Data
     .filter(f.col('month')>='2022-04-01')
     .filter(f.col('month')<'2022-05-01')
     .drop('month')
)

### Part 2 - Continuar

### Code for testing - Not finish ( missing features)

In [0]:
IDENTIFIERS = ["customer_id"]
 
CONTINUOUS_COLUMNS = [
    'frequency',
    'avg_time_to_reorder',
    'monetary',
    'recency',
    "AVG_Basket"
]
 
TARGET_COLUMN = ['target']

CATEGORICAL_COLUMNS = ['is_referee', 'device_type']

In [0]:
from pyspark.ml.feature import Imputer

imputer_estimator = Imputer(
    strategy="mean",
    inputCols=CONTINUOUS_COLUMNS,
    outputCols=CONTINUOUS_COLUMNS,
)

In [0]:
imputer_transformer = imputer_estimator.fit(train_data) #aula 6

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-1637834739458995>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mimputer_transformer[0m [0;34m=[0m [0mimputer_estimator[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mtrain_data[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py[0m in [0;36mpatched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m             [0mcall_succeeded[0m [0;34m=[0m [0;32mFalse[0m[0;34m[0m[0;34m[0m[0m
[1;32m     29[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 30[0;31m                 [0mresult[0m [0;34m=[0m [0moriginal_method[0m[0;34m([0m[0mself[0m[0;34m,[0m [0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[

In [0]:
train_data_transformed = imputer_transformer.transform(train_data) # aula 6

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

indexer_estimator = StringIndexer(inputCols=["device_type"], outputCols=["device_type_idx"])
indexer_transformer = indexer_estimator.fit(train_data_transformed)
train_data_transformed = indexer_transformer.transform(train_data_transformed) # aula 6

In [0]:
encoder_estimator = OneHotEncoder(inputCols=["device_type_idx"], outputCols=["device_type_vector"])
train_data_transformed = encoder_estimator.fit(train_data_transformed).transform(train_data_transformed) # chaining
train_data_transformed.select('customer_id', 'device_type_vector').display() # aula 6


In [0]:
from pyspark.ml.feature import VectorAssembler
 
vector_assembler_transformer = VectorAssembler(
    inputCols=CONTINUOUS_COLUMNS,
    outputCol="continuous_features"
)
 
train_data_transformed = vector_assembler_transformer.transform(train_data_transformed)
 
train_data_transformed.select('customer_id', 'continuous_features').display()

In [0]:
from pyspark.ml.stat import Correlation
import pandas as pd

correlation = Correlation.corr(
    train_data_transformed, "continuous_features"
)

correlation_array = correlation.head()[0].toArray()

correlation_pd = pd.DataFrame(
    correlation_array,
    index=CONTINUOUS_COLUMNS,
    columns=CONTINUOUS_COLUMNS,
)

print(correlation_pd)

In [0]:
from pyspark.ml.feature import RobustScaler

scaler_estimator = MinMaxScaler(inputCol="continuous_features", outputCol="continuous_scaled_features")

# Compute summary statistics by fitting the RobustScaler
scaler_transformer = scaler_estimator.fit(train_data_transformed)

# Transform each feature to have unit quantile range.
train_data_transformed = scaler_transformer.transform(train_data_transformed)

In [0]:
preml_assembler = VectorAssembler(
    inputCols=BINARY_COLUMNS
    + ["continuous_scaled_features"]
    + ["device_type_vector"],
    outputCol="features"
)

train_data_transformed = preml_assembler.transform(train_data_transformed)
train_data_transformed.select('customer_id', 'features').display()

#### Logistic Regression

In [0]:
#Aula 7
from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

impute = Imputer(inputCols=['frequency', 'avg_time_to_reorder'], outputCols=['frequency', 'avg_time_to_reorder'])
assemble = VectorAssembler(inputCols=['frequency', 'avg_time_to_reorder', 'Monetary', 'Recency','AVG_Basket'], outputCol='continuous_features')
index = StringIndexer(inputCols=['device_type'], outputCols=['device_type_idx']) 
one_hot = OneHotEncoder(inputCol='device_type_idx', outputCol='device_type_vector')
scale = MinMaxScaler(inputCol='continuous_features', outputCol='scaled_continuous_features')
final_assemble = VectorAssembler(inputCols=['scaled_continuous_features', 'device_type_vector', 'is_referee'], outputCol='features')
lr = LogisticRegression(featuresCol="features", labelCol="target", predictionCol="prediction")

pipe = Pipeline()
pipe.setStages(
    [
        impute,
        assemble,
        index,
        one_hot,
        scale,
        final_assemble,
        lr
    ]
)

Out[41]: Pipeline_0e131f08c903

In [0]:
pipe_model = pipe.fit(train_data)

In [0]:
fitted_data = pipe_model.transform(train_data)
fitted_data.display()

customer_id,frequency,avg_time_to_reorder,Monetary,Recency,AVG_Basket,View_list3,target,is_referee,device_type,install_origin,continuous_features,device_type_idx,device_type_vector,scaled_continuous_features,features,rawPrediction,probability,prediction
e7f524b3-c4a7-4d17-acc9-f147377d5187,1,24.0,63.79999923706055,85,63.79999923706055,3,0,True,Low-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 24.0, 63.79999923706055, 85.0, 63.79999923706055))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.41379310344827586, 0.0013952693631000762, 0.13793103448275862, 0.04501607826910982))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.41379310344827586, 0.0013952693631000762, 0.13793103448275862, 0.04501607826910982, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(3.228357090601703, -3.228357090601703))","Map(vectorType -> dense, length -> 2, values -> List(0.9618875697621785, 0.038112430237821515))",0.0
10bb2f5f-26c8-43b2-8ce5-5cbd91640754,23,1.3,6935.400054931641,106,301.5391328231148,6,0,True,High-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(23.0, 1.3, 6935.400054931641, 106.0, 301.5391328231148))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.44, 0.022413793103448276, 0.16445195571666818, 0.5, 0.22702430248682118))","Map(vectorType -> dense, length -> 7, values -> List(0.44, 0.022413793103448276, 0.16445195571666818, 0.5, 0.22702430248682118, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.7577356862747595, 0.7577356862747595))","Map(vectorType -> dense, length -> 2, values -> List(0.31913807497501756, 0.6808619250249824))",1.0
c0123e58-37f4-4e5b-91ea-d77c6bd3b1c8,3,7.7,179.9999961853027,107,59.99999872843424,1,0,True,High-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(3.0, 7.7, 179.99999618530273, 107.0, 59.99999872843424))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.04, 0.13275862068965516, 0.004152587353540065, 0.5172413793103449, 0.0421068755048492))","Map(vectorType -> dense, length -> 7, values -> List(0.04, 0.13275862068965516, 0.004152587353540065, 0.5172413793103449, 0.0421068755048492, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(1.0546341266169161, -1.0546341266169161))","Map(vectorType -> dense, length -> 2, values -> List(0.7416637852964459, 0.25833621470355406))",0.0
6bafd181-e051-450c-8e9e-a5004da8ad9f,6,5.7,275.0,96,45.833333333333336,2,1,False,High-End,SMS,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 5.7, 275.0, 96.0, 45.833333333333336))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.1, 0.09827586206896552, 0.0064068491994057584, 0.3275862068965517, 0.03126116587027825))","Map(vectorType -> dense, length -> 7, values -> List(0.1, 0.09827586206896552, 0.0064068491994057584, 0.3275862068965517, 0.03126116587027825, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.3137760453242102, -0.3137760453242102))","Map(vectorType -> dense, length -> 2, values -> List(0.577806682358664, 0.42219331764133605))",0.0
24ca14ba-1601-4d5b-a800-2d5d59e2346c,1,7.2137351663919,108.6999969482422,135,108.6999969482422,1,0,True,High-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 7.2137351663919, 108.69999694824219, 135.0, 108.69999694824219))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1243747442481362, 0.0024607046015786113, 1.0, 0.07939059931460442))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.1243747442481362, 0.0024607046015786113, 1.0, 0.07939059931460442, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(1.0853790072664773, -1.0853790072664773))","Map(vectorType -> dense, length -> 2, values -> List(0.7475105602019129, 0.25248943979808713))",0.0
88116c72-c703-4a30-abc0-6febf27d920b,1,7.2137351663919,102.5999984741211,134,102.5999984741211,4,0,False,High-End,Organic,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 7.2137351663919, 102.5999984741211, 134.0, 102.5999984741211))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.1243747442481362, 0.002315957304021948, 0.9827586206896551, 0.07472056509155274))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.1243747442481362, 0.002315957304021948, 0.9827586206896551, 0.07472056509155274, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.5505857104870111, -0.5505857104870111))","Map(vectorType -> dense, length -> 2, values -> List(0.634271469641366, 0.36572853035863395))",0.0
5e1c3921-8994-452c-8be5-33bdd6f94f32,2,12.0,222.9000015258789,134,111.45000076293944,2,0,False,Low-End,Organic,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 12.0, 222.9000015258789, 134.0, 111.45000076293945))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.02, 0.20689655172413793, 0.005170564630839225, 0.9827586206896551, 0.08149594605897542))","Map(vectorType -> dense, length -> 7, values -> List(0.02, 0.20689655172413793, 0.005170564630839225, 0.9827586206896551, 0.08149594605897542, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(2.0854918006240624, -2.0854918006240624))","Map(vectorType -> dense, length -> 2, values -> List(0.8894850408361961, 0.11051495916380394))",0.0
28459ec2-c117-4cea-a436-f04b4fa4eeb1,12,3.2,1228.9000091552734,104,102.40833409627278,3,1,False,Low-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(12.0, 3.2, 1228.9000091552734, 104.0, 102.40833409627278))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.22, 0.05517241379310345, 0.029042010347441347, 0.46551724137931033, 0.07457383075912809))","Map(vectorType -> dense, length -> 7, values -> List(0.22, 0.05517241379310345, 0.029042010347441347, 0.46551724137931033, 0.07457383075912809, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(1.3064986269621717, -1.3064986269621717))","Map(vectorType -> dense, length -> 2, values -> List(0.786926659649178, 0.21307334035082204))",0.0
6dda0e0a-c6a0-439d-8d4f-bf6313220f48,17,1.7,17383.000045776367,135,1022.5294144574334,2,1,False,High-End,SMS,"Map(vectorType -> dense, length -> 5, values -> List(17.0, 1.7, 17383.000045776367, 135.0, 1022.5294144574334))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.32, 0.029310344827586206, 0.4123637988168724, 1.0, 0.7789997339569207))","Map(vectorType -> dense, length -> 7, values -> List(0.32, 0.029310344827586206, 0.4123637988168724, 1.0, 0.7789997339569207, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.13323882496300898, 0.13323882496300898))","Map(vectorType -> dense, length -> 2, values -> List(0.4667394842164816, 0.5332605157835184))",1.0
614dcd4a-f40d-436d-a6b1-51dfc7ac539e,14,1.8,815.0999908447266,133,58.22142791748047,1,1,False,High-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(14.0, 1.8, 815.0999908447266, 133.0, 58.22142791748047))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.26, 0.03103448275862069, 0.01922292028808203, 0.9655172413793103, 0.04074523802403464))","Map(vectorType -> dense, length -> 7, values -> List(0.26, 0.03103448275862069, 0.01922292028808203, 0.9655172413793103, 0.04074523802403464, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.8084964778940553, 0.8084964778940553))","Map(vectorType -> dense, length -> 2, values -> List(0.30821097965367084, 0.6917890203463292))",1.0


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
evaluator = BinaryClassificationEvaluator(
    labelCol="target",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC",
)
 
metric = evaluator.evaluate(fitted_data)
print(f"Area under ROC = {metric} ")

Area under ROC = 0.7757828488221965 


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.2, 0.1, 0.01])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .addGrid(lr.aggregationDepth, [2.0, 4.0, 6.0])
    .addGrid(lr.maxIter, [50.0, 100.0, 150.0])
    .build()
)

In [0]:
print(lr.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: target)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for discret

In [0]:
!pip install mlflow --quiet

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
dbutils.fs.put("file:///root/.databrickscfg","[DEFAULT]\nhost=https://community.cloud.databricks.com\ntoken = "+token,overwrite=True)

Wrote 98 bytes.
Out[55]: True

In [0]:
from pyspark.ml.tuning import CrossValidator 
import mlflow
from mlflow import spark

mlflow.pyspark.ml.autolog()
 
mlflow.start_run(nested = True)
cv = CrossValidator(
    estimator=pipe,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=4
)
 
cv_model = cv.fit(train_data)

In [0]:
mlflow.spark.log_model(cv_model.bestModel, "model-file")# logs model as artifacts
mlflow.end_run()

In [0]:
print(cv_model.avgMetrics)

In [0]:
fitted_test_data = best_model.transform(test_data)

In [0]:
fitted_test_data = best_model.transform(test_data)

train_metric = evaluator.evaluate(fitted_data)
test_metric = evaluator.evaluate(fitted_test_data)

print(f"Area under ROC on TRAIN= {train_metric}")
print(f"Area under ROC on TEST= {test_metric}")

#### Decision tree

In [0]:
from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

impute = Imputer(inputCols=['frequency', 'avg_time_to_reorder'], outputCols=['frequency', 'avg_time_to_reorder'])
assemble = VectorAssembler(inputCols=['frequency', 'avg_time_to_reorder', 'Monetary', 'Recency','AVG_Basket'], outputCol='continuous_features')
index = StringIndexer(inputCols=['device_type'], outputCols=['device_type_idx']) 
one_hot = OneHotEncoder(inputCol='device_type_idx', outputCol='device_type_vector')
scale = MinMaxScaler(inputCol='continuous_features', outputCol='scaled_continuous_features')
final_assemble = VectorAssembler(inputCols=['scaled_continuous_features', 'device_type_vector', 'is_referee'], outputCol='features')
Dt = DecisionTreeClassifier (featuresCol="features", labelCol="target", predictionCol="prediction")

pipe1 = Pipeline()
pipe1.setStages(
    [
        impute,
        assemble,
        index,
        one_hot,
        scale,
        final_assemble,
        lr
    ]
)

Out[63]: Pipeline_2a5b2b0afb84

In [0]:
pipe1_model = pipe1.fit(train_data)



In [0]:
fitted_data2 = pipe1_model.transform(train_data)
fitted_data2.display()

customer_id,frequency,avg_time_to_reorder,Monetary,Recency,AVG_Basket,View_list3,target,is_referee,device_type,install_origin,continuous_features,device_type_idx,device_type_vector,scaled_continuous_features,features,rawPrediction,probability,prediction
009a901a-4079-4bde-9f58-833de2edce51,2,20.0,123.89999771118164,105,61.94999885559082,3,0,False,Low-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 20.0, 123.89999771118164, 105.0, 61.94999885559082))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.02, 0.3448275862068966, 0.002821386500537891, 0.48275862068965514, 0.0435997557682477))","Map(vectorType -> dense, length -> 7, values -> List(0.02, 0.3448275862068966, 0.002821386500537891, 0.48275862068965514, 0.0435997557682477, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(2.345663355258291, -2.345663355258291))","Map(vectorType -> dense, length -> 2, values -> List(0.9125889101129154, 0.08741108988708457))",0.0
00acd007-767b-427a-a09a-202a27e450d6,4,6.3,243.5000038146973,97,60.87500095367432,1,0,False,Low-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(4.0, 6.3, 243.50000381469727, 97.0, 60.875000953674316))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.06, 0.10862068965517241, 0.005659383549994309, 0.3448275862068966, 0.04277675933423922))","Map(vectorType -> dense, length -> 7, values -> List(0.06, 0.10862068965517241, 0.005659383549994309, 0.3448275862068966, 0.04277675933423922, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(2.069335907983689, -2.069335907983689))","Map(vectorType -> dense, length -> 2, values -> List(0.8878868722437122, 0.11211312775628779))",0.0
021cc105-cf6c-40c0-bae9-b7694764dd6c,16,1.7,18442.09992980957,135,1152.631245613098,3,0,False,High-End,SMS,"Map(vectorType -> dense, length -> 5, values -> List(16.0, 1.7, 18442.09992980957, 135.0, 1152.6312456130981))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.3, 0.029310344827586206, 0.4374952552765348, 1.0, 0.8786030382128596))","Map(vectorType -> dense, length -> 7, values -> List(0.3, 0.029310344827586206, 0.4374952552765348, 1.0, 0.8786030382128596, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.2289211496149388, -0.2289211496149388))","Map(vectorType -> dense, length -> 2, values -> List(0.556981661322067, 0.44301833867793305))",0.0
032c0db8-790c-4c49-9384-bc32606348f5,1,7.21373516639187,123.8000030517578,115,123.8000030517578,6,0,False,High-End,Organic,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 7.21373516639187, 123.80000305175781, 115.0, 123.80000305175781))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.12437474424813569, 0.0028190137201539118, 0.6551724137931034, 0.0909508555295648))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.12437474424813569, 0.0028190137201539118, 0.6551724137931034, 0.0909508555295648, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.8144304485135669, -0.8144304485135669))","Map(vectorType -> dense, length -> 2, values -> List(0.6930528016114085, 0.3069471983885915))",0.0
0342080c-46f5-48d3-b633-b8c9be9e9af8,3,14.7,303.8000030517578,100,101.26666768391928,4,0,True,Low-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(3.0, 14.7, 303.8000030517578, 100.0, 101.26666768391927))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.04, 0.253448275862069, 0.00709024651975775, 0.39655172413793105, 0.07369979427541114))","Map(vectorType -> dense, length -> 7, values -> List(0.04, 0.253448275862069, 0.00709024651975775, 0.39655172413793105, 0.07369979427541114, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(2.8768736475733157, -2.8768736475733157))","Map(vectorType -> dense, length -> 2, values -> List(0.9466913065186333, 0.05330869348136669))",0.0
03433686-5f00-42e9-b6c3-2241632fa5bf,9,2.6,844.9000091552734,132,93.87777879503038,3,0,True,High-End,Organic,"Map(vectorType -> dense, length -> 5, values -> List(9.0, 2.6, 844.9000091552734, 132.0, 93.87777879503038))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.16, 0.04482758620689655, 0.019930047041619825, 0.9482758620689655, 0.06804301187983698))","Map(vectorType -> dense, length -> 7, values -> List(0.16, 0.04482758620689655, 0.019930047041619825, 0.9482758620689655, 0.06804301187983698, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(0.2587356582836242, -0.2587356582836242))","Map(vectorType -> dense, length -> 2, values -> List(0.5643254632440138, 0.4356745367559862))",0.0
038962ab-c0bc-49f5-b4b0-8d56dcb2fd94,4,7.0,241.3000030517578,100,60.32500076293945,4,0,False,Low-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(4.0, 7.0, 241.3000030517578, 100.0, 60.32500076293945))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.06, 0.12068965517241378, 0.005607179575450862, 0.39655172413793105, 0.042355690423433165))","Map(vectorType -> dense, length -> 7, values -> List(0.06, 0.12068965517241378, 0.005607179575450862, 0.39655172413793105, 0.042355690423433165, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(2.045569056011453, -2.045569056011453))","Map(vectorType -> dense, length -> 2, values -> List(0.8854991302538325, 0.11450086974616747))",0.0
03b8085d-9b06-48e6-bece-57ca8152c5e4,2,2.0,141.0,111,70.5,1,0,True,Low-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 2.0, 141.0, 111.0, 70.5))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.02, 0.034482758620689655, 0.0032271536708117893, 0.5862068965517241, 0.05014546198783409))","Map(vectorType -> dense, length -> 7, values -> List(0.02, 0.034482758620689655, 0.0032271536708117893, 0.5862068965517241, 0.05014546198783409, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(2.601911606948074, -2.601911606948074))","Map(vectorType -> dense, length -> 2, values -> List(0.9309845061441653, 0.06901549385583472))",0.0
0421e49d-ab66-4aea-9931-45ccdecd144c,9,3.7,352.9000015258789,99,39.21111128065321,1,1,False,High-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(9.0, 3.7, 352.9000015258789, 99.0, 39.21111128065321))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.16, 0.06379310344827586, 0.008255343874997554, 0.3793103448275862, 0.026191327943290956))","Map(vectorType -> dense, length -> 7, values -> List(0.16, 0.06379310344827586, 0.008255343874997554, 0.3793103448275862, 0.026191327943290956, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.020421949863761224, 0.020421949863761224))","Map(vectorType -> dense, length -> 2, values -> List(0.49489468996619035, 0.5051053100338097))",1.0
04d789e4-c157-4cbd-82bc-af81be3ed397,1,7.21373516639187,63.5,110,63.5,1,0,False,Low-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 7.21373516639187, 63.5, 110.0, 63.5))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.12437474424813569, 0.0013881506598712476, 0.5689655172413793, 0.044786404981500674))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.12437474424813569, 0.0013881506598712476, 0.5689655172413793, 0.044786404981500674, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(2.2227960916128744, -2.2227960916128744))","Map(vectorType -> dense, length -> 2, values -> List(0.9022780112740493, 0.09772198872595073))",0.0


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
evaluator1 = BinaryClassificationEvaluator(
    labelCol="target",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC",
)
 
metric2 = evaluator1.evaluate(fitted_data2)
print(f"Area under ROC = {metric2} ")

Area under ROC = 0.7757881395012022 


In [0]:
print(Dt.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: target)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for discret

In [0]:
paramGrid = (
    ParamGridBuilder()
    .addGrid(Dt.minInstancesPerNode, [1 , 2 , 3])
    .addGrid(Dt.labelCol, ['target'])
    .addGrid(Dt.maxBins, [32])
    .addGrid(Dt.maxDepth, [3, 4 , 5 ])
    .addGrid(Dt.minWeightFractionPerNode, [0.1 , 0.2])
    .build()
)

In [0]:
mlflow.pyspark.ml.autolog()
 
mlflow.start_run(nested = True)
cv_DT = CrossValidator(
    estimator=pipe1,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=4
)
 
cv_model_DT = cv.fit(train_data)

In [0]:
mlflow.spark.log_model(cv_model.bestModel, "model-file")# logs model as artifacts
mlflow.end_run()

print(cv_model.avgMetrics)

In [0]:
fitted_test_data1 = best_model.transform(test_data)

train_metric2 = evaluator.evaluate(fitted_data)
test_metric2 = evaluator.evaluate(fitted_test_data1)

print(f"Area under ROC on TRAIN= {train_metric2}")
print(f"Area under ROC on TEST= {test_metric2}")

#### Random Forest

In [0]:
from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer, OneHotEncoder, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

impute = Imputer(inputCols=['frequency', 'avg_time_to_reorder'], outputCols=['frequency', 'avg_time_to_reorder'])
assemble = VectorAssembler(inputCols=['frequency', 'avg_time_to_reorder', 'Monetary', 'Recency','AVG_Basket'], outputCol='continuous_features')
index = StringIndexer(inputCols=['device_type'], outputCols=['device_type_idx'])
one_hot = OneHotEncoder(inputCol='device_type_idx', outputCol='device_type_vector')
scale = MinMaxScaler(inputCol='continuous_features', outputCol='scaled_continuous_features')
final_assemble = VectorAssembler(inputCols=['scaled_continuous_features', 'device_type_vector', 'is_referee'], outputCol='features')
rf = RandomForestClassifier (featuresCol="features", labelCol="target", predictionCol="prediction")

pipe2 = Pipeline()
pipe2.setStages(
    [
        impute,
        assemble,
        index,
        one_hot,
        scale,
        final_assemble,
        lr
    ]
)

Out[82]: Pipeline_edb53d8c5382

In [0]:
pipe2_model = pipe2.fit(train_data)



In [0]:
fitted_data3 = pipe2_model.transform(train_data)
fitted_data3.display()

customer_id,frequency,avg_time_to_reorder,Monetary,Recency,AVG_Basket,View_list3,target,is_referee,device_type,install_origin,continuous_features,device_type_idx,device_type_vector,scaled_continuous_features,features,rawPrediction,probability,prediction
e7f524b3-c4a7-4d17-acc9-f147377d5187,1,24.0,63.79999923706055,85,63.79999923706055,3,0,True,Low-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 24.0, 63.79999923706055, 85.0, 63.79999923706055))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.41379310344827586, 0.0013952693631000762, 0.13793103448275862, 0.04501607826910982))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.41379310344827586, 0.0013952693631000762, 0.13793103448275862, 0.04501607826910982, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(3.228357090601702, -3.228357090601702))","Map(vectorType -> dense, length -> 2, values -> List(0.9618875697621783, 0.03811243023782174))",0.0
10bb2f5f-26c8-43b2-8ce5-5cbd91640754,23,1.3,6935.400054931641,106,301.5391328231148,6,0,True,High-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(23.0, 1.3, 6935.400054931641, 106.0, 301.5391328231148))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.44, 0.022413793103448276, 0.16445195571666818, 0.5, 0.22702430248682118))","Map(vectorType -> dense, length -> 7, values -> List(0.44, 0.022413793103448276, 0.16445195571666818, 0.5, 0.22702430248682118, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.7577356862747595, 0.7577356862747595))","Map(vectorType -> dense, length -> 2, values -> List(0.31913807497501756, 0.6808619250249824))",1.0
c0123e58-37f4-4e5b-91ea-d77c6bd3b1c8,3,7.7,179.9999961853027,107,59.99999872843424,1,0,True,High-End,Email,"Map(vectorType -> dense, length -> 5, values -> List(3.0, 7.7, 179.99999618530273, 107.0, 59.99999872843424))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.04, 0.13275862068965516, 0.004152587353540065, 0.5172413793103449, 0.0421068755048492))","Map(vectorType -> dense, length -> 7, values -> List(0.04, 0.13275862068965516, 0.004152587353540065, 0.5172413793103449, 0.0421068755048492, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(1.0546341266169166, -1.0546341266169166))","Map(vectorType -> dense, length -> 2, values -> List(0.7416637852964462, 0.25833621470355383))",0.0
6bafd181-e051-450c-8e9e-a5004da8ad9f,6,5.7,275.0,96,45.833333333333336,2,1,False,High-End,SMS,"Map(vectorType -> dense, length -> 5, values -> List(6.0, 5.7, 275.0, 96.0, 45.833333333333336))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.1, 0.09827586206896552, 0.0064068491994057584, 0.3275862068965517, 0.03126116587027825))","Map(vectorType -> dense, length -> 7, values -> List(0.1, 0.09827586206896552, 0.0064068491994057584, 0.3275862068965517, 0.03126116587027825, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.3137760453242109, -0.3137760453242109))","Map(vectorType -> dense, length -> 2, values -> List(0.5778066823586642, 0.42219331764133583))",0.0
24ca14ba-1601-4d5b-a800-2d5d59e2346c,1,7.21373516639187,108.6999969482422,135,108.6999969482422,1,0,True,High-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 7.21373516639187, 108.69999694824219, 135.0, 108.69999694824219))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.12437474424813569, 0.0024607046015786113, 1.0, 0.07939059931460442))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.12437474424813569, 0.0024607046015786113, 1.0, 0.07939059931460442, 1.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(1.0853790072664773, -1.0853790072664773))","Map(vectorType -> dense, length -> 2, values -> List(0.7475105602019129, 0.25248943979808713))",0.0
88116c72-c703-4a30-abc0-6febf27d920b,1,7.21373516639187,102.5999984741211,134,102.5999984741211,4,0,False,High-End,Organic,"Map(vectorType -> dense, length -> 5, values -> List(1.0, 7.21373516639187, 102.5999984741211, 134.0, 102.5999984741211))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.0, 0.12437474424813569, 0.002315957304021948, 0.9827586206896551, 0.07472056509155274))","Map(vectorType -> dense, length -> 7, values -> List(0.0, 0.12437474424813569, 0.002315957304021948, 0.9827586206896551, 0.07472056509155274, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.5505857104870109, -0.5505857104870109))","Map(vectorType -> dense, length -> 2, values -> List(0.634271469641366, 0.36572853035863395))",0.0
5e1c3921-8994-452c-8be5-33bdd6f94f32,2,12.0,222.9000015258789,134,111.45000076293944,2,0,False,Low-End,Organic,"Map(vectorType -> dense, length -> 5, values -> List(2.0, 12.0, 222.9000015258789, 134.0, 111.45000076293945))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.02, 0.20689655172413793, 0.005170564630839225, 0.9827586206896551, 0.08149594605897542))","Map(vectorType -> dense, length -> 7, values -> List(0.02, 0.20689655172413793, 0.005170564630839225, 0.9827586206896551, 0.08149594605897542, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(2.085491800624062, -2.085491800624062))","Map(vectorType -> dense, length -> 2, values -> List(0.8894850408361961, 0.11051495916380394))",0.0
28459ec2-c117-4cea-a436-f04b4fa4eeb1,12,3.2,1228.9000091552734,104,102.40833409627278,3,1,False,Low-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(12.0, 3.2, 1228.9000091552734, 104.0, 102.40833409627278))",1.0,"Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 5, values -> List(0.22, 0.05517241379310345, 0.029042010347441347, 0.46551724137931033, 0.07457383075912809))","Map(vectorType -> dense, length -> 7, values -> List(0.22, 0.05517241379310345, 0.029042010347441347, 0.46551724137931033, 0.07457383075912809, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(1.3064986269621721, -1.3064986269621721))","Map(vectorType -> dense, length -> 2, values -> List(0.7869266596491782, 0.21307334035082182))",0.0
6dda0e0a-c6a0-439d-8d4f-bf6313220f48,17,1.7,17383.000045776367,135,1022.5294144574334,2,1,False,High-End,SMS,"Map(vectorType -> dense, length -> 5, values -> List(17.0, 1.7, 17383.000045776367, 135.0, 1022.5294144574334))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.32, 0.029310344827586206, 0.4123637988168724, 1.0, 0.7789997339569207))","Map(vectorType -> dense, length -> 7, values -> List(0.32, 0.029310344827586206, 0.4123637988168724, 1.0, 0.7789997339569207, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.13323882496300854, 0.13323882496300854))","Map(vectorType -> dense, length -> 2, values -> List(0.4667394842164816, 0.5332605157835184))",1.0
614dcd4a-f40d-436d-a6b1-51dfc7ac539e,14,1.8,815.0999908447266,133,58.22142791748047,1,1,False,High-End,Meta,"Map(vectorType -> dense, length -> 5, values -> List(14.0, 1.8, 815.0999908447266, 133.0, 58.22142791748047))",0.0,"Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 5, values -> List(0.26, 0.03103448275862069, 0.01922292028808203, 0.9655172413793103, 0.04074523802403464))","Map(vectorType -> dense, length -> 7, values -> List(0.26, 0.03103448275862069, 0.01922292028808203, 0.9655172413793103, 0.04074523802403464, 1.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.8084964778940553, 0.8084964778940553))","Map(vectorType -> dense, length -> 2, values -> List(0.30821097965367084, 0.6917890203463292))",1.0


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
evaluator2 = BinaryClassificationEvaluator(
    labelCol="target",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC",
)
 
metric3 = evaluator2.evaluate(fitted_data3)
print(f"Area under ROC = {metric3} ")

Area under ROC = 0.7757874843656793 


In [0]:
print(rf.explainParams())

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur

In [0]:
paramGrid = (
    ParamGridBuilder()
    .addGrid(rf.featureSubsetStrategy, ['all'])
    .addGrid(rf.impurity, ['gini'])
    .addGrid(rf.maxBins, [32])
    .addGrid(rf.maxDepth, [6, 7 , 8 ])
    .addGrid(rf.rawPredictionCol, ['rawPrediction'])
    .addGrid(rf.subsamplingRate, [0.1, 0.3 , 0.6, 1])
    .build()
)

In [0]:
mlflow.pyspark.ml.autolog()
 
mlflow.start_run(nested = True)
cv_DT = CrossValidator(
    estimator=pipe1,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=4
)
 
cv_model_DT = cv.fit(train_data)

In [0]:
mlflow.spark.log_model(cv_model.bestModel, "model-file")# logs model as artifacts
mlflow.end_run()

print(cv_model.avgMetrics)

In [0]:
fitted_test_data1 = best_model.transform(test_data)

train_metric2 = evaluator.evaluate(fitted_data)
test_metric2 = evaluator.evaluate(fitted_test_data1)

print(f"Area under ROC on TRAIN= {train_metric2}")
print(f"Area under ROC on TEST= {test_metric2}")