### 기본적인 DataFrame 다루기

In [3]:
#creating spark session
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

## spark DataFrame
RDD + schema = Dataframe\
structured data, for big data 

### creating schema automatically
- spark.createDataFrame(list)

In [2]:
myList=[('1','kim, js', 170),
        ('1','lee, sm', 175),
        ('2','lim, yg',180),
        ('2','lee', 170)]

In [3]:
myDf = spark.createDataFrame(myList)

In [6]:
myDf.columns

['_1', '_2', '_3']

In [7]:
myDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [8]:
print (myDf.take(1))

[Row(_1='1', _2='kim, js', _3=170)]


In [4]:
#defining column names beforehand
cols = ['year','name','height']
_myDf = spark.createDataFrame(myList, cols)
_myDf.columns

['year', 'name', 'height']

#### using Row to create DataFrame
- spark.createDataFrame(Row)

In [6]:
from pyspark.sql import Row
Person = Row('year','name', 'height') #naming column
row1=Person('1','kim, js',170)

In [7]:
#saving Row as Dictionary
row1.asDict()

{'year': '1', 'name': 'kim, js', 'height': 170}

In [8]:
#dict.keys
row1.asDict().keys()

dict_keys(['year', 'name', 'height'])

In [9]:
#dict.values
row1.asDict().values()

dict_values(['1', 'kim, js', 170])

In [10]:
myRows = [row1,
          Person('1','lee, sm', 175),
          Person('2','lim, yg',180),
          Person('2','lee',170)]

In [11]:
myDf=spark.createDataFrame(myRows)

In [16]:
print (myDf.printSchema())
myDf.show()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)

None
+----+-------+------+
|year|   name|height|
+----+-------+------+
|   1|kim, js|   170|
|   1|lee, sm|   175|
|   2|lim, yg|   180|
|   2|    lee|   170|
+----+-------+------+



### Defining Schema before creating Dataframe
- StructType([\
    StructField(columnName, Type(), true),\
    )]

In [12]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
mySchema=StructType([
    StructField("year", StringType(), True),
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), True)
])

In [13]:
myDf = spark.createDataFrame(myRows, mySchema)

In [19]:
myDf.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)



In [20]:
myDf_ = spark.createDataFrame(myList, mySchema)

In [22]:
myDf_.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)



### Creating Dataframe from RDD
Rdd is unstructured data that does not require shcema, schema will be created automatically


In [15]:
myRdd = spark.sparkContext.parallelize(myList)

- .toDF()

In [24]:
rddDf=myRdd.toDF()
rddDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



- .createDataFrame()

In [25]:
rddDf = spark.createDataFrame(myRdd)
rddDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



#### using Row to cast datatype 

In [16]:
from pyspark.sql import Row
_myRdd=myRdd.map(lambda x:Row(year=int(x[0]), name=x[1], height=int(x[2])))
_myDf=spark.createDataFrame(_myRdd)

_myDf.printSchema()

root
 |-- year: long (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)



In [27]:
#making Rdd using Row()
from pyspark.sql import Row

r1=Row(name="js1", age=10)
r2=Row(name="js2", age=20)
_myRdd=spark.sparkContext.parallelize([r1,r2])

In [28]:
myDF=spark.createDataFrame(_myRdd)

In [29]:
myDF.show()

+----+---+
|name|age|
+----+---+
| js1| 10|
| js2| 20|
+----+---+



using StructType() to define schema

In [30]:
schema=StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    #StructField("created", TimestampType(), True)
])
_myDf=spark.createDataFrame(_myRdd, schema)
_myDf.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [31]:
_myDf.show()

+----+---+
|name|age|
+----+---+
| js1| 10|
| js2| 20|
+----+---+



#### Rdd to Dataframe defining schema beforehand

In [17]:
from pyspark.sql.types import *

myRdd=spark.sparkContext.parallelize([(1, 'kim', 50.0), (2, 'lee', 60.0), (3, 'park', 70.0)])
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("height", DoubleType(), True)
])
_myDf = spark.createDataFrame(myRdd, schema)
_myDf.show()

+---+----+------+
| id|name|height|
+---+----+------+
|  1| kim|  50.0|
|  2| lee|  60.0|
|  3|park|  70.0|
+---+----+------+



