In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

# 문제 S-1: 네트워크에 불법적으로 침입하는 사용자의 분석
네트워크에 불법적으로 침입하는 시도는 허용되어서는 안된다. 1998년 MIT Lincoln Labs에서 DARPA Intrusion Detection Evaluation Program을 연구하였다. 이 데이터의 일부가 1999년 KDD로 만들어져 배포되고 있다. https://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html

---

# 해결
## 파일 내려받기
KDD 파일은 gz 압축되어 있다. 파일 확장자 'gz'은 'gzip'이라는 압축 도구에서 생성된 파일이다. 지금은 WinZip에서 읽을 수 있다.

- web에서 json은 request 사용했었음

In [2]:
import os
_url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
_fname = os.path.join(os.getcwd(),'data','kddcup.data_10_percent.gz') #os.getcwd(): 현재 디렉토리

In [3]:
from urllib.request import urlretrieve

if(not os.path.exists(_fname)):
    print ("{} data does not exist! retrieving..".format(_fname))
    _f=urlretrieve(_url,_fname)

C:\Users\user\Bigdata\data\kddcup.data_10_percent.gz data does not exist! retrieving..


## RDD 생성
RDD는 gz와 같은 압축파일에서 데이터를 읽어서 생성할 수 있다.

반면, DataFrame은 구조schema를 정의해야 하기 때문에 쉽지 않다. 여기서는 오류가 발생한다. 따라서 RDD를 생성하고 난 후, 그로부터 DataFrame을 생성하고, Sql을 사용한다.

textFile() 함수로 RDD를 생성한다. count()는 행의 수를 돌려주는 action 함수이다. action 함수는 바로 실행되므로 시간이 좀 걸린다.

In [4]:
_rdd = spark.sparkContext.textFile(_fname)

In [5]:
_rdd.count()

494021

In [6]:
_rdd.take(1)
#nomar뒤에 .이있음에 유의

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.']

In [7]:
#map() 함수를 사용하여 csv 형식으로 구성된 파일을 컴마(,)로 분리
_allRdd=_rdd.map(lambda x: x.split(','))

In [8]:
_allRdd.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

## 정상, 공격 건수
데이터가 normal인 경우와 아닌 경우로 구분하자. filter()는 41번째 행을 조건에 따라 데이터를 구분한다. count() 함수로 건수를 계산하면 'normal' 97,278, 'attack'은 396,743 건이다.

In [41]:
_normalRdd=_allRdd.filter(lambda x: x[41]=="normal.") #normal인거
_attackRdd=_allRdd.filter(lambda x: x[41]!="normal.") #normal 아닌거

In [42]:
_normalRdd.count()

97278

In [43]:
_attackRdd.count()

396743

## attack별 건수
attack 종류는 41번째 열에 구분되어 있다. 총 494,021건을 정상 'noraml'과 나머지는 'attack'으로 구분한다. 'attack'은 크게 4종류로 나눈다. DOS는 서비스 거부, R2L 원격침입, U2R은 루트권한침입, probing은 탐지이다.


열41에 대해 건수를 세어보자. reduceByKey()는 인자로 '함수'가 필요. 키별로 '함수를 사용해서' 계산한다.

In [45]:
_41 = _allRdd.map(lambda x: (x[41], 1)) #count할것이므로 1을 넣는다. 2 넣으면 2배로 count되는 구조!
_41.reduceByKey(lambda x,y: x+y).collect() #x: subtotal, y: 하나하나 들어감
#reduceByKey 그대로 사용!

[('normal.', 97278),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 107201),
 ('smurf.', 280790),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 1040),
 ('ipsweep.', 1247),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 1589),
 ('phf.', 4),
 ('nmap.', 231),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

In [48]:
# groupByKey()는 키별로 group한다.
#위 reduceByKey()와 달리 mapValues()를 사용해 값을 별도로 계산한다는 점에 유의하자.

_41 = _allRdd.map(lambda x: (x[41], 1))
def f(x): return len(x) #위에서 1로 합산했으니까 len으로 해도 상관 없음
_41.groupByKey().mapValues(f).collect()

