- 아키텍쳐 이상의 DataFrame을 실제로 다루는 기능을 소개
- DF의 기본 기능을 중점적으로 다룸
    - 집계, 윈도우, 조이;ㄴ 등의 내용은 7, 8 장에서 다룸
- DF는 row타입의 레코드와 각 레코드에 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성됨
    - 스키마는 각 컬럼명과 데이터타입을 정의
    - DF의 파티셔닝은 DF나 DS가 클러스터에서 물리적으로 배치되는 형태를 정의
    - 파티셔닝의 분할 기준은 특정 컬럼이나 nondeterministically값을 기반으로 설정
        - 비결정론적 : 매번 변한다는 의미

In [1]:
base_dir = "Spark-The-Definitive-Guide-master/data"

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

df = spark.read.format("json").load("{}/{}".format(base_dir, "flight-data/json/2015-summary.json"))

In [3]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



# 5.1. Schema
- 데이터 프레임의 컬럼명과 데이터 타입을 정의함
- 데이터 소스에서 스키마를 얻거나(위의 사례)
    - 혹은 직접 정의할 수 있음
- *ETL같이 정밀도가 중요한 작업에서는 직접 스키마를 정의해야함*

In [4]:
spark.read.format("json").load("{}/{}".format(base_dir, "flight-data/json/2015-summary.json")).schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

- 지금 현재의 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType객체
    - StructField는 이름, 데이터타입, 컬럼값이 없거나 Null일 수 있는지 지정하는 불리언 값을 가짐
    - 따라서 필요한 경우 컬럼과 관련된 메타 데이터를 직접 지정할 수도 있음
        - *메타데이터는 해당 컬럼과 관련된 정보이며 스파크의 ML라이브러리에서 사용가능함*
- 스키마는 복합 데이터 타입인 StructType을 가질 수 있음
    - 복합 데이터 타입은 6장에서 자세히 설명할 것
- 스파크는 런타임에 데이터 타입이 스키마의 데이터 타입과 일치하지 않으면 오류를 발생시킴

- DF에 스키마를 만들고 적용

In [5]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

In [6]:
myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True), # True : 컬럼이 null일 수 있다
    StructField("ORGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False, metadata = {"hello" : "world"}) #LongType : 정수(BigInt)
])
df = spark.read.format("json").schema(myManualSchema)\
    .load("{}/{}".format(base_dir, "flight-data/json/2015-summary.json"))

- 스파크는 자체 데이터 타입 정보를 사용하므로 스파크를 작성하고 있는 프로그램 언어의 데이터 타입으로 스파크의 데이터 타입을 설정할 수는 없음

In [7]:
df

DataFrame[DEST_COUNTRY_NAME: string, ORGIN_COUNTRY_NAME: string, count: bigint]

# 5.2. Column and Expression
- 스파크의 컬럼은 DF의 컬럼과 유사
- 표현식으로 DF의 컬럼을 선택/조작/제거할 수 있음
- 스파크의 컬럼은 표현식을 사용해 레코드 단위로 계산한 값을 단순하게 나타내는 논리적인 구조임
    - 그러므로 컬럼의 실젯값을 얻으려면 로우가 필요하며, 로우가 존재하려면 데이터프레임이 필요함
    - DF없이는 외부에서 컬럼에 ㅈ법근할 수 없음
        - 그러므로 스파크에서 컬럼을 수정하려면 DF가 우선 구성되어야하며, 이 이후 스파크의 트랜스포메이션을 이용해야함

### 5.2.1. Column
- col이나 column함수를 이용하는 것이 가장 간단
- 컬럼명을 인자로 받음

In [8]:
from pyspark.sql.functions import col, column

col("col1")
column("col2")

Column<b'col2'>

- spark df의 컬럼과 pandas df의 컬럼의 가장 중요한 차이는
    - spark의 경우 컬럼이 df에 있을지 없을지 알 수 없음
    - 컬럼명을 카탈로그에 저장된 정보와 비교하기 전까지 미확인 상태
        - 즉, action에 들어갈 때야 컬럼과 테이블을 분석하므로 그 전까지는 알 수 없음

##### 명시적 컬럼 강조
- DF의 컬럼은 col메서드로 참조함
- col메서드는 조인시 유용함
- col메서드를 사용하여 명시적으로 컬럼을 정의하면 **분석기(action) 단계에서 컬럼 확인 절차를 생략**

In [9]:
# df.column("count")
### 지금은 row가 없어서 에러가 나는 것으로 보임

