# RDD上のデータ集約

[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

削減、折りたたみ、および集約の3つの異なるアクションを使用して、SparkでRDDデータを集計できます。最後のものはより一般的なもので、あるものは最初の2つを含んでいます。

## データの取得とRDDの作成

最初のノートブックで行ったように、KDD Cup 1999に提供された縮小データセット（10％）を使用します。これには約50万のニュークリン相互作用が含まれています。このファイルは、ローカルでダウンロードするGzipファイルとして提供されています。
 [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html)

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [1]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

## タグによるインタラクションの継続時間の検査

フォールドとリダクションの両方は、RDDの2つの要素に適用される引数としての関数をとります。 折り畳み動作は、それが取得し、最初の呼び出しで使用される追加の初期ゼロ値があるという点で、reduceと異なります。 この値は、提供される関数の識別要素でなければなりません。

例として、通常と攻撃の相互作用のための相互作用の合計時間を知りたいとします。 reduceは以下のように使用できます。

In [2]:
# parse data
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

reduceに渡す関数は、同じタイプのRDDの要素を取得して返します。期間を合計したい場合は、その要素を新しいRDDに抽出する必要があります。

In [3]:
normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

これで新しいRDDを減らすことができます。

In [4]:
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

print("Total duration for 'normal' interactions is {}".\
    format(total_normal_duration))
print("Total duration for 'attack' interactions is {}".\
    format(total_attack_duration))

Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792


私たちはさらに進み、カウントを使って持続時間の平均を計算することができます。

In [5]:
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

print("Mean duration for 'normal' interactions is {}".\
    format(round(total_normal_duration/float(normal_count),3)))
print("Mean duration for 'attack' interactions is {}".\
    format(round(total_attack_duration/float(attack_count),3)))

Mean duration for 'normal' interactions is 216.657
Mean duration for 'attack' interactions is 6.621


We have a first (and too simplistic) approach to identify attack interactions.

## A better way, using `aggregate`  

The `aggregate` action frees us from the constraint of having the return be the same type as the RDD we are working on. Like with `fold`, we supply an initial zero value of the type we want to return. Then we provide two functions. The first one is used to combine the elements from our RDD with the accumulator. The second function is needed to merge two accumulators. Let's see it in action calculating the mean we did before.  

In [6]:
normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print("Mean duration for 'normal' interactions is {}".\
    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3)))

Mean duration for 'normal' interactions is 216.657


In the previous aggregation, the accumulator first element keeps the total sum, while the second element keeps the count. Combining an accumulator with an RDD element consists in summing up the value and incrementing the count. Combining two accumulators requires just a pairwise sum.  

We can do the same with attack type interactions.  

In [7]:
attack_sum_count = attack_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print("Mean duration for 'attack' interactions is {}".\
    format(round(attack_sum_count[0]/float(attack_sum_count[1]),3)))

Mean duration for 'attack' interactions is 6.621