[('normal.', 97278),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 107201),
 ('smurf.', 280790),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 1040),
 ('ipsweep.', 1247),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 1589),
 ('phf.', 4),
 ('nmap.', 231),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

## Dataframe 생성
열 0, 1, 2, 3, 4, 5, 41을 선별하여 스키마를 정해서 RDD를 생성한다.

In [14]:
from pyspark.sql import Row

_csv = _rdd.map(lambda l: l.split(","))
_csvRdd = _csv.map(lambda p: 
    Row(
        duration=int(p[0]), 
        protocol=p[1],
        service=p[2],
        flag=p[3],
        src_bytes=int(p[4]),
        dst_bytes=int(p[5]),
        attack=p[41]
    )
)

In [15]:
#RDD를 Dataframe으로 변환한다.
_df=spark.createDataFrame(_csvRdd)

In [16]:
_df.printSchema()
_df.show(5)

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)

+--------+--------+-------+----+---------+---------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|
+--------+--------+-------+----+---------+---------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.|
|       0|     tcp|   http|  SF|      239|      486|normal.|
|       0|     tcp|   http|  SF|      235|     1337|normal.|
|       0|     tcp|   http|  SF|      219|     1337|normal.|
|       0|     tcp|   http|  SF|      217|     2032|normal.|
+--------+--------+-------+----+---------+---------+-------+
only showing top 5 rows



## attack 분류
네트워크 침입이 'attack' 또는 'normal'에 따라 구분해서 attackB 컬럼을 생성한다.

In [17]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
attack_udf = udf(lambda x: "normal" if x =="normal." else "attack", StringType())
myDf=_df.withColumn("attackB", attack_udf(_df.attack))

In [18]:
myDf.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)
 |-- attackB: string (nullable = true)



네트워크 침입 attack을 세분화하여 normal, dos, r2l, u2r, probling으로 5종류로 구분한다. 구분 문자열이 점('.')으로 끝난다는 점에 주의하자.

udf() 함수를 사용해서 if문으로 'noraml' 및 'attack'을 총 5가지 종류로 구분한다. 반환 값은 StringType()이다.

In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def classify41(s): #문자열을 받아서 분류, 강의자료 표 참조
    _5=""
    if s=="normal.":
        _5="normal"
    elif s=="back." or s=="land." or s=="neptune." or s=="pod." or s=="smurf." or s=="teardrop.":
        _5="dos"
    elif s=="ftp_write." or s=="guess_passwd." or s=="imap." or s=="multihop." or s=="phf." or\
        s=="spy." or s=="warezclient." or s=="warezmaster.":
        _5="r2l"
    elif s=="buffer_overflow." or s=="loadmodule." or s=="perl." or s=="rootkit.":
        _5="u2r"
    elif s=="ipsweep." or s=="nmap." or s=="portsweep." or s=="satan.":
        _5="probing"
    return _5

attack5_udf = udf(classify41, StringType()) #udf에 이 함수 등록, 반환타입은 string

In [20]:
myDf=myDf.withColumn("attack5", attack5_udf(_df.attack))

In [21]:
myDf.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)
 |-- attackB: string (nullable = true)
 |-- attack5: string (nullable = true)



In [22]:
myDf.show(5)

+--------+--------+-------+----+---------+---------+-------+-------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|attackB|attack5|
+--------+--------+-------+----+---------+---------+-------+-------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.| normal| normal|
|       0|     tcp|   http|  SF|      239|      486|normal.| normal| normal|
|       0|     tcp|   http|  SF|      235|     1337|normal.| normal| normal|
|       0|     tcp|   http|  SF|      219|     1337|normal.| normal| normal|
|       0|     tcp|   http|  SF|      217|     2032|normal.| normal| normal|
+--------+--------+-------+----+---------+---------+-------+-------+-------+
only showing top 5 rows



## attack, normal 특징 분석
attack5 별로 건수를 세어보자.

