# spark sql

In [1]:
import requests

In [2]:
import findspark
findspark.init()
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

In [3]:
import os

In [4]:
r=requests.get("https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json")

In [5]:
wc=r.json()

In [6]:
wc[0]

{'Competition': 'World Cup',
 'Year': 1930,
 'Team': 'Argentina',
 'Number': '',
 'Position': 'GK',
 'FullName': 'Ãngel Bossio',
 'Club': 'Club AtlÃ©tico Talleres de Remedios de Escalada',
 'ClubCountry': 'Argentina',
 'DateOfBirth': '1905-5-5',
 'IsCaptain': False}

In [7]:
_wcDf=spark.createDataFrame(wc)

In [9]:
from pyspark.sql import Row
wcDf = spark.createDataFrame(Row(**x) for x in wc)

In [10]:
wcDf.printSchema()


root
 |-- Competition: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Team: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)



In [13]:
# 임시테이블 생성
wcDf.createOrReplaceTempView("wc")
spark.sql("select Club,Team,Year from wc").show(2)

+--------------------+---------+----+
|                Club|     Team|Year|
+--------------------+---------+----+
|Club AtlÃ©tico Ta...|Argentina|1930|
|Quilmes AtlÃ©tico...|Argentina|1930|
+--------------------+---------+----+
only showing top 2 rows



In [14]:
wcPlayers=spark.sql("select FullName,Club,Team,Year from wc")
wcPlayers.show(1)

+------------+--------------------+---------+----+
|    FullName|                Club|     Team|Year|
+------------+--------------------+---------+----+
|Ãngel Bossio|Club AtlÃ©tico Ta...|Argentina|1930|
+------------+--------------------+---------+----+
only showing top 1 row



In [15]:
spark.catalog.listTables()

[Table(name='wc', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [16]:
namesRdd=wcPlayers.rdd.map(lambda x: "Full name: "+x[0])
for e in namesRdd.take(5):
    print (e)

Full name: Ãngel Bossio
Full name: Juan Botasso
Full name: Roberto Cherro
Full name: Alberto Chividini
Full name: 


In [17]:
bucketDf=spark.createDataFrame([[1,["orange", "apple", "pineapple"]],
                                [2,["watermelon","apple","bananas"]]],
                               ["bucketId","items"])

In [18]:
bucketDf.show(bucketDf.count(), truncate=False)

+--------+----------------------------+
|bucketId|items                       |
+--------+----------------------------+
|1       |[orange, apple, pineapple]  |
|2       |[watermelon, apple, bananas]|
+--------+----------------------------+



In [19]:
from pyspark.sql.functions import explode
bDf=bucketDf.select(bucketDf.bucketId, explode(bucketDf.items).alias('item'))

In [20]:
bDf.show()

+--------+----------+
|bucketId|      item|
+--------+----------+
|       1|    orange|
|       1|     apple|
|       1| pineapple|
|       2|watermelon|
|       2|     apple|
|       2|   bananas|
+--------+----------+



In [21]:
fDf=spark.createDataFrame([["orange", "F1"],
                            ["", "F2"],
                            ["pineapple","F3"],
                            ["watermelon","F4"],
                            ["bananas","F5"]],
                            ["item","itemId"])

In [22]:
fDf.show()

+----------+------+
|      item|itemId|
+----------+------+
|    orange|    F1|
|          |    F2|
| pineapple|    F3|
|watermelon|    F4|
|   bananas|    F5|
+----------+------+



In [25]:
joinDf=fDf.join(bDf, fDf.item==bDf.item, "cross")

In [26]:
joinDf.select(fDf.itemId,fDf.item,bDf.bucketId).show()

+------+----------+--------+
|itemId|      item|bucketId|
+------+----------+--------+
|    F5|   bananas|       2|
|    F1|    orange|       1|
|    F3| pineapple|       1|
|    F4|watermelon|       2|
+------+----------+--------+



# 문제 네트워크 불법 침입 시도 분석

In [27]:
_url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
_fname = os.path.join(os.getcwd(),'data','kddcup.data_10_percent.gz')

In [28]:
from urllib.request import urlretrieve

if(not os.path.exists(_fname)):
    print ("{} data does not exist! retrieving..".format(_fname))
    _f=urlretrieve(_url,_fname)

C:\Users\user\Desktop\3-2\Bigdata\201710773\data\kddcup.data_10_percent.gz data does not exist! retrieving..


In [29]:
_rdd = spark.sparkContext.textFile(_fname)

In [30]:
_rdd.count()

494021

In [31]:
_rdd.take(1)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.']

In [32]:
_allRdd=_rdd.map(lambda x: x.split(','))

In [38]:
_allRdd.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

In [34]:
_normalRdd=_allRdd.filter(lambda x: x[41]=="normal.")
_attackRdd=_allRdd.filter(lambda x: x[41]!="normal.")

In [35]:
_normalRdd.count()

97278

In [36]:
_attackRdd.count()

396743

In [39]:
_41 = _allRdd.map(lambda x: (x[41], 1))
_41.reduceByKey(lambda x,y: x+y).collect()

[('normal.', 97278),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 107201),
 ('smurf.', 280790),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 1040),
 ('ipsweep.', 1247),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 1589),
 ('phf.', 4),
 ('nmap.', 231),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

