In [1]:
%%init_spark
# Configure Spark to use a local master
launcher.master = "local"

## 데이터 소스

In [14]:
val df = spark.read.format("json").load("2015-summary.json")

df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [16]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [18]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

## 5.1 스키마
### 스키마는 Dataframe의 컬럼명과 데이터 타입을 정의

In [6]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata

val myManualSchema = StructType(Array(
  StructField("DEST_COUNTRY_NAME", StringType, true),
  StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  StructField("count", LongType, false,
    Metadata.fromJson("{\"hello\":\"world\"}"))
))

val df = spark.read.format("json").schema(myManualSchema)
  .load("2015-summary.json")

import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
myManualSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,false))
df: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


## 5.2 컬럼과 표현식

In [7]:
import org.apache.spark.sql.functions.{col, column}
col("someColumnName")
column("someColumnName")

import org.apache.spark.sql.functions.{col, column}
res1: org.apache.spark.sql.Column = someColumnName


### 명시적 컬럼 참조

In [8]:
df.col("count")

res2: org.apache.spark.sql.Column = count


### 표현식으로 컬럼 표현

#### 컬럼은 표현식의 일부 기능을 제공한다
#### col() 함수를 호출해 컬럼에 트랜스포메이션 수행 시, 컬럼 참조를 이용해야 한다
#### 컬럼과 컬럼의 트랜스포메이션은 파싱된 표현식과 동일한 논리적 실행계획으로 컴파일된다

In [9]:
(((col("someCol") + 5) * 200) - 6) < col("otherCol")

res3: org.apache.spark.sql.Column = ((((someCol + 5) * 200) - 6) < otherCol)


## DataFrame 컬럼에 접근하기

In [11]:
df.columns

res5: Array[String] = Array(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count)


## 레코드와 Row

In [12]:
df.first()

res6: org.apache.spark.sql.Row = [United States,Romania,15]


## 로우 생성하기

In [16]:
import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)

myRow(0) // type Any
myRow(0).asInstanceOf[String] // String
myRow.getString(0) // String
myRow.getInt(2) // Int

import org.apache.spark.sql.Row
myRow: org.apache.spark.sql.Row = [Hello,null,1,false]
res10: Int = 1


In [18]:
df.createOrReplaceTempView("dfTabe")

## select와 selectExpr

In [19]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [20]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [21]:
import org.apache.spark.sql.functions.{expr, col, column}
df.select(
    df.col("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"),
    'DEST_COUNTRY_NAME,
    $"DEST_COUNTRY_NAME",
    expr("DEST_COUNTRY_NAME"))
  .show(2)

+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|    United States|    United States|    United States|
|    United States|    United States|    United States|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
only showing top 2 rows



import org.apache.spark.sql.functions.{expr, col, column}


In [22]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



#### alias로 컬럼명을 변경 가능하다

In [23]:
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))
  .show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



#### select와 expr를 같이 쓰는 패턴을 자주 사용하는데, selectExpr로 두 기능을 합칠 수 있다

In [24]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

+-------------+-----------------+
|newColumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [33]:
/**
목적지가 출발지와 같은지를 파악하는 withinCountry 컬럼을 추가
*/

df.selectExpr(
    "*", // include all original columns
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
  .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



## 컬럼 추가하기

In [30]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [32]:
/**
출발지와 도착지가 같은지 여부를 불리언 타입으로 표현
*/
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
  .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



## 컬럼명 변경하기

In [34]:
/**
wthColumnRenamed 명령어로 첫번째 인수의 컬럼명을 두번째 인수의 문자열로 변경
*/

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

res27: Array[String] = Array(dest, ORIGIN_COUNTRY_NAME, count)


## 컬럼 제거하기

In [35]:
df.drop("ORIGIN_COUNTRY_NAME").columns

res28: Array[String] = Array(DEST_COUNTRY_NAME, count)


In [40]:
df.withColumn("count2", col("count").cast("string")).schema

res33: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true), StructField(count2,StringType,true))


## 로우 필터링하기

In [41]:
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [42]:
/**
여러 필터를 동시에 적용할 수도 있다
그러나 필터의 순서와 상관 없이 동시에 모든 필터링 작업을 수행하므로 주의
*/

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
  .show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