In [23]:
myDf.groupBy('attack5').count().show()

+-------+------+
|attack5| count|
+-------+------+
|probing|  4107|
|    u2r|    52|
| normal| 97278|
|    r2l|  1126|
|    dos|391458|
+-------+------+



In [24]:
myDf.groupBy("protocol").count().show()

+--------+------+
|protocol| count|
+--------+------+
|     tcp|190065|
|     udp| 20354|
|    icmp|283602|
+--------+------+



In [25]:
myDf.groupBy('attackB','protocol').count().show()

+-------+--------+------+
|attackB|protocol| count|
+-------+--------+------+
| normal|     udp| 19177|
| normal|    icmp|  1288|
| normal|     tcp| 76813|
| attack|    icmp|282314|
| attack|     tcp|113252|
| attack|     udp|  1177|
+-------+--------+------+



In [28]:
myDf.groupBy('attackB').pivot('protocol').count().show() #프로토콜별로

+-------+------+------+-----+
|attackB|  icmp|   tcp|  udp|
+-------+------+------+-----+
| normal|  1288| 76813|19177|
| attack|282314|113252| 1177|
+-------+------+------+-----+



In [29]:
myDf.groupBy('attack5').pivot('protocol').avg('src_bytes').show()

+-------+------------------+------------------+------------------+
|attack5|              icmp|               tcp|               udp|
+-------+------------------+------------------+------------------+
|probing|10.700793650793651| 261454.6003016591|25.235897435897435|
|    u2r|              null| 960.8979591836735|13.333333333333334|
| normal| 91.47049689440993|1439.3120305156679| 98.01220211711947|
|    r2l|              null|271972.57460035523|              null|
|    dos| 936.2672084368129| 1090.303422435458|              28.0|
+-------+------------------+------------------+------------------+



In [30]:
myDf.groupBy('attack5').avg('duration').show()

+-------+--------------------+
|attack5|       avg(duration)|
+-------+--------------------+
|probing|   485.0299488677867|
|    u2r|    80.9423076923077|
| normal|  216.65732231336992|
|    r2l|   559.7522202486679|
|    dos|7.254929008986916E-4|
+-------+--------------------+



In [31]:
from pyspark.sql import functions as F
myDf.groupBy('attackB').pivot('protocol').agg(F.max('dst_bytes')).show()

+-------+----+-------+---+
|attackB|icmp|    tcp|udp|
+-------+----+-------+---+
| normal|   0|5134218|516|
| attack|   0|5155468| 74|
+-------+----+-------+---+



In [32]:
# duration>1000), dst_bytes==0인 경우의 건수를 계산
myDf.select("protocol", "duration", "dst_bytes")\
    .filter(_df.duration>1000)\
    .filter(_df.dst_bytes==0)\
    .groupBy("protocol")\
    .count()\
    .show()

+--------+-----+
|protocol|count|
+--------+-----+
|     tcp|  139|
+--------+-----+



## SQL
SQL을 사용해보자. 위에 사용했던 _df에서 임시 테이블 _tab을 생성한다

In [33]:
_df.registerTempTable("_tab")

In [34]:
tcp_interactions = spark.sql(
"""
    SELECT duration, dst_bytes FROM _tab
    WHERE protocol = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")

In [35]:
tcp_interactions.show(5)

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
+--------+---------+
only showing top 5 rows



In [36]:
tcp_interactions_out = tcp_interactions.rdd\
    .map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))

In [38]:
for i,ti_out in enumerate(tcp_interactions_out.collect()):
    if(i%10==0):
        print(ti_out)

Duration: 5057, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5046, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5057, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 42448, Dest. bytes: 0
Duration: 40121, Dest. bytes: 0
Duration: 31709, Dest. bytes: 0
Duration: 30619, Dest. bytes: 0
Duration: 22616, Dest. bytes: 0
Duration: 21455, Dest. bytes: 0
Duration: 13998, Dest. bytes: 0
Duration: 12933, Dest. bytes: 0