### 5.2.2. Expression
- 컬럼은 표현식임
- **표현식은 DF 레코드의 여러 값에 대한 T의 집합을 의미함**
    - 여러 컬럼명을 입력으로 받아 식별하고 단일 값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수(apply나 map느낌)
- 단일값은 int, float과 같은 일반 데이터타입일 수도 Map이나 Array같은 복합 데이터 타입일 수 있음(nested list느낌)
- **expr**gkatnfh rkseksgkrp tkdydgkf tn dlTdma
    - 이 함수를 통해 DF의 컬럼을 참조할 수 있음
        - 즉, expr("col1") 은 col("col1")구문과 동일하게 동작함

##### 표현식으로 컬럼 표현
- 컬럼은 표현식의 일부 기능을 제공함
- col()을 통해 컬럼에 T를 수행하려면 반드시 컬럼 참조를 이용해야함
- expr함수의 인자로 표현식을 사용하면, 분석기가 표현식을 분석해 T와 컬럼 참조를 알아낼 수 있으며, 다음 T에 컬럼 참조를 전달할 수 있음
- 예시
    - expr("col - 5") , col("col") - 5, expr("col") - 5는 모두 같은 T과정을 거침
        - 스파크는 연산 순서를 지정하는 논리적 트리로 컴파일하기 때문에 모두 같은 것
    - 위 내용이 뭔지 모르겠다면, 다음 두 가지만 확실하게 기억할 것
        - **1) 컬럼은 단지 표현식일 뿐**
        - **2) 컬럼과 컬럼의 T는 파싱된 표현식과 동일한 논리적 실행 계획으로 컴파일됨**

In [10]:
(((col("col1")+5)*200)* - 6) < col("col2")

Column<b'((((col1 + 5) * 200) * -6) < col2)'>

- 위 과정을 논리적 트리 개념으로 나타내면
    - 1단계 : col1 + 5 
    - 2단계 : 1단계 + 200
    - 3단계 : 2단계 - 6
    - 4단계 : col2와의 비교
- 즉, DAG(지향성 비순환그래프)의 형태로 이루어짐
- 그러므로 이 그래프는 다음 코드로 동일하게 표현할 수 있음

In [11]:
from pyspark.sql.functions import expr

expr("(((col1 + 5) * 200) - 6 < col2)")

Column<b'((((col1 + 5) * 200) - 6) < col2)'>

##### DF 컬럼에 접근하기
- printSchema메서드로 DF의 전체 컬럼 정보를 확인할 수 있음
    - 그러나 프로그래밍 방식으로 접근할 때는 DF의 columns 속성을 사용함

In [12]:
spark.read.format("json").load("{}/{}".format(base_dir, "flight-data/json/2015-summary.json")).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

# 5.3. 레코드와 로우
- 스파크에서 DF의 각 로우는 하나의 레코드임
- 각 레코드를 Row객체로 표현함
- 그래서 스파크는 값을 생성하기 위해 컬럼 표현식으로 Row객체를 다룸
- 해당 객체는 내부에 바이트 배열을 가짐
    - 이 바이트 배열 인터페이스는 오직 컬럼 표현식으로만 다룰 수 있어 사용자에게 노출되지 않음

In [13]:
df.first() #first메서드로 로우 확인

Row(DEST_COUNTRY_NAME='United States', ORGIN_COUNTRY_NAME=None, count=15)

### 5.3.1. 로우 생성
- 각 컬럼에 해당하는 값을 활용하여 직접 생성할 수 있음
- Row객체는 스키마 정보를 갖고있지 않음 -> DF만 유일하게 스키마를 가짐
- 그래서 Row 객체를 직접 생성하려면, DF의 스키마와 같은 순서로 값을 명시해야함

In [14]:
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)

- 접근은 매우 쉬움
- 원하는 위치를 지정하기만 하면 됨

In [15]:
myRow[0]

'Hello'

In [17]:
myRow[2]

1

# 5.4. DataFrame Transformation
- DF를 다루는 주요 작업은 다음과 같이 몇 가지로 나눠볼 수 있음
    - 1) 로우나 컬럼 추가
    - 2) 로우나 컬럼 제거
    - 3) 로우를 컬럼으로 변환하거나 반대로 변환
    - 4) 로우 순서 변경
- 위 작업들은 모두 트랜스포메이션으로 변환할 수 있음
    - 가장 일반적인 트랜스포메이션은 모든 로우의 특정 컬럼을 변환하고 결과를 반환하는 것