## Pandas <-> Spark
spark is more suitable for big data than pandas

### Dataframe to Pandas
- myDf.toPands()

In [20]:
_myDf.toPandas()

In [None]:
#dataframe to csv
myDf.write.format('com.databricks.spark.csv').save(os.path.join('data','_myDf.csv'))

**csv file to dataframe**

In [22]:
#1
df = spark\
        .read\
        .format('com.databricks.spark.csv')\
        .options(header='true', inferschema='true', delimiter=',')\
        .load(os.path.join('data','ds_spark.csv'))

In [23]:
df.printSchema()

root
 |-- 1: integer (nullable = true)
 |-- 2: integer (nullable = true)
 |-- 3: integer (nullable = true)
 |-- 4: integer (nullable = true)



inferschema를 제외하면, string으로 자동인식한다

In [24]:
df = spark\
        .read\
        .format('com.databricks.spark.csv')\
        .options(header='true', delimiter=',')\
        .load(os.path.join('data','ds_spark.csv'))

In [26]:
df.printSchema()

root
 |-- 1: string (nullable = true)
 |-- 2: string (nullable = true)
 |-- 3: string (nullable = true)
 |-- 4: string (nullable = true)



In [27]:
#2
df = spark\
        .read\
        .options(header='true', inferschema='true', delimiter=',')\
        .csv(os.path.join('data', 'ds_spark.csv'))
df.printSchema()

root
 |-- 1: integer (nullable = true)
 |-- 2: integer (nullable = true)
 |-- 3: integer (nullable = true)
 |-- 4: integer (nullable = true)



**reading tsv file**

In [29]:
from pyspark.sql.types import *
_tRdd=spark.sparkContext\
    .textFile(os.path.join('data','ds_spark_heightweight.txt'))

tsv파일을 float 형변환하고, split함수로 \t로 분리

In [30]:
#rdd로 읽기
#myRdd=rdd.map(lambda line:np.array([float(x) for x in line.split('\t')]))
tRdd=_tRdd.map(lambda line:[float(x) for x in line.split('\t')])
tRdd.take(1)

[[1.0, 65.78, 112.99]]

In [33]:
tDfNamed = spark.createDataFrame(tRdd, ["id","weight","height"])

In [34]:

tDfNamed.printSchema()

root
 |-- id: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)



reading txt file

In [35]:
tDftxt = spark.read.text(os.path.join('data','ds_spark_heightweight.txt'))
tDftxt.printSchema()

root
 |-- value: string (nullable = true)



In [None]:
#reading tsv file with csv
tDf = spark\
    .read\
    .options(header='false', inferschema='true', delimiter='\t')\
    .csv(os.path.join('data', 'ds_spark_heightweight.txt'))
tDf.show()

In [None]:
#rading tweet by dataframe

In [None]:
jfile= os.path.join('src','ds_twitter_seoul_3.json')

tweetDf= spark.read.json(jfile)
tweetDf.printSchema()

In [36]:
import requests
r=requests.get("https://raw.githubusercontent.com/jokecamp/FootballData/master/World%20Cups/all-world-cup-players.json")
#앞서 읽은 텍스트를 json으로 
wc=r.json()
wc[0]

{'Competition': 'World Cup',
 'Year': 1930,
 'Team': 'Argentina',
 'Number': '',
 'Position': 'GK',
 'FullName': 'Ãngel Bossio',
 'Club': 'Club AtlÃ©tico Talleres de Remedios de Escalada',
 'ClubCountry': 'Argentina',
 'DateOfBirth': '1905-5-5',
 'IsCaptain': False}

In [37]:
#wc는 리스트이고 그 요소는 dictionary 형식임.
print (type(wc), type(wc[0]))

<class 'list'> <class 'dict'>


dictionary구조를 풀어서 Row에 넘겨주어야 한다.

### 별표 1개 *: 리스트에서 인자 풀어내기
### 별표 2개 **: dictionary에서 인자 풀어내기

**Row를 이용해서 dictionary to dataframe **

In [38]:
from pyspark.sql import Row

wcDf = spark.createDataFrame(Row(**x) for x in wc)

