rdd in practice from https://github.com/jadianes/spark-py-notebooks

In [1]:
from pyspark import SparkConf, SparkContext


In [2]:
conf = SparkConf().setAppName("RDD_doc_notes")
sc = SparkContext.getOrCreate(conf=conf)
sc

Exception: Java gateway process exited before sending its port number

In [3]:
import urllib

f = urllib.request.urlretrieve(
    "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz",
    "kddcup.data_10_percent.gz",
)

In [35]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [36]:
raw_data.count()

494021

In [37]:
raw_data.take(10)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,59,59,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,212,1940,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,2,0.00,0.00,0.00,0.00,1.00,0.00,

The filter transformation

In [46]:
normal_raw_data = raw_data.filter(lambda x: "normal." in x)
# normal_raw_data.take(2)

notebook 5 years ago: from filter --> time count action: 5.951 seconds
this notebook:
- from filter -> take(2) -> time count action : 1.068s
- from filter -> time count action: 1.016 s
--> basically still not understand what different if we call 1 action before the next action.

In [47]:
from time import perf_counter

t0 = perf_counter()
normal_count = normal_raw_data.count()
tt = perf_counter() - t0
print("There are {} 'normal' interactions".format(normal_count))
print("Count completed in {} seconds".format(round(tt, 3)))

There are 97278 'normal' interactions
Count completed in 1.016 seconds



The map transformation

In [49]:
from pprint import pprint

csv_data = raw_data.map(lambda x: x.split(","))
t0 = perf_counter()
head_rows = csv_data.take(5)
tt = perf_counter() - t0
print("Parse completed in {} seconds".format(round(tt, 3)))
pprint(head_rows[0])

Parse completed in 0.076 seconds
['0',
 'tcp',
 'http',
 'SF',
 '181',
 '5450',
 '0',
 '0',
 '0',
 '0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '8',
 '8',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '1.00',
 '0.00',
 '0.00',
 '9',
 '9',
 '1.00',
 '0.00',
 '0.11',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 'normal.']


In [50]:
t0 = perf_counter()
head_rows = csv_data.take(100000)
tt = perf_counter() - t0
print("Parse completed in {} seconds".format(round(tt, 3)))

Parse completed in 1.642 seconds


In [51]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)


key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

('normal.',
 ['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'])


In [52]:
t0 = perf_counter()
all_raw_data = raw_data.collect()
tt = perf_counter() - t0
print("Data collected in {} seconds".format(round(tt, 3)))

Data collected in 2.936 seconds


In [53]:
# get normal interaction as key


normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = perf_counter()
all_normal = normal_key_interactions.collect()
tt = perf_counter() - t0
normal_count = len(all_normal)
print("Data collected in {} seconds".format(round(tt, 3)))
print("There are {} 'normal' interactions".format(normal_count))

Data collected in 3.534 seconds
There are 97278 'normal' interactions


# Sampling RDDs

## The sample transformation, apply successive transformation on a sample of data

In [56]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print("Sample size is {} of {}".format(sample_size, total_size))

Sample size is 49493 of 494021


In [58]:
# transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = perf_counter()
sample_normal_tags_count = sample_normal_tags.count()
tt = perf_counter() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print("The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio, 3)))
print("Count done in {} seconds".format(round(tt, 3)))

The ratio of 'normal' interactions is 0.195
Count done in 1.198 seconds


In [63]:
# without sample
# problem is sample will maintain the ratio of different label ??

# transformations to be applied


# actions + time
t0 = perf_counter()
normal_tags_count = (
    raw_data.map(lambda x: x.split(",")).filter(lambda x: "normal." in x).count()
)
tt = perf_counter() - t0

normal_ratio = normal_tags_count / float(total_size)
print("The ratio of 'normal' interactions is {}".format(round(normal_ratio, 3)))
print("Count done in {} seconds".format(round(tt, 3)))

The ratio of 'normal' interactions is 0.197
Count done in 2.156 seconds


## The takeSample action

In [64]:
t0 = perf_counter()
raw_data_sample = raw_data.takeSample(False, 400000, 1234)
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = perf_counter() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / 400000.0
print("The ratio of 'normal' interactions is {}".format(normal_ratio))
print("Count done in {} seconds".format(round(tt, 3)))

The ratio of 'normal' interactions is 0.1967175
Count done in 4.016 seconds


# Set operations on RDDs

In [77]:
normal_raw_data = raw_data.filter(lambda x: "normal." in x)

attack_raw_data = raw_data.subtract(normal_raw_data)

In [78]:
# count all
t0 = perf_counter()
raw_data_count = raw_data.count()
tt = perf_counter() - t0
print("All count in {} secs".format(round(tt, 3)))

All count in 0.796 secs


In [79]:
# count normal
t0 = perf_counter()
normal_data_count = normal_raw_data.count()
tt = perf_counter() - t0
print("All count in {} secs".format(round(tt, 3)))

All count in 1.021 secs


In [80]:
# count attack
t0 = perf_counter()
attack_data_count = attack_raw_data.count()
tt = perf_counter() - t0
print("All count in {} secs".format(round(tt, 3)))

All count in 4.764 secs


In [82]:
print(
    "There are {} normal interactions and {} attacks, \
from a total of {} interactions".format(
        normal_data_count, attack_data_count, raw_data_count
    )
)

There are 97278 normal interactions and 396743 attacks, from a total of 494021 interactions


# Data aggregations on RDDs


In [83]:
normal_duration_data = (
    raw_data.map(lambda x: x.split(","))
    .filter(lambda x: x[41] == "normal.")
    .map(lambda x: int(x[0]))
)

attack_duration_data = (
    raw_data.map(lambda x: x.split(","))
    .filter(lambda x: x[41] != "normal.")
    .map(lambda x: int(x[0]))
)

In [90]:
normal_sum_count = normal_duration_data.aggregate(
    (0, 0),
    lambda acc, value: (acc[0] + 1, acc[1] + value),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),
)

