In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=bc6e8c3d4ede891c227542365f67a508d805f11357f07b48e7466b09cb7695cf
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz

--2024-04-12 05:07:58--  http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz
Resolving kdd.ics.uci.edu (kdd.ics.uci.edu)... 128.195.1.86
Connecting to kdd.ics.uci.edu (kdd.ics.uci.edu)|128.195.1.86|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18115902 (17M) [application/x-gzip]
Saving to: ‘kddcup.data.gz’


2024-04-12 05:09:12 (237 KB/s) - ‘kddcup.data.gz’ saved [18115902/18115902]



In [3]:
!gzip -d kddcup.data.gz

## Import Libraries

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

## Creat SparkSession

In [2]:
spark = SparkSession.builder.appName('Decision Trees KDD').getOrCreate()

## Read The Dataset

In [3]:
df = spark.read.csv('kddcup.data', header='False', inferSchema='True')

df.show(5)

+---+---+----+---+---+-----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|_c0|_c1| _c2|_c3|_c4|  _c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|   _c41|
+---+---+----+---+---+-----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|  0|tcp|http| SF|215|45076|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   1| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|   0|   0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|normal.|
|  0|tcp|http| SF|162| 4528|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   2|   2| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|   1|   1| 1.0| 0.0

In [4]:
print('Number of datapoints: ', df.count())

Number of datapoints:  4898431


In [5]:
columns = [
    'duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes',
    'land', 'wrong_fragment', 'urgent', 'hot','num_failed_logins', 'logged_in',
    'num_compromised', 'root_shell', 'su_attempted', 'num_root',
    'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds',
    'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate',
    'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate',
    'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count',
    'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate',
    'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate',
    'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate',
    'dst_host_srv_rerror_rate', 'label'
]

In [6]:
df = df.toDF(*columns)

df.show(5)

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst_hos

## Convert String Features to Integer Features

In [7]:
df.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true

In [8]:
from pyspark.sql.types import StringType

all_cols = df.schema.fields

string_cols = [col.name for col in all_cols if col.dataType == StringType()]

print(string_cols)

['protocol_type', 'service', 'flag', 'label']


In [9]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexers = [
    StringIndexer(inputCol=col, outputCol=col+'_index')
    for col in string_cols
]

pipeline = Pipeline(stages=indexers)

pipeline_model = pipeline.fit(df)

In [10]:
df = pipeline_model.transform(df)

df.show(5)

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+-------------------+-------------+----------+-----------+
|duration|protocol_type|service|flag|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_ra

In [11]:
columns = [col if col not in string_cols else col+'_index' for col in columns]

print(columns)

['duration', 'protocol_type_index', 'service_index', 'flag_index', 'src_bytes', 'dst_bytes', 'land', 'wrong_fragment', 'urgent', 'hot', 'num_failed_logins', 'logged_in', 'num_compromised', 'root_shell', 'su_attempted', 'num_root', 'num_file_creations', 'num_shells', 'num_access_files', 'num_outbound_cmds', 'is_host_login', 'is_guest_login', 'count', 'srv_count', 'serror_rate', 'srv_serror_rate', 'rerror_rate', 'srv_rerror_rate', 'same_srv_rate', 'diff_srv_rate', 'srv_diff_host_rate', 'dst_host_count', 'dst_host_srv_count', 'dst_host_same_srv_rate', 'dst_host_diff_srv_rate', 'dst_host_same_src_port_rate', 'dst_host_srv_diff_host_rate', 'dst_host_serror_rate', 'dst_host_srv_serror_rate', 'dst_host_rerror_rate', 'dst_host_srv_rerror_rate', 'label_index']


In [12]:
df = df.select(*columns)

df.show(5)

+--------+-------------------+-------------+----------+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-----------+
|duration|protocol_type_index|service_index|flag_index|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|

## Create a single Feature Vector

In [13]:
from pyspark.ml.feature import VectorAssembler

input_cols = columns[:-1]

vector_assembler = VectorAssembler(inputCols=input_cols, outputCol='featureVector')
df = vector_assembler.transform(df)

df.select('featureVector').show(5)

