**Connect with Google Drive for loading Data.**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Defining Spark Session**

In [None]:
!pip install pyspark
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "25g").\
config('spark.executor.memory', '25G').\
config('spark.driver.maxResultSize', '25G').\
appName("MapReduce-Project-Fraud-Analysis").getOrCreate()
spark.conf.set('spark.sql.pivotMaxValues', u'1000000')
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", True)
spark.conf.set("spark.sql.inMemoryColumnarStorage.batchSize",10000)

spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 55.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=92463305f8e6c3247f793f527d7f78beb8b89f1e2427d7f6813885f2328edc3e
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


## Libraries

1.   **Installing Visualization Libraries.**
2.   **Importing Spark**

In [None]:
#Libraries for Visualization purposesly
!pip install seaborn
!pip install prettytable

#Imports
from pyspark.sql.functions import row_number, count, isnan, countDistinct
from pyspark.sql.window import *
import random
import numpy as np
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.window import *
from pyspark.sql.window import Window
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import ArrayType, DoubleType,FloatType
from pyspark.sql import Row, functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler,BucketedRandomProjectionLSH, VectorSlicer, VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.sql.functions import col, when, lit, udf, row_number, array, create_map, struct, explode
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from prettytable import PrettyTable

#For visualization purposes only
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Data Stats Functions


1.   **checkNullsInData**: Returns percentage of rows with Null against the Total Rows
2.   **checkNullPerTable**: Returns number of Null records full table.
3. **getAttributeCount** : Returns the count for each label.
4. **getCompleteSummary**: Returns the complete summary of the table.

In [None]:
# Percentage of rows with Null against the Total Rows
def checkNullsInData(data):
    # Show how many Null we have in the Dataframe
    totalRows = data.count()
    drop = data.na.drop().count()
    print("Total: ", totalRows)
    print("Left After Dropping:", drop)
    return (totalRows - drop) / totalRows * 100


#Number of Nulls full Table
def checkNullPerTable(data):
    # Show how many Null we have in the Dataframe
    print("% Of Drops: ",checkNullsInData(data))
    # this shows there are a lot of duplicacy in the data.# this shows there are a lot of duplicacy in the data.
    # Lets see the percentage: 
    print("% of drop per column")
    return data.select([(count(when(isnan(c) | col(c).isNull(), c))*100/count(lit(1))).alias(c) for c in data.columns])

# GET COUNT FOR EACH LABEL
def getAttributeCount(data, label="is_attributed"):
    print("Stats for is attributed:")
    data.groupBy(label).count().show()

# GET COMPLETE SUMMARY OF THE DATA
def getCompleteSummary(data, label="is_attributed"):
    print("Summary")
    print("_________")
    data.summary().show()
    print("_________")
    checkNullPerTable(data)
    print("_________")
    print("Unique Values for each column in the table")
    data.agg(*(countDistinct(col(c)).alias(c) for c in data.columns)).show()
    print("_________")
    print("Number of values in is_attributed for each label.")
    print("_________")
    getAttributeCount(data, label)
    print("_________")

## Utility Functions
1. **getFeaturesData**: For returning Vectorized features and label. It also drops exta coulmns if required.
2. **findImbalance** : It is used for finding imbalance ratio between the two labels and returns the data Not Fraud, data Fraud, ratio
3. **vectorizeData** : Returns Vector Assembled feature by merging all feature columns.
4. **stratifiedTrainTestSplit**: It is used for splitting the sampled dataset randomly in 80:20 ratio where 80 is for Training and 20 is for Testing 

In [None]:
def getFeaturesData(
    data, inputColumnsList=["ip", "app", "device", "os", "channel"], drop=False
):
    va = VectorAssembler(inputCols=inputColumnsList, outputCol="features")
    transformedData = va.transform(data)
    if drop:
        return (
            va.transform(data)
            .drop(*inputColumnsList)
            .withColumnRenamed("is_attributed", "label")
        )
    return va.transform(data)

#It is used for finding imbalance ratio between the two labels and returns the data Not Fraud, data Fraud, ratio
def findImbalance(data):
    dataNotFraud = data.filter(col("is_attributed") == 0)
    dataFraud = data.filter(col("is_attributed") == 1)
    countFraud = dataFraud.count()
    countNotFraud = dataNotFraud.count()
    ratio = int(countNotFraud / countFraud)
    print(
        "Count Fraud: {}\nCount Not Fraud: {}\nRatio: {}".format(
            countFraud, countNotFraud, ratio
        )
    )
    return dataNotFraud, dataFraud, ratio


#
def vectorizeData(data, NumericColumns, targetColumn):
    if data.select(targetColumn).distinct().count() != 2:
        raise ValueError("Target col must have exactly 2 classes")
    if targetColumn in NumericColumns:
        NumericColumns.remove(targetColumn)
    assembler = VectorAssembler(inputCols=NumericColumns, outputCol="features")
    vectorizedData = assembler.transform(data)
    keepColumns = [a for a in vectorizedData.columns if a not in NumericColumns]
    return (
        vectorizedData.select(*keepColumns)
        .withColumn("label", vectorizedData[targetColumn])
        .drop(targetColumn)
    )



#It is used for splitting the sampled dataset randomly in 80:20 ratio where 80 is for Training and 20 is for Testing
def stratifiedTrainTestSplit(data, ifprint=False):
    print("\n-----TRAIN TEST SPLIT STARTED----")
    dataNotFraud, dataFraud, ratio= findImbalance(data)
    dataNotFraudTrain,dataNotFraudTest=dataNotFraud.randomSplit([0.8, 0.2])
    dataFraudTrain,dataFraudTest=dataFraud.randomSplit([0.8, 0.2])
    train = dataNotFraudTrain.union(dataFraudTrain)
    test = dataFraudTest.union(dataNotFraudTest)
    if print:
        print("\n----SAMPLES IN TRAIN----")
        dataNotFraud, dataFraud, ratio= findImbalance(train)
        print("\n----SAMPLES IN TEST-----")
        dataNotFraud, dataFraud, ratio= findImbalance(test)
    return train , test


