In [1]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import pandas as pd

In [1]:
df = spark.read.csv('/malware/MicrosoftMalware.csv',header=True,inferSchema=True)

In [2]:
df.show(5)

+---+--------------------+------------+-------------+---------------+------------+------+----------------+----------------+-------------------------+-------------------+-----------------+------+-----------------+--------------+----------------------+-----------------+---------------------------+---------+---------+--------+-------+-------+--------------------+--------------------+----------+-----------+---------------+-----+---------------+------------+--------+------------+---------------------+-------------------+------------------------+-------------------------+-------------------------+--------------------------------------+-------------------------------+-------------------------------+--------------------------+--------------------------------+--------------------------+-----------------------+----------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+-----------------

In [3]:
df.show()

+---+--------------------+------------+-------------+----------------+------------+------+----------------+----------------+-------------------------+-------------------+-----------------+------+-----------------+--------------+----------------------+-----------------+---------------------------+---------+---------+--------+-------+-------+--------------------+--------------------+----------+-----------+---------------+-----+---------------+------------+--------+------------+---------------------+-------------------+------------------------+-------------------------+-------------------------+--------------------------------------+-------------------------------+-------------------------------+--------------------------+--------------------------------+--------------------------+-----------------------+----------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+----------------

In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- MachineIdentifier: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- EngineVersion: string (nullable = true)
 |-- AppVersion: string (nullable = true)
 |-- AvSigVersion: string (nullable = true)
 |-- IsBeta: integer (nullable = true)
 |-- RtpStateBitfield: double (nullable = true)
 |-- IsSxsPassiveMode: integer (nullable = true)
 |-- AVProductStatesIdentifier: double (nullable = true)
 |-- AVProductsInstalled: double (nullable = true)
 |-- AVProductsEnabled: double (nullable = true)
 |-- HasTpm: integer (nullable = true)
 |-- CountryIdentifier: integer (nullable = true)
 |-- CityIdentifier: double (nullable = true)
 |-- OrganizationIdentifier: double (nullable = true)
 |-- GeoNameIdentifier: double (nullable = true)
 |-- LocaleEnglishNameIdentifier: integer (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Processor: string (nullable = true)
 |-- OsVer: string (nullable = true)
 |-- OsBuild: integer (null

In [5]:
type(df)

pyspark.sql.dataframe.DataFrame

In [6]:
df.columns

['_c0',
 'MachineIdentifier',
 'ProductName',
 'EngineVersion',
 'AppVersion',
 'AvSigVersion',
 'IsBeta',
 'RtpStateBitfield',
 'IsSxsPassiveMode',
 'AVProductStatesIdentifier',
 'AVProductsInstalled',
 'AVProductsEnabled',
 'HasTpm',
 'CountryIdentifier',
 'CityIdentifier',
 'OrganizationIdentifier',
 'GeoNameIdentifier',
 'LocaleEnglishNameIdentifier',
 'Platform',
 'Processor',
 'OsVer',
 'OsBuild',
 'OsSuite',
 'OsPlatformSubRelease',
 'OsBuildLab',
 'SkuEdition',
 'IsProtected',
 'AutoSampleOptIn',
 'SMode',
 'IeVerIdentifier',
 'SmartScreen',
 'Firewall',
 'UacLuaenable',
 'Census_MDC2FormFactor',
 'Census_DeviceFamily',
 'Census_OEMNameIdentifier',
 'Census_OEMModelIdentifier',
 'Census_ProcessorCoreCount',
 'Census_ProcessorManufacturerIdentifier',
 'Census_ProcessorModelIdentifier',
 'Census_PrimaryDiskTotalCapacity',
 'Census_PrimaryDiskTypeName',
 'Census_SystemVolumeTotalCapacity',
 'Census_HasOpticalDiskDrive',
 'Census_TotalPhysicalRAM',
 'Census_ChassisTypeName',
 'Cens

In [7]:
data = df.drop('_c0')

In [8]:
print((data.count(), len(data.columns)))

(3259724, 76)


In [9]:
data.show(2)

+--------------------+------------+-------------+---------------+------------+------+----------------+----------------+-------------------------+-------------------+-----------------+------+-----------------+--------------+----------------------+-----------------+---------------------------+---------+---------+--------+-------+-------+--------------------+--------------------+----------+-----------+---------------+-----+---------------+------------+--------+------------+---------------------+-------------------+------------------------+-------------------------+-------------------------+--------------------------------------+-------------------------------+-------------------------------+--------------------------+--------------------------------+--------------------------+-----------------------+----------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+---------------------

In [10]:
data.groupby('HasDetections').count().collect()

[Row(HasDetections=1, count=1689470), Row(HasDetections=0, count=1570254)]

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler,SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import functools
from pyspark.ml.feature import OneHotEncoder

In [12]:
from matplotlib import pyplot as plt
#matplotlib inline
responses = data.groupBy('HasDetections').count().collect()
categories = [i[0] for i in responses]
counts = [i[1] for i in responses]
ind = np.array(range(len(categories)))
width = 0.20
plt.bar(ind, counts, width=width, color='r')
plt.ylabel('counts')
plt.title('Response distribution')
plt.xticks(ind + width/2., categories)
plt.show()

<Figure size 640x480 with 1 Axes>

In [13]:
import random
import numpy as np
from pyspark.sql import Row
from sklearn import neighbors
from pyspark.ml.feature import VectorAssembler
def vectorizerFunction(dataInput, TargetFieldName):
    if(dataInput.select(TargetFieldName).distinct().count() != 2):
        raise ValueError("Target field must have only 2 distinct classes")
    columnNames = list(dataInput.columns)
    columnNames.remove(TargetFieldName)
    dataInput = dataInput.select((','.join(columnNames)+','+TargetFieldName).split(','))
    assembler=VectorAssembler(inputCols = columnNames, outputCol = 'features')
    pos_vectorized = assembler.transform(dataInput)
    vectorized = pos_vectorized.select('features',TargetFieldName).withColumn('label',pos_vectorized[TargetFieldName]).drop(TargetFieldName)
    return vectorized
def SmoteSampling(vectorized, k =3, minorityClass = 1, majorityClass = 0,percentageOver = 150, percentageUnder = 100):
    if(percentageUnder > 100|percentageUnder < 10):
        raise ValueError("Percentage Under must be in range 10 - 100")
    if(percentageOver < 100):
        raise ValueError("Percentage Over must be in at least 100")
    dataInput_min = vectorized[vectorized['label'] == minorityClass]
    dataInput_maj = vectorized[vectorized['label'] == majorityClass]
    feature = dataInput_min.select('features')
    feature = feature.rdd
    feature = feature.map(lambda x: x[0])
    feature = feature.collect()
    feature = np.asarray(feature)
    nbrs = neighbors.NearestNeighbors(n_neighbors=k, algorithm='auto').fit(feature)
    neighbours = nbrs.kneighbors(feature)
    gap = neighbours[0]
    neighbours = neighbours[1]
    min_rdd = dataInput_min.drop('label').rdd
    pos_rddArray = min_rdd.map(lambda x : list(x))
    pos_ListArray = pos_rddArray.collect()
    min_Array = list(pos_ListArray)
    newRows = []
    nt = len(min_Array)
    nexs = percentageOver/100
    for i in range(nt):
        for j in range(int(nexs)):
            neigh = random.randint(1,k)
            difs = min_Array[neigh][0] - min_Array[i][0]
            newRec = (min_Array[i][0]+random.random()*difs)
            newRows.insert(0,(newRec))
    newData_rdd = sc.parallelize(newRows)
    newData_rdd_new = newData_rdd.map(lambda x: Row(features = x, label = 1))
    new_data = newData_rdd_new.toDF()
    new_data_minor = dataInput_min.unionAll(new_data)
    new_data_major = dataInput_maj.sample(False, (float(percentageUnder)/float(100)))
    return new_data_major.unionAll(new_data_minor)

In [14]:
data.describe()

DataFrame[summary: string, MachineIdentifier: string, ProductName: string, EngineVersion: string, AppVersion: string, AvSigVersion: string, IsBeta: string, RtpStateBitfield: string, IsSxsPassiveMode: string, AVProductStatesIdentifier: string, AVProductsInstalled: string, AVProductsEnabled: string, HasTpm: string, CountryIdentifier: string, CityIdentifier: string, OrganizationIdentifier: string, GeoNameIdentifier: string, LocaleEnglishNameIdentifier: string, Platform: string, Processor: string, OsVer: string, OsBuild: string, OsSuite: string, OsPlatformSubRelease: string, OsBuildLab: string, SkuEdition: string, IsProtected: string, AutoSampleOptIn: string, SMode: string, IeVerIdentifier: string, SmartScreen: string, Firewall: string, UacLuaenable: string, Census_MDC2FormFactor: string, Census_DeviceFamily: string, Census_OEMNameIdentifier: string, Census_OEMModelIdentifier: string, Census_ProcessorCoreCount: string, Census_ProcessorManufacturerIdentifier: string, Census_ProcessorModelId

In [15]:
data.groupBy('HasDetections').count().collect()

[Row(HasDetections=1, count=1689470), Row(HasDetections=0, count=1570254)]

In [16]:
df = SmoteSampling(vectorizerFunction(data, 'HasDetections'), k = 3, minorityClass =1, majorityClass = 0, percentageOver = 101, percentageUnder = 22)

IllegalArgumentException: 'Data type string of column MachineIdentifier is not supported.\nData type string of column ProductName is not supported.\nData type string of column EngineVersion is not supported.\nData type string of column AppVersion is not supported.\nData type string of column AvSigVersion is not supported.\nData type string of column Platform is not supported.\nData type string of column Processor is not supported.\nData type string of column OsVer is not supported.\nData type string of column OsPlatformSubRelease is not supported.\nData type string of column OsBuildLab is not supported.\nData type string of column SkuEdition is not supported.\nData type string of column SmartScreen is not supported.\nData type string of column Census_MDC2FormFactor is not supported.\nData type string of column Census_DeviceFamily is not supported.\nData type string of column Census_PrimaryDiskTypeName is not supported.\nData type string of column Census_ChassisTypeName is not supported.\nData type string of column Census_PowerPlatformRoleName is not supported.\nData type string of column Census_OSVersion is not supported.\nData type string of column Census_OSArchitecture is not supported.\nData type string of column Census_OSBranch is not supported.\nData type string of column Census_OSEdition is not supported.\nData type string of column Census_OSSkuName is not supported.\nData type string of column Census_OSInstallTypeName is not supported.\nData type string of column Census_OSWUAutoUpdateOptionsName is not supported.\nData type string of column Census_GenuineStateName is not supported.\nData type string of column Census_ActivationChannel is not supported.\nData type string of column Census_FlightRing is not supported.'

In [17]:
df.columns

['_c0',
 'MachineIdentifier',
 'ProductName',
 'EngineVersion',
 'AppVersion',
 'AvSigVersion',
 'IsBeta',
 'RtpStateBitfield',
 'IsSxsPassiveMode',
 'AVProductStatesIdentifier',
 'AVProductsInstalled',
 'AVProductsEnabled',
 'HasTpm',
 'CountryIdentifier',
 'CityIdentifier',
 'OrganizationIdentifier',
 'GeoNameIdentifier',
 'LocaleEnglishNameIdentifier',
 'Platform',
 'Processor',
 'OsVer',
 'OsBuild',
 'OsSuite',
 'OsPlatformSubRelease',
 'OsBuildLab',
 'SkuEdition',
 'IsProtected',
 'AutoSampleOptIn',
 'SMode',
 'IeVerIdentifier',
 'SmartScreen',
 'Firewall',
 'UacLuaenable',
 'Census_MDC2FormFactor',
 'Census_DeviceFamily',
 'Census_OEMNameIdentifier',
 'Census_OEMModelIdentifier',
 'Census_ProcessorCoreCount',
 'Census_ProcessorManufacturerIdentifier',
 'Census_ProcessorModelIdentifier',
 'Census_PrimaryDiskTotalCapacity',
 'Census_PrimaryDiskTypeName',
 'Census_SystemVolumeTotalCapacity',
 'Census_HasOpticalDiskDrive',
 'Census_TotalPhysicalRAM',
 'Census_ChassisTypeName',
 'Cens

In [18]:
data = df.drop('_c0','MachineIdentifier','AvSigVersion')
data.columns

['ProductName',
 'EngineVersion',
 'AppVersion',
 'IsBeta',
 'RtpStateBitfield',
 'IsSxsPassiveMode',
 'AVProductStatesIdentifier',
 'AVProductsInstalled',
 'AVProductsEnabled',
 'HasTpm',
 'CountryIdentifier',
 'CityIdentifier',
 'OrganizationIdentifier',
 'GeoNameIdentifier',
 'LocaleEnglishNameIdentifier',
 'Platform',
 'Processor',
 'OsVer',
 'OsBuild',
 'OsSuite',
 'OsPlatformSubRelease',
 'OsBuildLab',
 'SkuEdition',
 'IsProtected',
 'AutoSampleOptIn',
 'SMode',
 'IeVerIdentifier',
 'SmartScreen',
 'Firewall',
 'UacLuaenable',
 'Census_MDC2FormFactor',
 'Census_DeviceFamily',
 'Census_OEMNameIdentifier',
 'Census_OEMModelIdentifier',
 'Census_ProcessorCoreCount',
 'Census_ProcessorManufacturerIdentifier',
 'Census_ProcessorModelIdentifier',
 'Census_PrimaryDiskTotalCapacity',
 'Census_PrimaryDiskTypeName',
 'Census_SystemVolumeTotalCapacity',
 'Census_HasOpticalDiskDrive',
 'Census_TotalPhysicalRAM',
 'Census_ChassisTypeName',
 'Census_InternalPrimaryDiagonalDisplaySizeInInches',

In [19]:
catcolumns = [item[0] for item in data.dtypes if item[1].startswith('string')]
print(str(len(catcolumns)) + ' categorical features')
numcolumns = [item[0] for item in data.dtypes if item[1].startswith('int') | item[1].startswith('double')]
print(str(len(numcolumns)) + ' numerical features')

25 categorical features
49 numerical features


In [20]:
#pyspark
#categorical columns
catcolumns=[item[0] for item in data.dtypes if item[0].startswith('string')] #will select name of column with string data type
print("cateogrical columns:", catcolumns)

cateogrical columns: ['ProductName', 'EngineVersion', 'AppVersion', 'Platform', 'Processor', 'OsVer', 'OsPlatformSubRelease', 'OsBuildLab', 'SkuEdition', 'SmartScreen', 'Census_MDC2FormFactor', 'Census_DeviceFamily', 'Census_PrimaryDiskTypeName', 'Census_ChassisTypeName', 'Census_PowerPlatformRoleName', 'Census_OSVersion', 'Census_OSArchitecture', 'Census_OSBranch', 'Census_OSEdition', 'Census_OSSkuName', 'Census_OSInstallTypeName', 'Census_OSWUAutoUpdateOptionsName', 'Census_GenuineStateName', 'Census_ActivationChannel', 'Census_FlightRing']


In [21]:
#pyspark
### numerical columns
numcolumns = [item[0] for item in data.dtypes if item[1].startswith('int') | item[1].startswith('double')] #will select name of c or double data type
print("numerical columns:", numcolumns)

numerical columns: ['IsBeta', 'RtpStateBitfield', 'IsSxsPassiveMode', 'AVProductStatesIdentifier', 'AVProductsInstalled', 'AVProductsEnabled', 'HasTpm', 'CountryIdentifier', 'CityIdentifier', 'OrganizationIdentifier', 'GeoNameIdentifier', 'LocaleEnglishNameIdentifier', 'OsBuild', 'OsSuite', 'IsProtected', 'AutoSampleOptIn', 'SMode', 'IeVerIdentifier', 'Firewall', 'UacLuaenable', 'Census_OEMNameIdentifier', 'Census_OEMModelIdentifier', 'Census_ProcessorCoreCount', 'Census_ProcessorManufacturerIdentifier', 'Census_ProcessorModelIdentifier', 'Census_PrimaryDiskTotalCapacity', 'Census_SystemVolumeTotalCapacity', 'Census_HasOpticalDiskDrive', 'Census_TotalPhysicalRAM', 'Census_InternalPrimaryDiagonalDisplaySizeInInches', 'Census_InternalPrimaryDisplayResolutionHorizontal', 'Census_InternalPrimaryDisplayResolutionVertical', 'Census_InternalBatteryNumberOfCharges', 'Census_OSBuildNumber', 'Census_OSBuildRevision', 'Census_OSInstallLanguageIdentifier', 'Census_OSUILocaleIdentifier', 'Census_Is

In [22]:
type(numcolumns)

list

In [23]:
categorical = data.select(catcolumns)
categorical.show()

+------------+-------------+----------------+---------+---------+--------+--------------------+--------------------+----------+------------+---------------------+-------------------+--------------------------+----------------------+----------------------------+----------------+---------------------+--------------------+------------------+-------------------+------------------------+--------------------------------+-----------------------+------------------------+-----------------+
| ProductName|EngineVersion|      AppVersion| Platform|Processor|   OsVer|OsPlatformSubRelease|          OsBuildLab|SkuEdition| SmartScreen|Census_MDC2FormFactor|Census_DeviceFamily|Census_PrimaryDiskTypeName|Census_ChassisTypeName|Census_PowerPlatformRoleName|Census_OSVersion|Census_OSArchitecture|     Census_OSBranch|  Census_OSEdition|   Census_OSSkuName|Census_OSInstallTypeName|Census_OSWUAutoUpdateOptionsName|Census_GenuineStateName|Census_ActivationChannel|Census_FlightRing|
+------------+-------------+

In [24]:
for i in categorical:
    print(i,(categorical.select(i).distinct().count()))

Column<b'ProductName'> 2
Column<b'EngineVersion'> 57
Column<b'AppVersion'> 101
Column<b'Platform'> 3
Column<b'Processor'> 3
Column<b'OsVer'> 33
Column<b'OsPlatformSubRelease'> 9
Column<b'OsBuildLab'> 418
Column<b'SkuEdition'> 7
Column<b'SmartScreen'> 16
Column<b'Census_MDC2FormFactor'> 11
Column<b'Census_DeviceFamily'> 3
Column<b'Census_PrimaryDiskTypeName'> 4
Column<b'Census_ChassisTypeName'> 40
Column<b'Census_PowerPlatformRoleName'> 9
Column<b'Census_OSVersion'> 359
Column<b'Census_OSArchitecture'> 3
Column<b'Census_OSBranch'> 15
Column<b'Census_OSEdition'> 24
Column<b'Census_OSSkuName'> 23
Column<b'Census_OSInstallTypeName'> 9
Column<b'Census_OSWUAutoUpdateOptionsName'> 5
Column<b'Census_GenuineStateName'> 4
Column<b'Census_ActivationChannel'> 5
Column<b'Census_FlightRing'> 9


In [25]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder

In [26]:
stages = []
for i in catcolumns:
    stringIndexer = StringIndexer(inputCol= i, outputCol= i + '_Index')
    #model = stringIndexer.fit(data_cp)
    #indexed = model.transform(data)
    #data_cp = model.transform(data_cp)
    #ind +=[indexed]
    #data_cp.show(1)

    encoder = OneHotEncoder(inputCol= i +'_Index', outputCol= i + "_classVec")
    #data_cp = encoder.transform(data_cp)
    # enc +=[encoded]
    stages += [stringIndexer, encoder]


In [27]:
assemblerInputs = [c + "_classVec" for c in catcolumns] + numcolumns
assemblerr = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
#data_cp = assemblerr.transform(data_cp)
stages += [assemblerr]

In [28]:
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
stages += [scaler]

In [29]:
data.columns

['ProductName',
 'EngineVersion',
 'AppVersion',
 'IsBeta',
 'RtpStateBitfield',
 'IsSxsPassiveMode',
 'AVProductStatesIdentifier',
 'AVProductsInstalled',
 'AVProductsEnabled',
 'HasTpm',
 'CountryIdentifier',
 'CityIdentifier',
 'OrganizationIdentifier',
 'GeoNameIdentifier',
 'LocaleEnglishNameIdentifier',
 'Platform',
 'Processor',
 'OsVer',
 'OsBuild',
 'OsSuite',
 'OsPlatformSubRelease',
 'OsBuildLab',
 'SkuEdition',
 'IsProtected',
 'AutoSampleOptIn',
 'SMode',
 'IeVerIdentifier',
 'SmartScreen',
 'Firewall',
 'UacLuaenable',
 'Census_MDC2FormFactor',
 'Census_DeviceFamily',
 'Census_OEMNameIdentifier',
 'Census_OEMModelIdentifier',
 'Census_ProcessorCoreCount',
 'Census_ProcessorManufacturerIdentifier',
 'Census_ProcessorModelIdentifier',
 'Census_PrimaryDiskTotalCapacity',
 'Census_PrimaryDiskTypeName',
 'Census_SystemVolumeTotalCapacity',
 'Census_HasOpticalDiskDrive',
 'Census_TotalPhysicalRAM',
 'Census_ChassisTypeName',
 'Census_InternalPrimaryDiagonalDisplaySizeInInches',

In [30]:
from pyspark.ml import Pipeline
partialPipeline = Pipeline().setStages(stages)
pipeline_data = partialPipeline.fit(data).transform(data)

In [31]:
pipeline_data.columns

['ProductName',
 'EngineVersion',
 'AppVersion',
 'IsBeta',
 'RtpStateBitfield',
 'IsSxsPassiveMode',
 'AVProductStatesIdentifier',
 'AVProductsInstalled',
 'AVProductsEnabled',
 'HasTpm',
 'CountryIdentifier',
 'CityIdentifier',
 'OrganizationIdentifier',
 'GeoNameIdentifier',
 'LocaleEnglishNameIdentifier',
 'Platform',
 'Processor',
 'OsVer',
 'OsBuild',
 'OsSuite',
 'OsPlatformSubRelease',
 'OsBuildLab',
 'SkuEdition',
 'IsProtected',
 'AutoSampleOptIn',
 'SMode',
 'IeVerIdentifier',
 'SmartScreen',
 'Firewall',
 'UacLuaenable',
 'Census_MDC2FormFactor',
 'Census_DeviceFamily',
 'Census_OEMNameIdentifier',
 'Census_OEMModelIdentifier',
 'Census_ProcessorCoreCount',
 'Census_ProcessorManufacturerIdentifier',
 'Census_ProcessorModelIdentifier',
 'Census_PrimaryDiskTotalCapacity',
 'Census_PrimaryDiskTypeName',
 'Census_SystemVolumeTotalCapacity',
 'Census_HasOpticalDiskDrive',
 'Census_TotalPhysicalRAM',
 'Census_ChassisTypeName',
 'Census_InternalPrimaryDiagonalDisplaySizeInInches',

In [32]:
pipeline_data = pipeline_data.withColumnRenamed('HasDetections','label')

In [33]:
pipeline_data.select('scaledFeatures').show(1,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [34]:
(trainingData, testData) = pipeline_data.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

2282020
977704


In [35]:
trainingData.groupBy('label').count().show()

+-----+-------+
|label|  count|
+-----+-------+
|    1|1182247|
|    0|1099773|
+-----+-------+



In [36]:
trainingData.columns

['ProductName',
 'EngineVersion',
 'AppVersion',
 'IsBeta',
 'RtpStateBitfield',
 'IsSxsPassiveMode',
 'AVProductStatesIdentifier',
 'AVProductsInstalled',
 'AVProductsEnabled',
 'HasTpm',
 'CountryIdentifier',
 'CityIdentifier',
 'OrganizationIdentifier',
 'GeoNameIdentifier',
 'LocaleEnglishNameIdentifier',
 'Platform',
 'Processor',
 'OsVer',
 'OsBuild',
 'OsSuite',
 'OsPlatformSubRelease',
 'OsBuildLab',
 'SkuEdition',
 'IsProtected',
 'AutoSampleOptIn',
 'SMode',
 'IeVerIdentifier',
 'SmartScreen',
 'Firewall',
 'UacLuaenable',
 'Census_MDC2FormFactor',
 'Census_DeviceFamily',
 'Census_OEMNameIdentifier',
 'Census_OEMModelIdentifier',
 'Census_ProcessorCoreCount',
 'Census_ProcessorManufacturerIdentifier',
 'Census_ProcessorModelIdentifier',
 'Census_PrimaryDiskTotalCapacity',
 'Census_PrimaryDiskTypeName',
 'Census_SystemVolumeTotalCapacity',
 'Census_HasOpticalDiskDrive',
 'Census_TotalPhysicalRAM',
 'Census_ChassisTypeName',
 'Census_InternalPrimaryDiagonalDisplaySizeInInches',

In [37]:
testData.groupBy('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|507223|
|    0|470481|
+-----+------+



In [38]:
pipeline_data.select('features').show(5)

+--------------------+
|            features|
+--------------------+
|(1196,[0,1,57,157...|
|(1196,[0,1,57,157...|
|(1196,[0,1,57,157...|
|(1196,[0,2,57,157...|
|(1196,[0,1,57,157...|
+--------------------+
only showing top 5 rows



In [39]:
print(trainingData.count(),len(trainingData.dtypes))
print(testData.count(),len(testData.dtypes))
#print(trainingData.columns,testData.columns)

2282020 126
977704 126


In [40]:
from pyspark.ml.classification import LogisticRegression
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="scaledFeatures", maxIter=10)
# Train model with Training Data
Model = lr.fit(trainingData)
train_prediction = Model.transform(trainingData)
test_prediction = Model.transform(testData)

In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
train_accuracy = evaluator.evaluate(train_prediction)
test_accuracy = evaluator.evaluate(test_prediction)
print("Accuracy of LogisticRegression on train is = %g"% (train_accuracy))
print("Accuracy of LogisticRegression on test is = %g"% (test_accuracy))
#print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy_test))

Accuracy of LogisticRegression on train is = 0.873356
Accuracy of LogisticRegression on test is = 0.87315


In [43]:
cm = test_prediction.crosstab('prediction','label')
cm = cm.toPandas()
cm

Unnamed: 0,prediction_label,0,1
0,1.0,63359,446560
1,0.0,407122,60663


In [44]:
TP = cm["1"][0]
FP = cm["0"][0]
TN = cm["0"][1]
FN = cm["1"][1]
print(TP,FP,TN,FN)

446560 63359 407122 60663


In [45]:
Accuracy = (TP+TN)/(TP+TN+FP+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)
print('ACCURACY = %0.2f' %Accuracy)
print('SENSITIVITY = %0.2f' %Sensitivity)
print('SPECIFICITY = %0.2f' %Specificity)
print('PRECISION = %0.2f' %Precision)

ACCURACY = 0.87
SENSITIVITY = 0.88
SPECIFICITY = 0.87
PRECISION = 0.88


In [46]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="scaledFeatures",maxDepth=3)
Model = dt.fit(trainingData)
train_prediction = Model.transform(trainingData)
test_prediction = Model.transform(testData)