# PySpark 2 

In [1]:
import findspark # pyspark를 import할 수 있도록 해주는 모듈

In [2]:
findspark.init() # pyspark객체를 초기화

In [3]:
from pyspark.sql import SparkSession

In [4]:
from pyspark import SparkContext

In [5]:
spark = SparkSession.builder.getOrCreate()

In [6]:
sc = SparkContext.getOrCreate()

두개가 같다.

RDD 와 DataFrame의 차이는 스키마가 있고 없고, 데이터프레임은 raw 데이터를 스키마가 있게 만들어줌. 

read.csv, read.format( 포맷 명시 )

In [7]:
df = spark.read.csv("sample/ages.csv")
# csv파일을 DataFrame 타입 객체로 반환해준다.

In [8]:
type(df)

pyspark.sql.dataframe.DataFrame

In [9]:
df.dtypes # 모든 컬럼과 타입을 리스트로 출력해준다.

[('_c0', 'string'), ('_c1', 'string')]

In [10]:
df = spark.read.format('json').load("sample/people.json")
# 'json'포맷의 파일을 불러온다.

In [11]:
type(df)

pyspark.sql.dataframe.DataFrame

In [12]:
df.dtypes # 컬럼명이랑 데이터타입을 추론함.

#굉장히 명확하게 구조화된 데이터를 불러온다. jdbc 커넥터로 붙일수있다.

[('age', 'bigint'), ('name', 'string')]

RDD 였으면 그냥 들어간데로나온다.

In [13]:
rdd = sc.textFile("sample/people.json") # rdd를 가져온다

In [14]:
rdd

sample/people.json MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0

In [15]:
type(rdd) # RDD 타입 객체

pyspark.rdd.RDD

In [16]:
rdd.collect() 
#똑같은파일인데, rdd 로 가져오면 테이블형식이아님.  데이터프레임으로 되면 : 스키마구조를 갖고 , 자동으로 데이터타입을 지정함

['{"name":"Michael"}',
 '{"name":"Andy", "age":30}',
 '{"name":"Justin", "age":19}']

In [17]:
df = spark.read.json(rdd) #rdd 로 부터 dataframe 을 만든다.  

In [18]:
df

DataFrame[age: bigint, name: string]

### rdd 로 부터 데이터프레임을 만들고 sql 문쓰기

In [19]:
findspark.init()

In [20]:
sc

In [21]:
spark

In [22]:
rdd = sc.parallelize((
"""
    { "id":123,
    "name":"Katy",
    "age":19,
    "eyeColor":"brown"
    }
""",
"""
    {
    "id":124,
    "name":"Joe",
    "age":44,
    "eyeColor":"black"

    }
""",
"""
    {
    "id":125,
    "name":"Romanson",
    "age":25,
    "eyeColor":"blue"

    }

"""
)) # RDD 객체 생성

In [23]:
rdd.collect() 
#문자열 그대로 들어감 .처리하려면 regex로 다짤라줘야함 하지만 json 이라는 파일형식을 가지니까. 

['\n    { "id":123,\n    "name":"Katy",\n    "age":19,\n    "eyeColor":"brown"\n    }\n',
 '\n    {\n    "id":124,\n    "name":"Joe",\n    "age":44,\n    "eyeColor":"black"\n\n    }\n',
 '\n    {\n    "id":125,\n    "name":"Romanson",\n    "age":25,\n    "eyeColor":"blue"\n\n    }\n\n']

In [24]:
df = spark.read.json(rdd) 
# rdd 에 있는 json 을 그대로가져온다.
# 컬럼 , 데이터타입 추론해서 rdd 로 부터 dataframe 을 만든다.  

In [25]:
df # 추론된 결과가 나옴

DataFrame[age: bigint, eyeColor: string, id: bigint, name: string]

pandas 에서 쓰던 DataFrame 과는 달리, 여기의 DataFrame 은 row 단위로 관리한다.   pandas 의 DataFrame은 컬럼으로 관리함.  

In [26]:
df.collect() # pyspark의 DataFrame은 Row 단위로 관리되는 것을 확인 할 수 있다.