## Sampling Functions
1. **randomOverSample**: It takes the ratio and does random over sampling of the lower count label to match as the higher count label as per the ratio and returns the vectorized data.
2. **randomUnderSamplingWithoutTransformation**:  It takes the ratio and does random over sampling of the lower count label to match as the higher count label as per the ratio and returns the data.
3. **randomUnderSamplingStratified**: It is used for doing undersampling in a stratified way keeping percentage of labels as per rates r1 and r2, It returns the combined data.
4. **randomUnderSampling**:  It takes the ratio and does random under sampling of the decrease the higher count label to match as per the ratio.
5. **randomUnderSamplingWithoutTransformation**: It takes the ratio and does random over sampling of the lower count label to match as per the ratio to the higher count label and returns the data.
6. **randomUnderSamplingStratified** : It is used for undersampled in a stratified way keeping percentage of labels as per rates r1 and r2, It returns the combined data.
7. **randomUnderSampling** : It takes the ratio and does random under sampling of the decrease the higher count label to match as per the ratio.
8. **completeOverSampling** : It duplicates the minority class records to match the passed ratio.

In [None]:
# It takes the ratio and does random over sampling of the lower count label to match as the higher count label as per the ratio.
def randomOverSample(dataNotFraud, dataFraud, ratio):
    dataFraud = dataFraud.sample(True, float(ratio), 24)
    totalData = dataFraud.unionAll(dataNotFraud)
    return getFeaturesData(totalData, drop=True)

#It takes the ratio and does random over sampling of the lower count label to match as per the ratio to the higher count label and returns the data.
def randomUnderSamplingWithoutTransformation(dataNotFraud, dataFraud, ratio):
    dataNotFraud = dataNotFraud.sample(False, 1 / ratio, 24)
    return dataNotFraud.unionAll(dataFraud)

# It is used for undersampled in a stratified way keeping percentage of labels as per rates r1 and r2, It returns the combined data.
def randomUnderSamplingStratified(data, r1=0.1, r2=0.4):
    dataNotFraudSampled = data.filter(col("is_attributed") == 0).sample(False, r1)
    dataFraudSampled = data.filter(col("is_attributed") == 1).sample(False, r2)
    out = dataNotFraudSampled.union(dataFraudSampled)
    return out

# It takes the ratio and does random under sampling of the decrease the higher count label to match as per the ratio.
def randomUnderSampling(dataNotFraud, dataFraud, ratio):
    dataNotFraud = dataNotFraud.sample(False, 1 / ratio, 24)
    totalData = dataNotFraud.unionAll(dataFraud)
    return getFeaturesData(totalData, drop=True)


def completeOverSampling(dataNotFraud, dataFraud, ratio):
    a = range(ratio)
    # duplicate the minority rows
    oversampledData = dataFraud.withColumn(
        "test", explode(array([lit(x) for x in a]))
    ).drop("test")
    # combine both oversampled minority rows and previous majority rows combined_df = major_df.unionAll(oversampled_df)
    totalData = dataNotFraud.unionAll(oversampledData)
    return getFeaturesData(totalData, drop=True)



## SMOTE: Synthetic Minority Over-sampling Technique Implementation

1. **checkValidityOfColumnsCheck**: Checking validity of functions, if all columns are correctly type identified.
2. **getNumericCategoricalColumns**: Returns the lists of numerical and string columns.
3. **smote**: Used above mentioned utlity functions in implementing custom function for SMOTE 

In [None]:
# Utlity functions of SMOTE


#Checking validity of functions, if all columns are correctly type identified.
def checkValidityOfColumnsCheck(allColumns, data):
    if len(set(allColumns)) == len(data.columns):
        print("All columns are been covered.")
    elif len(set(allColumns)) < len(data.columns):
        not_handle_list = list(set(data.columns) - set(allColumns))
        print(
            "Not all columns are covered,The columns missed out: {0}".format(
                not_handle_list
            )
        )
    else:
        mistake_list = list(set(allColumns) - set(data.columns))
        print("The columns been hardcoded wrongly: {0}".format(mistake_list))


#Returns the lists of numerical and string columns.
def getNumericCategoricalColumns(data, excludedList=[]):
    timestampColumns = [
        item[0] for item in data.dtypes if item[1].lower().startswith(("time", "date"))
    ]
    stringColumns = [
        item[0]
        for item in data.dtypes
        if item[1].lower().startswith("string")
        and item[0] not in excludedList + timestampColumns
    ]
    numericColumns = [
        item[0]
        for item in data.dtypes
        if item[1].lower().startswith(("big", "dec", "doub", "int", "float"))
        and item[0] not in excludedList + timestampColumns
    ]
    allColumns = timestampColumns + stringColumns + numericColumns + excludedList
    checkValidityOfColumnsCheck(allColumns, data)
    return numericColumns, stringColumns



# Synthetic Minority Over-sampling Technique Implementation 
def smote(dataInit, seed, bucketLength, k, multiplier):
    NumericColumns, CatColumns = getNumericCategoricalColumns(dataInit)
    data = vectorizeData(dataInit, NumericColumns, targetColumn="is_attributed")
    dataInputFraud = data[data["label"] == 1]

    # LSH, bucketed random projection
    bucketedRandomProjection = BucketedRandomProjectionLSH(
        inputCol="features", outputCol="hashes", seed=seed, bucketLength=bucketLength
    )
    # smote only applies on existing minority instances
    model = bucketedRandomProjection.fit(dataInputFraud)
    model.transform(dataInputFraud)

    # here distance is calculated from bucketedRandomProjection's param inputCol
    selfJoinWithDistance = model.approxSimilarityJoin(
        dataInputFraud, dataInputFraud, float("inf"), distCol="EuclideanDistance"
    )
    # remove self-comparison (distance 0)
    selfJoinWithDistance = selfJoinWithDistance.filter(
        selfJoinWithDistance.EuclideanDistance > 0
    )
    overOriginalRows = Window.partitionBy("datasetA").orderBy("EuclideanDistance")
    selfSimilarity = selfJoinWithDistance.withColumn(
        "r_num", F.row_number().over(overOriginalRows)
    )
    selfSimilaritySelected = selfSimilarity.filter(selfSimilarity.r_num <= k)
    overOriginalRowsNoOrder = Window.partitionBy("datasetA")

    # list to store batches of synthetic data
    res = []
    # two udf for vector add and subtract, subtraction include a random factor [0,1]
    subtractVectorUDF = F.udf(
        lambda arr: random.uniform(0, 1) * (arr[0] - arr[1]), VectorUDT()
    )
    addVectorUDF = F.udf(lambda arr: arr[0] + arr[1], VectorUDT())

    # retain original columns
    originalColumns = dataInputFraud.columns
    print("Generating New Samples")
    for i in range(multiplier):
        # logic to randomly select neighbour: pick the largest random number generated row as the neighbour
        randomSelectedData = (
            selfSimilaritySelected.withColumn("rand", F.rand())
            .withColumn("max_rand", F.max("rand").over(overOriginalRowsNoOrder))
            .where(F.col("rand") == F.col("max_rand"))
            .drop(*["max_rand", "rand", "r_num"])
        )
        # create synthetic feature numerical part
        vecDiff = randomSelectedData.select(
            "*",
            subtractVectorUDF(F.array("datasetA.features", "datasetB.features")).alias(
                "vecdiff"
            ),
        )
        vecModified = vecDiff.select(
            "*", addVectorUDF(F.array("datasetA.features", "vecdiff")).alias("features")
        )
        for c in originalColumns:
            # randomly select neighbour or original data
            colSubsititue = random.choice(["datasetA", "datasetB"])
            val = "{0}.{1}".format(colSubsititue, c)
            if c != "features":
                # do not unpack original numerical features
                vecModified = vecModified.withColumn(c, F.col(val))
        vecModified = vecModified.drop(
            *["datasetA", "datasetB", "vecdiff", "EuclideanDistance"]
        )
        res.append(vecModified)
    print("Samples Generation Complete.")

    unionedData = reduce(DataFrame.unionAll, res)
    # union synthetic instances with original full (both minority and majority) data
    return unionedData.union(data.select(unionedData.columns))

