In [4]:
from pyspark import SparkContext, SparkConf
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.functions import col, column, expr, instr, pow, corr
from pyspark.sql import Row
from pyspark.sql.functions import max, desc, col, window, date_format
from pyspark.sql.functions import lit

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
import time
from pyspark.sql.functions import pow
from pyspark.sql.functions import struct, split, size, array_contains, create_map
from pyspark.sql.functions import initcap, lower, upper, regexp_replace, count
from pyspark.sql.functions import current_date, current_timestamp, to_date, get_json_object, json_tuple
from pyspark.sql.functions import var_pop, stddev_pop, var_samp, stddev_samp, skewness, kurtosis

In [2]:
SparkContext()

In [6]:
pyspark

<module 'pyspark' from '/home/com/anaconda3/envs/nlp_prac/lib/python3.6/site-packages/pyspark/__init__.py'>

In [5]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
"""
csv 옵션:
sep: , ` 각 필드와 값을 구분하는데 사용되는 단일 문자
header: true,false 첫 번째 줄이 컬럼명인지 나타내는 불리언값
inferSchema: true,false 스파크가 파일을 읽을 때 컬럼의 데이터 타입을 추론할지 정의

"""

In [None]:
"""
테스트는 로컬모드에서 실행하면 되기에 간단하게 할 수 있다.
테스트 방식을 쉽게 하려면 스파크 코드에서 의존성 주입 방식으로 SparkSession을 관리하도록 만들어야 한다.
SparkSession을 한번만 초기화하고 런타임 환경에서 사용하면 SparkSession을 쉽게 관리할 수 있다.

ex)
if __name__ == "__main__":
    from pyspark import SparkSession
    spark = SparkSession.builder.master("local").appName("name").config("spark.some.config.option","some-value").getOrCreate()
"""

In [6]:
conf = SparkConf().setMaster("local[2]").setAppName("DefinitiveGuide").set("some.conf","to.some.value")

In [7]:
conf

<pyspark.conf.SparkConf at 0x7fd40e1fb198>

In [8]:
csvFile = spark.read.format("csv").option("header","true").option("mode","FAILFAST").option("inferSchema","true").load('./data/flight-data/csv/2010-summary.csv')


In [9]:
csvFile.write.format("csv").mode("overwrite").option("sep","\t").save("./tmp/my-tsv-file.tsv")

In [10]:
csvFile.repartition(5).write.format("csv").save("./tmp/multiple.csv")

In [18]:
spark.read.json('./data/flight-data/json/2015-summary.json').createOrReplaceTempView("some_sql_view")

spark.sql("""
SELECT DEST_COUNTRY_NAME , sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
""").where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10").count()

12

In [19]:
#로컬모드는 단일이기에 상관없지만, 클러스트, 클라이언트 모드는 스파크 어플리케이션을 제출하는 것으로 시작한다.
#./bin/spark-submit

#어플리케이션이 시작되었다면 sparksession으로 스파크 클러스트를 초기화해야 한다.
#드라이버와 익스큐터를 설정


spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option","some-value").getOrCreate()

In [27]:
df1 = spark.range(2,1000000,2)
df2 = spark.range(2,1000000,4)

In [28]:
step1 = df1.repartition(5)
step12 = df2.repartition(6)

In [29]:
step2 = step1.selectExpr("id * 5 as id") 
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")

step4.collect()

[Row(sum(id)=25000000000)]

In [22]:
step4.explain()