[Row(age=19, eyeColor='brown', id=123, name='Katy'),
 Row(age=44, eyeColor='black', id=124, name='Joe'),
 Row(age=25, eyeColor='blue', id=125, name='Romanson')]

In [27]:
#실제 테이블 형태로 가져옴.
df.createOrReplaceTempView("test")
#view테이블 만들껀데 view table 의 이름을 test 로 지정 dataframe - > temp view 만듬

df에서 collect 같은 기능이 show 다 .

In [28]:
df.show() # 스키마를 가지기때문에 스키마 형태로 데이터를 보여준다. 

+---+--------+---+--------+
|age|eyeColor| id|    name|
+---+--------+---+--------+
| 19|   brown|123|    Katy|
| 44|   black|124|     Joe|
| 25|    blue|125|Romanson|
+---+--------+---+--------+



In [29]:
df.printSchema() # Tree 구조로 Schema를 출력해준다.

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [30]:
spark.sql("select * from test") #sql query 날린다.

DataFrame[age: bigint, eyeColor: string, id: bigint, name: string]

In [31]:
spark.sql("select * from test").collect()

[Row(age=19, eyeColor='brown', id=123, name='Katy'),
 Row(age=44, eyeColor='black', id=124, name='Joe'),
 Row(age=25, eyeColor='blue', id=125, name='Romanson')]

RDD 에서 있던 collect는 Schema없이 출력한 반면에,  
여기서는 collect가  row 형태로, 스키마가 있는 형태로 리턴.

새로 RDD 만든다

In [32]:
rdd = sc.parallelize(
    [
        (123,"katie",19,"brown"),
        (124,"joe",45,"black"),
        (125,"romanson",25,"blue")
    ]
)



In [33]:
rdd.collect()

[(123, 'katie', 19, 'brown'),
 (124, 'joe', 45, 'black'),
 (125, 'romanson', 25, 'blue')]

원래 그냥 읽으면 컬럼명 같은것을 다 정해줌 , 하지만 여기서는 우리가 정의할 것.

In [34]:
from pyspark.sql.types import *  # 데이터 타입 다 불러옴

In [35]:
#name, dataType,nullable,metadata 우리가 정해줄거임.

scheme = StructType(
    [
        StructField("id",LongType(),nullable=True),
        StructField("name",StringType(),nullable=True),
         StructField("age",LongType(),nullable=True),
         StructField("eyeColor",StringType(),nullable=True)
    
    ]
)

### Schema 와 RDD 를 합쳐서 DataFrame 만들기

In [36]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [37]:
df.createOrReplaceTempView("test2") # 해당하는 DataFrame을 임시 View로 만들어줌.
# "test2"라는 이름의 테이블 생성

In [38]:
# 이제부터는 sql 작업가능해짐
spark.sql("select * from test2").collect() # 해당 sql 쿼리에 대한 답을 출력

[Row(age=19, eyeColor='brown', id=123, name='Katy'),
 Row(age=44, eyeColor='black', id=124, name='Joe'),
 Row(age=25, eyeColor='blue', id=125, name='Romanson')]

In [39]:
spark.sql("select count(*)from test").show() 
# sql 쿼리에 대한 결과값을 스키마 형태로 출력해준다. 

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [40]:
spark.sql("select id,age from test").show()

+---+---+
| id|age|
+---+---+
|123| 19|
|124| 44|
|125| 25|
+---+---+



In [41]:
df.select("id",'age').filter("age=25").show()
# age=25로 필터링된 id, age를 출력 
# 여기서는 column을 string으로 넣어줌.

+---+---+
| id|age|
+---+---+
|125| 25|
+---+---+



In [42]:
df.select(df.id, df.age).filter(df.age == 22).show()
# age = 22로 필터링해준 id, age를 출력해줌. 
# 여기서는 column명을 df의 attribute로 넣음.

+---+---+
| id|age|
+---+---+
+---+---+



###  항공 데이터 다루기

In [43]:
flightPath = "sample/departuredelays.csv"
airportPath = "sample/airport-codes-na.txt"

In [44]:
flight = spark.read.csv(flightPath, header = True)
# 해당 csv파일을 읽어와서 DataFrame객체로 반환해준다.
# header = True 플래그를 통해서 첫줄을 Column명으로 인식해준다.