In [40]:
_41 = _allRdd.map(lambda x: (x[41], 1))
def f(x): return len(x)
_41.groupByKey().mapValues(f).collect()

[('normal.', 97278),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 107201),
 ('smurf.', 280790),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 1040),
 ('ipsweep.', 1247),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 1589),
 ('phf.', 4),
 ('nmap.', 231),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

In [41]:
from pyspark.sql import Row

_csv = _rdd.map(lambda l: l.split(","))
_csvRdd = _csv.map(lambda p: 
    Row(
        duration=int(p[0]), 
        protocol=p[1],
        service=p[2],
        flag=p[3],
        src_bytes=int(p[4]),
        dst_bytes=int(p[5]),
        attack=p[41]
    )
)

In [60]:
_df=spark.createDataFrame(_csvRdd)

In [61]:
_df.printSchema()
_df.show(5)

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)

+--------+--------+-------+----+---------+---------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|
+--------+--------+-------+----+---------+---------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.|
|       0|     tcp|   http|  SF|      239|      486|normal.|
|       0|     tcp|   http|  SF|      235|     1337|normal.|
|       0|     tcp|   http|  SF|      219|     1337|normal.|
|       0|     tcp|   http|  SF|      217|     2032|normal.|
+--------+--------+-------+----+---------+---------+-------+
only showing top 5 rows



In [44]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
attack_udf = udf(lambda x: "normal" if x =="normal." else "attack", StringType())
myDf=_df.withColumn("attackB", attack_udf(_df.attack))

In [45]:

myDf.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)
 |-- attackB: string (nullable = true)



In [46]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def classify41(s):
    _5=""
    if s=="normal.":
        _5="normal"
    elif s=="back." or s=="land." or s=="neptune." or s=="pod." or s=="smurf." or s=="teardrop.":
        _5="dos"
    elif s=="ftp_write." or s=="guess_passwd." or s=="imap." or s=="multihop." or s=="phf." or\
        s=="spy." or s=="warezclient." or s=="warezmaster.":
        _5="r2l"
    elif s=="buffer_overflow." or s=="loadmodule." or s=="perl." or s=="rootkit.":
        _5="u2r"
    elif s=="ipsweep." or s=="nmap." or s=="portsweep." or s=="satan.":
        _5="probing"
    return _5

attack5_udf = udf(classify41, StringType())

In [47]:
myDf=myDf.withColumn("attack5", attack5_udf(_df.attack))

In [48]:
myDf.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)
 |-- attackB: string (nullable = true)
 |-- attack5: string (nullable = true)



In [49]:
myDf.show(5)

+--------+--------+-------+----+---------+---------+-------+-------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|attackB|attack5|
+--------+--------+-------+----+---------+---------+-------+-------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.| normal| normal|
|       0|     tcp|   http|  SF|      239|      486|normal.| normal| normal|
|       0|     tcp|   http|  SF|      235|     1337|normal.| normal| normal|
|       0|     tcp|   http|  SF|      219|     1337|normal.| normal| normal|
|       0|     tcp|   http|  SF|      217|     2032|normal.| normal| normal|
+--------+--------+-------+----+---------+---------+-------+-------+-------+
only showing top 5 rows