In [39]:
wcDf.printSchema()

root
 |-- Competition: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Team: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)



**using rdd to dataframe**

In [41]:
wcRdd=spark.sparkContext.parallelize(wc)

In [42]:
wcDfFromDf = spark.createDataFrame(wcRdd)
wcDfFromDf.printSchema()

root
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- Competition: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- Year: long (nullable = true)





### 결측값
null = no value, nothing\
Nan = Not a Number\

- df.na.fill(0) 모든 컬럼의 na를 0으로 교체
- df.fillna( { 'c0':0, 'c1':0 } ) 컬럼 c0, c1의 na를 0으로 교체
- df.na.drop(subset=["c0"]) 결측값 삭제

In [None]:
from pyspark.sql import functions as F
myDf.where(F.col("height").isNull())

In [None]:
from pyspark.sql.functions import isnan, when, count, col
myDf.select([count(when(isnan(c), c)).alias(c) for c in myDf.columns]).show()

In [43]:
cols = wcDf.columns
cols.remove('IsCaptain')

### datatype cast

In [44]:
from pyspark.sql.functions import to_date

_wcDfCasted=wcDf.withColumn('date2', to_date(wcDf['DateOfBirth'], 'yyyy-MM-dd'))

In [45]:
from pyspark.sql.types import DateType

wcDfCasted = _wcDfCasted.withColumn('date3', _wcDfCasted['DateOfBirth'].cast(DateType()))
wcDfCasted = wcDfCasted.withColumn('NumberInt', wcDfCasted['Number'].cast("integer"))

### Add Column, Delete Column

- df.withColumn("new col name", df['original col name'].cast("double"))\
- df.Drop('col name')

In [None]:
tDf = tDf.withColumn("weight", tDf['_c2'].cast("double"))
tDf = tDf.drop('_c0').drop('_c1').drop('_c2')

### UDF : User Defined Functions
cannot call udf directly.\
always need to import udf
-  pyspark.sql.functions import udf

In [46]:

myDf = spark\
        .read.format('com.databricks.spark.csv')\
        .options(header='true', inferschema='true')\
        .load(os.path.join('data','myDf.csv'))

In [47]:
#소문자 -> 대문자
def uppercase(s):
    return s.upper()

In [48]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

upperUdf = udf(uppercase, StringType())

In [50]:
myDf = myDf.withColumn("nameUpper", upperUdf(myDf['name']))

In [51]:
myDf.show()

+---+----+-------+------+---------+
|_c0|year|   name|height|nameUpper|
+---+----+-------+------+---------+
|  0|   1|kim, js|   170|  KIM, JS|
|  1|   1|lee, sm|   175|  LEE, SM|
|  2|   2|lim, yg|   180|  LIM, YG|
|  3|   2|    lee|   170|      LEE|
+---+----+-------+------+---------+



In [52]:
#Casting with Udf
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
#to double
toDoublefunc = udf(lambda x: float(x), DoubleType())
myDf = myDf.withColumn("heightD", toDoublefunc(myDf.height))
#to int
toint=udf(lambda x:int(x), IntegerType())
#if-string
height_udf = udf(lambda height: "taller" if height >=175 else "shorter", StringType())
heightDf=myDf.withColumn("height>175", height_udf(myDf.heightD))


### Rename Column
- df.withColumnRenamed('originalcolname', 'newcolname')

In [None]:
tDf=tDf.withColumnRenamed('id','ID')
tDf.show(3)

- df.select('colname').show()

In [54]:
_name=myDf.select('name', 'height').show()

+-------+------+
|   name|height|
+-------+------+
|kim, js|   170|
|lee, sm|   175|
|lim, yg|   180|
|    lee|   170|
+-------+------+



In [56]:
#how to show more than two columns with list
cols = ['name', 'height']
myDf.select(*cols).show()
myDf.show()

+-------+------+
|   name|height|
+-------+------+
|kim, js|   170|
|lee, sm|   175|
|lim, yg|   180|
|    lee|   170|
+-------+------+