### 5.4.1. DF의 생성
- 원시 데이터소스에서 dataframe을 생성할 수도 있음

In [20]:
df = spark.read.format("json").load("{}/flight-data/json/2015-summary.json".format(base_dir))
df.createOrReplaceTempView("dfTable") #SQL쿼리를 실행하고 트랜스포메이션을 확인하기 위해 임시 뷰로 등록

In [23]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
    StructField("col1", StringType(), True),
    StructField("col2", StringType(), True),
    StructField("col3", LongType(), False)
]) #우선 스키마를 만들어줌

myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema) #pandas에서 리스트를 입력으로 넣고, columnsm를 지정해주는 것과 가틈
myDf.show()

+-----+----+----+
| col1|col2|col3|
+-----+----+----+
|Hello|null|   1|
+-----+----+----+



- 마치 판다스처럼 리스트를 입력값으로 넣고, columns를 지정해주듯, 만들어놓은 스키마를 지정해주면 됨
    - 컬럼스를 지정할 땐 자료형을 지정하진 않지만, 스파크에서는 sql스키마를 지정해주듯, 스키마를 지정해주면 됨
- DF를 가장 유용하게 사용할 수 있는 메서드는 다음과 같음
    - 컬럼이나 표현식을 사용하는 select
    - 문자열 표현식을 사용하는 selectExpr
    - 메서드로 사용할 수는 없지만, pyspark.sql.functions패키지에 포함되어있는 다양한 함수
- 이 세 가지 유형의 메서드로 DF를 다룰 때 필요한 대부분의 T 작업을 해결할 수 있음

### 5.4.2. select&selectExpr
- SQL을 실행하듯 DF에서 SQL을 사용할 수 있도록 해주는 메서드
- 문자열 컬럼명을 인수로 받는 select를 이용하는 방법이 가장 쉬움

In [24]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [26]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [27]:
from pyspark.sql.functions import expr, col, column