## Machine Learning Models Implementation

1. **LRModel** : Implements Logistic Regression with Cross Validation.
2. **randomForest** : Implements Random Forrest Classifier with cross Validation.
3. **LSVC**: Implents Linear Support Vector Machine with Cross Validation


In [None]:
#Implements Logistic Regression with Cross Validation.
def LRModel(train,test, isCV=False):
    print(">>> LRModel Invoked")
    evaluator=BinaryClassificationEvaluator(labelCol='label')
    lr = LogisticRegression(featuresCol='features',labelCol='label')
    paramGrid = (ParamGridBuilder().addGrid(lr.maxIter, [20]).build())
    fold=3
    if isCV:
        fold=10
        paramGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]) \
                                 .addGrid(lr.maxIter, [10,20]) \
                                 .build())
    cv = CrossValidator(estimator=lr,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=fold) 
    model = cv.fit(train)
    predictions = model.transform(test)
    bestModel = model.bestModel
    return {"predictions":predictions, "bestModel":bestModel}

#Implements Random Forrest Classifier with cross Validation.
def randomForest(train,test, isCV=False):
    print(">>> RandomForest Invoked")
    evaluator=BinaryClassificationEvaluator(labelCol='label')
    rf= RandomForestClassifier(featuresCol='features',labelCol='label')
    paramGrid=ParamGridBuilder().addGrid(rf.maxDepth, [20]).build()
    fold=3
    if isCV:
        fold=10
        paramGrid = (ParamGridBuilder().addGrid(rf.maxDepth, [20]) \
                                          .addGrid(rf.maxBins, [10, 20])\
                                          .addGrid(rf.numTrees, [20, 50])\
                                          .build())
    
    cv = CrossValidator(estimator=rf,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=fold) 
    model = cv.fit(train)
    predictions = model.transform(test)
    bestModel = model.bestModel
    return {"predictions":predictions, "bestModel":bestModel}


#Implents Linear Support Vector Machine with Cross Validation
def LSVC(train,test, isCV=False):
    print(">>> LinearSVC Invoked")
    evaluator=BinaryClassificationEvaluator(labelCol='label')
    lsvc = LinearSVC(featuresCol='features',labelCol='label')
    paramGrid=ParamGridBuilder().addGrid(lsvc.regParam, [0.01]).build()
    fold=3
    if isCV:
        fold=10
        paramGrid = (ParamGridBuilder().addGrid(lsvc.maxIter, [10, 15]) \
                                     .addGrid(lsvc.regParam, [0.1, 0.01]) \
                                     .build())
    cv = CrossValidator(estimator=lsvc,
                                  estimatorParamMaps=paramGrid,
                                  evaluator=evaluator,
                                  numFolds=fold) 
    model = cv.fit(train)
    predictions = model.transform(test)
    bestModel = model.bestModel
    return {"predictions":predictions, "bestModel":bestModel}


## Sample Data Creation

**diffSampledData** : Returns the required sampled data upon specification.

In [None]:
def diffSampledData(data, isUnderSample=False,isOverSample=False ,isSMOTE=False, ifprint=False):
    sampledData={}
    print("\n---Comparing data using various Sampling Techniques---")

    # print("\n--NO Sampling--")
    # sampledData['NO_SAMPLING_APPLIED']=getFeaturesData(data, drop=True)

    # Find each class data
    dataNotFraud, dataFraud, ratio= findImbalance(data)
    if isUnderSample:
        print("\n--Undersampling--")
        # Random UnderSample
        underSampledData=randomUnderSampling(dataNotFraud,dataFraud,ratio)
        # getAttributeCount(underSampledData,"label")
        sampledData['underSampledData']=underSampledData
        
    if isOverSample:
        print("\n--Random OverSampling--")
        #Random OverSample
        randomOverSampleddata=randomOverSample(dataNotFraud,dataFraud,int(ratio*0.75))
        # getAttributeCount(randomOverSampleddata,"label")
        sampledData['randomOverSampleddata']=randomOverSampleddata

    # print("\n--Complete OverSampling--")
    # #Complete Oversample
    # completeOversampledData=completeOverSampling(dataNotFraud,dataFraud,ratio)
    # getAttributeCount(completeOversampledData,"label")
    # sampledData['completeOversampledData']=completeOversampledData
    
    if isSMOTE:
        print("\n--SMOTE OverSampling--")
        #SMOTE
        oversampledDataSMOTE= smote(data, seed=24,bucketLength=200,k=3,multiplier=int(ratio*0.75))
        sampledData['oversampledDataSMOTE']=oversampledDataSMOTE
    
    if ifprint:
        if isSMOTE:
            print("\n--SMOTE OverSampling--")
            getAttributeCount(oversampledDataSMOTE,"label")
        if isOverSample:
            print("\n--Random OverSampling--")
            getAttributeCount(randomOverSampleddata,"label")
        # print("\n--Complete OverSampling--")
        # getAttributeCount(completeOversampledData,"label")
        if isUnderSample:
            print("\n--Undersampling--")
            getAttributeCount(underSampledData,"label")
    return sampledData