## 고유한 로우 얻기

In [44]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

res37: Long = 256


In [45]:
df.select("ORIGIN_COUNTRY_NAME").distinct().count()

res38: Long = 125


## 무작위 샘플 만들기

In [46]:
val seed = 5
val withReplacement = false /**복원 비복원 추출 여부*/
val fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

seed: Int = 5
withReplacement: Boolean = false
fraction: Double = 0.5
res39: Long = 138


## 임의 분할하기

In [54]:
/**
임의 분할은 원본 데이터프레임을 임의의 크기로 분할할 시 유용하게 사용된다
머신러닝 알고리즘에서 사용할 학습셋, 검증셋,테스트셋을 만들 때 유용하다
*/
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
print(dataFrames(0).count() > dataFrames(1).count())

false

dataFrames: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field], [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field])


## 로우 합치기와 추가하기

In [55]:
/**
DataFrame은 불변성을 가지기 때문에, 원본 데이터 프레임에 로우를 추가하는 것은 불가능하다

새로운 데이터프레임과 통합하는 것은 가능하며, 동일한 스키마와 컬럼 수를 가져야 한다
*/

import org.apache.spark.sql.Row
val schema = df.schema
val newRows = Seq(
  Row("New Country", "Other Country", 5L),
  Row("New Country 2", "Other Country 3", 1L)
)
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)
  .where("count = 1")
  .where($"ORIGIN_COUNTRY_NAME" =!= "United States")
  .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+



import org.apache.spark.sql.Row
schema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))
newRows: Seq[org.apache.spark.sql.Row] = List([New Country,Other Country,5], [New Country 2,Other Country 3,1])
parallelizedRows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[221] at parallelize at <console>:46
newDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


## 로우 정렬하기

In [59]:
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--

In [60]:
import org.apache.spark.sql.functions.{desc, asc}
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
+-----------------+-------------------+------+
only showing top 2 rows



import org.apache.spark.sql.functions.{desc, asc}


## repartition과 coalesce

In [62]:
df.rdd.getNumPartitions // 1

res55: Int = 1


In [71]:
/**
repartition 메서드 호출시, 무조건 전체 데이터를 셔플한다
향후 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용
*/
df.repartition(3).rdd.getNumPartitions

res64: Int = 3


In [72]:
/**
특정 칼럼으로 자주 필터링을 한다면 자주 필터링되는 컬럼을 기준으로 파티션을 재분배 하는 것이 좋다
*/
df.repartition(col("DEST_COUNTRY_NAME")).rdd.getNumPartitions

res65: Int = 200


In [73]:
/**
선택적으로 파티션 수 지정 가능
*/
df.repartition(5, col("DEST_COUNTRY_NAME")).rdd.getNumPartitions

res66: Int = 5


In [74]:
/**
coalesce는 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우에 사용한다
파티션 수를 줄이려면 셔플이 일어나는 repartition 대신 coalesce를 사용
*/
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2).rdd.getNumPartitions

res67: Int = 2


## 드라이버로 로우 데이터 수집하기

In [76]:
/**
스파크는 드라이버에서 클러스터 상태 정보를 유지한다
로컬 환경에서 데이터를 다룬다면 드라이버르 데이터를 수집해야 한다
*/

val collectDF = df.limit(10)
collectDF.take(5) // take works with an Integer count

collectDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
res69: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344], [Egypt,United States,15], [United States,India,62])


In [77]:
collectDF.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



In [78]:
collectDF.show(5, false)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+
only showing top 5 rows



In [79]:
collectDF.collect()

res72: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344], [Egypt,United States,15], [United States,India,62], [United States,Singapore,1], [United States,Grenada,62], [Costa Rica,United States,588], [Senegal,United States,40], [Moldova,United States,1])


In [80]:
/**
전체 데이터셋에 대한 반복 처리를 위해서는 toLocalIterator 메서드로 반복자의 방식으로 모든 파티션의 데이터를 드라이버로 전달\

데이터셋의 파티션을 차례대로 반복 처리할 수 있다.
*/
collectDF.toLocalIterator()

res73: java.util.Iterator[org.apache.spark.sql.Row] = IteratorWrapper(<iterator>)