+---+----+-------+------+---------+-------+
|_c0|year|   name|height|nameUpper|heightD|
+---+----+-------+------+---------+-------+
|  0|   1|kim, js|   170|  KIM, JS|  170.0|
|  1|   1|lee, sm|   175|  LEE, SM|  175.0|
|  2|   2|lim, yg|   180|  LIM, YG|  180.0|
|  3|   2|    lee|   170|      LEE|  170.0|
+---+----+-------+------+---------+-------+



In [57]:
#select like
myDf.select("name", "height", myDf.name.like("%lee%")).show()
#select startswith
myDf.select("name", "height", myDf.name.startswith("kim")).show()
#select endsiwth
myDf.select("name", "height", myDf.name.endswith("lee")).show()
#where.select()
myDf.where(myDf['height'] < 175).select(myDf['name'], myDf['height']).show()

+-------+------+---------------+
|   name|height|name LIKE %lee%|
+-------+------+---------------+
|kim, js|   170|          false|
|lee, sm|   175|           true|
|lim, yg|   180|          false|
|    lee|   170|           true|
+-------+------+---------------+



### Alias
- DF = Df.alias("newDfName")

In [58]:
#changing Df name
myDf1 = myDf.alias("myDf1")

In [59]:
#changing df Col Name
myDf1.select(myDf1.name.substr(1,3).alias("short name")).show(3)

+----------+
|short name|
+----------+
|       kim|
|       lee|
|       lim|
+----------+
only showing top 3 rows



In [60]:
from pyspark.sql.functions import when
myDf.select("height", when(myDf.height < 175, 1).otherwise(0)).show()

+------+------------------------------------------+
|height|CASE WHEN (height < 175) THEN 1 ELSE 0 END|
+------+------------------------------------------+
|   170|                                         1|
|   175|                                         0|
|   180|                                         0|
|   170|                                         1|
+------+------------------------------------------+



In [61]:

from pyspark.sql.functions import when
myDf.select("height", (when(myDf.height < 175, 1).otherwise(0)).alias('<175')).show()

+------+----+
|height|<175|
+------+----+
|   170|   1|
|   175|   0|
|   180|   0|
|   170|   1|
+------+----+



#### filter

In [62]:

myDf.filter(myDf['height'] > 175).show()

+---+----+-------+------+---------+-------+
|_c0|year|   name|height|nameUpper|heightD|
+---+----+-------+------+---------+-------+
|  2|   2|lim, yg|   180|  LIM, YG|  180.0|
+---+----+-------+------+---------+-------+



### regexp_replace 컬럼의 내용 변경

In [63]:
from pyspark.sql.functions import *

_heightDf = myDf.withColumn('nameNew', regexp_replace('name', 'lee', 'lim'))
_heightDf.show()

+---+----+-------+------+---------+-------+-------+
|_c0|year|   name|height|nameUpper|heightD|nameNew|
+---+----+-------+------+---------+-------+-------+
|  0|   1|kim, js|   170|  KIM, JS|  170.0|kim, js|
|  1|   1|lee, sm|   175|  LEE, SM|  175.0|lim, sm|
|  2|   2|lim, yg|   180|  LIM, YG|  180.0|lim, yg|
|  3|   2|    lee|   170|      LEE|  170.0|    lim|
+---+----+-------+------+---------+-------+-------+



### GroupBy()

In [64]:
myDf.groupby(myDf['year']).max().show()
#avg(), max(), min(), min(), count()

+----+--------+---------+-----------+------------+
|year|max(_c0)|max(year)|max(height)|max(heightD)|
+----+--------+---------+-----------+------------+
|   1|       1|        1|        175|       175.0|
|   2|       3|        2|        180|       180.0|
+----+--------+---------+-----------+------------+



In [66]:
myDf.groupBy('year').agg({"heightD":"avg"}).show()
#alias("average height") 한다고 컬럼명 안바뀌네

+----+------------+
|year|avg(heightD)|
+----+------------+
|   1|       172.5|
|   2|       175.0|
+----+------------+



In [67]:
wcDf.groupBy(wcDf.ClubCountry).count().show()