In [45]:
flight.take(1) # flight DataFrame의 처음Row부터 1개의 Row객체를 갖고 온다.

[Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL')]

In [46]:
airport = spark.read.csv(airportPath, header=True, inferSchema=True, sep="\t")
# header = True를 통해서 첫째 줄을 Column명으로 인식
# inferSchema = True를 통해서 input Schema를 자동으로 추론해서 인식해준다.
# sep = "\t" 을 통해서 Tab 공백으로 데이터 간을 나눠준다.

In [47]:
airport.take(1) # 해당 데이터의 Row를 한개 가지고 온다.

[Row(City='Abbotsford', State='BC', Country='Canada', IATA='YXX')]

In [48]:
airport.printSchema() # 해당 스키마를 트리 구조로서 출력해준다.

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [49]:
flight.createOrReplaceTempView("flight")
airport.createOrReplaceTempView("airport") # sql문 쓰기 위해 임시로 뷰를 만들어줌.
# "flight", "airport"라는 이름의 테이블 생성됨.

In [50]:
spark.sql(
"""
     select a.city, f.origin,sum(f.delay) as delay from flight f join airport a on a.IATA = f.origin
        where a.State = "WA"
        group by a.City,f.origin
        order by delay desc
""").show()

#sql문 query를 날릴 수 있다.

+-------+------+--------+
|   city|origin|   delay|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [51]:
from pyspark.sql import functions as F # SQL문의 built-in 함수.

In [52]:
#함수형 프로그래밍 형태로 sql문 query날릴 수 있다.

airport.join(flight, airport.IATA == flight.origin)\
.where(airport.State == "WA")\
.select(airport.City, flight.origin, flight.delay)\
.groupby(airport.City, flight.origin)\
.agg(F.sum(flight.delay))\
.orderBy("sum(delay)", ascending=False)\
.show()
 # 오름차순 False -> 내림차순 = 큰값부터 나옴

+-------+------+----------+
|   City|origin|sum(delay)|
+-------+------+----------+
|Seattle|   SEA|  159086.0|
|Spokane|   GEG|   12404.0|
|  Pasco|   PSC|     949.0|
+-------+------+----------+



## 스트리밍(Streaming)

__스트리밍에 대한 이해를 위한 사전 개념__

__Unbounded data__ 는 데이터의 수가 정해져있지 않고 계속해서 추가되는, 즉 끊임 없이 흘러 들어오는 데이터라고 볼 수 있다. 예를 들어서 모바일 디바이스에서 계속 올라오는 로그, 페이스북이나 트위터의 타임 피드, 증권 거래 주문 같이 계속 해서 들어와서 쌓이는 데이터를 Unbounded data 라고 한다.   
   
   
__Bounded data__는 데이터가 딱 저장되고 더 이상 증거나 변경이 없는 형태로 계속 유지되는 데이터를 뜻한다. 1월의 정산 데이터. 

__Event time__ : 데이터의 발생 시간  
__Processing time__ : 데이터가 시스템에서 처리되는 시간  
    
이상적으로는 EventTime과 ProcessingTime이 동일하면 좋겠지만,
실제로는 네트워크 상황, 서버CPU, I/O 처리 시간에 따라 ProcessingTime이 더 늦다.

ProcessingTime - EventTime의 지연되는 시간을 Skew라고 한다.

__Bounded data 처리__ : 그냥 데이터 읽어서 한번에 처리 후 저장

__UnBounded data 처리__
- Batch방식
 - Fixed Windows : 스트리밍으로 들어오는 데이터를 일정 시간 단위로 모은 뒤에 배치로 처리하는 방식이다.
 - Sliding Windows : 윈도우(일정 간격 시간 범위)가 움직이면서 데이터를 모으는 개념으로, 윈도우 내에 일정 부분 시간은 서로 겹치게 된다. 
- Streaming방식 : 기본적으로 Skew(지연시간)이 환경에 따라 들쭉날쭉해서 비교적 복잡함.
 - Time agnostic : 시간 속성을 지니지 않는 데이터로, 들어오는 대로 처리한다.
 - Filtering : 특정 데이터만 필터링해서 저장하는 구조.
 - Inner joins : 2개의 unbounded data에서 서로 비교해서 매칭 뒤에 값을 구하는 방식.
 - Approximation algorithms : 근사치 추정 방식으로 실시간 분석에서는 전체 데이터를 모두 분석할 시간이 없고, 시급한 분석이 필요하기에 일부만 분석하거나 대략적인 데이터의 근사값을 구하는 방법

__Windowing__ : 스트리밍 데이터를 처리할 때 일정 시간 간격으로 처리하는 것을 의미  
  
Fixed Window, Sliding Window방식이 있다.


__Session__ : Session Window에는 사용자가 일정 기간동안 반응이 없는 경우에 세션 시작에서 반응이 없어지는 시간 까지를 한 세션으로 묶어서 처리

__시간대별 Window 처리 방식__

Processing Time based Windowing  
데이터가 도착한 순서대로 처리해서 저장한다.  
  
Event Time based Windowing  
데이터가 순차적으로 들어온다는 것을 보장할 수 없다.
그래서 2가지의 주요 고려 사항이 필요하다.
 - Buffering
   : 늦게 도착한 데이터를 처리해야 하기 때문에. 윈도우를 일정시간동안 유지해야 한다. 이를 위해서 메모리나 별도의 디스크 공간을 사용한다. 

 - Completeness
   : Buffering을 적용했으면 다른 문제가 얼마 동안 버퍼를 유지해야 하는가? 즉, 해당 시간에 발생한 모든 데이터는 언제 모두 도착이 완료(Completeness) 되는가? 를 결정하는 것이다. 정확한 완료 시점을 갖는 것은 사실 현실적으로 힘들다. 버퍼를 아주 크게 잡으면 거의 모든 데이타를 잡아낼 수 있겠지만, 버퍼를 아주 크게 잡는 것이 어렵기 때문에, 데이터가 언제 도착할 것이라는 것을 어림 잡아 짐작할 수 있는 방법들이 많다.

출처: http://bcho.tistory.com/tag/Sliding%20window [조대협의 블로그]


In [53]:
from pyspark.streaming import StreamingContext
# Spark Streaming 기능에 대한 메인 entry-point(진입점)
# Streamingontext는 Spark Cluster에 대한 연결을 나타낸다.
# DStream을 만드는데 쓰일 수 있다.
# Discretized Stream(DStream)은 Spark Streaming에 대한 것이며, 
# (같은 타입의)RDD들의 연속적인 sequence이다.

In [54]:
streaming = StreamingContext(sc, 5)
#인자로 sparkContext, batchDuration(배치가 되는 시간 간격)을 받는다.

In [55]:
streaming

<pyspark.streaming.context.StreamingContext at 0x1ce1b3be978>

In [56]:
lines = streaming.socketTextStream("localhost", 9999)
#스트리밍으로 받아올 포트를 연결.
#rdd이다 보니까 flatMap같은것을 쓸 수 있다.

# TCP socket을 이용해서 데이터를 받아온다.

In [57]:
words = lines.flatMap(lambda line:line.split())
# DStream객체의 모든 element들에 대해서 해당 함수 인자로 넣어 실행하고,
# 1열로 쭉 나열한 상태로 반환해준다.

In [58]:
type(words)

pyspark.streaming.dstream.TransformedDStream

In [59]:
pairs = words.map(lambda w:(w,1))
# DStream객체의 모든 element들에 대해서 해당 함수 인자로 넣어 실행해서 반환

In [60]:
type(pairs)

pyspark.streaming.dstream.TransformedDStream

In [61]:
counts = pairs.reduceByKey(lambda x,y:x+y)
# 키값에 대해서 reduce를 각각의 RDD에 적용하고 DStream 객체로 반환

In [62]:
counts.pprint()

현재 스트리밍 받는다고 가정하에 디자인하고 있는 것.

In [None]:
# netcat은 서버에다가 request를 보내고, response받는 용도이다. 인터넷 상에 cat을 쓸 수 있다.

# 먼저 streaming.start()가 된 이후에
# cmd에서 netcat을 실행해서 nc -l -p 9999 를 한뒤 (nc -lvp 9999로 해도 됨.)
# 9999port로 apple과 hi를 input을 주었다.

streaming.start() # 스트리밍 시작
streaming.awaitTermination() # 스트리밍 꺼준다.

-------------------------------------------
Time: 2018-10-30 18:10:00
-------------------------------------------

-------------------------------------------
Time: 2018-10-30 18:10:05
-------------------------------------------

-------------------------------------------
Time: 2018-10-30 18:10:10
-------------------------------------------

-------------------------------------------
Time: 2018-10-30 18:10:15
-------------------------------------------
('apple', 1)

-------------------------------------------
Time: 2018-10-30 18:10:20
-------------------------------------------

-------------------------------------------
Time: 2018-10-30 18:10:25
-------------------------------------------
('hi', 1)

-------------------------------------------
Time: 2018-10-30 18:10:30
-------------------------------------------

-------------------------------------------
Time: 2018-10-30 18:10:35
-------------------------------------------

-------------------------------------------
Time: 2018-10

In [63]:
lines = spark.readStream.format("socket")\
.option("host", "localhost")\
.option("port", 9999)\
.load()
# localhost:9999 에서 socket을 통해서 데이터를 받아와서 로딩한다.
# DataFrame으로 반환받아서 구조적으로 이용할 수도 있다.

In [64]:
from pyspark.sql.functions import explode, split
# explode : 주어진 array나 map 으로 새로운 row 리턴해줌.
# split : reg exp에 따라서 나눠줌.

In [65]:
type(lines)

pyspark.sql.dataframe.DataFrame

In [66]:
words = lines.select(explode(split(lines.value, " ")).alias("word"))
# lines.value에 대해서 공백으로 나눠주고, row를 리턴하며 이름은 "word"이다.

In [67]:
wordCount = words.groupby("word").count() # Stream으로 받은 단어의 개수

In [None]:
stream = wordCount.writeStream\
.outputMode("complete")\ 
.format("console")\
.start()
# console포맷으로 complete(싱크에 맞게 모든 row들 출력)모드로 스트림을 실행

stream.awaitTermination() # 스트림 중지

---

In [82]:
from pyspark.sql import SparkSession

In [116]:
spark = SparkSession.builder.getOrCreate() 
# 있던 SparkSession을 불러오거나 새로이 생성

In [117]:
df = spark.createDataFrame(
    [
        (1, 144.5, 5.9, 33, "M"),
        (2, 167.2, 5.4, 45, "M"),
        (3, 124.1, 5.2, 23, "F"),
        (1, 144.5, 5.9, 33, "M"),
        (5, 133.2, 5.7, 54, "F"),
        (6, 124.1, 5.2, 23, "F"),
        (7, 129.2, 5.3, 42, "M"),    
    ],
    ["id", "weight", "height", "age", "gender"]
) # 해당 데이터를 DataFrame 객체로 만들어줌.

In [118]:
type(df)

pyspark.sql.dataframe.DataFrame

In [119]:
df.count(), df.distinct().count()
# (row의 수 카운트, 중복 제거한 row의 수 카운트)

(7, 6)

In [120]:
df = df.dropDuplicates() # 중복된 row를 drop해준다.
df.count() # 중복된 row 1개가 제거되고 6개가 된다.

6

In [121]:
df.orderBy("id").show() # orderBy()를 통해 특정 column기준으로 sorting해줌.

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 133.2|   5.7| 54|     F|
|  6| 124.1|   5.2| 23|     F|
|  7| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [122]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  6| 124.1|   5.2| 23|     F|
|  2| 167.2|   5.4| 45|     M|
|  7| 129.2|   5.3| 42|     M|
|  3| 124.1|   5.2| 23|     F|
+---+------+------+---+------+



In [123]:
df.dropDuplicates([c for c in df.columns if c != "id"]).show()
# "id" Column을 제외한 컬럼들에서 중복되는 row를 drop해준다.
# id= 6, 3이 id를 제외하고는 동일한 값을 지니고 있어 drop됨을 알수 있다.


+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  6| 124.1|   5.2| 23|     F|
|  7| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [124]:
from pyspark.sql import functions as f

In [125]:
df.agg(
    f.count("id").alias("Count"),
    f.countDistinct("id").alias("Dcount")). show()
# df.agg()는 데이터 전체에 대하여 aggregate 적용할 수 있도록 해준다.

+-----+------+
|Count|Dcount|
+-----+------+
|    6|     6|
+-----+------+



In [126]:
df.withColumn("id", f.monotonically_increasing_id()).show()
# withColumn은 Column을 추가해주거나 같은 이름의 column을 대체해준다.
# f.monotonically_increasing_id()는 증가하는 64bit 정수로 이루어진 column 생성해줌
# 그래서 자동으로 id가 겹치지 않도록 해줄 수 있다.

+-------------+------+------+---+------+
|           id|weight|height|age|gender|
+-------------+------+------+---+------+
| 171798691840| 133.2|   5.7| 54|     F|
| 481036337152| 144.5|   5.9| 33|     M|
| 575525617664| 124.1|   5.2| 23|     F|
| 721554505728| 167.2|   5.4| 45|     M|
|1099511627776| 129.2|   5.3| 42|     M|
|1623497637888| 124.1|   5.2| 23|     F|
+-------------+------+------+---+------+



In [127]:
df.withColumn("old", df.id).show()

+---+------+------+---+------+---+
| id|weight|height|age|gender|old|
+---+------+------+---+------+---+
|  5| 133.2|   5.7| 54|     F|  5|
|  1| 144.5|   5.9| 33|     M|  1|
|  6| 124.1|   5.2| 23|     F|  6|
|  2| 167.2|   5.4| 45|     M|  2|
|  7| 129.2|   5.3| 42|     M|  7|
|  3| 124.1|   5.2| 23|     F|  3|
+---+------+------+---+------+---+



In [128]:
df = spark.createDataFrame(
[
    (1, 143.5, 5.6, 28, "M", 100000),
    (2, 167.2, 5.4, 45, "M", None),
    (3, None, 5.2, None, None, None),
    (4, 144.5, 5.9, 33, "M", None),
    (5, 133.2, 5.7, 54, "F", None),
    (6, 124.1, 5.2, None, "F", None),
    (7, 129.2, 5.3, 42, "M", 76000),
],
['id', 'weight', 'height', 'age', 'gender', 'income']
) # DataFrame 객체 생성

In [129]:
type(df)

pyspark.sql.dataframe.DataFrame

In [130]:
type(df.rdd) #dataframe 내에 있는데도 타입이 RDD이다.

pyspark.rdd.RDD

In [131]:
df.rdd.map(
    lambda row:(row["id"], sum([c == None for c in row]))
).collect() # column별로 row별로 빈 데이터를 찾아준다.
#[(해당 row의 id, row 내에 None의 개수)]

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [132]:
df.where("id=3").show() # id가 3인 row를 보여준다.

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



In [133]:
from pyspark.sql import functions as f

In [134]:
df.agg(f.count("weight")).show() # weight를 가지는 row의 개수

+-------------+
|count(weight)|
+-------------+
|            6|
+-------------+



In [135]:
df.agg(*[1-f.count(c)/f.count("*").alias(c+"Rate") for c in df.columns]).show()
# 1 - ( 해당 컬럼 지니는 row 수 /  전체 row 수)
# 즉, 모든 데이터에 대해서 각 column에 None을 지니는 비율을 보여준다.

+----------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------+------------------------------------------------+------------------------------------------------+
|(1 - (count(id) / count(1) AS `idRate`))|(1 - (count(weight) / count(1) AS `weightRate`))|(1 - (count(height) / count(1) AS `heightRate`))|(1 - (count(age) / count(1) AS `ageRate`))|(1 - (count(gender) / count(1) AS `genderRate`))|(1 - (count(income) / count(1) AS `incomeRate`))|
+----------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------+------------------------------------------------+------------------------------------------------+
|                                     0.0|                              0.1428571428571429|                                             0.0|              

In [136]:
df.select([c for c in df.columns if c != "income"]).show()
# "income" Column을 제외한 df를 보여준다.

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [138]:
df.show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  1| 143.5|   5.6|  28|     M|100000|
|  2| 167.2|   5.4|  45|     M|  null|
|  3|  null|   5.2|null|  null|  null|
|  4| 144.5|   5.9|  33|     M|  null|
|  5| 133.2|   5.7|  54|     F|  null|
|  6| 124.1|   5.2|null|     F|  null|
|  7| 129.2|   5.3|  42|     M| 76000|
+---+------+------+----+------+------+



In [142]:
df.dropna(thresh=5).show() # non-null값이 5개 이하인 것을 날려줌
# 즉, 6개의 column 중에 null 값이 2개 이상이면 날려준다.

+---+------+------+---+------+------+
| id|weight|height|age|gender|income|
+---+------+------+---+------+------+
|  1| 143.5|   5.6| 28|     M|100000|
|  2| 167.2|   5.4| 45|     M|  null|
|  4| 144.5|   5.9| 33|     M|  null|
|  5| 133.2|   5.7| 54|     F|  null|
|  7| 129.2|   5.3| 42|     M| 76000|
+---+------+------+---+------+------+



In [166]:
means = df.agg(
*[f.mean(c).alias(c) for c in df.columns if c != "gender"]
) 
#gender를 제외한 컬럽에서 mean(평균)을 구해준다.

In [167]:
means.show()

+---+------------------+-----------------+----+-------+
| id|            weight|           height| age| income|
+---+------------------+-----------------+----+-------+
|4.0|140.28333333333333|5.471428571428572|40.4|88000.0|
+---+------------------+-----------------+----+-------+



rdd이던 dataframe이든 immutable하기 때문에 바로 수정하지 못한다.

In [168]:
mpd = means.toPandas() #데이터를 수정하기 위해서 판다스 형식으로 바꿔줌.

In [169]:
type(mpd)

pandas.core.frame.DataFrame

In [170]:
mpd # DataFrame타입이다.

Unnamed: 0,id,weight,height,age,income
0,4.0,140.283333,5.471429,40.4,88000.0


In [171]:
mpd = mpd.to_dict("records")[0] 
# dict타입으로 해당 DataFrame객체를 바꿔준다.

In [172]:
mpd

{'age': 40.399999999999999,
 'height': 5.4714285714285724,
 'id': 4.0,
 'income': 88000.0,
 'weight': 140.28333333333333}

In [173]:
mpd["gender"] = "X" # 새로운 Column을 생성

In [174]:
mpd

{'age': 40.399999999999999,
 'gender': 'X',
 'height': 5.4714285714285724,
 'id': 4.0,
 'income': 88000.0,
 'weight': 140.28333333333333}

In [175]:
df = df.fillna(mpd)

In [176]:
df.show()

+---+------------------+------+---+------+------+
| id|            weight|height|age|gender|income|
+---+------------------+------+---+------+------+
|  1|             143.5|   5.6| 28|     M|100000|
|  2|             167.2|   5.4| 45|     M| 88000|
|  3|140.28333333333333|   5.2| 40|     X| 88000|
|  4|             144.5|   5.9| 33|     M| 88000|
|  5|             133.2|   5.7| 54|     F| 88000|
|  6|             124.1|   5.2| 40|     F| 88000|
|  7|             129.2|   5.3| 42|     M| 76000|
+---+------------------+------+---+------+------+



NULL으로 있던 id=3이 평균값으로 대체되었다.

Null값이 많은 컬럼은 날려버린다.

위의 방식으로 데이터를 정제할 수 있다.

## Outlier들을 날리기

* Outlier는 일반적인 범위를 벗어나 있는 것

In [231]:
df = spark.createDataFrame(
    [
        (1, 143.5, 5.3, 28, "M"),
        (2, 154.2, 5.5, 45, "M"),
        (3, 324.3, 5.1, 99, "F"),
        (4, 144.5, 5.5, 33, "M"),
        (5, 133.2, 5.4, 54, "F"),
        (6, 124.1, 5.1, 21, "F"),
        (7, 129.2, 5.3, 42, "M"),    
    ],
    ['id', 'weight','height','age']
) # DataFrame 객체 생성

In [232]:
quantiles = df.approxQuantile("weight", [0.25, 0.75], 0.05)
# df.approxQuantile([Column들], [확률값], 상대오차)
# DataFrame객체의  숫자로된 Column들의  대략적인 Quantile를 계산

# 0%는 제일 적은 129.2, 100%는 제일 큰 324.3
#"Weight"컬럼 자료군들 중에 [25%, 75%]에 해당하는 자료를 5%의 상대오차를 허용하여 구한다.

In [233]:
quantiles # lower bound, upper bound로 맞춘다.
# 즉 데이터군의 25% 이하, 75 % 이상의 데이터를 outlier로 간주할 것임.

[129.2, 154.2]

In [234]:
IQR = quantiles[1] - quantiles[0] #  이것은 중간값이 될것이다.
# 그리고 이제 bound를 만들어준다.

In [235]:
IQR

25.0

In [236]:
bounds = [quantiles[0] - 1.5*IQR, quantiles[1] + 1.5 * IQR]

In [237]:
bounds

[91.69999999999999, 191.7]

In [238]:
cols = ["weight", "height", "age"]
bounds = {}

for c in cols:
    quantiles = df.approxQuantile(c, [0.25, 0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds[c] = [quantiles[0]-1.5*IQR, quantiles[1]+1.5*IQR]

#위에서 구한 것처럼 모든 컬럼에 대해서 lowerbound 25%, upperbound 75%로 놓는다.
# 그리고 bound르 생성해준다.

In [239]:
bounds

{'age': [-11.0, 93.0],
 'height': [4.499999999999999, 6.1000000000000005],
 'weight': [91.69999999999999, 191.7]}

In [240]:
# lowerbound보다 낮은 weight를 지녔는가?
df.select(df["weight"] < bounds["weight"][0]).show() 
# upperbound보다 높은 weight를 지녔는가?
df.select(df["weight"] > bounds["weight"][1]).show()

+----------------------------+
|(weight < 91.69999999999999)|
+----------------------------+
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
+----------------------------+

+----------------+
|(weight > 191.7)|
+----------------+
|           false|
|           false|
|            true|
|           false|
|           false|
|           false|
|           false|
+----------------+



In [241]:
outlier =  df.select(
    *["id"] + [((df[c] < bounds[c][0]) |
    (df[c] > bounds[c][1])).alias(c+"_O") for c in col]
) #  "id" 컬럼 데이터 + bound를 넘어가는 outlier에 대한 Boolean값

In [242]:
outlier.show()  # true 인 값들이 bound를 넘어가는 outlier들이다.

+---+--------+--------+-----+
| id|weight_O|height_O|age_O|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+



In [243]:
joinTable = df.join(outlier, on="id")
# "id" Column을 기준으로 outlier와 DataFrame객체 데이터를 Join해준다.

In [244]:
joinTable.show()

+---+------+------+---+---+--------+--------+-----+
| id|weight|height|age| _5|weight_O|height_O|age_O|
+---+------+------+---+---+--------+--------+-----+
|  7| 129.2|   5.3| 42|  M|   false|   false|false|
|  6| 124.1|   5.1| 21|  F|   false|   false|false|
|  5| 133.2|   5.4| 54|  F|   false|   false|false|
|  1| 143.5|   5.3| 28|  M|   false|   false|false|
|  3| 324.3|   5.1| 99|  F|    true|   false| true|
|  2| 154.2|   5.5| 45|  M|   false|   false|false|
|  4| 144.5|   5.5| 33|  M|   false|   false|false|
+---+------+------+---+---+--------+--------+-----+



In [257]:
joinTable.filter("!weight_O").select("id", "weight", "height", "age").show()
# !weight_0를 한 이유는 outlier들을 판별하기 위해..
# !를 통해서 해당 weight_0에서 outlier True인 값을 빼주는 것이다.

+---+------+------+---+
| id|weight|height|age|
+---+------+------+---+
|  7| 129.2|   5.3| 42|
|  6| 124.1|   5.1| 21|
|  5| 133.2|   5.4| 54|
|  1| 143.5|   5.3| 28|
|  2| 154.2|   5.5| 45|
|  4| 144.5|   5.5| 33|
+---+------+------+---+

