In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext

In [2]:
SparkContext.setSystemProperty('spark.executor.memory','4g')
conf = SparkConf()
conf.set('spark.executor.instances',20)
sc = SparkContext('yarn', 'kdd99', conf=conf)
hc=HiveContext(sc)

In [3]:
hc.sql("USE itv000684_kdd99data")

In [4]:
kdd = hc.table("kdd99")

In [5]:
(train_data, test_data) = kdd.randomSplit([0.7,0.3], seed=42)

In [6]:
train_data.cache()

protocol_type,service,flag,is_anomaly,duration,src_bytes,dst_bytes,wrong_fragment,urgent,hot,num_failed_logins,num_compromised,root_shell,su_attempted,num_root,num_file_creations,num_shells,num_access_files,num_outbound_cmds,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,srv_diff_host_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,3.0,1.0,0.0,1.0,0.67,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,15.0,1.0,0.0,1.0,0.53,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,29.0,1.0,0.0,1.0,0.52,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,43.0,1.0,0.0,1.0,0.51,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,53.0,1.0,0.0,1.0,0.51,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,97.0,1.0,0.0,1.0,0.51,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,127.0,1.0,0.0,1.0,0.5,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,131.0,1.0,0.0,1.0,0.5,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,233.0,1.0,0.0,1.0,0.5,0.0,0.0,0.0,0.0
icmp,eco_i,SF,ipsweep.,0,8,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,255.0,1.0,0.0,1.0,0.5,0.0,0.0,0.0,0.0


In [7]:
services = train_data.withColumnRenamed('service','srvc').select('srvc').distinct()

In [8]:
## filter and remove any rows with a service not trained upon

In [9]:
test_data = test_data.join(services, test_data.service == services.srvc)
#test_data.cache()

In [10]:
print("training set has " + str(train_data.count()) + " instances")

training set has 3429322 instances


In [11]:
print("test set has " + str(test_data.count()) + " instances")

test set has 1469108 instances


In [12]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

In [13]:
index1 = StringIndexer(inputCol="protocol_type", outputCol="protocol-cat")
index2 = StringIndexer(inputCol="service", outputCol="service-cat")
index3 = StringIndexer(inputCol="flag", outputCol="flag-cat")
index4 = StringIndexer(inputCol="is_anomaly", outputCol="label")
onehotencode = OneHotEncoder(inputCol="service-cat", outputCol="service-onehotencode")

feat_columns = [col for col in kdd.columns +
               ['protocol-cat','service-onehotencode','flag-cat','label']
               if col not in ['protocol_type','service','flag','is_anomaly']]
vectorAssembler = VectorAssembler(inputCols = feat_columns, outputCol = 'features')

In [14]:
randomjungle = RandomForestClassifier(numTrees=500, maxDepth=6, maxBins=80,seed=42)
pipeline = Pipeline(stages=[index1,index2,index3,index4,onehotencode, vectorAssembler, randomjungle])

In [15]:
themodel = pipeline.fit(train_data)

In [17]:
themodel.save('/user/itv000684/kdd/model.model2')

In [18]:
from pyspark.ml import PipelineModel
model = PipelineModel.load('/user/itv000684/kdd/model.model2')

In [19]:
results = model.transform(test_data).select("label","prediction").cache()

In [20]:
import pandas as pd

def eval_metrics(lap):
    labels = lap.select("label").distinct().toPandas()['label'].tolist()
    tpos = [lap.filter(lap.label == x).filter(lap.prediction == x).count() for x in labels]
    fpos = [lap.filter(lap.label == x).filter(lap.prediction != x).count() for x in labels]
    fneg = [lap.filter(lap.label != x).filter(lap.prediction != x).count() for x in labels]
    precision = zip(labels,[float(tp)/(tp+fp+1e-50) for (tp,fp) in zip(tpos,fpos)])
    recall = zip(labels, [float(tp)/(tp+fn+1e-50) for (tp,fn) in zip(tpos,fneg)])
    return(precision,recall)

In [21]:
(precision, recall) = eval_metrics(results)
ordered_labels = model.stages[3]._call_java("labels")
df = pd.DataFrame([(x, test_data.filter(test_data.is_anomaly == x).count(),y[1],z[1]) for x,y,z in zip(ordered_labels, sorted(precision, key = lambda x: x[0]), sorted(recall, key=lambda x: x[0]))], columns = ['type','count','precision','recall'])

In [22]:
df

Unnamed: 0,type,count,precision,recall
0,smurf.,842884,1.0,0.573739
1,neptune.,321292,1.0,0.218699
2,normal.,291432,1.0,0.19853
3,satan.,4684,0.944065,0.003011
4,ipsweep.,3635,0.967813,0.002395
5,portsweep.,3161,0.990826,0.002132
6,nmap.,667,0.449775,0.000204
7,back.,670,0.943284,0.00043
8,warezclient.,287,0.0,0.0
9,teardrop.,263,0.034221,6e-06