## Results 

1. **getResults**: Main method to run the specified Machine Learning models. Return Evaluation metrics. In case of Cross Validation, returns Best Model.

2. **filldetails** : Adds all the metrics from different oversampling techniques into table.

3. **printConfusionMatrix** ( ***For Visualization purposes only***): Prints confusion Matrix.

4. **otherMetrics** : Caluclates Precison, Recall, Accuracy and F1 Score.

5. **getEvalutions**: Evaluates predictions with labels and returns the metrics

In [None]:
def filldetails(analysisTable, predictions, sampling, model):
    cf_matrix, ROC, accuracy, F1, precision, recall = getEvalutions(predictions)
    print(sampling, model, ROC, accuracy, F1, precision, recall, cf_matrix)
    analysisTable.add_row([sampling, model, ROC, accuracy, F1, precision, recall, cf_matrix])
    
def getResults(sampledData, test, isLR=False, isRF=False, isLSVC=False, isCatBoost=False, isLightGBM=False,isCV=False):
    # Specify the Column Names while initializing the Table
    analysisTable = PrettyTable(["Sampling", "Model", "ROC", "accuracy", "F1", "precision", "recall", "Matrix"])
    results = {}
    testData = getFeaturesData(test, drop=True)
    testData.cache()
    for sampling in sampledData:
        print(">>>>>>>>>>>>>>>>Started :", sampling)
        train = sampledData[sampling]
        res={}
        if isLR:
        # "LRModel":
            modelDataLR = LRModel(train, testData, isCV=isCV)
            filldetails(analysisTable, modelDataLR["predictions"], sampling,"LR")
            res["LRModel"]=modelDataLR
        if isRF:
            # # "randomForest":
            modelDataRF = randomForest(train, testData, isCV=isCV)
            filldetails(analysisTable, modelDataRF["predictions"], sampling,"randomForest")
            res["randomForest"]=modelDataRF
        if isLSVC:
            #"LSVC":
            modelDataLSVC = LSVC(train, testData, isCV=isCV)
            filldetails(analysisTable, modelDataLSVC["predictions"], sampling, "LSVC")
            res["LSVC"]=modelDataLSVC
        if len(res.keys())>1:
            results[sampling] = res
        print("<<<<<<<<<<<<<<Finished :", sampling)
    return results, analysisTable



def printConfusionMatrix(cf_matrix):
    group_names = ["True Neg","False Pos","False Neg","True Pos"]
    group_counts = ["{0:0.0f}".format(value) for value in
                    cf_matrix.flatten()]
    group_percentages = ["{0:.2%}".format(value) for value in
                         cf_matrix.flatten()/np.sum(cf_matrix)]
    labels = [f"{v1}\n{v2}\n{v3}" for v1, v2, v3 in
              zip(group_names,group_counts,group_percentages)]
    labels = np.asarray(labels).reshape(2,2)
    sns.heatmap(cf_matrix, annot=labels, fmt="", cmap='Blues')

def otherMetrics(cf):
    tp = cf[0][0]
    fp = cf[1][0]
    fn = cf[0][1]
    tn = cf[1][1]
    precision = np.round((tp)/(tp+fp),3)
    recall =  np.round((tp)/(tp+fn),3)
    accuracy= np.round((tp+tn)/(tp+fp+fn+tn),3)
    F1=np.round((2*precision*recall)/(precision+recall),3)
    return accuracy, F1, precision, recall

def getEvalutions(predictions):
    evaluator=BinaryClassificationEvaluator(labelCol='label')
    ROC = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
    preds_and_labels = predictions.withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction').select(['prediction','label'])
    metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
    cf_matrix=metrics.confusionMatrix().toArray()
    # printConfusionMatrix(cf_matrix)
    accuracy, F1, precision, recall= otherMetrics(cf_matrix)
    return cf_matrix, np.round(ROC,3), accuracy, F1, precision, recall

## Demo 
**Training and Testing on train_sample.csv data provided along with actual Dataset.**

In [None]:
def demoData(path="../Data/train_sample.csv"):
    dataDownload = spark.read\
      .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
      .option("inferSchema",True)\
      .option('header', 'true')\
      .load(path).drop("attributed_time","click_time").distinct().na.drop()
    dataNotFraud, dataFraud, ratio= findImbalance(dataDownload)
    print("\n--Undersampling to create demo set--")
    # Random UnderSample the big data to form processable ratio for demo.
    underSampledData=randomUnderSamplingWithoutTransformation(dataNotFraud,dataFraud,int(ratio/8))
    getCompleteSummary(underSampledData)
    trainSample,testSample=stratifiedTrainTestSplit(underSampledData, ifprint=False)
    sampledData=diffSampledData(trainSample,isUnderSample=True,isOverSample=False ,isSMOTE=False, ifprint=False)
    results, analysisTable= getResults(sampledData,testSample,isLR=True, isRF=False, isLSVC=False)
    print("\n________________RESULTS______________\n",analysisTable)
    return results, analysisTable

In [None]:
resultsDemo, analysisTableDemo= demoData(path='/content/drive/MyDrive/Final Project CS 657/talkingdata-adtracking-fraud-detection/train_sample.csv')

Count Fraud: 227
Count Not Fraud: 97693
Ratio: 430

--Undersampling to create demo set--
Summary
_________
+-------+-----------------+-----------------+-----------------+------------------+-----------------+-------------------+
|summary|               ip|              app|           device|                os|          channel|      is_attributed|
+-------+-----------------+-----------------+-----------------+------------------+-----------------+-------------------+
|  count|             2094|             2094|             2094|              2094|             2094|               2094|
|   mean|99453.89923591213|14.14660936007641|17.64517669531996| 22.39541547277937|258.7679083094556|0.10840496657115568|
| stddev|78809.85413920505|18.31663940219889| 223.695495718449|56.105595429545566| 130.619592252793| 0.3109654468266364|
|    min|               36|                1|                0|                 0|                3|                  0|
|    25%|            41369|                3| 



underSampledData LR 0.853 0.849 0.912 0.964 0.865 [[296.  46.]
 [ 11.  25.]]
<<<<<<<<<<<<<<Finished : underSampledData

________________RESULTS______________
 +------------------+-------+-------+----------+-------+-----------+--------+---------------+