+-----------+-----+
|ClubCountry|count|
+-----------+-----+
|   England |    4|
|   Paraguay|   93|
|     Russia|   51|
|        POL|   11|
|        BRA|   27|
|    Senegal|    1|
|     Sweden|  154|
|   Colombia|    1|
|        FRA|  155|
|        ALG|    8|
|   England |    1|
|       RUS |    1|
|     Turkey|   65|
|      Zaire|   22|
|       Iraq|   22|
|    Germany|  206|
|        RSA|   16|
|        ITA|  224|
|        UKR|   38|
|        GHA|    8|
+-----------+-----+
only showing top 20 rows



In [None]:
wcDf.groupBy('ClubCountry').pivot('Position').count().show()

#### F function

In [68]:
from pyspark.sql import functions as F

myDf.agg(F.min(myDf.heightD),F.max(myDf.heightD),F.avg(myDf.heightD),F.sum(myDf.heightD)).show()

+------------+------------+------------+------------+
|min(heightD)|max(heightD)|avg(heightD)|sum(heightD)|
+------------+------------+------------+------------+
|       170.0|       180.0|      173.75|       695.0|
+------------+------------+------------+------------+



Adding Row
this Row should also be a dataframe
- df.union(dfname)

In [None]:
toAppendDf = spark.createDataFrame([Row(4, 1, "choi, js", 177)])
_myDf = myDf.union(toAppendDf)
_myDf.show()

#### partitions

In [None]:
#parition 개수 확인
myDf.rdd.getNumPartitions()
#partition 개수 늘리기
_myDf = myDf.repartition(4)
print(_myDf.rdd.getNumPartitions())
#parition 개수 줄이기
_myDf2 = _myDf.coalesce(2)
print(_myDf2.rdd.getNumPartitions())

### [과제] 년별, 월별 대여건수

In [74]:
!dir data

 C 드라이브의 볼륨에는 이름이 없습니다.
 볼륨 일련 번호: 9C4F-6B21

 C:\Users\joann\Code\201710754\data 디렉터리

2020-10-11  오후 04:40    <DIR>          .
2020-10-11  오후 04:40    <DIR>          ..
2020-10-03  오후 05:36    <DIR>          .ipynb_checkpoints
2020-09-20  오후 05:20            15,287 ds_bigdata_wiki.txt
2020-10-03  오전 06:07                39 ds_spark.csv
2020-10-11  오후 04:40               147 ds_spark_2cols.csv
2020-10-03  오전 06:17               840 ds_spark_heightweight.txt
2020-09-20  오후 05:09               583 ds_spark_wiki.txt
2020-10-09  오후 07:35         2,144,903 kddcup.data_10_percent.gz
2020-10-03  오후 03:59                89 myDf.csv
2020-10-03  오후 03:16    <DIR>          people.parquet
2020-10-02  오전 01:59    <DIR>          _myDf.csv
2020-10-03  오후 05:50             8,113 서울특별시_공공자전거 일별 대여건수_(2018_2019.03).csv
               8개 파일           2,170,001 바이트
               5개 디렉터리  69,451,026,432 바이트 남음


In [76]:
_bicycle = spark.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true').load('data/서울특별시_공공자전거 일별 대여건수_(2018_2019.03).csv')

In [77]:
_bicycle.show(5)

+----------+-----+
|      date|count|
+----------+-----+
|2018-01-01| 4950|
|2018-01-02| 7136|
|2018-01-03| 7156|
|2018-01-04| 7102|
|2018-01-05| 7705|
+----------+-----+
only showing top 5 rows



In [78]:

bicycle=_bicycle\
    .withColumnRenamed("date", "Date")\
    .withColumnRenamed(" count", "Count")

year, month 추가 방법1

In [80]:
bicycle=bicycle.withColumn("year",bicycle.Date.substr(1, 4))

In [83]:
bicycle=bicycle.withColumn("month",bicycle.Date.substr(6, 2))
bicycle.printSchema()

root
 |-- Date: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)



year, month 추가 방법2

In [84]:
columns_to_drop = ['year','month']
df = bicycle.drop(*columns_to_drop)

In [85]:
import pyspark.sql.functions as F
bicycle = bicycle\
    .withColumn('year', F.year('date'))\
    .withColumn('month', F.month('date'))

In [86]:
bicycle.printSchema()
#String 으로 들어갔던 위의 결과와 달리 얘는 integer로 들어감.