== Physical Plan ==
*(4) HashAggregate(keys=[], functions=[sum(id#152L)])
+- Exchange SinglePartition, true, [id=#217]
   +- *(3) HashAggregate(keys=[], functions=[partial_sum(id#152L)])
      +- *(3) Project [id#152L]
         +- *(3) BroadcastHashJoin [id#152L], [id#146L], Inner, BuildRight
            :- *(3) Project [(id#144L * 5) AS id#152L]
            :  +- Exchange RoundRobinPartitioning(5), false, [id=#206]
            :     +- *(1) Range (2, 1000000, step=2, splits=6)
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#211]
               +- Exchange RoundRobinPartitioning(6), false, [id=#210]
                  +- *(2) Range (2, 1000000, step=4, splits=6)




In [5]:
flightData2015 = spark.read.option("inferSchema","true").\
option("header","true").\
csv("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010-summary.csv")

In [12]:
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=264),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=69)]

In [6]:
flightData2015.explain()

== Physical Plan ==
FileScan csv [DEST_COUNTRY_NAME#16,ORIGIN_COUNTRY_NAME#17,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [14]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(1) Sort [count#18 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#18 ASC NULLS FIRST, 200), true, [id=#35]
   +- FileScan csv [DEST_COUNTRY_NAME#16,ORIGIN_COUNTRY_NAME#17,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [7]:
# spark partition 수 조절

spark.conf.set("spark.sql.shuffle.partitions","5")

flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='Equatorial Guinea', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1)]

In [18]:
spark.conf.set("spark.sql.shuffle.partitions","1")

flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='Equatorial Guinea', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1)]

In [21]:
spark.conf.set("spark.sql.shuffle.partitions","10")

flightData2015.sort("count").take(10)

[Row(DEST_COUNTRY_NAME='Malaysia', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Algeria', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=1),
 Row(DEST_COUNTRY_NAME='Azerbaijan', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Malta', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Liberia', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Saint Vincent and the Grenadines', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Vietnam', count=1),
 Row(DEST_COUNTRY_NAME='Slovakia', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Estonia', count=1)]

In [6]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [7]:
sqlWay = spark.sql("""SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME""")

In [8]:
dataFramWay = flightData2015.\
groupby("DEST_COUNTRY_NAME")\
.count()

In [9]:
sqlWay.explain()
dataFramWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 200), true, [id=#37]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 200), true, [id=#56]
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_count(1)])
      +- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010..., PartitionFilte

In [29]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=348113)]

In [30]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME , sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

In [31]:
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           384932|
|           Canada|             8271|
|           Mexico|             6200|
|   United Kingdom|             1629|
|          Germany|             1392|
+-----------------+-----------------+



In [38]:
flightData2015.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           384932|
|           Canada|             8271|
|           Mexico|             6200|
|   United Kingdom|             1629|
|          Germany|             1392|
+-----------------+-----------------+



In [40]:
flightData2015.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()

== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#204L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#16,destination_total#204L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[sum(cast(count#18 as bigint))])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 10), true, [id=#374]
      +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_sum(cast(count#18 as bigint))])
         +- FileScan csv [DEST_COUNTRY_NAME#16,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [35]:
flightData2015.explain()

== Physical Plan ==
FileScan csv [DEST_COUNTRY_NAME#16,ORIGIN_COUNTRY_NAME#17,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/csv/2010..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [11]:
staticDataFrame = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")

In [12]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [13]:
staticDataFrame\
.selectExpr("CustomerId","(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
.groupBy(col("CustomerId"), window(col("InvoiceDate"),"1 day"))\
.sum("total_cost")\
.show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 09:00...|            -37.6|
|   14126.0|[2011-11-29 09:00...|643.6300000000001|
|   13500.0|[2011-11-16 09:00...|497.9700000000001|
|   17160.0|[2011-11-08 09:00...|516.8499999999999|
|   15608.0|[2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [14]:
staticDataFrame.take(5)

[Row(InvoiceNo='580538', StockCode='23084', Description='RABBIT NIGHT LIGHT', Quantity=48, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.79, CustomerID=14075.0, Country='United Kingdom'),
 Row(InvoiceNo='580538', StockCode='23077', Description='DOUGHNUT LIP GLOSS ', Quantity=20, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.25, CustomerID=14075.0, Country='United Kingdom'),
 Row(InvoiceNo='580538', StockCode='22906', Description='12 MESSAGE CARDS WITH ENVELOPES', Quantity=24, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.65, CustomerID=14075.0, Country='United Kingdom'),
 Row(InvoiceNo='580538', StockCode='21914', Description='BLUE HARMONICA IN BOX ', Quantity=24, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.25, CustomerID=14075.0, Country='United Kingdom'),
 Row(InvoiceNo='580538', StockCode='22467', Description='GUMBALL COAT RACK', Quantity=6, InvoiceDate='2011-12-05 08:38:00', UnitPrice=2.55, CustomerID=14075.0, Country='United Kingdom')]

In [50]:
spark.conf.set("spark.sql.shuffle.partitions","5")
#로컬 모드에선 5개면 충분하다

In [51]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("csv")\
.option("header","true")\
.load("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")

In [53]:
streamingDataFrame.isStreaming

True

In [55]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"customerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
.sum("total_cost")

In [56]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fa3964416d8>

In [60]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [7]:
# 구조적 api : Dataset, Dataframe, SQL 테이블

# 구조적 api는 데이터 흐름을 정의하는 기본 추상화 개념

# *타입형/ 비타입형 API의 개념과 차이점
# 핵심 용어
# 스파크가 구조적 API의 데이터 흐름을 해석하고 클러스터에서 실행하는 방식

from pyspark.sql.types import *

b = ByteType()

In [6]:
b

ByteType

In [10]:
df = spark.read.format('json').load("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")

In [12]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [8]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.functions import col, column
from pyspark.sql import Row


myManulSchema = StructType([
    StructField("DEST_COUNTRY_NAME",StringType(),True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
    StructField("count",LongType(),False,metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManulSchema)\
.load("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")

In [9]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [10]:
from pyspark.sql.functions import col, column
from pyspark.sql import Row

col("someColumnName")
column("someColumnName")

Column<b'someColumnName'>

In [13]:
df = spark.read.format('json').load("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")

In [14]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [15]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [16]:
myRow = Row("Hello",None,1,False)
myRow[0]

'Hello'

In [17]:
df.createOrReplaceTempView("dfTable")

In [18]:
#데이터프레임 만들기

myManulSchema = StructType([
    StructField("SOME",StringType(),True),
    StructField("col",StringType(),True),
    StructField("names",LongType(),False)
])

myRow = Row("hello",None,1)

myDf = spark.createDataFrame([myRow],myManulSchema)

myDf.show()

+-----+----+-----+
| SOME| col|names|
+-----+----+-----+
|hello|null|    1|
+-----+----+-----+



In [40]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [41]:
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [19]:
spark.sql("""
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2
""").show()

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+



In [46]:
df.select(col("DEST_COUNTRY_NAME"),"DEST_COUNTRY_NAME")

DataFrame[DEST_COUNTRY_NAME: string, DEST_COUNTRY_NAME: string]

In [49]:
df.select(expr("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string]

In [50]:
df.select("DEST_COUNTRY_NAME")

DataFrame[DEST_COUNTRY_NAME: string]

In [51]:
# select(expr 구조를 많이 사용한다.)
#이를 selectExpr 로 spark에서 제공한다.

In [24]:

#아래 예제는 출발지와 도착지가 같은지 나타내는 새로운 칼럼을 추가하는 예제
df.selectExpr("*", #모든 원본 칼럼 포함
              "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [25]:
df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [29]:
from pyspark.sql.functions import lit

#lit은 명시적인 값을 쓸때 사용한다. 상숫값일 수도 있고 다양하다.
#어떤 상수나 프로그래밍으로 생성된 변숫값이 특정 걸럼의 값보다 큰지 확인할 때 사용한다.

df.select(expr("*"), lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [30]:
#칼럼 추가 예제  일반적으로 withColumn 을 이용한다. 값 1을 추가하는 예제이다.
df.withColumn("numberOne",lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [32]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [33]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME== DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [35]:
df.column = df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns

In [43]:
df.withColumn("ggg",lit(1)).show(1)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|ggg|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
+-----------------+-------------------+-----+---+
only showing top 1 row



In [39]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [47]:
#공백이나 하이픈은 예약어라서 사용불가능하지만 컬럼명에만 사용하면 사용가능
dfWithLongColName = df.withColumn(
"this Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))

#아래식은 컬럼명뿐만 아니라 표현식에도 사용했기에 백틱을 사용해야 한다.
dfWithLongColName.selectExpr(
"`this long column-name`",
"`this long column-name` as `new col`")\
.show(2)

+---------------------+-------+
|this long column-name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



In [48]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [52]:
df.withColumn("count2",col("count").cast("string")).printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)
 |-- count2: string (nullable = true)



In [53]:
df.where("count< 2 ").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [54]:
df.where(col("count")< 2 ).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [55]:
#중복되지 않는 데이터 없는 식 distinct을 사용해 고윳값을 찾을 수 있다
df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()

256

In [56]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

138

In [57]:
dataFrames = df.randomSplit([0.25,0.75],seed)

dataFrames[0].count() >dataFrames[1].count()

False

In [60]:
#데이터 프레임은 기본적으로 불변성을 가지기에 데이터를 합치려면 새로운 데이터에 원본데이터를 통합해야 한다.

schema= df.schema
newRows = [
    Row("New Country","Other Country", 5L),
    Row("New Country", "Other Country 3",1L )
]

paralledlizedRows = spark.sparkContext.parallelize(newRows)

newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()

SyntaxError: invalid syntax (<ipython-input-60-35c0ab86ab61>, line 5)

In [61]:
df.sort("count").show(5)

df.orderBy("count","DEST_COUNTRY_NAME").show(5)

df.orderBy(col("count"),col("DEST_COUNTRY_NAME")).show(5)



+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [62]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



In [63]:
#트랜스 포메이션을 처리하기전 성능 최적화를 위해 파티션별로 정렬을 수행하기도 한다.

spark.read.format("json").load("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/flight-data/json/2015-summary.json")\
.sortWithinPartitions("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [64]:
#또 다른 최적화 기법은 자주 필터링하는 컬럼을 기준으로 데이터를 분할하는 방식이다.

df.rdd.getNumPartitions()

1

In [65]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [66]:
#특정 칼럼을 기준으로 자주 필터링한다면 그 컬럼을 기준으로 파티션을 재분해하는것이 좋다.

df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [3]:
#드라이버로 로우 데이터 수집하기

collectDF = df.limit(10)
collectDF.take(5)
collectDF.show()
collectDF.show(5,False)
collectDF.collect()

NameError: name 'df' is not defined

In [20]:
df = spark.read.format("json").option("header","true").option("inferSchema","true")\
.load("/home/com/다운로드/Spark-The-Definitive-Guide-master/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")

root
 |-- _corrupt_record: string (nullable = true)



In [7]:
#lit 함수는 다른언어의 데이터타입을 스파크 데이터 타입으로 변환한다.

df.select(lit(5),lit("five"),lit(5.0)).printSchema()

root
 |-- 5: integer (nullable = false)
 |-- five: string (nullable = false)
 |-- 5.0: double (nullable = false)



In [18]:
df

DataFrame[_corrupt_record: string]

In [21]:
df.where(col("InvoiceNo") != 536365).select("InvoiceNo","Description").show(5,false)


df.where("InvoiceNo <> 536365").show(5,false)
df.where("InvoiceNo == 536365").show(5,false)

AnalysisException: cannot resolve '`InvoiceNo`' given input columns: [_corrupt_record];;
'Filter NOT ('InvoiceNo = 536365)
+- Relation[_corrupt_record#54] json


In [25]:
df

DataFrame[_corrupt_record: string]

In [24]:
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive",DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice","isExpensive").show(5)

AnalysisException: cannot resolve '`StockCode`' given input columns: [_corrupt_record];;
'Project [_corrupt_record#54, (('StockCode = DOT) AND (('UnitPrice > 600) OR (instr('Description, POSTAGE) >= 1))) AS isExpensive#57]
+- Relation[_corrupt_record#54] json


In [28]:
#수치형 데이터 타입 다루기

fabricatedQuantity = pow(col("Quantity")*col("UnitPrice"),2) + 5
df.select(expr("CustomerId"),fabricatedQuantity.alias("realQuantity")).show(2)

In [8]:
staticDataFrame = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("./data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

staticSchema = staticDataFrame.schema

In [9]:
staticSchema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

In [10]:
staticDataFrame.selectExpr("CustomerId","(UnitPrice * Quantity) as total_cost","InvoiceDate")\
.groupBy(col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
.sum("total_cost")\
.show(5)
#특정의 아이디로 그룹화 시키고 그 고객이 하루동안 소비한 총 금액

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 09:00...|            -37.6|
|   14126.0|[2011-11-29 09:00...|643.6300000000001|
|   13500.0|[2011-11-16 09:00...|497.9700000000001|
|   17160.0|[2011-11-08 09:00...|516.8499999999999|
|   15608.0|[2011-11-11 09:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [11]:
spark.conf.set("spark.sql.shuffle.parititions","5")

In [12]:
#streaming data읽는 법
#read에서 readStream으로 바뀐게 가장 큰차이
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger",1)\
.format("csv")\
.option("header","true")\
.load("./data/retail-data/by-day/*.csv")

#한번에 읽은 파일 수 설정

In [13]:
streamingDataFrame.isStreaming

True

In [14]:
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr("CustomerId","(UnitPrice * Quantity) as total_cost","InvoiceDate")\
.groupby(col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
.sum("total_cost")

In [15]:
#구조적 스트리밍 데이터 시작 
purchaseByCustomerPerHour.writeStream.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f59263639b0>

In [16]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""").show(5)

+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [21]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [22]:
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week",date_format(col("InvoiceDate"),"EEEE"))\
.coalesce(5)

In [33]:
preppedDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|     Monday|
+---------+---------+-------------------

In [23]:
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDateFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")

In [25]:
trainDataFrame.count()

245903

In [26]:
testDateFrame.count()

296006

In [28]:
#범주형 데이터를 실수형 데이터로 바꾸는 함수
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

In [30]:
#범주형 데이터를 실수형으로 바꾸었을 때
#그 실수형은 엄밀히 말하면 연속적인 실수값이 아니다.
#따라서 그 실수형을 값에 따라 원-핫 인코딩을 사용해 독립차원으로 쪼갠다
encoder = OneHotEncoder().setInputCol("day_of_week_index").setOutputCol("day_of_week_encoded")

In [32]:
vectorAssembler = VectorAssembler().setInputCols(["UnitPrice","Quantity","day_of_week_encoded"]).setOutputCol("features")

In [34]:
#머신러닝 파이프라인 설정
transformationPipeline = Pipeline().setStages([indexer,encoder,vectorAssembler])

In [35]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [36]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [40]:
kmeans = KMeans().setK(20).setSeed(11)

In [41]:
kmModel = kmeans.fit(transformedTraining)

In [48]:
df = spark.read.format("json").load("./data/flight-data/json/2015-summary.json")

In [50]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [53]:
ww=Row("gg",11,"asdfads")

In [56]:
ww[1]

11

In [6]:
df = spark.read.format('csv').option("header","true").option("inferSchema","true").load("./data/retail-data/by-day/2010-12-01.csv")

In [7]:
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [8]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [9]:
#lit 함수를 사용하여야 스파크 데이터 타입으로 변환
df.select(lit(5),lit("five"),lit(5.0))

DataFrame[5: int, five: string, 5.0: double]

In [14]:
df.where(col("InvoiceNo")!= 536365).select("InvoiceNo","Description").show(5,False)

+---------+-----------------------------+
|InvoiceNo|Description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [15]:
priceFilter = col("UnitPrice") > 600
descriptFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descriptFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [18]:
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive",DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive").select("unitPrice","isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [10]:
from pyspark.sql.functions import pow
from pyspark.sql.functions import initcap

In [22]:
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2 ) +5

In [27]:
df.select(col("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [32]:
#select 데이터를 가공하거나 골라낸다.
#where 조건절로 찾아낸다.

#selectExpr 은 표현식을 그대로 적는다.
df.selectExpr("CustomerId", "(POWER((Quantity * UnitPrice),2.0) + 5) as realQuantity").show(2)

+----------+------------------+
|CustomerId|      realQuantity|
+----------+------------------+
|   17850.0|239.08999999999997|
|   17850.0|          418.7156|
+----------+------------------+
only showing top 2 rows



In [36]:
df.select(corr("Quantity","UnitPrice")).show() #상관관계식

+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+



In [38]:
df.select("InVoiceNo","StockCode","Quantity").describe().show()

+-------+-----------------+------------------+------------------+
|summary|        InVoiceNo|         StockCode|          Quantity|
+-------+-----------------+------------------+------------------+
|  count|             3108|              3108|              3108|
|   mean| 536516.684944841|27834.304044117645| 8.627413127413128|
| stddev|72.89447869788873|17407.897548583845|26.371821677029203|
|    min|           536365|             10002|               -24|
|    max|          C536548|              POST|               600|
+-------+-----------------+------------------+------------------+



In [39]:
"""
키바나 좀 돌려보기
elastic search 다른거로 시각화할 수 있는지

내일 1시반에 이성-it (서울특별시 금천구 가산디지털1로 19 대륭테크노마트타운 18차 1810호)
"""

'\n키바나 좀 돌려보기\nelastic search 다른거로 시각화할 수 있는지\n\n내일 1시반에 이성-it (서울특별시 금천구 가산디지털1로 19 대륭테크노마트타운 18차 1810호)\n'

In [41]:
olName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile("UnitPrice",quantileProbs,relError)
#stat 은 통계학적 함수를 제공해준다.

[2.51]

In [11]:
df.select(initcap(col("Description"))).show()

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
|Cream Cupid Heart...|
|Knitted Union Fla...|
|Red Woolly Hottie...|
|Set 7 Babushka Ne...|
|Glass Star Froste...|
|Hand Warmer Union...|
|Hand Warmer Red P...|
|Assorted Colour B...|
|Poppy's Playhouse...|
|Poppy's Playhouse...|
|Feltcraft Princes...|
|Ivory Knitted Mug...|
|Box Of 6 Assorted...|
|Box Of Vintage Ji...|
|Box Of Vintage Al...|
|Home Building Blo...|
|Love Building Blo...|
|Recipe Box With M...|
+--------------------+
only showing top 20 rows



In [15]:
df.select(col("Description"),lower("Description"),upper("Description")).show(2)

+--------------------+--------------------+--------------------+
|         Description|  lower(Description)|  upper(Description)|
+--------------------+--------------------+--------------------+
|WHITE HANGING HEA...|white hanging hea...|WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern| WHITE METAL LANTERN|
+--------------------+--------------------+--------------------+
only showing top 2 rows



In [17]:
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(regexp_replace(col("Description"),regex_string,"COLOR").alias("color_clean"),col("Description")).show(2)

+--------------------+--------------------+
|         color_clean|         Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR METAL LANTERN| WHITE METAL LANTERN|
+--------------------+--------------------+
only showing top 2 rows



In [19]:
dateDF = spark.range(10).withColumn("today",current_date()).withColumn("now",current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- now: timestamp (nullable = false)



In [23]:
spark.range(5).withColumn("date",lit("2017-01-01")).select(to_date(col("date"))).show(1)

+---------------+
|to_date(`date`)|
+---------------+
|     2017-01-01|
+---------------+
only showing top 1 row



In [25]:
#스파크는 빈 문자열이나 대체값 대신 null을 사용해야 가장 효과적이다.
#null을 가진 row 제거
df.na.drop()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

In [26]:
df.na.drop("all")

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

In [27]:
df.na.fill("All Null values become this string")

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

In [29]:
fill_cols_vals = {"StockCode":5, "Description": "No Value"}
#null을 효과적으로 바꾸는 식
df.na.fill(fill_cols_vals)

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

In [31]:
df.na.replace([""],["UNKNOWN"],"Description")

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string]

In [47]:
from pyspark.sql.functions import struct, split, size, array_contains, create_map

In [34]:
complexDF = df.select(struct("Description","InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")

In [36]:
complexDF.select("complex.Description","complex.InvoiceNo").show(1)

+--------------------+---------+
|         Description|InvoiceNo|
+--------------------+---------+
|WHITE HANGING HEA...|   536365|
+--------------------+---------+
only showing top 1 row



In [38]:
df.select(split(col("Description")," ")).show(2)

+-------------------------+
|split(Description,  , -1)|
+-------------------------+
|     [WHITE, HANGING, ...|
|     [WHITE, METAL, LA...|
+-------------------------+
only showing top 2 rows



In [40]:
df.select(size(split(col("Description")," "))).show(2)

+-------------------------------+
|size(split(Description,  , -1))|
+-------------------------------+
|                              5|
|                              3|
+-------------------------------+
only showing top 2 rows



In [45]:
#배열안에 특정값이 있는지 확인
df.select(array_contains(split(col("Description")," "),"WHITE")).show(2)

+------------------------------------------------+
|array_contains(split(Description,  , -1), WHITE)|
+------------------------------------------------+
|                                            true|
|                                            true|
+------------------------------------------------+
only showing top 2 rows



In [49]:
jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey":{"myJSONValue":[1,2,3]}}' as jsonString
""")

In [54]:
jsonDF.select(get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]").alias("column"),
              json_tuple(col("jsonString"),"myJSONKey")).show(2)

+------+--------------------+
|column|                  c0|
+------+--------------------+
|     2|{"myJSONValue":[1...|
+------+--------------------+



In [13]:
from pyspark.sql.functions import to_json, from_json, count

In [56]:
df.selectExpr("(InvoiceNo, Description) as myStruct").select(to_json(col("myStruct")))

DataFrame[to_json(myStruct): string]

In [5]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("./data/retail-data/all/*.csv").coalesce(5)

In [6]:
df.cache()
df.createOrReplaceTempView("dfTable")

In [7]:
df.count() == 541909

True

In [8]:
df.select(pyspark.sql.functions.count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [9]:
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [17]:
#고유 레코드의 수
df.select(pyspark.sql.functions.countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [19]:
df.select(pyspark.sql.functions.approx_count_distinct("stockCode",0.1)).show()
#위에는 정확한 고유값 이는 근사값을 고유값으로 계산

+--------------------------------+
|approx_count_distinct(stockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [20]:
df.select(pyspark.sql.functions.first("StockCode"),pyspark.sql.functions.last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [22]:
df.select(pyspark.sql.functions.min("Quantity"),max("Quantity")).show(5)

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [24]:
df.select(pyspark.sql.functions.sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [32]:
df.select(pyspark.sql.functions.count("Quantity").alias("total_transactions"), pyspark.sql.functions.sum("Quantity").alias("total_purchases")).selectExpr("total_purchases/total_transactions").show()

+--------------------------------------+
|(total_purchases / total_transactions)|
+--------------------------------------+
|                      9.55224954743324|
+--------------------------------------+



In [10]:
from pyspark.sql.functions import var_pop, stddev_pop, var_samp, stddev_samp, skewness, kurtosis

In [36]:
#분산과 표준편차
df.select(var_pop("Quantity"),var_samp("Quantity"),stddev_pop("Quantity"),stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660916| 47559.39140929885|  218.08095663447818|   218.08115785023438|
+-----------------+------------------+--------------------+---------------------+



In [38]:
#비대칭도와 첨도 계산 함수
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.26407557610525234|119768.05495534946|
+--------------------+------------------+



In [11]:
from pyspark.sql.functions import collect_set, collect_list

In [40]:
df.agg(collect_set("Country"),collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



In [41]:
df.groupBy("InvoiceNo","CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
|   544578|     12365|    1|
|   536596|      null|    6|
|   537252|      null|    1|
+---------+----------+-----+
only showing top 20 rows



In [12]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

In [14]:
windowSpec = Window\
.partitionBy("CustomerId","date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [15]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [16]:
from pyspark.sql.functions import dense_rank,rank

In [17]:
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [18]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(col("CustomerId"),col("date"),col("Quantity"),purchaseRank.alias("quantity"),purchaseDenseRank.alias("quantityDenseRank")
        ,maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

NameError: name 'dfWithDate' is not defined

In [5]:
### 스파크 조인 , 여러 개의 데이터셋을 연결하고 서로 다른 데이터를 조합하기 위한 연산식

person = spark.createDataFrame([
    (0,"Bill Chambers",0,[100]),
    (1,"Matei Zaharia",1,[500,250,100]),
    (2,"Michael Armbrust",1,[250,100])
]).toDF("id","name","graduate_program","spark_status")

graduateProgram = spark.createDataFrame([
    (0,"Masters","School of Information","UC Berkeley"),
    (2, "Masters","EECS","UC Berkeley"),
    (1,"Ph.D.","EECS","UC Berkeley")
]).toDF("id","degree","department","school")

sparkStatus = spark.createDataFrame([
    (500,"Vice President"),
    (250,"PMC Member"),
    (100, "Contributor")
]).toDF("id","status")

In [6]:
person.createOrReplaceTempView("person")

graduateProgram.createOrReplaceTempView("graduateProgram")

sparkStatus.createOrReplaceTempView("sparkStatus")

In [23]:
# 내부조인 , 참인 로우만 결합한다.

joinExpression = person["graduate_program"] == graduateProgram["id"]

In [24]:
wrongJoinExpression = person["name"] == graduateProgram["school"]

In [25]:
person.join(graduateProgram,joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [26]:
wrongJoinExpression

Column<b'(name = school)'>

In [29]:
joinExpression

Column<b'(graduate_program = id)'>

In [31]:
joinType = "inner"

person.join(graduateProgram,joinExpression,joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [32]:
#외부조인은 내부조인과 같지만, 없는 것에 null로 채우고 반환한다.
joinType = "outer"

person.join(graduateProgram,joinExpression,joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [33]:
joinType = "left_outer"

graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



In [35]:
joinType = "right_outer"

person.join(graduateProgram,joinExpression,joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [36]:
joinType = "left_semi"

graduateProgram.join(person,joinExpression,joinType).show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [37]:
joinType = "left_anti"

graduateProgram.join(person,joinExpression,joinType).show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



In [7]:
person.withColumnRenamed("id","personId").join(sparkStatus, expr("array_contains(spark_status,id)")).show()

+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



In [8]:
person.join(graduateProgram,joinExpr).show()

NameError: name 'joinExpr' is not defined