|     Sampling     | Model |  ROC  | accuracy |   F1  | precision | recall |     Matrix    |
+------------------+-------+-------+----------+-------+-----------+--------+---------------+
| underSampledData |   LR  | 0.853 |  0.849   | 0.912 |   0.964   | 0.865  |  [[296.  46.] |
|                  |       |       |          |       |           |        |  [ 11.  25.]] |
+------------------+-------+-------+----------+-------+-----------+--------+---------------+


## 6 Million Records
**Training and Testing on sampled 6 Million records from train.csv**

In [None]:
def RUN6MTEST(path="../Data/Sampled_data.parquet"):
    dataDownload=spark.read.parquet(path)
    getCompleteSummary(dataDownload)
    trainSample,testSample=stratifiedTrainTestSplit(dataDownload, ifprint=False)
    sampledData=diffSampledData(trainSample, isUnderSample=False,isOverSample=False,isSMOTE=True, ifprint=False)
    results, analysisTable= getResults(sampledData,testSample,isLR=False, isRF=True, isLSVC=False)
    print("\n________________RESULTS______________\n",analysisTable)
    return results, analysisTable

In [None]:
results6M, analysisTable6M= RUN6MTEST()

Summary
_________


                                                                                

+-------+------------------+------------------+------------------+------------------+------------------+-------------------+
|summary|                ip|               app|            device|                os|           channel|      is_attributed|
+-------+------------------+------------------+------------------+------------------+------------------+-------------------+
|  count|            616913|            616913|            616913|            616913|            616913|             616913|
|   mean|124889.97044477909|17.759027610051984|22.148653051564807|24.069484676121267|  260.603418958589|0.27552183208977604|
| stddev| 92713.99720309858|  24.5428670492717|232.85310325314362| 54.08001962606024|135.62297249295284| 0.4467772103566922|
|    min|                 1|                 0|                 0|                 0|                 0|                  0|
|    25%|             50737|                 6|                 1|                13|               135|                  0|


                                                                                

+------+---+------+---+-------+-------------+
|    ip|app|device| os|channel|is_attributed|
+------+---+------+---+-------+-------------+
|155161|326|  1320|224|    174|            2|
+------+---+------+---+-------+-------------+

_________
Number of values in is_attributed for each label.
_________
Stats for is attributed:
+-------------+------+
|is_attributed| count|
+-------------+------+
|            1|169973|
|            0|446940|
+-------------+------+

_________

-----TRAIN TEST SPLIT STARTED----
Count Fraud: 169973
Count Not Fraud: 446940
Ratio: 2

----SAMPLES IN TRAIN----
Count Fraud: 136201
Count Not Fraud: 357669
Ratio: 2

----SAMPLES IN TEST-----
Count Fraud: 33772
Count Not Fraud: 89271
Ratio: 2

---Comparing data using various Sampling Techniques---
Count Fraud: 136201
Count Not Fraud: 357669
Ratio: 2

--SMOTE OverSampling--
All columns are been covered.


                                                                                

Generating New Samples
Samples Generation Complete.
>>>>>>>>>>>>>>>>Started : oversampledDataSMOTE
>>> RandomForest Invoked


                                                                                

22/12/05 21:33:06 WARN DAGScheduler: Broadcasting large task binary with size 1114.9 KiB


                                                                                

22/12/05 21:33:15 WARN DAGScheduler: Broadcasting large task binary with size 1723.4 KiB


                                                                                

22/12/05 21:33:26 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


                                                                                

22/12/05 21:33:39 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB


                                                                                

22/12/05 21:33:56 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB


                                                                                

22/12/05 21:34:19 WARN DAGScheduler: Broadcasting large task binary with size 8.2 MiB




22/12/05 21:34:37 WARN DAGScheduler: Broadcasting large task binary with size 1201.1 KiB


                                                                                

22/12/05 21:34:47 WARN DAGScheduler: Broadcasting large task binary with size 11.2 MiB




22/12/05 21:35:10 WARN DAGScheduler: Broadcasting large task binary with size 1476.8 KiB


                                                                                

22/12/05 21:35:21 WARN DAGScheduler: Broadcasting large task binary with size 14.9 MiB




22/12/05 21:35:57 WARN DAGScheduler: Broadcasting large task binary with size 1767.9 KiB


                                                                                

22/12/05 21:36:12 WARN DAGScheduler: Broadcasting large task binary with size 19.1 MiB




22/12/05 21:36:53 WARN DAGScheduler: Broadcasting large task binary with size 2037.7 KiB


                                                                                

22/12/05 21:37:09 WARN DAGScheduler: Broadcasting large task binary with size 23.8 MiB




22/12/05 21:37:49 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


[Stage 5384:>                                                     (0 + 0) / 204]

22/12/05 21:38:07 WARN DAGScheduler: Broadcasting large task binary with size 29.1 MiB




22/12/05 21:39:06 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


[Stage 5390:>                                                     (0 + 0) / 204]

22/12/05 21:39:31 WARN DAGScheduler: Broadcasting large task binary with size 34.7 MiB




22/12/05 21:40:24 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB




22/12/05 21:42:23 WARN DAGScheduler: Broadcasting large task binary with size 14.4 MiB


                                                                                

22/12/05 21:47:29 WARN DAGScheduler: Broadcasting large task binary with size 1153.1 KiB


                                                                                

22/12/05 21:47:39 WARN DAGScheduler: Broadcasting large task binary with size 1800.1 KiB


                                                                                

22/12/05 21:47:51 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


                                                                                

22/12/05 21:48:06 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


                                                                                

22/12/05 21:48:26 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB


                                                                                

22/12/05 21:48:52 WARN DAGScheduler: Broadcasting large task binary with size 8.6 MiB




22/12/05 21:49:17 WARN DAGScheduler: Broadcasting large task binary with size 1250.3 KiB


                                                                                

22/12/05 21:49:30 WARN DAGScheduler: Broadcasting large task binary with size 11.7 MiB




22/12/05 21:49:59 WARN DAGScheduler: Broadcasting large task binary with size 1536.2 KiB


                                                                                

22/12/05 21:50:14 WARN DAGScheduler: Broadcasting large task binary with size 15.4 MiB




22/12/05 21:50:50 WARN DAGScheduler: Broadcasting large task binary with size 1828.0 KiB


                                                                                

22/12/05 21:51:06 WARN DAGScheduler: Broadcasting large task binary with size 19.8 MiB




22/12/05 21:51:51 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


