In [99]:
from pyspark import * 
from pyspark.sql import SQLContext
import urllib

In [2]:
sc = SparkContext(master="local", appName="test")

In [4]:
sc

In [58]:
#download les data
# http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
url = urllib.request.urlretrieve('http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz', 'data_10_percent.gz')

In [127]:
#initialiser un SQLContext
path = './data_10_percent.gz'
sqlContext = SQLContext(sc)
datas = sc.textFile(path)
datas = datas.map(lambda x: x.split(','))

In [136]:
#parser les data
from pyspark.sql.types import *
structure = StructType([
    StructField("duration", IntegerType(),     True),
    StructField("protocol_type", StringType(), True),
    StructField("service", StringType(),       True),
    StructField("flag", StringType(),       True),
    StructField("src_bytes", IntegerType(),    True),
    StructField("dest_bytes", IntegerType(),   True),
    StructField("interactions", StringType(),  True)
])
structure

StructType(List(StructField(duration,IntegerType,true),StructField(protocol_type,StringType,true),StructField(service,StringType,true),StructField(flag,StringType,true),StructField(src_bytes,IntegerType,true),StructField(dest_bytes,IntegerType,true),StructField(interactions,StringType,true)))

In [141]:
#a l'aide de la methode StructType structurer un schema de donnees  
datasFormatted = datas.map(lambda key: (int(key[0]), key[1], key[2], key[3], int(key[4]), int(key[5]), key[-1]))

In [140]:
#afficher les 10premieres lignes du DataFrame

df = sqlContext.createDataFrame(datasFormatted, structure)
df.show(10)

+--------+-------------+-------+----+---------+----------+------------+
|duration|protocol_type|service|flag|src_bytes|dest_bytes|interactions|
+--------+-------------+-------+----+---------+----------+------------+
|       0|          tcp|   http|  SF|      181|      5450|     normal.|
|       0|          tcp|   http|  SF|      239|       486|     normal.|
|       0|          tcp|   http|  SF|      235|      1337|     normal.|
|       0|          tcp|   http|  SF|      219|      1337|     normal.|
|       0|          tcp|   http|  SF|      217|      2032|     normal.|
|       0|          tcp|   http|  SF|      217|      2032|     normal.|
|       0|          tcp|   http|  SF|      212|      1940|     normal.|
|       0|          tcp|   http|  SF|      159|      4087|     normal.|
|       0|          tcp|   http|  SF|      210|       151|     normal.|
|       0|          tcp|   http|  SF|      212|       786|     normal.|
+--------+-------------+-------+----+---------+----------+------

In [152]:
#creer une vue 'interactions'
df.createOrReplaceTempView('interactions')

In [167]:
#selectionner les colonnes duration, dst_bytes ou le protocole est tcp et les duration > 1000 avec des dst_bytes nuls
result = df[(df['duration'] > 1000) & (df['protocol_type'] == 'tcp') & (df['dest_bytes'] == 0)]
result.count()

139

In [244]:
#afficher les lignes ci-dessus en utilisant la fonction lambda
result.rdd.map(lambda x : "Duration: {0}, Dest. bytes: {1}".format(x.dest_bytes,x.duration)).take(10)


['Duration: 0, Dest. bytes: 5057',
 'Duration: 0, Dest. bytes: 5059',
 'Duration: 0, Dest. bytes: 5051',
 'Duration: 0, Dest. bytes: 5056',
 'Duration: 0, Dest. bytes: 5051',
 'Duration: 0, Dest. bytes: 5039',
 'Duration: 0, Dest. bytes: 5062',
 'Duration: 0, Dest. bytes: 5041',
 'Duration: 0, Dest. bytes: 5056',
 'Duration: 0, Dest. bytes: 5064']

In [232]:
#compter le nombre d'interaction par protocol
import time
test = df[['protocol_type']].groupby(['protocol_type']).count()
start_time = time.time()
test.show()
print("Temps d execution : %s secondes" % (time.time() - start_time))

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+

Temps d execution : 8.615044832229614 secondes


In [252]:
#selectionner les colonnes duration, dst_bytes ou le protocole est tcp et les duration > 1000 avec des dst_bytes nuls
# utiliser la methode filter
timeStartFilter = time.time()
filtered = df[(df['duration'] > 1000) & (df['dest_bytes'] ==0)].groupby(['protocol_type']).count()
filtered.show()
print("Temps d execution : %s secondes" % (time.time() - timeStartFilter))

+-------------+-----+
|protocol_type|count|
+-------------+-----+
|          tcp|  139|
+-------------+-----+

Temps d execution : 8.23030686378479 secondes


In [254]:
#afficher votre resultats
withoutGby = df[(df['duration'] > 1000) & (df['dest_bytes'] ==0)]
withoutGby.rdd.map(lambda x : "Duration: {0}, Dest. bytes: {1}".format(x.dest_bytes,x.duration)).take(10)

['Duration: 0, Dest. bytes: 5057',
 'Duration: 0, Dest. bytes: 5059',
 'Duration: 0, Dest. bytes: 5051',
 'Duration: 0, Dest. bytes: 5056',
 'Duration: 0, Dest. bytes: 5051',
 'Duration: 0, Dest. bytes: 5039',
 'Duration: 0, Dest. bytes: 5062',
 'Duration: 0, Dest. bytes: 5041',
 'Duration: 0, Dest. bytes: 5056',
 'Duration: 0, Dest. bytes: 5064']

In [227]:
from pyspark.sql.functions import udf


def attack_or_normal_func(s):
    return "normal" if s == "normal." else "attack"

In [272]:
#à l'aide de la fonction udf ajouter une colonne label au DataFrame 
#afficher le schema de donnees et verifier que la colonne label est bien ajoutee
newLabel = udf(attack_or_normal_func, StringType())
df = df.withColumn('label',newLabel(df['interactions']))
df.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dest_bytes: integer (nullable = true)
 |-- interactions: string (nullable = true)
 |-- label: string (nullable = true)



In [280]:
#compter les labels
timeStartgroupbyLabel = time.time()
df.select('label').groupby('label').count().show()
print("Temps d execution : %s secondes" % (time.time() - timeStartgroupbyLabel))

+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+

Temps d execution : 7.908934116363525 secondes


In [281]:
#compter les labels par protocoles 
timeStartgroupbyLabelProtocol = time.time()
df.select('label', 'protocol_type').groupby('label', 'protocol_type').count().show()
print("Temps d execution : %s secondes" % (time.time() - timeStartgroupbyLabelProtocol))

+------+-------------+------+
| label|protocol_type| count|
+------+-------------+------+
|normal|          udp| 19177|
|normal|         icmp|  1288|
|normal|          tcp| 76813|
|attack|         icmp|282314|
|attack|          tcp|113252|
|attack|          udp|  1177|
+------+-------------+------+

Temps d execution : 7.731747388839722 secondes
