# 2. Spark & Python: Working with RDDs (Il)

- DataSets :  KDD Cup 1999 ( http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html )

This is the data set used for The Third International Knowledge Discovery and Data Mining Tools Competition, which was held in conjunction with KDD-99 The Fifth International Conference on Knowledge Discovery and Data Mining. The competition task was to build a network intrusion detector, a predictive model capable of distinguishing between ``bad'' connections, called intrusions or attacks, and ``good'' normal connections. This database contains a standard set of data to be audited, which includes a wide variety of intrusions simulated in a military network environment.

## RDD Creation

### (1) Getting the data files

In [24]:
import urllib

f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", 
                       "kddcup.data_10_percent.gz")

### (2) Creating a RDD from a file

In [1]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [7]:
# parse data
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

In [8]:
# 첫 번째 value만 추출 -> int로 변환

normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

##  Inspecting interaction duration by tag

**- normal**  
**- attack**

### (1) reduce

In [9]:
# normal/attact 기간 합계
# rdd : action

total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

In [90]:
print(total_normal_duration)
print(total_attack_duration)

21075991
2626792


In [127]:
print(sc.parallelize([1, 2, 3, 5]).reduce(lambda x, y: x),
      sc.parallelize([1, 2, 3, 5]).reduce(lambda x, y: y),
      sc.parallelize([1, 2, 3, 5]).reduce(lambda x, y: x + y),
      sc.parallelize([1, 2, 3, 5]).reduce(lambda x, y: x * y),
      sc.parallelize([1, 2, 3, 5]).reduce(lambda x, y: x - y))

1 5 11 30 1


- reduce 연산 순서  
https://kdb.or.kr/info/info_04_view.html?field=&keyword=&type=techreport&page=9&dbnum=186753&mode=detail&type=techreport

### (2) count

In [10]:
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

In [91]:
print(normal_count)
print(attack_count)

97278
396743


### (3) aggregate

In [11]:
normal_sum_count = normal_duration_data.aggregate(
    (0,0), # 초기 값
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # total sum (x + y), count
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # accumulators 합
)

In [133]:
# 초기값 이유? -> 항등원

In [128]:
normal_sum_count

(21075991, 97278)

In [12]:
attack_sum_count = attack_duration_data.aggregate(
    (0,0), # 초기값
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), 
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) 
)

In [117]:
print(normal_sum_count)
print(attack_sum_count)

(21075991, 97278)
(2626792, 396743)


In [150]:
attack_duration_data.getNumPartitions()

1

## Working with key/value pair RDDs

In [13]:
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

In [17]:
key_value_data.take(1)

[('normal.',
  ['0',
   'tcp',
   'http',
   'SF',
   '181',
   '5450',
   '0',
   '0',
   '0',
   '0',
   '0',
   '1',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '0',
   '8',
   '8',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '1.00',
   '0.00',
   '0.00',
   '9',
   '9',
   '1.00',
   '0.00',
   '0.11',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   '0.00',
   'normal.'])]

### (1) Data aggregations with key/value pair RDDs

In [18]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()

[('portsweep.', 1991911.0),
 ('neptune.', 0.0),
 ('satan.', 64.0),
 ('pod.', 0.0),
 ('multihop.', 1288.0),
 ('back.', 284.0),
 ('warezclient.', 627563.0),
 ('nmap.', 0.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('ftp_write.', 259.0),
 ('imap.', 72.0),
 ('land.', 0.0),
 ('loadmodule.', 326.0),
 ('buffer_overflow.', 2751.0),
 ('perl.', 124.0),
 ('ipsweep.', 43.0),
 ('rootkit.', 1008.0),
 ('phf.', 18.0),
 ('teardrop.', 0.0),
 ('warezmaster.', 301.0),
 ('normal.', 21075991.0),
 ('spy.', 636.0)]

- key값 기준으로 reduce 연산

In [151]:
from datetime import datetime

In [153]:
s = datetime.now()

durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)
durations_by_key.collect()

e = datetime.now()
print(e - s)

0:00:04.544392


- 스파크는 클러스터의 사이즈에 맞는 적절한 파티션 개수를 찾는 방식으로 동작함.
- 퍼포먼스 향상을 위해 병렬화 수준 지정 가능
- ex. key_value_duration.reduceByKey(lambda x, y: x + y, 4)

In [19]:
counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(int,
            {'back.': 2203,
             'buffer_overflow.': 30,
             'ftp_write.': 8,
             'guess_passwd.': 53,
             'imap.': 12,
             'ipsweep.': 1247,
             'land.': 21,
             'loadmodule.': 9,
             'multihop.': 7,
             'neptune.': 107201,
             'nmap.': 231,
             'normal.': 97278,
             'perl.': 3,
             'phf.': 4,
             'pod.': 264,
             'portsweep.': 1040,
             'rootkit.': 10,
             'satan.': 1589,
             'smurf.': 280790,
             'spy.': 2,
             'teardrop.': 979,
             'warezclient.': 1020,
             'warezmaster.': 20})

- key값 기준으로 count 연산

### (2) Using combineByKey

In [168]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

# sum_counts.collectAsMap()

In [173]:
sum_counts.collect()

[('portsweep.', (1991911.0, 1040)),
 ('neptune.', (0.0, 107201)),
 ('satan.', (64.0, 1589)),
 ('pod.', (0.0, 264)),
 ('multihop.', (1288.0, 7)),
 ('back.', (284.0, 2203)),
 ('warezclient.', (627563.0, 1020)),
 ('nmap.', (0.0, 231)),
 ('smurf.', (0.0, 280790)),
 ('guess_passwd.', (144.0, 53)),
 ('ftp_write.', (259.0, 8)),
 ('imap.', (72.0, 12)),
 ('land.', (0.0, 21)),
 ('loadmodule.', (326.0, 9)),
 ('buffer_overflow.', (2751.0, 30)),
 ('perl.', (124.0, 3)),
 ('ipsweep.', (43.0, 1247)),
 ('rootkit.', (1008.0, 10)),
 ('phf.', (18.0, 4)),
 ('teardrop.', (0.0, 979)),
 ('warezmaster.', (301.0, 20)),
 ('normal.', (21075991.0, 97278)),
 ('spy.', (636.0, 2))]