root
 |-- Date: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



#### 년도별 대여건수 합계

In [87]:
bicycle.groupBy('year').agg({"count":"sum"}).show()

+----+----------+
|year|sum(count)|
+----+----------+
|2018|  10124874|
|2019|   1871935|
+----+----------+



#### 년도별, 월별 (분기별) 대여건수 합계

In [88]:
bicycle.groupBy('year').pivot('month').agg({"count":"sum"}).show()

+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+
|year|     1|     2|     3|     4|     5|      6|      7|      8|      9|     10|    11|    12|
+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+
|2018|164367|168741|462661|687885|965609|1207123|1100015|1037505|1447993|1420621|961532|500822|
|2019|495573|471543|904819|  null|  null|   null|   null|   null|   null|   null|  null|  null|
+----+------+------+------+------+------+-------+-------+-------+-------+-------+------+------+



#### Pandas 로 해결

In [None]:
import pandas as pd
import numpy as np
bicycleP = bicycle.toPandas()

In [None]:
#bicycleP.groupby('year').aggregate({'Count':np.sum})
bicycleP.groupby('year').aggregate({'Count':'sum'})

In [None]:
#01
pd.pivot_table(bicycleP, values = 'Count', index = ['year'], columns = ['month'], aggfunc= 'sum')

In [None]:
#02
sumMonthly=bicycle.groupBy('year').pivot('month').agg({"count":"sum"})
pdf=sumMonthly.toPandas()
pdf.head()

In [None]:
my=pdf.drop('year', 1).transpose()

In [None]:
my.plot(kind='line')

### SPARK & SQL 

In [89]:
wcDf.printSchema()

root
 |-- Competition: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Team: string (nullable = true)
 |-- Number: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- Club: string (nullable = true)
 |-- ClubCountry: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- IsCaptain: boolean (nullable = true)



In [90]:
wcDf.createOrReplaceTempView("wc") #임시테이블 만들기
spark.sql("select Club,Team,Year from wc").show(1) #데이터 조회

+--------------------+---------+----+
|                Club|     Team|Year|
+--------------------+---------+----+
|Club AtlÃ©tico Ta...|Argentina|1930|
+--------------------+---------+----+
only showing top 1 row



In [91]:
wcPlayers=spark.sql("select FullName,Club,Team,Year from wc")
wcPlayers.show(1)

+------------+--------------------+---------+----+
|    FullName|                Club|     Team|Year|
+------------+--------------------+---------+----+
|Ãngel Bossio|Club AtlÃ©tico Ta...|Argentina|1930|
+------------+--------------------+---------+----+
only showing top 1 row



In [92]:
#wcPlayes를 RDD로 변환해서 이름만 출력
namesRdd=wcPlayers.rdd.map(lambda x: "Full name: "+x[0])
for e in namesRdd.take(5):
    print (e)

Full name: Ãngel Bossio
Full name: Juan Botasso
Full name: Roberto Cherro
Full name: Alberto Chividini
Full name: 


In [108]:
bucketDf=spark.createDataFrame([[1,["orange", "apple", "pineapple"]],
                                [2,["watermelon","apple","bananas"]]],
                               ["bucketId","items"])

In [109]:
bucketDf.show(bucketDf.count(), truncate=False)
#truncate=False 값을 잘라내지 않고 모두 출력

+--------+----------------------------+
|bucketId|items                       |
+--------+----------------------------+
|1       |[orange, apple, pineapple]  |
|2       |[watermelon, apple, bananas]|
+--------+----------------------------+



- explode
컬럼에 List 또는 배열이 포함된 경우 explode() 함수는 이를 flat해서 새로운 컬럼을 생성하게 된다.

In [105]:
from pyspark.sql.functions import explode
bDf=bucketDf.select(bucketDf.bucketId, explode(bucketDf.items).alias('item'))

In [106]:
bDf=bucketDf.select(bucketDf.bucketId, bucketDf.items)
bDf.show()