[Stage 5552:>                                                     (0 + 0) / 204]

22/12/05 21:52:12 WARN DAGScheduler: Broadcasting large task binary with size 24.6 MiB




22/12/05 21:53:09 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB


[Stage 5558:>                                                     (0 + 0) / 204]

22/12/05 21:53:33 WARN DAGScheduler: Broadcasting large task binary with size 30.0 MiB




22/12/05 21:54:35 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


[Stage 5564:>                                                     (0 + 0) / 204]

22/12/05 21:55:01 WARN DAGScheduler: Broadcasting large task binary with size 35.7 MiB




22/12/05 21:56:17 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB




22/12/05 21:58:35 WARN DAGScheduler: Broadcasting large task binary with size 14.6 MiB


                                                                                

22/12/05 22:04:17 WARN DAGScheduler: Broadcasting large task binary with size 1163.3 KiB


                                                                                

22/12/05 22:04:29 WARN DAGScheduler: Broadcasting large task binary with size 1805.9 KiB


                                                                                

22/12/05 22:04:43 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


                                                                                

22/12/05 22:05:01 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


                                                                                

22/12/05 22:05:23 WARN DAGScheduler: Broadcasting large task binary with size 6.1 MiB


                                                                                

22/12/05 22:05:51 WARN DAGScheduler: Broadcasting large task binary with size 8.6 MiB




22/12/05 22:06:10 WARN DAGScheduler: Broadcasting large task binary with size 1233.1 KiB


                                                                                

22/12/05 22:06:20 WARN DAGScheduler: Broadcasting large task binary with size 11.6 MiB




22/12/05 22:06:43 WARN DAGScheduler: Broadcasting large task binary with size 1499.2 KiB


                                                                                

22/12/05 22:06:55 WARN DAGScheduler: Broadcasting large task binary with size 15.3 MiB




22/12/05 22:07:29 WARN DAGScheduler: Broadcasting large task binary with size 1764.9 KiB


                                                                                

22/12/05 22:07:45 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB




22/12/05 22:08:28 WARN DAGScheduler: Broadcasting large task binary with size 2024.8 KiB


                                                                                

22/12/05 22:08:46 WARN DAGScheduler: Broadcasting large task binary with size 24.1 MiB




22/12/05 22:09:30 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


[Stage 5732:>                                                     (0 + 0) / 204]

22/12/05 22:09:47 WARN DAGScheduler: Broadcasting large task binary with size 29.2 MiB




22/12/05 22:10:44 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


[Stage 5738:>                                                     (0 + 0) / 204]

22/12/05 22:11:06 WARN DAGScheduler: Broadcasting large task binary with size 34.7 MiB




22/12/05 22:12:05 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


[Stage 5744:>                                                     (0 + 0) / 204]

22/12/05 22:14:11 WARN DAGScheduler: Broadcasting large task binary with size 14.1 MiB


                                                                                

22/12/05 22:26:47 WARN DAGScheduler: Broadcasting large task binary with size 1112.1 KiB


                                                                                

22/12/05 22:26:51 WARN DAGScheduler: Broadcasting large task binary with size 1729.1 KiB


                                                                                

22/12/05 22:26:55 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB


                                                                                

22/12/05 22:27:01 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


                                                                                

22/12/05 22:27:07 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB




22/12/05 22:27:14 WARN DAGScheduler: Broadcasting large task binary with size 1024.1 KiB


                                                                                

22/12/05 22:27:16 WARN DAGScheduler: Broadcasting large task binary with size 8.6 MiB




22/12/05 22:27:23 WARN DAGScheduler: Broadcasting large task binary with size 1342.8 KiB


                                                                                

22/12/05 22:27:27 WARN DAGScheduler: Broadcasting large task binary with size 11.9 MiB




22/12/05 22:27:37 WARN DAGScheduler: Broadcasting large task binary with size 1688.2 KiB


                                                                                

22/12/05 22:27:41 WARN DAGScheduler: Broadcasting large task binary with size 15.9 MiB




22/12/05 22:27:51 WARN DAGScheduler: Broadcasting large task binary with size 2043.0 KiB


                                                                                

22/12/05 22:27:55 WARN DAGScheduler: Broadcasting large task binary with size 20.7 MiB




22/12/05 22:28:06 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


[Stage 5927:>                                                      (0 + 0) / 27]

22/12/05 22:28:11 WARN DAGScheduler: Broadcasting large task binary with size 26.2 MiB




22/12/05 22:28:25 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


[Stage 5933:>                                                      (0 + 0) / 27]

22/12/05 22:28:31 WARN DAGScheduler: Broadcasting large task binary with size 32.3 MiB




22/12/05 22:28:52 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB


[Stage 5939:>                                                      (0 + 0) / 27]

22/12/05 22:28:59 WARN DAGScheduler: Broadcasting large task binary with size 39.0 MiB




22/12/05 22:29:17 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB




22/12/05 22:34:50 WARN DAGScheduler: Broadcasting large task binary with size 15.3 MiB


                                                                                

22/12/05 22:34:56 WARN DAGScheduler: Broadcasting large task binary with size 15.3 MiB


                                                                                

22/12/05 22:34:58 WARN DAGScheduler: Broadcasting large task binary with size 15.3 MiB




22/12/05 22:34:59 WARN DAGScheduler: Broadcasting large task binary with size 15.3 MiB


                                                                                

22/12/05 22:35:00 WARN DAGScheduler: Broadcasting large task binary with size 15.4 MiB


[Stage 5975:>                                                       (0 + 1) / 1]

oversampledDataSMOTE randomForest 0.961 0.933 0.955 0.939 0.971 [[86665.  2606.]
 [ 5656. 28116.]]
<<<<<<<<<<<<<<Finished : oversampledDataSMOTE

________________RESULTS______________
 +----------------------+--------------+-------+----------+-------+-----------+--------+-------------------+
|       Sampling       |    Model     |  ROC  | accuracy |   F1  | precision | recall |       Matrix      |
+----------------------+--------------+-------+----------+-------+-----------+--------+-------------------+
| oversampledDataSMOTE | randomForest | 0.961 |  0.933   | 0.955 |   0.939   | 0.971  |  [[86665.  2606.] |
|                      |              |       |          |       |           |        |  [ 5656. 28116.]] |
+----------------------+--------------+-------+----------+-------+-----------+--------+-------------------+


                                                                                

## 26 Million Records
**Training and Testing on sampled 26 Million records from train.csv**