print(
    "Mean duration for 'normal' interactions is {}".format(
        round(normal_sum_count[1] / float(normal_sum_count[0]), 3)
    )
)

Mean duration for 'normal' interactions is 216.657


(97278, 21075991)

In [93]:
attack_sum_count = attack_duration_data.aggregate(
    (0, 0),
    lambda acc, value: (acc[0] + 1, acc[1] + value),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),
)

print(
    "Mean duration for 'normal' interactions is {}".format(
        round(attack_sum_count[1] / float(attack_sum_count[0]), 3)
    )
)

Mean duration for 'normal' interactions is 6.621


# Working with key/value pair RDDs

In [94]:
tag_duration_kv_pair = raw_data.map(lambda x: x.split(",")).map(
    lambda x: (x[41], float(x[0]))
)

In [95]:
tag_duration_kv_pair.countByKey()

defaultdict(int,
            {'normal.': 97278,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 107201,
             'smurf.': 280790,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 1040,
             'ipsweep.': 1247,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 1589,
             'phf.': 4,
             'nmap.': 231,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

In [97]:
tag_duration_kv_pair.reduceByKey(lambda x, y: x + y).collect()

[('normal.', 21075991.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 0.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 1991911.0),
 ('ipsweep.', 43.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 64.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

In [105]:
tag_count_duration = tag_duration_kv_pair.combineByKey(
    lambda x: [1, x],
    lambda acc, value: (acc[0] + 1, acc[1] + value),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),
)
tag_count_duration.collect()

[('normal.', (97278, 21075991.0)),
 ('buffer_overflow.', (30, 2751.0)),
 ('loadmodule.', (9, 326.0)),
 ('perl.', (3, 124.0)),
 ('neptune.', (107201, 0.0)),
 ('smurf.', (280790, 0.0)),
 ('guess_passwd.', (53, 144.0)),
 ('pod.', (264, 0.0)),
 ('teardrop.', (979, 0.0)),
 ('portsweep.', (1040, 1991911.0)),
 ('ipsweep.', (1247, 43.0)),
 ('land.', (21, 0.0)),
 ('ftp_write.', (8, 259.0)),
 ('back.', (2203, 284.0)),
 ('imap.', (12, 72.0)),
 ('satan.', (1589, 64.0)),
 ('phf.', (4, 18.0)),
 ('nmap.', (231, 0.0)),
 ('multihop.', (7, 1288.0)),
 ('warezmaster.', (20, 301.0)),
 ('warezclient.', (1020, 627563.0)),
 ('spy.', (2, 636.0)),
 ('rootkit.', (10, 1008.0))]

In [107]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)
df = spark.createDataFrame(tag_count_duration, ["tag", "(count,duration)"])
df.show()

+----------------+--------------------+
|             tag|    (count,duration)|
+----------------+--------------------+
|         normal.|{97278, 2.1075991E7}|
|buffer_overflow.|        {30, 2751.0}|
|     loadmodule.|          {9, 326.0}|
|           perl.|          {3, 124.0}|
|        neptune.|       {107201, 0.0}|
|          smurf.|       {280790, 0.0}|
|   guess_passwd.|         {53, 144.0}|
|            pod.|          {264, 0.0}|
|       teardrop.|          {979, 0.0}|
|      portsweep.|   {1040, 1991911.0}|
|        ipsweep.|        {1247, 43.0}|
|           land.|           {21, 0.0}|
|      ftp_write.|          {8, 259.0}|
|           back.|       {2203, 284.0}|
|           imap.|          {12, 72.0}|
|          satan.|        {1589, 64.0}|
|            phf.|           {4, 18.0}|
|           nmap.|          {231, 0.0}|
|       multihop.|         {7, 1288.0}|
|    warezmaster.|         {20, 301.0}|
+----------------+--------------------+
only showing top 20 rows