+--------------------+
|       featureVector|
+--------------------+
|(41,[1,2,4,5,11,2...|
|(41,[1,2,4,5,11,2...|
|(41,[1,2,4,5,11,2...|
|(41,[1,2,4,5,11,2...|
|(41,[1,2,4,5,11,2...|
+--------------------+
only showing top 5 rows



In [14]:
df = df.select('featureVector', 'label_index')

df.show(5)

+--------------------+-----------+
|       featureVector|label_index|
+--------------------+-----------+
|(41,[1,2,4,5,11,2...|        2.0|
|(41,[1,2,4,5,11,2...|        2.0|
|(41,[1,2,4,5,11,2...|        2.0|
|(41,[1,2,4,5,11,2...|        2.0|
|(41,[1,2,4,5,11,2...|        2.0|
+--------------------+-----------+
only showing top 5 rows



## Split to Training and Testing

In [15]:
train_df, test_df = df.randomSplit([0.8, 0.2])

## Decision Trees

In [17]:
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(
    labelCol='label_index',
    featuresCol='featureVector',
    predictionCol='prediction',
    maxBins=100
)

model = classifier.fit(train_df)

In [18]:
print(model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_7614bd0cbf3b, depth=5, numNodes=41, numClasses=23, numFeatures=41
  If (feature 23 <= 65.5)
   If (feature 28 <= 0.315)
    If (feature 34 <= 0.16)
     If (feature 3 in {1.0,2.0,4.0,6.0,7.0,8.0})
      If (feature 35 <= 0.07500000000000001)
       Predict: 1.0
      Else (feature 35 > 0.07500000000000001)
       Predict: 5.0
     Else (feature 3 not in {1.0,2.0,4.0,6.0,7.0,8.0})
      If (feature 4 <= 6.5)
       Predict: 3.0
      Else (feature 4 > 6.5)
       Predict: 2.0
    Else (feature 34 > 0.16)
     If (feature 4 <= 7.5)
      If (feature 29 <= 0.635)
       Predict: 5.0
      Else (feature 29 > 0.635)
       Predict: 3.0
     Else (feature 4 > 7.5)
      If (feature 22 <= 28.5)
       Predict: 2.0
      Else (feature 22 > 28.5)
       Predict: 9.0
   Else (feature 28 > 0.315)
    If (feature 36 <= 0.45)
     If (feature 3 in {0.0,2.0,5.0,6.0,8.0})
      If (feature 34 <= 0.92)
       Predict: 2.0
      Else (feature 

In [19]:
import pandas as pd

pd.DataFrame(
    model.featureImportances.toArray(),
    index=input_cols,
    columns=['importance']
).sort_values(by='importance', ascending=False)

Unnamed: 0,importance
srv_count,0.615878
same_srv_rate,0.338573
src_bytes,0.013365
dst_host_diff_srv_rate,0.011713
flag_index,0.009436
dst_host_srv_diff_host_rate,0.006831
dst_host_serror_rate,0.00282
service_index,0.000912
diff_srv_rate,0.000279
dst_host_same_src_port_rate,0.0001


In [20]:
predictions = model.transform(test_df)
predictions.select('label_index', 'prediction', 'probability').show(5)

+-----------+----------+--------------------+
|label_index|prediction|         probability|
+-----------+----------+--------------------+
|        2.0|       5.0|[0.0,6.8662455369...|
|        2.0|       5.0|[0.0,6.8662455369...|
|        2.0|       5.0|[0.0,6.8662455369...|
|        2.0|       5.0|[0.0,6.8662455369...|
|        2.0|       5.0|[0.0,6.8662455369...|
+-----------+----------+--------------------+
only showing top 5 rows



### Evaluate Outputs

In [21]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol='label_index',
    predictionCol='prediction'
)

acc = evaluator.setMetricName('accuracy').evaluate(predictions)
f1 = evaluator.setMetricName('f1').evaluate(predictions)

print('Accuracy: ', round(acc * 100, 2))
print('F1 Score: ', round(f1 * 100, 2))

Accuracy:  99.75
F1 Score:  99.7


### Hyperparameter Tuning

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

classifier = DecisionTreeClassifier(
    labelCol='label_index',
    featuresCol='featureVector',
    predictionCol='prediction',
)

paramGrid = ParamGridBuilder().\
    addGrid(classifier.impurity, ['gini', 'entropy']).\
    addGrid(classifier.maxDepth, [1, 5, 10, 20]).\
    addGrid(classifier.maxBins, [100, 200]).\
    addGrid(classifier.minInfoGain, [0.0, 0.05, 0.1, 0.5]).\
    build()

evaluator = MulticlassClassificationEvaluator(
    labelCol='label_index',
    predictionCol='prediction',
    metricName='accuracy'
)

In [None]:
from pyspark.ml.tuning import TrainValidationSplit

validator = TrainValidationSplit(
    estimator=classifier,
    evaluator=evaluator,
    estimatorParamMaps=paramGrid
)

validator_model = validator.fit(train_df)

In [None]:
best_model = validator_model.bestModel

print('Best Parameters: ')
print('\tImpurity: ', best_model.getImpurity())
print('\tMax Depth: ', best_model.getMaxDepth())
print('\tMax Bins: ', best_model.getMaxBins())
print('\tMin Info Gain: ', best_model.getMinInfoGain())

In [None]:
predictions = best_model.transform(test_df)
predictions.select('income_index', 'prediction', 'probability').show(5)

In [None]:
acc = evaluator.setMetricName('accuracy').evaluate(predictions)
f1 = evaluator.setMetricName('f1').evaluate(predictions)

print('Accuracy: ', round(acc * 100, 2))
print('F1 Score: ', round(f1 * 100, 2))

## Random Forests

In [22]:
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(
    labelCol='label_index',
    featuresCol='featureVector',
    predictionCol='prediction',
    maxBins=100
)

model = classifier.fit(train_df)

In [23]:
pd.DataFrame(
    model.featureImportances.toArray(),
    index=input_cols,
    columns=['importance']
).sort_values(by='importance', ascending=False)

Unnamed: 0,importance
dst_host_same_src_port_rate,0.1797296
same_srv_rate,0.1594905
count,0.1233215
srv_count,0.09303557
protocol_type_index,0.09124643
service_index,0.09086988
diff_srv_rate,0.06304451
src_bytes,0.05123403
dst_host_srv_count,0.04578603
dst_host_diff_srv_rate,0.02616706


In [24]:
predictions = model.transform(test_df)
predictions.select('label_index', 'prediction', 'probability').show(5)

+-----------+----------+--------------------+
|label_index|prediction|         probability|
+-----------+----------+--------------------+
|        2.0|       2.0|[0.00127372791918...|
|        2.0|       2.0|[0.00123814103855...|
|        2.0|       2.0|[7.39955841850548...|
|        2.0|       2.0|[0.00115888468411...|
|        2.0|       2.0|[0.00115888468411...|
+-----------+----------+--------------------+
only showing top 5 rows



### Evaluation

In [25]:
evaluator = MulticlassClassificationEvaluator(
    labelCol='label_index',
    predictionCol='prediction'
)

acc = evaluator.setMetricName('accuracy').evaluate(predictions)
f1 = evaluator.setMetricName('f1').evaluate(predictions)

print('Accuracy: ', round(acc * 100, 2))
print('F1 Score: ', round(f1 * 100, 2))

Accuracy:  99.74
F1 Score:  99.68


### HyperParameter Tuning

In [None]:
classifier = RandomForestClassifier(
    labelCol='label_index',
    featuresCol='featureVector',
    predictionCol='prediction',
)

paramGrid = ParamGridBuilder().\
    addGrid(classifier.impurity, ['gini', 'entropy']).\
    addGrid(classifier.maxDepth, [1, 5, 10, 20]).\
    addGrid(classifier.maxBins, [50, 100, 200]).\
    addGrid(classifier.numTrees, [20, 50, 100]).\
    addGrid(classifier.minInfoGain, [0.0, 0.05, 0.1, 0.5]).\
    build()

evaluator = MulticlassClassificationEvaluator(
    labelCol='label_index',
    predictionCol='prediction',
    metricName='accuracy'
)

In [None]:
validator = TrainValidationSplit(
    estimator=classifier,
    evaluator=evaluator,
    estimatorParamMaps=paramGrid
)

validator_model = validator.fit(train_df)

In [None]:
best_model = validator_model.bestModel

print('Best Parameters: ')
print('\tImpurity: ', best_model.getImpurity())
print('\tMax Depth: ', best_model.getMaxDepth())
print('\tMax Bins: ', best_model.getMaxBins())
print('\tNum Trees: ', best_model.getNumTrees)
print('\tMin Info Gain: ', best_model.getMinInfoGain())

In [None]:
predictions = best_model.transform(test_df)
predictions.select('income_index', 'prediction', 'probability').show(5)

In [None]:
acc = evaluator.setMetricName('accuracy').evaluate(predictions)
f1 = evaluator.setMetricName('f1').evaluate(predictions)

print('Accuracy: ', round(acc * 100, 2))
print('F1 Score: ', round(f1 * 100, 2))