In [None]:
def RUN26MTEST(path="../Data/Sampled25M.parquet"):
    dataDownload=spark.read.parquet(path)
    getCompleteSummary(dataDownload)
    trainSample,testSample=stratifiedTrainTestSplit(dataDownload, ifprint=False)
    sampledData=diffSampledData(trainSample, isUnderSample=False,isOverSample=True, ifprint=False)
    results, analysisTable= getResults(sampledData,testSample,isLR=False, isRF=True, isLSVC=False)
    print("\n________________RESULTS______________\n",analysisTable)
    return results, analysisTable

In [None]:
results26M, analysisTable26M= RUN26MTEST()

Summary
_________


                                                                                

+-------+------------------+-----------------+------------------+------------------+------------------+-------------------+
|summary|                ip|              app|            device|                os|           channel|      is_attributed|
+-------+------------------+-----------------+------------------+------------------+------------------+-------------------+
|  count|           2570962|          2570962|           2570962|           2570962|           2570962|            2570962|
|   mean|117578.32723781993|15.83455609223318|20.505753099423483|24.018647883554873| 269.1028957254133|0.16480679216573407|
| stddev|  88516.5227112953|21.98906805298176|235.46908212340674|  54.1577769104347|137.00831860255715|0.37100615488199506|
|    min|                 1|                0|                 0|                 0|                 0|                  0|
|    25%|             48418|                3|                 1|                13|               135|                  0|
|    50%

                                                                                

+------+---+------+---+-------+-------------+
|    ip|app|device| os|channel|is_attributed|
+------+---+------+---+-------+-------------+
|265843|424|  1999|307|    182|            2|
+------+---+------+---+-------+-------------+

_________
Number of values in is_attributed for each label.
_________
Stats for is attributed:
+-------------+-------+
|is_attributed|  count|
+-------------+-------+
|            1| 423712|
|            0|2147250|
+-------------+-------+

_________

-----TRAIN TEST SPLIT STARTED----
Count Fraud: 423712
Count Not Fraud: 2147250
Ratio: 5

----SAMPLES IN TRAIN----


                                                                                

Count Fraud: 338751
Count Not Fraud: 1716450
Ratio: 5

----SAMPLES IN TEST-----


                                                                                

Count Fraud: 84961
Count Not Fraud: 430800
Ratio: 5

---Comparing data using various Sampling Techniques---


                                                                                

Count Fraud: 338751
Count Not Fraud: 1716450
Ratio: 5

--Random OverSampling--
>>>>>>>>>>>>>>>>Started : randomOverSampleddata
>>> RandomForest Invoked


                                                                                

22/12/05 22:36:32 WARN DAGScheduler: Broadcasting large task binary with size 1398.9 KiB


                                                                                

22/12/05 22:36:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

22/12/05 22:36:54 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


                                                                                

22/12/05 22:37:13 WARN DAGScheduler: Broadcasting large task binary with size 6.5 MiB




22/12/05 22:37:29 WARN DAGScheduler: Broadcasting large task binary with size 1438.2 KiB


                                                                                

22/12/05 22:37:31 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB




22/12/05 22:37:57 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB


                                                                                

22/12/05 22:38:01 WARN DAGScheduler: Broadcasting large task binary with size 15.2 MiB




22/12/05 22:38:26 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB


[Stage 6057:>                                                      (0 + 0) / 20]

22/12/05 22:38:33 WARN DAGScheduler: Broadcasting large task binary with size 21.9 MiB




22/12/05 22:39:05 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


[Stage 6059:>                                                      (0 + 0) / 20]

22/12/05 22:39:14 WARN DAGScheduler: Broadcasting large task binary with size 30.3 MiB




22/12/05 22:39:51 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


[Stage 6061:>                                                      (0 + 0) / 20]

22/12/05 22:40:02 WARN DAGScheduler: Broadcasting large task binary with size 40.4 MiB




22/12/05 22:40:47 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB


[Stage 6063:>                                                      (0 + 0) / 20]

22/12/05 22:41:00 WARN DAGScheduler: Broadcasting large task binary with size 51.9 MiB




22/12/05 22:41:48 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB


[Stage 6065:>                                                      (0 + 0) / 20]

22/12/05 22:42:04 WARN DAGScheduler: Broadcasting large task binary with size 64.2 MiB




22/12/05 22:42:59 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB


[Stage 6067:>                                                      (0 + 0) / 20]

22/12/05 22:43:18 WARN DAGScheduler: Broadcasting large task binary with size 76.7 MiB




22/12/05 22:44:14 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB


                                                                                

22/12/05 22:44:25 WARN DAGScheduler: Broadcasting large task binary with size 18.9 MiB


                                                                                

22/12/05 22:45:44 WARN DAGScheduler: Broadcasting large task binary with size 1404.2 KiB


                                                                                

22/12/05 22:45:54 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

22/12/05 22:46:07 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


                                                                                

22/12/05 22:46:20 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB




22/12/05 22:46:37 WARN DAGScheduler: Broadcasting large task binary with size 1423.7 KiB


                                                                                

22/12/05 22:46:40 WARN DAGScheduler: Broadcasting large task binary with size 10.0 MiB




22/12/05 22:47:01 WARN DAGScheduler: Broadcasting large task binary with size 2044.3 KiB


                                                                                

22/12/05 22:47:04 WARN DAGScheduler: Broadcasting large task binary with size 15.0 MiB




22/12/05 22:47:31 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


                                                                                

22/12/05 22:47:37 WARN DAGScheduler: Broadcasting large task binary with size 21.7 MiB




22/12/05 22:48:02 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


[Stage 6119:>                                                      (0 + 0) / 20]

22/12/05 22:48:10 WARN DAGScheduler: Broadcasting large task binary with size 29.9 MiB




22/12/05 22:48:41 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB


[Stage 6121:>                                                      (0 + 0) / 20]

22/12/05 22:48:51 WARN DAGScheduler: Broadcasting large task binary with size 39.9 MiB




22/12/05 22:49:30 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB


[Stage 6123:>                                                      (0 + 0) / 20]

22/12/05 22:49:43 WARN DAGScheduler: Broadcasting large task binary with size 51.2 MiB




22/12/05 22:50:29 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB


[Stage 6125:>                                                      (0 + 0) / 20]

22/12/05 22:50:46 WARN DAGScheduler: Broadcasting large task binary with size 63.4 MiB




