In [1]:
import findspark

In [2]:
findspark.init()

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [5]:
spark

In [6]:
sc

In [7]:
df = spark.read.csv("sample/sample/ages.csv")

In [8]:
df.dtypes   #자동으로 스키마를 적용해 dataframe을 가져옴

[('_c0', 'string'), ('_c1', 'string')]

In [9]:
df = spark.read.format("json").load("sample/sample/people.json")

In [10]:
df.dtypes

[('age', 'bigint'), ('name', 'string')]

In [11]:
rdd = sc.textFile("sample/sample/people.json")

In [12]:
rdd.collect()    #데이터를 스키마 없이 가져옴

['{"name":"Michael"}',
 '{"name":"Andy", "age":30}',
 '{"name":"Justin", "age":19}']

In [13]:
df = spark.read.json(rdd)    #rdd로부터 dataframe 생성

In [14]:
rdd = sc.parallelize(("""
    {
        "id": "123",
        "name": "Katie",
        "age": 19,
        "eyeColor": "brown"
    }
""",
"""
    {
        "id": "234",
        "name": "Michael",
        "age": 22,
        "eyeColor": "green"
    }
""",
"""
    {
        "id": "345",
        "name": "Simone",
        "age": 23,
        "eyeColor": "blue"
    }
"""              
))

In [15]:
rdd.collect()

['\n    {\n        "id": "123",\n        "name": "Katie",\n        "age": 19,\n        "eyeColor": "brown"\n    }\n',
 '\n    {\n        "id": "234",\n        "name": "Michael",\n        "age": 22,\n        "eyeColor": "green"\n    }\n',
 '\n    {\n        "id": "345",\n        "name": "Simone",\n        "age": 23,\n        "eyeColor": "blue"\n    }\n']

In [16]:
df = spark.read.json(rdd)

In [17]:
df

DataFrame[age: bigint, eyeColor: string, id: string, name: string]

In [18]:
df.createOrReplaceTempView("test")   #스키마 구조를 갖고 있음

In [19]:
df.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [31]:
spark.sql("select * from test").show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [34]:
df.count()    #row의 갯수 반환

3