+--------+--------------------+
|bucketId|               items|
+--------+--------------------+
|       1|[orange, apple, p...|
|       2|[watermelon, appl...|
+--------+--------------------+



spark join

In [98]:
fDf=spark.createDataFrame([["orange", "F1"],
                            ["", "F2"],
                            ["pineapple","F3"],
                            ["watermelon","F4"],
                            ["bananas","F5"]],
                            ["item","itemId"])
fDf.show()

+----------+------+
|      item|itemId|
+----------+------+
|    orange|    F1|
|          |    F2|
| pineapple|    F3|
|watermelon|    F4|
|   bananas|    F5|
+----------+------+



In [112]:
joinDf=fDf.join(bDf, fDf.item==bDf.item, "inner")

AttributeError: 'DataFrame' object has no attribute 'item'

### 불법 네트워크 attack

In [1]:
import os
_url = 'http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz'
_fname = os.path.join(os.getcwd(),'data','kddcup.data_10_percent.gz')

In [None]:
#파일존재 확인
# from urllib.request import urlretrieve

# if(not os.path.exists(_fname)):
#     print ("{} data does not exist! retrieving..".format(_fname))
#     _f=urlretrieve(_url,_fname)

In [4]:
_rdd = spark.sparkContext.textFile(_fname)

In [5]:
#map() 함수를 사용하여 csv 형식으로 구성된 파일을 컴마(,)로 분리한다.
_allRdd=_rdd.map(lambda x: x.split(','))

In [6]:
_normalRdd=_allRdd.filter(lambda x: x[41]=="normal.")
_attackRdd=_allRdd.filter(lambda x: x[41]!="normal.")

In [7]:
print(_allRdd.count())
print(_normalRdd.count())
print(_attackRdd.count())

494021
97278
396743


In [8]:
_41 = _allRdd.map(lambda x: (x[41], 1))
_41.reduceByKey(lambda x,y: x+y).collect()
# def f(x): return len(X)
# _41.groupByKey().mapValues().collect()

[('normal.', 97278),
 ('buffer_overflow.', 30),
 ('loadmodule.', 9),
 ('perl.', 3),
 ('neptune.', 107201),
 ('smurf.', 280790),
 ('guess_passwd.', 53),
 ('pod.', 264),
 ('teardrop.', 979),
 ('portsweep.', 1040),
 ('ipsweep.', 1247),
 ('land.', 21),
 ('ftp_write.', 8),
 ('back.', 2203),
 ('imap.', 12),
 ('satan.', 1589),
 ('phf.', 4),
 ('nmap.', 231),
 ('multihop.', 7),
 ('warezmaster.', 20),
 ('warezclient.', 1020),
 ('spy.', 2),
 ('rootkit.', 10)]

In [9]:
#열 0, 1, 2, 3, 4, 5, 41을 선별하여 스키마를 정해서 RDD를 생성한다.
from pyspark.sql import Row

_csv = _rdd.map(lambda l: l.split(","))
_csvRdd = _csv.map(lambda p: 
    Row(
        duration=int(p[0]), 
        protocol=p[1],
        service=p[2],
        flag=p[3],
        src_bytes=int(p[4]),
        dst_bytes=int(p[5]),
        attack=p[41]
    )
)

In [10]:
_df=spark.createDataFrame(_csvRdd)

In [11]:
_df.printSchema()
_df.show(4)

root
 |-- duration: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- attack: string (nullable = true)

+--------+--------+-------+----+---------+---------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|
+--------+--------+-------+----+---------+---------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.|
|       0|     tcp|   http|  SF|      239|      486|normal.|
|       0|     tcp|   http|  SF|      235|     1337|normal.|
|       0|     tcp|   http|  SF|      219|     1337|normal.|
+--------+--------+-------+----+---------+---------+-------+
only showing top 4 rows



In [14]:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
attack_udf = udf(lambda x: "normal" if x =="normal." else "attack", StringType())
myDf=_df.withColumn("attackB", attack_udf(_df.attack))

In [16]:
myDf.show()

+--------+--------+-------+----+---------+---------+-------+-------+
|duration|protocol|service|flag|src_bytes|dst_bytes| attack|attackB|
+--------+--------+-------+----+---------+---------+-------+-------+
|       0|     tcp|   http|  SF|      181|     5450|normal.| normal|
|       0|     tcp|   http|  SF|      239|      486|normal.| normal|
|       0|     tcp|   http|  SF|      235|     1337|normal.| normal|
|       0|     tcp|   http|  SF|      219|     1337|normal.| normal|
|       0|     tcp|   http|  SF|      217|     2032|normal.| normal|
|       0|     tcp|   http|  SF|      217|     2032|normal.| normal|
|       0|     tcp|   http|  SF|      212|     1940|normal.| normal|
|       0|     tcp|   http|  SF|      159|     4087|normal.| normal|
|       0|     tcp|   http|  SF|      210|      151|normal.| normal|
|       0|     tcp|   http|  SF|      212|      786|normal.| normal|
|       0|     tcp|   http|  SF|      210|      624|normal.| normal|
|       0|     tcp|   http|  SF|  

In [12]:
#udf함수를 사용해서 attack 종류 구분
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def classify41(s):
    _5=""
    if s=="normal.":
        _5="normal"
    elif s=="back." or s=="land." or s=="neptune." or s=="pod." or s=="smurf." or s=="teardrop.":
        _5="dos"
    elif s=="ftp_write." or s=="guess_passwd." or s=="imap." or s=="multihop." or s=="phf." or\
        s=="spy." or s=="warezclient." or s=="warezmaster.":
        _5="r2l"
    elif s=="buffer_overflow." or s=="loadmodule." or s=="perl." or s=="rootkit.":
        _5="u2r"
    elif s=="ipsweep." or s=="nmap." or s=="portsweep." or s=="satan.":
        _5="probing"
    return _5

attack5_udf = udf(classify41, StringType())

In [17]:
myDf=myDf.withColumn("attack5", attack5_udf(_df.attack))

In [18]:
#attack 종류별로 건수 세기
myDf.groupBy('attack5').count().show()

+-------+------+
|attack5| count|
+-------+------+
|probing|  4107|
|    u2r|    52|
| normal| 97278|
|    r2l|  1126|
|    dos|391458|
+-------+------+



In [19]:
#네트워크 종류별로 건수 세기
myDf.groupBy('protocol').count().show()

+--------+------+
|protocol| count|
+--------+------+
|     tcp|190065|
|     udp| 20354|
|    icmp|283602|
+--------+------+



In [20]:
myDf.groupBy("protocol","attackB").count().show()

+--------+-------+------+
|protocol|attackB| count|
+--------+-------+------+
|     tcp| normal| 76813|
|     udp| normal| 19177|
|    icmp| normal|  1288|
|     udp| attack|  1177|
|     tcp| attack|113252|
|    icmp| attack|282314|
+--------+-------+------+



In [22]:
myDf.groupBy('attackB').pivot('protocol').count().show()

+-------+------+------+-----+
|attackB|  icmp|   tcp|  udp|
+-------+------+------+-----+
| normal|  1288| 76813|19177|
| attack|282314|113252| 1177|
+-------+------+------+-----+



In [23]:
myDf.groupBy('attack5').avg('duration').show()

+-------+--------------------+
|attack5|       avg(duration)|
+-------+--------------------+
|probing|   485.0299488677867|
|    u2r|    80.9423076923077|
| normal|  216.65732231336992|
|    r2l|   559.7522202486679|
|    dos|7.254929008986916E-4|
+-------+--------------------+



In [24]:
from pyspark.sql import functions as F
myDf.groupBy('attackB').pivot('protocol').agg(F.max('dst_bytes')).show()

+-------+----+-------+---+
|attackB|icmp|    tcp|udp|
+-------+----+-------+---+
| normal|   0|5134218|516|
| attack|   0|5155468| 74|
+-------+----+-------+---+



In [27]:
#duration>1000이고, dst_bytes==0인 경우
myDf.select("protocol", "duration", "dst_bytes").filter(_df.duration>1000).filter(_df.dst_bytes==0)\
    .groupBy("protocol").count().show()

+--------+-----+
|protocol|count|
+--------+-----+
|     tcp|  139|
+--------+-----+



#### SQL table로 조건검색

In [28]:
_df.registerTempTable("_tab")
tcp_interactions = spark.sql(
"""
    SELECT duration, dst_bytes FROM _tab
    WHERE protocol = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show(5)

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
+--------+---------+
only showing top 5 rows