22/12/05 22:51:39 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB


[Stage 6127:>                                                      (0 + 0) / 20]

22/12/05 22:51:57 WARN DAGScheduler: Broadcasting large task binary with size 75.8 MiB




22/12/05 22:52:49 WARN DAGScheduler: Broadcasting large task binary with size 6.5 MiB


                                                                                

22/12/05 22:53:03 WARN DAGScheduler: Broadcasting large task binary with size 18.5 MiB


                                                                                

22/12/05 22:54:20 WARN DAGScheduler: Broadcasting large task binary with size 1408.1 KiB


                                                                                

22/12/05 22:54:30 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

22/12/05 22:54:41 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB


                                                                                

22/12/05 22:54:56 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB




22/12/05 22:55:12 WARN DAGScheduler: Broadcasting large task binary with size 1406.6 KiB


                                                                                

22/12/05 22:55:15 WARN DAGScheduler: Broadcasting large task binary with size 10.0 MiB




22/12/05 22:55:32 WARN DAGScheduler: Broadcasting large task binary with size 2035.3 KiB


                                                                                

22/12/05 22:55:36 WARN DAGScheduler: Broadcasting large task binary with size 15.0 MiB




22/12/05 22:55:53 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB


                                                                                

22/12/05 22:55:59 WARN DAGScheduler: Broadcasting large task binary with size 21.6 MiB




22/12/05 22:56:20 WARN DAGScheduler: Broadcasting large task binary with size 3.6 MiB


[Stage 6179:>                                                      (0 + 0) / 20]

22/12/05 22:56:30 WARN DAGScheduler: Broadcasting large task binary with size 30.0 MiB




22/12/05 22:57:02 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


[Stage 6181:>                                                      (0 + 0) / 20]

22/12/05 22:57:14 WARN DAGScheduler: Broadcasting large task binary with size 40.0 MiB




22/12/05 22:57:47 WARN DAGScheduler: Broadcasting large task binary with size 5.3 MiB


[Stage 6183:>                                                      (0 + 0) / 20]

22/12/05 22:57:55 WARN DAGScheduler: Broadcasting large task binary with size 51.4 MiB




22/12/05 22:58:34 WARN DAGScheduler: Broadcasting large task binary with size 6.0 MiB


[Stage 6185:>                                                      (0 + 0) / 20]

22/12/05 22:58:43 WARN DAGScheduler: Broadcasting large task binary with size 63.7 MiB




22/12/05 22:59:30 WARN DAGScheduler: Broadcasting large task binary with size 6.5 MiB


[Stage 6187:>                                                      (0 + 0) / 20]

22/12/05 22:59:42 WARN DAGScheduler: Broadcasting large task binary with size 76.4 MiB




22/12/05 23:00:27 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB


                                                                                

22/12/05 23:00:34 WARN DAGScheduler: Broadcasting large task binary with size 19.0 MiB


                                                                                

22/12/05 23:02:19 WARN DAGScheduler: Broadcasting large task binary with size 1399.5 KiB


                                                                                

22/12/05 23:02:33 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB


                                                                                

22/12/05 23:02:54 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB


                                                                                

22/12/05 23:03:18 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB




22/12/05 23:03:45 WARN DAGScheduler: Broadcasting large task binary with size 1481.1 KiB


                                                                                

22/12/05 23:03:47 WARN DAGScheduler: Broadcasting large task binary with size 10.4 MiB




22/12/05 23:04:12 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                

22/12/05 23:04:16 WARN DAGScheduler: Broadcasting large task binary with size 15.7 MiB




22/12/05 23:04:48 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB


                                                                                

22/12/05 23:04:54 WARN DAGScheduler: Broadcasting large task binary with size 22.9 MiB




22/12/05 23:05:31 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB


[Stage 6239:>                                                      (0 + 0) / 20]

22/12/05 23:05:39 WARN DAGScheduler: Broadcasting large task binary with size 31.9 MiB




22/12/05 23:06:15 WARN DAGScheduler: Broadcasting large task binary with size 4.9 MiB


[Stage 6241:>                                                      (0 + 0) / 20]

22/12/05 23:06:26 WARN DAGScheduler: Broadcasting large task binary with size 42.8 MiB




22/12/05 23:07:15 WARN DAGScheduler: Broadcasting large task binary with size 5.8 MiB


[Stage 6243:>                                                      (0 + 0) / 20]

22/12/05 23:07:27 WARN DAGScheduler: Broadcasting large task binary with size 55.3 MiB




22/12/05 23:08:22 WARN DAGScheduler: Broadcasting large task binary with size 6.6 MiB


[Stage 6245:>                                                      (0 + 0) / 20]

22/12/05 23:08:37 WARN DAGScheduler: Broadcasting large task binary with size 68.8 MiB




22/12/05 23:09:44 WARN DAGScheduler: Broadcasting large task binary with size 7.2 MiB


[Stage 6247:>                                                      (0 + 0) / 20]

22/12/05 23:10:06 WARN DAGScheduler: Broadcasting large task binary with size 82.7 MiB




22/12/05 23:11:27 WARN DAGScheduler: Broadcasting large task binary with size 7.3 MiB


                                                                                

22/12/05 23:11:41 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB


                                                                                

22/12/05 23:11:53 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB


                                                                                

22/12/05 23:11:58 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB


                                                                                

22/12/05 23:12:03 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB




22/12/05 23:12:09 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB


                                                                                

22/12/05 23:12:10 WARN DAGScheduler: Broadcasting large task binary with size 19.5 MiB


[Stage 6266:>                                                       (0 + 1) / 1]

randomOverSampleddata randomForest 0.969 0.95 0.97 0.971 0.969 [[417358.  13442.]
 [ 12558.  72403.]]
<<<<<<<<<<<<<<Finished : randomOverSampleddata

________________RESULTS______________
 +-----------------------+--------------+-------+----------+------+-----------+--------+---------------------+
|        Sampling       |    Model     |  ROC  | accuracy |  F1  | precision | recall |        Matrix       |
+-----------------------+--------------+-------+----------+------+-----------+--------+---------------------+
| randomOverSampleddata | randomForest | 0.969 |   0.95   | 0.97 |   0.971   | 0.969  |  [[417358.  13442.] |
|                       |              |       |          |      |           |        |  [ 12558.  72403.]] |
+-----------------------+--------------+-------+----------+------+-----------+--------+---------------------+


                                                                                