In [35]:
spark.sql("select count(*) from test").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [39]:
df.select('id','age').filter("age=22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [38]:
spark.sql("select id, age from test where age=22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [40]:
df.select(df.id, df.age).filter(df.age == 22).show()    #파이썬 문법

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [21]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [22]:
rdd = sc.parallelize(
    [
        (123, "Katie", 19, "brown"),
        (234, "Michael", 22, "green"),
        (345, "Simone", 23, "blue")
    ]
)

In [23]:
from pyspark.sql.types import *

In [24]:
scheme = StructType(
    [
        StructField("id", LongType(), True),
        StructField("name", StringType(), nullable=True),
        StructField("age", LongType(), nullable=True),
        StructField("eyeColor", StringType(), nullable=True)       
    ]
)

In [25]:
df = spark.createDataFrame(rdd, scheme)

In [26]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [27]:
df.createOrReplaceTempView("test2")

In [28]:
spark.sql("select * from test2").collect()

[Row(id=123, name='Katie', age=19, eyeColor='brown'),
 Row(id=234, name='Michael', age=22, eyeColor='green'),
 Row(id=345, name='Simone', age=23, eyeColor='blue')]

In [29]:
rdd.collect()

[(123, 'Katie', 19, 'brown'),
 (234, 'Michael', 22, 'green'),
 (345, 'Simone', 23, 'blue')]

In [30]:
df.collect()

[Row(id=123, name='Katie', age=19, eyeColor='brown'),
 Row(id=234, name='Michael', age=22, eyeColor='green'),
 Row(id=345, name='Simone', age=23, eyeColor='blue')]

In [41]:
flightPath = "sample/departuredelays.csv"
airportPath = "sample/airport-codes-na.txt"

In [42]:
flight = spark.read.csv(flightPath, header=True)

In [43]:
flight.take(1)

[Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL')]

In [49]:
flight.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [45]:
airport = spark.read.csv(airportPath, header = True, inferSchema=True, sep="\t")

In [46]:
airport.take(1)

[Row(City='Abbotsford', State='BC', Country='Canada', IATA='YXX')]

In [47]:
airport.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [50]:
flight.createOrReplaceTempView("flight")
airport.createOrReplaceTempView("airport")

In [52]:
spark.sql("""
    select a.City, f.origin, sum(f.delay) as delay 
        from flight f 
            join  airport a 
                on a.IATA = f.origin
    where a.State = "WA"
    group by a.City, f.origin
    order by delay desc
""").show()

+-------+------+--------+
|   City|origin|   delay|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [58]:
airport.join(flight, airport.IATA==flight.origin).where(airport.State =="WA").select(airport.City, flight.origin, flight.delay)\
        .groupBy(airport.City, flight.origin).agg(F.sum(flight.delay))\
        .orderBy("sum(delay)", ascending=False).show()

+-------+------+----------+
|   City|origin|sum(delay)|
+-------+------+----------+
|Seattle|   SEA|  159086.0|
|Spokane|   GEG|   12404.0|
|  Pasco|   PSC|     949.0|
+-------+------+----------+



In [53]:
from pyspark.sql import functions as F

streaming을 통해 실시간으로 데이터를 받아와 처리할 수 있음

In [59]:
from pyspark.streaming import StreamingContext

In [60]:
streaming = StreamingContext(sc, 5) #5초마다 streaming 

In [61]:
streaming

<pyspark.streaming.context.StreamingContext at 0x14487891860>

In [62]:
#어디에서 streaming을 받아올 것인지 socket 설정
lines = streaming.socketTextStream("localhost", 9999)

In [65]:
type(lines)   #rdd 형태

pyspark.streaming.dstream.DStream

In [66]:
words = lines.flatMap(lambda line:line.split())

In [67]:
pairs = words.map(lambda w:(w,1))

In [69]:
counts = pairs.reduceByKey(lambda x,y: x+y)

In [70]:
counts.pprint()

In [71]:
streaming.start()
streaming.awaitTerminationOrTimeout()

TypeError: awaitTerminationOrTimeout() missing 1 required positional argument: 'timeout'

-------------------------------------------
Time: 2018-08-06 14:08:25
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:08:30
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:08:35
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:08:40
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:08:45
-------------------------------------------
('abc', 1)

-------------------------------------------
Time: 2018-08-06 14:08:50
-------------------------------------------
('ababab', 1)

-------------------------------------------
Time: 2018-08-06 14:08:55
-------------------------------------------

-------------------------------------------
Time: 2018-08-06 14:09:00
-------------------------------------------
('exrer', 1)

------------------------------------------

In [None]:
streaming.start()
streaming.awaitTermination()

In [4]:
lines = spark.readStream.format("socket")\
    .option("host", "localhost")\
    .option("port", 9999)\
    .load()

In [5]:
from pyspark.sql.functions import explode, split

In [6]:
type(lines)

pyspark.sql.dataframe.DataFrame

In [7]:
words = lines.select(
    explode(split(lines.value, " ")).alias("word")   #각 line의 내용을 space로 split후 word로 명명
)

In [8]:
wordCount = words.groupBy("word").count()

In [None]:
stream = wordCount.writeStream.outputMode("complete").format("console").start()

stream.awaitTermination()

In [21]:
df = spark.createDataFrame(
    [
        (1, 144.5, 5.9, 33, "M"),
        (2, 167.2, 5.4, 45, "M"),
        (3, 124.1, 5.2, 23, "F"),
        (4, 144.5, 5.9, 33, "M"),
        (5, 133.2, 5.7, 54, "F"),
        (3, 124.1, 5.2, 23, "F"),
        (5, 129.2, 5.3, 42, "M")        
    ],
    ["id","weight", "height","age","gender"]
)

In [22]:
df.count(), df.distinct().count()

(7, 6)

In [23]:
df = df.dropDuplicates()
df.orderBy("id").show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  4| 144.5|   5.9| 33|     M|
|  5| 133.2|   5.7| 54|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [5]:
df.count()

6

In [24]:
df = df.dropDuplicates([col for col in df.columns if col != "id"])
df.count()

5

In [11]:
df.dropDuplicates([col for col in df.columns if col == "id"]).distinct().show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  3| 124.1|   5.2| 23|     F|
|  2| 167.2|   5.4| 45|     M|
+---+------+------+---+------+



In [12]:
df.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  5| 133.2|   5.7| 54|     F|
|  1| 144.5|   5.9| 33|     M|
|  2| 167.2|   5.4| 45|     M|
|  3| 124.1|   5.2| 23|     F|
|  5| 129.2|   5.3| 42|     M|
+---+------+------+---+------+



In [13]:
from pyspark.sql import functions as F

In [25]:
df.agg(
    F.count("id").alias("Count"), 
    F.countDistinct('id').alias("DCount")
).show()

+-----+------+
|Count|DCount|
+-----+------+
|    5|     4|
+-----+------+



In [17]:
df.withColumn("nid", F.monotonically_increasing_id()).show()   #nid 라는 새로운 column을 만든 후 새로운 id 부여(col 명이 이미 존재한다면 update)

+---+------+------+---+------+-------------+
| id|weight|height|age|gender|          nid|
+---+------+------+---+------+-------------+
|  5| 133.2|   5.7| 54|     F|  25769803776|
|  1| 144.5|   5.9| 33|     M| 171798691840|
|  2| 167.2|   5.4| 45|     M| 592705486848|
|  3| 124.1|   5.2| 23|     F|1236950581248|
|  5| 129.2|   5.3| 42|     M|1365799600128|
+---+------+------+---+------+-------------+



In [26]:
df = df.withColumn("old", df.id)
df.show() 

+---+------+------+---+------+---+
| id|weight|height|age|gender|old|
+---+------+------+---+------+---+
|  5| 133.2|   5.7| 54|     F|  5|
|  1| 144.5|   5.9| 33|     M|  1|
|  2| 167.2|   5.4| 45|     M|  2|
|  3| 124.1|   5.2| 23|     F|  3|
|  5| 129.2|   5.3| 42|     M|  5|
+---+------+------+---+------+---+



In [28]:
df = df.withColumn("id", F.monotonically_increasing_id())
df.show() 

+-------------+------+------+---+------+---+
|           id|weight|height|age|gender|old|
+-------------+------+------+---+------+---+
|  25769803776| 133.2|   5.7| 54|     F|  5|
| 171798691840| 144.5|   5.9| 33|     M|  1|
| 592705486848| 167.2|   5.4| 45|     M|  2|
|1236950581248| 124.1|   5.2| 23|     F|  3|
|1365799600128| 129.2|   5.3| 42|     M|  5|
+-------------+------+------+---+------+---+



In [29]:
df.orderBy("old").show()

+-------------+------+------+---+------+---+
|           id|weight|height|age|gender|old|
+-------------+------+------+---+------+---+
| 171798691840| 144.5|   5.9| 33|     M|  1|
| 592705486848| 167.2|   5.4| 45|     M|  2|
|1236950581248| 124.1|   5.2| 23|     F|  3|
|  25769803776| 133.2|   5.7| 54|     F|  5|
|1365799600128| 129.2|   5.3| 42|     M|  5|
+-------------+------+------+---+------+---+



handling missing data

In [30]:
df = spark.createDataFrame(
    [
        (1, 143.5, 5.6, 28, "M", 100000),
        (2, 167.2,  5.4,45, "M", None),
        (3, None, 5.2, None, None, None),
        (4, 144.5, 5.9, 33, "M", None),
        (5, 133.2, 5.7, 54, "F", None),
        (6, 124.1, 5.2, None, "F", None),
        (7, 129.2, 5.3, 42, "M", 76000)
    ],
    
    ["id","weight", "height","age","gender", "income"]
)

In [32]:
df.show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  1| 143.5|   5.6|  28|     M|100000|
|  2| 167.2|   5.4|  45|     M|  null|
|  3|  null|   5.2|null|  null|  null|
|  4| 144.5|   5.9|  33|     M|  null|
|  5| 133.2|   5.7|  54|     F|  null|
|  6| 124.1|   5.2|null|     F|  null|
|  7| 129.2|   5.3|  42|     M| 76000|
+---+------+------+----+------+------+



In [33]:
type(df.rdd)

pyspark.rdd.RDD

In [35]:
df.rdd.map(
    lambda row:(row["id"], sum([c==None for c in row]))    #row 별 missing value count
).collect()

[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

In [36]:
df.where("id=3").show()

+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+



In [44]:
df.agg(*[(1 - F.count(c)/F.count("*")).alias(c+"Rate") for c in df.columns]).show()    #missing data가 있는지 missing 비율을 통해 판별 - rate가 높으면 데이터로써 가치 적음

+------+------------------+----------+------------------+------------------+------------------+
|idRate|        weightRate|heightRate|           ageRate|        genderRate|        incomeRate|
+------+------------------+----------+------------------+------------------+------------------+
|   0.0|0.1428571428571429|       0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+------+------------------+----------+------------------+------------------+------------------+



In [45]:
df = df.select([c for c in df.columns if c!= "income"])

In [46]:
df.show()

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [47]:
df.dropna(thresh=3).show() #null값이 한 row에 tresh 이상만큼 있는 row를 날림

+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+



In [51]:
#각 col에 null 값이 많지 않은 경우 전체 row의 평균값으로 null값을 채워줌 -> 후에 통계분석할 때 영향을 주지 않도록
means = df.agg(
    *[F.mean(c).alias(c) for c in df.columns if c != "gender"]
)

In [52]:
means.show()

+---+------------------+-----------------+----+
| id|            weight|           height| age|
+---+------------------+-----------------+----+
|4.0|140.28333333333333|5.471428571428571|40.4|
+---+------------------+-----------------+----+



In [53]:
mpd = means.toPandas()    #dataframe 은 immutable이기 때문에 pandas를 애용해서 데이터 수정 후 새로운 dataframe으로 생성

In [58]:
mpd

Unnamed: 0,id,weight,height,age
0,4.0,140.283333,5.471429,40.4


In [59]:
mpd = mpd.to_dict("records")[0]

In [60]:
mpd

{'id': 4.0,
 'weight': 140.28333333333333,
 'height': 5.471428571428571,
 'age': 40.4}

In [61]:
mpd['gender'] = "X"

In [62]:
mpd

{'id': 4.0,
 'weight': 140.28333333333333,
 'height': 5.471428571428571,
 'age': 40.4,
 'gender': 'X'}

In [63]:
df = df.fillna(mpd)

In [64]:
df.show()

+---+------------------+------+---+------+
| id|            weight|height|age|gender|
+---+------------------+------+---+------+
|  1|             143.5|   5.6| 28|     M|
|  2|             167.2|   5.4| 45|     M|
|  3|140.28333333333333|   5.2| 40|     X|
|  4|             144.5|   5.9| 33|     M|
|  5|             133.2|   5.7| 54|     F|
|  6|             124.1|   5.2| 40|     F|
|  7|             129.2|   5.3| 42|     M|
+---+------------------+------+---+------+



handling outlier

In [65]:
df = spark.createDataFrame(
    [
        (1, 143.5, 5.3, 28),
        (2, 154.2,  5.5, 45),
        (3, 342.3, 5.2, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42)
    ],
    
    ["id","weight", "height","age"]
)

In [66]:
quantiles = df.approxQuantile("weight", [0.25, 0.75], 0.05)  #relativeError = 0 일수록 해당 data에 specific -> 어느 정도 오류를 허용해야함

In [67]:
quantiles

[129.2, 154.2]

In [68]:
IQR = quantiles[1] - quantiles[0]   #중간값

In [69]:
bounds = [quantiles[0]-1.5*IQR , quantiles[1]+1.5*IQR]

In [70]:
bounds

[91.69999999999999, 191.7]

In [73]:
col = ["weight", "height", "age"]
bounds = {}

for c in col:
    quantiles = df.approxQuantile(c, [0.25, 0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds[c] = [quantiles[0]-1.5*IQR , quantiles[1]+1.5*IQR]

In [74]:
bounds

{'weight': [91.69999999999999, 191.7],
 'height': [4.75, 5.949999999999999],
 'age': [-11.0, 93.0]}

In [75]:
df.select(df["weight"] < bounds["weight"][0]).show()
df.select(df["weight"] > bounds["weight"][1]).show()

+----------------------------+
|(weight < 91.69999999999999)|
+----------------------------+
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
|                       false|
+----------------------------+

+----------------+
|(weight > 191.7)|
+----------------+
|           false|
|           false|
|            true|
|           false|
|           false|
|           false|
|           false|
+----------------+



In [80]:
outlier = df.select(
    *["id"] + [( (df[c] < bounds[c][0]) |( df[c] > bounds[c][1])).alias(c+"_o") for c in col]
)
outlier.show()
#true이면 outlier

+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+



In [82]:
joinTable = df.join(outlier, on="id")
joinTable.show()

+---+------+------+---+--------+--------+-----+
| id|weight|height|age|weight_o|height_o|age_o|
+---+------+------+---+--------+--------+-----+
|  7| 129.2|   5.3| 42|   false|   false|false|
|  6| 124.1|   5.1| 21|   false|   false|false|
|  5| 133.2|   5.4| 54|   false|   false|false|
|  1| 143.5|   5.3| 28|   false|   false|false|
|  3| 342.3|   5.2| 99|    true|   false| true|
|  2| 154.2|   5.5| 45|   false|   false|false|
|  4| 144.5|   5.5| 33|   false|   false|false|
+---+------+------+---+--------+--------+-----+



In [88]:
joinTable.filter("!weight_o and !height_o and !age_o").select("id", "weight", "height","age").show()

+---+------+------+---+
| id|weight|height|age|
+---+------+------+---+
|  7| 129.2|   5.3| 42|
|  6| 124.1|   5.1| 21|
|  5| 133.2|   5.4| 54|
|  1| 143.5|   5.3| 28|
|  2| 154.2|   5.5| 45|
|  4| 144.5|   5.5| 33|
+---+------+------+---+

