In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as tp
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder.config("spark.sql.shuffle.partitions", "200").config("spark.executor.memory", "4g").config("master", "yarn").getOrCreate()

In [3]:
spark

In [4]:
train = spark.read.csv("dataset/ml_project/train.csv",header = True, inferSchema = True)
test = spark.read.csv("dataset/ml_project/test.csv",header = True, inferSchema = True)
valid = spark.read.csv("dataset/ml_project/valid.csv",header = True, inferSchema = True)

## Varibale Identification - numeric/categorical/temporal/boolean

In [5]:
train.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Carrier: double (nullable = true)
 |-- TrafficType: string (nullable = true)
 |-- ClickDate: string (nullable = true)
 |-- Device: string (nullable = true)
 |-- Browser: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- ConversionStatus: boolean (nullable = true)
 |-- publisherId: string (nullable = true)
 |-- advertiserCampaignId: double (nullable = true)
 |-- Fraud: double (nullable = true)



### Type casting target variable in test, train and validation data

In [6]:
train = train.withColumn("ConversionStatus",F.col("ConversionStatus").cast(tp.IntegerType()))
test = test.withColumn("ConversionStatus",F.col("ConversionStatus").cast(tp.IntegerType()))
valid = valid.withColumn("ConversionStatus",F.col("ConversionStatus").cast(tp.IntegerType()))

PIPELINE STEPS:
stage1 : Transformer - fill na values in each column 

stage2: Transform - Reduce categories by udf functions 

stage3: Estimator - Label ENcode TrafficType 

stage4: Estimator - Label ENcode OS 

stage5: Estimator - Label ENcode Country 

stage6: Estimator - Label ENcode Browser 

stage7: Estimator - OneHotEncode OS, Browser, Country, Device 

stage8: Transform - Create columns total clicks/publisher-id and total clicks pers adveriser-id 

stage9: Transform - CReate Vector with OS_ohe, Browser_ohe, Country_ohe, TrafficType, Device_ohe, Fraud,total 
clicks/publisher-id and total clicks pers adveriser-id as feature_vector 

stage10: Estimator - Predict labels using LogisticRegession

### Count the number of missing values in each column

In [7]:
from pyspark.ml import Transformer
class fillNaValues(Transformer):
    def __init__(self, x = None):
        self.dataset = x
    def _transform(self,dataset):
        dataset = dataset.fillna({"Country":"IN",
                                           "TrafficType":"U",
                                           "Device":"Generic",
                                           "OS":"Android",
                                           "Fraud":0,
                                           "Browser":"chrome"})
        return dataset

In [8]:
def countries_mapping(x):
    top_20_countries = ['IN','TH','ID','BD','MX','BR','RU','NG','MY','US','BO','PH','ZA','VE','GT','DZ','KR','CO','IQ','AE']
    if x in top_20_countries:
        return x
    else:
        return "others"
udf_country = F.udf(f = countries_mapping, returnType = tp.StringType())

In [9]:
def device_mapping(x):
    if x in ["Generic"]:
        return 0
    else:
        return 1
udf_device = F.udf(f = device_mapping, returnType = tp.IntegerType())

# Browser 
* chrome : convert android_webkit, chrome, 46.0.2490.76(chrome version) and chromium
* Safari : phone, safari
* firefox : firefox_mobile, firefox, firefox_desktop
* others : rest of them

In [10]:
def browser_mapping(x):
    if x in ["android_webkit","chrome","46.0.2490.76","Chromium"]:
        return "chrome"
    elif x in ["iphone","safari"]:
        return "safari"
    elif x in ["firefox_mobile","firefox","firefoc_desktop"]:
        return "firefox"
    else:
        return "others"
udf_browser = F.udf(f = browser_mapping, returnType = tp.StringType())

## OS - 15 categories
* Android: Android
* ios : Mac OS X , IOS
* others : rest of categories

In [11]:
def os_mapping(x):
    if x in ["Android"]:
            return "Android"
    elif x in ["Mac OS X", "IOS"]:
        return "ios"
    else:
        return "Others"
udf_os = F.udf(f = os_mapping, returnType = tp.StringType())

In [12]:
class reduceCategories(Transformer):
    def __init__(self,dataframe = None):
        self.dataframe = dataframe
    def _transform(self, dataset):
        dataset = dataset.withColumn("Country",udf_country(dataset["Country"]))
        dataset = dataset.withColumn("OS", udf_os(dataset["OS"]))
        dataset = dataset.withColumn("Browser", udf_browser(dataset["Browser"]))
        
        #Device mapping is 0's and 1's so no need to LE and OHE
        dataset = dataset.withColumn("Device", udf_device(dataset["Device"]))
        return dataset

## PublisherId 2000 distict categories so --> PublisherId/frequency
### groupby publisherId and count on ConversionStatus as total_p_id
### Join this dataframe with original train df
*** this is a numeric column which as fixed number of distict values resulting in this transfomations

In [13]:
total_p_id = train.groupBy("PublisherId").agg(F.count("ConversionStatus").alias("pub-id"))
total_c_id = train.groupBy("advertiserCampaignId").agg(F.count("ConversionStatus").alias("camp-id"))

In [14]:
class frequencyEncoding(Transformer):
    def __init__(self,dataframe = None):
        self.dataframe = dataframe
    def _transform(self,dataset):
        
        dataset = dataset.join(total_c_id, on= "advertiserCampaignId")
        dataset = dataset.join(total_p_id, on= "PublisherId")
        
        dataset = dataset.fillna({'pub-id':0,
                                 'camp-id':0})
        return dataset

In [15]:
stage1 = fillNaValues()
stage2 = reduceCategories()
stage3 = StringIndexer(inputCol= "Country", outputCol = "Country_le")
stage4 = StringIndexer(inputCol= "TrafficType", outputCol = "TrafficType_le")
stage5 = StringIndexer(inputCol= "OS", outputCol = "OS_le")
stage6 = StringIndexer(inputCol= "Browser", outputCol = "Browser_le")

stage7 = OneHotEncoder(inputCols = ["Country_le",
                                      "Browser_le",
                                      "OS_le",
                                      "TrafficType_le"],
                         outputCols = ["Country_ohe",
                                       "Browser_ohe",
                                      "OS_ohe",
                                      "TrafficType_ohe"])

stage8 = frequencyEncoding()
stage9 = VectorAssembler(inputCols = ["Country_ohe",
                                       "Browser_ohe",
                                      "OS_ohe",
                                      "TrafficType_ohe",
                                     "Device",
                                     "Fraud",
                                     "pub-id",
                                     "camp-id"],
                        outputCol = "feature_vector")

stage10 = DecisionTreeClassifier(featuresCol = "feature_vector",
                                           labelCol = "ConversionStatus")

# Pipeline

In [17]:
pipeline = Pipeline(stages=[stage1,
                           stage2,
                           stage3,
                           stage4,
                           stage5,
                           stage6,
                           stage7,
                           stage8,
                           stage9,
                           stage10])

pipeline_model = pipeline.fit(train)


In [18]:
final_model = pipeline_model.transform(train)

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol = "ConversionStatus", metricName = "areaUnderROC")
evaluator.evaluate(final_model)

0.5

In [20]:
evaluator.evaluate( pipeline_model.transform(test))

0.5

In [21]:
evaluator.evaluate( pipeline_model.transform(valid))

0.5