In [50]:
myDf.groupBy('attack5').count().show()

+-------+------+
|attack5| count|
+-------+------+
|probing|  4107|
|    u2r|    52|
| normal| 97278|
|    r2l|  1126|
|    dos|391458|
+-------+------+



In [51]:
myDf.groupBy("protocol").count().show()

+--------+------+
|protocol| count|
+--------+------+
|     tcp|190065|
|     udp| 20354|
|    icmp|283602|
+--------+------+



In [52]:
myDf.groupBy('attackB','protocol').count().show()

+-------+--------+------+
|attackB|protocol| count|
+-------+--------+------+
| normal|     udp| 19177|
| normal|    icmp|  1288|
| normal|     tcp| 76813|
| attack|    icmp|282314|
| attack|     tcp|113252|
| attack|     udp|  1177|
+-------+--------+------+



In [53]:
myDf.groupBy('attackB').pivot('protocol').count().show()

+-------+------+------+-----+
|attackB|  icmp|   tcp|  udp|
+-------+------+------+-----+
| normal|  1288| 76813|19177|
| attack|282314|113252| 1177|
+-------+------+------+-----+



In [54]:
myDf.groupBy('attack5').pivot('protocol').avg('src_bytes').show()

+-------+------------------+------------------+------------------+
|attack5|              icmp|               tcp|               udp|
+-------+------------------+------------------+------------------+
|probing|10.700793650793651| 261454.6003016591|25.235897435897435|
|    u2r|              null| 960.8979591836735|13.333333333333334|
| normal| 91.47049689440993|1439.3120305156679| 98.01220211711947|
|    r2l|              null|271972.57460035523|              null|
|    dos| 936.2672084368129| 1090.303422435458|              28.0|
+-------+------------------+------------------+------------------+



In [55]:
myDf.groupBy('attack5').avg('duration').show()

+-------+--------------------+
|attack5|       avg(duration)|
+-------+--------------------+
|probing|   485.0299488677867|
|    u2r|    80.9423076923077|
| normal|  216.65732231336992|
|    r2l|   559.7522202486679|
|    dos|7.254929008986916E-4|
+-------+--------------------+



In [56]:
from pyspark.sql import functions as F
myDf.groupBy('attackB').pivot('protocol').agg(F.max('dst_bytes')).show()

+-------+----+-------+---+
|attackB|icmp|    tcp|udp|
+-------+----+-------+---+
| normal|   0|5134218|516|
| attack|   0|5155468| 74|
+-------+----+-------+---+



In [57]:
myDf.select("protocol", "duration", "dst_bytes")\
    .filter(_df.duration>1000)\
    .filter(_df.dst_bytes==0)\
    .groupBy("protocol")\
    .count()\
    .show()

+--------+-----+
|protocol|count|
+--------+-----+
|     tcp|  139|
+--------+-----+



In [62]:
_df.registerTempTable("_tab")

In [63]:
tcp_interactions = spark.sql(
"""
    SELECT duration, dst_bytes FROM _tab
    WHERE protocol = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")

In [64]:
tcp_interactions.show(5)

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
+--------+---------+
only showing top 5 rows



In [65]:
tcp_interactions_out = tcp_interactions.rdd\
    .map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))

In [67]:
for i,ti_out in enumerate(tcp_interactions_out.collect()):
    if(i%10==0):
        print (ti_out)

Duration: 5057, Dest. bytes: 0
Duration: 5043, Dest. bytes: 0
Duration: 5046, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5057, Dest. bytes: 0
Duration: 5063, Dest. bytes: 0
Duration: 42448, Dest. bytes: 0
Duration: 40121, Dest. bytes: 0
Duration: 31709, Dest. bytes: 0
Duration: 30619, Dest. bytes: 0
Duration: 22616, Dest. bytes: 0
Duration: 21455, Dest. bytes: 0
Duration: 13998, Dest. bytes: 0
Duration: 12933, Dest. bytes: 0