In [28]:
df.select(expr("DEST_COUNTRY_NAME")).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [29]:
df.select(col("DEST_COUNTRY_NAME")).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [30]:
df.select(column("DEST_COUNTRY_NAME")).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [32]:
df.select(expr("DEST_COUNTRY_NAME"),
         col("DEST_COUNTRY_NAME"),
         column("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



- expr은 가장 유연한 참조 방법으로, 단순 컬럼 참조나 문자열을 이용해 컬럼을 참조할 수 있음
    - 컬럼에 alias를 선언하는 등의 방식을 사용할 수 있다는 뜻으로 보임
    - 위에서는 컬럼 하나하나를 셀렉트 해오는 형식으로만 쓸 수 있었음 (마치, select col1, col2 From tbl)
    - 하지만 expr을 쓸 경우 단순히 column을 셀렉해오는 것 뿐만 아니라 추가적인 sql 표현을 할 수 있다는 것으로 받아들이면 될 듯

In [35]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [36]:
df.select(expr("DEST_COUNTRY_NAME AS destination").alias("YOUN")).show(2) #물론 alias메서드로 별칭 지정도 가ㅏ능함

+-------------+
|         YOUN|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



- selcectExpr 메서드는 스파크의 진짜 능력을 보여주기 좋은 도구임
    - 새로운 DF를 생성하는 복잡한 표현식을 간단하게 만들 수 있음
    - 모든 유효한 non-aggregating SQL구문을 지정할 수 있음(컬럼만 식별할 수 있다면)
        - aggregating함수도 특정 컬럼에 대해 단순하게 수행하는 것이라면 가능함
            - 아마, groupby같은게 안되는것 아닐가 싶음
- 아래 코드는 출발지와 도착지가 같은지 나타내는 새로운 컬럼 withinCountry컬럼을 추가하는 예제

In [37]:
df.selectExpr(
    "*",
    "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
    .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [38]:
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



### 5.4.3. Transformation to spark data type
- 지금까지는 기존의 컬럼이나, 컬럼으로부터 도출된 새로운 컬럼을 전달했음
- 하지만, 명시적인 값을 스파크에 전달해야할 수도 있음
    - 이 때 사용하는 것이 **literal**
    - sql처럼 그냥 SELECT 1 AS "number"로 하는 것이 아니라, lit(1) AS "number"이런 식으로 해야함
- 리터럴은 프로그래밍 언어의 리터럴 값을 스파크가 이해할 수 있는 값으로 변환함
    - 리터럴은 표현식이며, 사용 방식은 기존 예제와 같은 방식으로 사용함
        - select로 쓸 때만 lit을 사용해야하며, Expr과 같이 상세 sql을 적어주는 식으로 할 때는 1 AS num이 사용가능함

In [39]:
from pyspark.sql.functions import lit

df.select(expr("*"), lit(1).alias("num")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|num|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [45]:
df.selectExpr("*", "1 AS num").show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|num|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



- 보통 어떤 상수나 프로그래밍으로 생성도니 변수값을 특정 컬럼과 크기 비교를 하는 등의 케이스에서 lit을 주로 사용함
    - 마치 내가 groupby용으로 1 AS count를 쓰듯

### 5.4.4. 컬럼 추가하기
- 신규 컬럼을 추가하는 공식적인 방식은 withColumn 메서드를 이용하는 것
    - 두 개의 인자 선언
        - 1) 컬럼 명
        - 2) 값을 생성할 표현식

In [46]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [47]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
    .show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



- 단순히 컬럼명을 바꾸는 것도 가능함

In [48]:
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'Destination']

### 5.4.5. 컬럼명 변경하기
- 위에서처럼 withColumn("new_col_name, col)로도 바꿀 수 있지만, withColumnRenamed 메서드로도 컬럼명을 변경할 수 있음
    - 첫 번째 인수로 전달된 old_col을 두 번째 인수의 문자열로 변경함

In [49]:
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

### 5.4.6. 예약문자와 키워드
- 공백이나 하이픈은 컬렴명에 이용할 수 없음
    - 예약문자를 컬럼명에서 이용하려면 백틱을 이용하여 이스케이핑 해야함
- 쓸 수 있지만, 굳이 쓸 필요가...

### 5.4.7. 대소문자 구분
- 스파크는 기본적으로 대소문자를 가리지 않음
- 바꿀 수는 있지만...굳이 바꿀 필요가...

### 5.4.8. 컬럼 제거하기
- select 메서드로 해당 컬럼만 제외하고 가져오는 방법이 있지만, drop메서드도 사용 가능함

In [53]:
df.drop("ORIGIN_COUNTRY_NAME").columns

['DEST_COUNTRY_NAME', 'count']

In [54]:
df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").columns

['count']

### 5.4.9. 데이터타입 변경하기
- 다수의 StringType컬럼을 정수형으로 변환해야하는 그런 케이스
- cast메서드로 데이터 타입을 변환할 수 있음

In [57]:
df.withColumn("count2", col("count").cast("string"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: string]

In [60]:
df.select(col("count").cast("string"))

DataFrame[count: string]

### 5.4.10. 로우 필터링하기
- 참과 거짓을 판별하는 표현식을 만들어야 로우 필터링을 할 수 있음
- 가장 일반적인 필터링 방법은 문자열 표현식이나 컬럼을 다루는 기능을 이용해서 표현식을 만드는 것
    - DF의 where메서드나 filter메서드로 필터링할 수 있음
    - 두 메서드 모두 같은 연산을 수행하며, 같은 파라메터 타입을 사용함
    - 주로 SQL에 익숙한 사람들이 많기 때문에 굳이 filter를 쓰기보다는 where문을 활용함

In [62]:
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



- 동시에 여러 필터를 적용해야할 때가 있음
    - 그런데, 스파크는 자동으로 필터의 순서와 상관없이 모든 필터링 작업을 수행함
    - 따라서 그렇게 효율적이지 않으며 추천하지 않음

### 5.4.11. 고유 로우 얻기
- SQL의 Distinct문과 같음
- 역시 spark에서도 distinct메서드를 이용하여 고윳값을 찾을 수 있음

In [64]:
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

256

### 5.4.12. 무작위 샘플 만들기
- sample메서드를 이용
- 표본 데이터 추출 비율을 지정하거나, 복원추출, 비복원 추출을 지정할 수 있음

In [65]:
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

138

### 5.4.13. 임의 분할하기
- 임의 분할은 DF를 말그대로 임의로 분할하겠다는 의미
    - 주로 tr_set, val_set, test_set을 만들 때 주로 사용함

In [66]:
seed = 42
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count()

False

### 5.4.14. 로우 합치기와 추가하기
- DF는 불변성을 가지므로, 레코드를 추가하는 작업은 **불가**
- 그러므로 sql의 union 개념을 가져와야함
- sql에서의 union과 같이 통합하려는 두 데이터프레임이 반드시 동일한 스키마와 컬럼 수를 가져야함

In [70]:
from pyspark.sql import Row

schema = df.schema #기존 df의 스키마를 상속
newRows = [
    Row("country1", "new country1", 5),
    Row("country2", "new country2", 1)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema) #스키마를 만들고, 그 안에 위에서 만든 로우를 넣어줌

In [74]:
df.union(newDF)\
    .where("count = 1")\
    .where(col("ORIGIN_COUNTRY_NAME") != "United States")\
    .show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|         country2|       new country2|    1|
+-----------------+-------------------+-----+



- 로우가 추가된 DF를 참조하려면 새롭게 만들어진 DF 객체를 이용해야함
    - 즉, inplace가 안된다는 뜻
- DF를 뷰로 만들거나 테이블로 등록하면 DF변경 작업과 관계없이 동적으로 참조할 수 있음

### 5.4.15. 로우 정렬하기
- sort나 ordrBy메서드로 정렬 가능 : 둘 다 완전히 같은 방식으로 동작함
    - 둘 다 모두 컬럼표현식과 문자열을 사용할 수 있으며, 다수의 컬럼을 지정할 수 있음
    - 기본은 역시 오름차
- 난 sql에서와 동일하게 orderBy로 사용하자

In [76]:
df.orderBy("count").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [79]:
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(5)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



In [81]:
df.orderBy(expr("count desc").desc()).show(5)
#이제야 expr이 이해가 되네...
#select는 빠져있지만, col은 계속해서 컬럼 하나하나를 지정해오는 형식
#expr은 셀렉트의 케이스가 아니면 df메서드 내에서 활용되며, SQL문을 작성해서 가져오는 형식임

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
|           Mexico|      United States|  7140|
+-----------------+-------------------+------+
only showing top 5 rows



- 트랜스포메이션을 처리하기전에 성능을 최적화하기 위해 파시션별로 정렬을 수행하기도 함
- 이건 sortWithinPartitions 메서드로 가능함

In [84]:
spark.read.format("json").load("{}/flight-data/json/*-summary.json".format(base_dir))\
    .sortWithinPartitions("count")

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### 5.4.16. 로우 수 제한하기
- SQL과 마찬가지로 역시 limit메서드를 이용함

In [85]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



### 5.4.17.repartition and coalesce
- 최적화 기법 중 하나로, 자주 필터링하는 컬럼을 기준으로 데이터를 분할
    - 이를 통해 파티셔닝 스키마와 파티션 수를 포함하여 클러스터 전반의 물리적 데이터 구성을 제어할 수 있음
- repartition 메서드를 호출할 경우 무조건 전체 데이터를 셔플하게 됨
    - 따라서 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 컬럼을 기준으로 파티션을 만드는 경우에만 사용해야함

In [86]:
df.rdd.getNumPartitions()

1

In [87]:
df.repartition(5)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

- 특정 컬럼을 기준으로 자주 필터링 한다면, 자주 필터ㅓ링 되는 컬럼을 기준으로 파티션을 아래와 같이 재분배

In [88]:
df.repartition(col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

- 선택적으로 파티션 수를 지정할 수도 있음

In [89]:
df.repartition(5, col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

- coalesce메서드는 전체 데이터를 셔플하지 않고 파티션을 병합하려는 경우에 사용함
    - 파티션 수를 줄이려면 따라서 coalesce를 사용해야함
- 아래 코드는 목적지를 기준으로 셔플을 수행하여 5개 파티션으로 나누고, 전체 데이터를 셔플없이 병합하는 에제

In [91]:
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

### 5.4.18. 드라이버로 로우 데이터 수집하기
- 스파크는 드라이버에서 클러스터 상태 정보를 지속적으로 유지하기 때문에 로컬 환경에서 데이터를 다루려면 드라이버로 데이터를 수집해야함
- 해당 역할을 수행하는 메서드는 아래와 같음
    - collect : 전체 DF의 모든 데이터를 수집
    - take : 상위 N개의 로우 반환
    - show : 여러 로우를 보기 좋게 출력

In [92]:
collectDF = df.limit(10)
collectDF.take(5)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [93]:
collectDF.show() #보기 좋게 테이블 형태로

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+



In [95]:
collectDF.show(5, False)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |15   |
|United States    |Croatia            |1    |
|United States    |Ireland            |344  |
|Egypt            |United States      |15   |
|United States    |India              |62   |
+-----------------+-------------------+-----+
only showing top 5 rows



In [96]:
collectDF.collect()

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]