# Chapter 8 - Join 
(p.227 ~ 244)
## Join Type
1. Inner join : Left And Right 
2. Outer join : Left Or Right
3. Left Outer Join : Left All
4. Right Outer Join : Right All
5. Left Semi Join : Left in Right [Remarks](https://stackoverflow.com/questions/21738784/difference-between-inner-join-and-left-semi-join/21738897)
6. Left Anti Join : Left not in Right
7. Natural Join : Matching same columns of Left and Right [Remarks](https://www.vertica.com/docs/9.2.x/HTML/Content/Authoring/AnalyzingData/Queries/Joins/NaturalJoins.htm)
8. Cross Join/Cartesian Join : Left All X Right All

In [1]:
# Data 만들기
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])
]).toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D", "EECS", "UC Berkeley")
]).toDF("id", "degree", "department", "school")

sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")
]).toDF("id", "status")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

## Inner Join
- 테이블에 존재하는 키가 일치하는 지 평가 -> 일치 (True) 로 평가된 로우만 결합

In [8]:
# inner join

joinExpression = person["graduate_program"] == graduateProgram['id'] # 일치할 때
person.join(graduateProgram, joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [9]:
wrongJoinExpression = person['name'] == graduateProgram['school'] # 불일치할 때
person.join(graduateProgram, wrongJoinExpression).show()

+---+----+----------------+------------+---+------+----------+------+
| id|name|graduate_program|spark_status| id|degree|department|school|
+---+----+----------------+------------+---+------+----------+------+
+---+----+----------------+------------+---+------+----------+------+



In [10]:
# join type 지정
joinType = 'inner'
person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Outer Join
- 테이블에 존재하는 키를 평가 -> 일치 (True), 불일치 (False) 모두 포함 -> 이 때, 일치하는 로우 없으면, null 삽입

In [11]:
joinType = 'outer'
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Left Outer Join / Right Outer Join
- 테이블에 존재하는 키를 평가 -> 기준이 되는 테이블의 모든 로우 + 매칭된 반대쪽 로우 -> 이 때, 일치하는 로우 없으면 null 삽입

In [12]:
# left outer
joinType= 'left_outer'
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  1|   Ph.D|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|   Ph.D|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



In [13]:
# right outer
joinType = 'right_outer'
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|   Ph.D|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



## Left Semi Join
- 매칭된 쪽에 값이 있는지 확인하는 용도 = DataFrame의 필터 역할


In [14]:
# left semi
joinType = 'left_semi'
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Ph.D|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [18]:
joinType = 'right_semi' # 없음 ㅠ
# graduateProgram.join(person, joinExpression, joinType).show()

## Left Anti Join 
- 매칭된 쪽에 값이 없는 것을 Filter = Not In

In [19]:
joinType = 'left_anti'
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



## Natural Join
- 일치하는 컬럼을 이름으로 매칭함
- 각 컬럼이 뜻하는 바가 다를 수 있기 때문에 위험함

## Cross Join 
- Left와 Right 모든 로우의 조합
- 연산이 매우 크므로 spark 내부에서 사용이 차단되어 있음 -> spark.sql.crossJoin.enable = True 설정을 해야함

In [21]:
joinType = 'cross'
graduateProgram.join(person, joinExpression, joinType).show(100, False)

+---+-------+---------------------+-----------+---+----------------+----------------+---------------+
|id |degree |department           |school     |id |name            |graduate_program|spark_status   |
+---+-------+---------------------+-----------+---+----------------+----------------+---------------+
|0  |Masters|School of Information|UC Berkeley|0  |Bill Chambers   |0               |[100]          |
|1  |Ph.D   |EECS                 |UC Berkeley|1  |Matei Zaharia   |1               |[500, 250, 100]|
|1  |Ph.D   |EECS                 |UC Berkeley|2  |Michael Armbrust|1               |[250, 100]     |
+---+-------+---------------------+-----------+---+----------------+----------------+---------------+



# Join 사용 시 문제점

## 복합 데이터 타입의 조인
- 불리언을 반환하는 모든 표현식은 조인 표현식이 될 수 있음

In [23]:
from pyspark.sql.functions import expr
person.withColumnRenamed("id", "personId")\
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()

# spark_status가 id에 있는가

+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



## 중복 컬럼명 처리
- 조인에 사용할 DataFrame의 특정 키가 동일한 이름을 가지고 조인 표현식에 명시되는 경우
- 조인 대상이 아닌 두 개의 컬럼이 동일한 이름을 가진 경우



In [24]:
graduateProgram.show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|   Ph.D|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [25]:
gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
gradProgramDupe.show()

+----------------+-------+--------------------+-----------+
|graduate_program| degree|          department|     school|
+----------------+-------+--------------------+-----------+
|               0|Masters|School of Informa...|UC Berkeley|
|               2|Masters|                EECS|UC Berkeley|
|               1|   Ph.D|                EECS|UC Berkeley|
+----------------+-------+--------------------+-----------+



In [26]:
joinExpression = gradProgramDupe['graduate_program'] == person['graduate_program']
person.join(gradProgramDupe, joinExpression).show()

# join key 이름이 같게 매칭

+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|graduate_program| degree|          department|     school|
+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|               0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|               1|   Ph.D|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|               1|   Ph.D|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+



In [27]:
person.join(gradProgramDupe, joinExpression).select("graduate_program").show()

AnalysisException: u"Reference 'graduate_program' is ambiguous, could be: graduate_program, graduate_program.;"

## Solution 1
- 불리언 형태로 조인 표현식을 바꿔준다.

In [29]:
person.join(gradProgramDupe, 'graduate_program').show()
person.join(gradProgramDupe, 'graduate_program').select("graduate_program").show()

+----------------+---+----------------+---------------+-------+--------------------+-----------+
|graduate_program| id|            name|   spark_status| degree|          department|     school|
+----------------+---+----------------+---------------+-------+--------------------+-----------+
|               0|  0|   Bill Chambers|          [100]|Masters|School of Informa...|UC Berkeley|
|               1|  1|   Matei Zaharia|[500, 250, 100]|   Ph.D|                EECS|UC Berkeley|
|               1|  2|Michael Armbrust|     [250, 100]|   Ph.D|                EECS|UC Berkeley|
+----------------+---+----------------+---------------+-------+--------------------+-----------+

+----------------+
|graduate_program|
+----------------+
|               0|
|               1|
|               1|
+----------------+



## Solution 2
- 조인 후 컬럼 제거

In [35]:
from pyspark.sql.functions import col
person.join(gradProgramDupe, joinExpression).drop(person.col("graduate_program")).select("graduate_program").show()

AttributeError: 'DataFrame' object has no attribute 'col'

## Solution 3
- 컬럼명 변경

In [36]:
gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
joinExpr = person['graduate_program'] == gradProgram3['grad_id']
person.join(gradProgram3, joinExpr).show()

+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|grad_id| degree|          department|     school|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|      0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|      1|   Ph.D|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|      1|   Ph.D|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+



## 스파크의 조인 수행 방식
### 네트워크 통신 전략 [URL](https://knight76.tistory.com/entry/spark-%EC%8A%A4%ED%8C%8C%ED%81%AC-%EC%A1%B0%EC%9D%B8-%EC%A0%84%EB%9E%B5-%EC%85%94%ED%94%8C-%EC%A1%B0%EC%9D%B8-%EB%B8%8C%EB%A1%9C%EC%BA%90%EC%8A%A4%ED%8A%B8-%EC%A1%B0%EC%9D%B8-shuffle-join-broadcast-join) [URL2](https://towardsdatascience.com/strategies-of-spark-join-c0e7b4572bcf)
1. Shuffle join : 전체 노드 간 통신을 유발함 -> 같은 키를 가지고 있는 값을 동일 executor에 모아준다. -> 이 과정에서 데이터의 이동이 많아지고 (shuffle) 이는 I/O 가 많이 일어나므로 시간이 오래 걸림 
- ![Shuffle Join](img/shuffle.png)
2. Broadcast join : 작은 노드에 대해서 동일한 테이블을 worker에 각각 배치함, worker에 올릴 수 있는 메모리가 한정되어있음. worker에 배치할 때 마찬가지로 큰 데이터 이동이 일어나지만, 추가적인 통신은 발생하지 않음 (shuffle).  
단점으로는 CPU가 고성능이어야함, 고비용의 수집 연산 발생 -> 드라이버 노드가 out of memory error로 종료될 수 있음
- ![Broadcast Join](img/capture.png)



In [3]:
joinExpr = person['graduate_program'] == graduateProgram['id']
person.join(graduateProgram, joinExpr).explain()

== Physical Plan ==
*(5) SortMergeJoin [graduate_program#10L], [id#24L], Inner
:- *(2) Sort [graduate_program#10L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(graduate_program#10L, 200)
:     +- *(1) Project [_1#0L AS id#8L, _2#1 AS name#9, _3#2L AS graduate_program#10L, _4#3 AS spark_status#11]
:        +- *(1) Filter isnotnull(_3#2L)
:           +- Scan ExistingRDD[_1#0L,_2#1,_3#2L,_4#3]
+- *(4) Sort [id#24L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#24L, 200)
      +- *(3) Project [_1#16L AS id#24L, _2#17 AS degree#25, _3#18 AS department#26, _4#19 AS school#27]
         +- *(3) Filter isnotnull(_1#16L)
            +- Scan ExistingRDD[_1#16L,_2#17,_3#18,_4#19]


In [6]:
from pyspark.sql.functions import broadcast
person.join(broadcast(graduateProgram), joinExpr).explain()

== Physical Plan ==
*(2) BroadcastHashJoin [graduate_program#10L], [id#24L], Inner, BuildRight
:- *(2) Project [_1#0L AS id#8L, _2#1 AS name#9, _3#2L AS graduate_program#10L, _4#3 AS spark_status#11]
:  +- *(2) Filter isnotnull(_3#2L)
:     +- Scan ExistingRDD[_1#0L,_2#1,_3#2L,_4#3]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
   +- *(1) Project [_1#16L AS id#24L, _2#17 AS degree#25, _3#18 AS department#26, _4#19 AS school#27]
      +- *(1) Filter isnotnull(_1#16L)
         +- Scan ExistingRDD[_1#16L,_2#17,_3#18,_4#19]


In [None]:
# SQL 에서 broadcast join
Select /*+ MAPJOIN(graduateProgram) */ * FROM person 
JOIN graduateProgram 
ON person.graduate_program = graduateProgram.id

# Chapter 9 데이터소스
(p.245 ~ 277)

## 핵심 데이터소스
- CSV
- JSON
- Parquet 파케이~
- ORC
- JDBC/ODBC 연결
- 일반 텍스트 파일

외부 데이터소스
- 카산드라
- HBase
- 몽고DB
- AWS Redshift
- XML
- 기타

## 읽기 API 구조
- DataFrameReader.format(...).option("key", "value").schema(...).load()
- format 메서드 : 기본값 parquet
- option을 통해서 데이터 읽는 방법 설정 가능
- 포맷, 스키마, 읽기모드, 옵션을 설정해서 DataFrameReader로 읽음

In [None]:
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inforSchema", "true")
.option("path", "paht/to/file(s)")
.schema(someSchema)
.load()

## 읽기 모드
- permissive : 오류 레코드의 필드는 null로 설정, 오류 레코드 -> _corrupt_recored 라는 문자열 기록
- dropMalformed : 형식에 맞지 않는 레코드는 drop
- failFast :  형식에 맞지 않는 레코드 발생 시 종료
- 읽기 모드 기본값은 permissive

## 쓰기 API 구조
- DataFrameWriter.format(...).option(...)partitionBy(...).buscketBy(...)sortBy(...).save()
- format 메서드 : 기본값 parquet


In [None]:
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", 'yyyy-MM-dd')
.option("path", "path/to/file(s)")
.save()

## 저장 모드
- append : 해당 경로에 존재하는 파일 목록에 결과 파일을 추가
- overwrite : 이미 존재하는 데이터를 덮어씀
- errorIfExists : 해당 경로에 데이터나 파일이 존재하는 경우, 오류 발생
- ignore : 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리 X
- 쓰기 모드의 기본 값은 errorIfExists

# CSV
- CSV reader 옵션 p.250 쪽 참조
- 스파크는 지연 연산의 특성이 있으므로 Dataframe 정의 시점이 아닌 잡 실행 시점에 오류 발생
## CSV 파일 읽고 쓰기

In [15]:
# csv file 읽기 - OASIS HDFS
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/user/fp10226/2015-summary.csv")
#pyspark_study/data/flight-data/csv/2010-summary.csv -> jutopia 개인 경로 X

In [18]:
# csv file 쓰기
csvFile.write.format('csv').mode('overwrite').option("sep", "\t")\
.save("./tsv_test.tsv")

# OASIS HDFS 에 쓰기 되어짐 -> folder 형태로 저장됨

## JSON
- javascript object notation
- spark에서는 줄(line)로 구분된 방식을 기본적으로 사용
- multiline 옵션 : 여러 줄로 구성된 방식
- JSON은 객체이기 때문에 CSV보다 옵션 수가 적음
- 옵션 : p.255 참고

## JSON 파일 읽고 쓰기

In [20]:
# 읽기
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/user/fp10226/flight-data/json/2010-summary.json").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [21]:
# 쓰기
csvFile.write.format("json").mode('overwrite').save('my-json-file.json')

## 파케이 파일
- 장점 : 
1. 분석 워크로드에 최적화
2. 저장소 공간을 절약
3. 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있음
4. 컬럼 기반의 압축 기능 제공
5. 아파치 스파크와 호환성이 높음 -> 스파크의 기본 파일 포맷
6. 읽기 연산이 csv, json보다 빠름 
7. 복합 데이터 타입을 지원
- spark.read.format('parquet')

## 파케이 파일 읽고 쓰기
- 파케이는 데이터를 저장할 때 자체 스키마를 사용해 데이터를 저장하기 때문에 옵션이 거의 없음 (p.259 참조)
- 파일을 읽는 시점에 스키마를 알 수 있음 -> schema on read

In [22]:
# 파케이 읽기
spark.read.format('parquet')\
.load('flight-data/parquet/2010-summary.parquet').show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [23]:
# 파케이 쓰기
csvFile.write.format('parquet').mode('overwrite')\
.save('my-parquet-file.parquet')

## ORC
- 하둡 워크로드를 위해 설계된 파일, 데이터 타입을 인식할 수 있음
- 대규모 스트리밍 읽기에 최적화되어 있음
- 필요한 로우를 신속하게 찾을 수 있음
- 파케이는 스파크에 최적화된 반면 ORC는 하이브에 최적화되어 있음

## ORC 파일 읽고 쓰기

In [24]:
# ORC 읽기
spark.read.format("orc").load("flight-data/orc/2010-summary.orc").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [25]:
# ORC 쓰기
csvFile.write.format("orc").mode("overwrite").save('my-orc-file.orc')


## SQL 데이터 베이스
- MySQL, PostgreSQL, Oracle
- 데이터베이스 인증 정보, 접속 관련 옵션이 필요, 네트워크 상태 확인
- spark.sql로 대체

In [None]:
# 원래 코드
driver = "org.sqlite.JDBC"
path = "/data/flight-data/jdbc/my-sqlite.db"
url = "jdbc:sqlite:" + path
tablename = "flight_info"

dbDataFrame = spark.read.format("jdbc").option("url", url)\
  .option("dbtable", tablename).option("driver",  driver).load()

pgDF = spark.read.format("jdbc")\
  .option("driver", "org.postgresql.Driver")\
  .option("url", "jdbc:postgresql://database_server")\
  .option("dbtable", "schema.tablename")\
  .option("user", "username").option("password", "my-secret-password").load()

In [30]:
df = spark.sql("show tables in linecreditscore_dev")
df.select("database").distinct().explain

# 원래 코드
# pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info)
#   AS flight_info"""
# dbDataFrame = spark.read.format("jdbc")\
#   .option("url", url).option("dbtable", pushdownQuery).option("driver",  driver)\
#   .load()


<bound method DataFrame.explain of DataFrame[database: string]>

In [31]:
df.filter("database in ('linecreditscore_dev')").explain()

# 원래 코드 
# dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain()


== Physical Plan ==
LocalTableScan [database#176, tableName#177, isTemporary#178]


## 데이터베이스 병렬로 읽기
- 파티셔닝, numPartitions 옵션을 사용해 읽기 및 쓰기용 동시 작업 수를 제한할 수 있음
- SQL 데이터베이스에는 파티션이 나눠져있지 않기 때문에 spark 쪽으로 불러올 때 이걸 잘게 쪼개주기 위해 선언

In [None]:
#원 코드
dbDataFrame = spark.read.format("jdbc")\
  .option("url", url).option("dbtable", tablename).option("driver",  driver)\
  .option("numPartitions", 10).load()

In [None]:
props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
  "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).show()
spark.read.jdbc(url,tablename,predicates=predicates,properties=props)\
  .rdd.getNumPartitions

props = {"driver":"org.sqlite.JDBC"}
predicates = [
  "DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
  "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'"]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).count()

## 슬라이딩 윈도우 기반의 파티셔닝
- 조건절을 기반으로 파티셔닝 하는 방법 (예제 : count 컬럼을 기준으로 분할)
- 처음과 마지막 파티션 사이의 최솟값, 최댓값을 사용 -> 전체 파티션 수 설정

In [None]:
colName = "count"
lowerBound = 0L
upperBound = 348113L # this is the max count in our database
numPartitions = 10

spark.read.jdbc(url, tablename, column=colName, properties=props,
                lowerBound=lowerBound, upperBound=upperBound,
                numPartitions=numPartitions).count() # 255

## SQL 데이터베이스 쓰기

In [None]:
newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.jdbc(newPath, tablename, mode="overwrite", properties=props)

spark.read.jdbc(newPath, tablename, properties=props).count() # 255

csvFile.write.jdbc(newPath, tablename, mode="append", properties=props)

spark.read.jdbc(newPath, tablename, properties=props).count() # 765

## 텍스트 파일 
- 일반 텍스트 파일도 읽을 수 있음
- 각 줄이 DataFrame의 레코드

## 텍스트 파일 읽고 쓰기
- textFile

In [32]:
spark.read.textFile("flight-data/csv/2010-summary.csv")\
.selectExpr("split(value, ',') as rows ").show()

AttributeError: 'DataFrameReader' object has no attribute 'textFile'

In [34]:
csvFile.select("DEST_COUNTRY_NAME").write.text("simple-text-file.txt")

In [None]:
csvFile.limit(10).select("DEST_COUNTRY_NAME", "count")\
  .write.partitionBy("count").text("/tmp/five-csv-files2py.csv")

# 파티션에 count를 추가하여 더 많은 컬럼을 저장할 수 있음 -> 폴더별로 컬럼이 저장됨

## 고급 I/O
- 버켓팅, 파티셔닝 조절 -> 데이터의 저장 구조 제어
- HDFS : 분할 파일로 여러 블록을 나누어 분산 저장
- 추천 : 파케이 파일 포맷, GZIP 압축 방식

### 병렬로 데이터 읽기
- 폴더의 개별 파일 -> DataFrame의 파티션
- 여러 파일을 동시에 익스큐터가 읽을 수 있음

### 병렬로 데이터 쓰기
- 데이터 파티션 당 하나의 파일이 작성
- repartition(5) -> 폴더 안에 5개의 파일을 생성
- 폴더명이 조건절로 저장됨 : DEST_COUNTRY_NAME = Senegal

### 버켓팅
- 각 파일에 저장된 데이터를 제어할 수 있는 조직화 기법
- 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여있기 때문에, 데이터를 읽을 때 셔플을 피할 수 있음
- 조인 및 집계 시 발생하는 고비용의 셔플을 피할 수 있음
- 버켓 단위로 데이터를 모아 일정 수의 파일로 저장 

## 파일 크기 관리
- 작은 파일을 많이 생성하면 메타데이터에 관리 부하가 생김 -> HDFS 시스템은 작은 크기의 파일을 다루는 시스템이 아님 -> 작은 크기 파일 문제
- 그런데, 크기가 커도 비효율적
- 스파크 2.2에서는 자동으로 파일 크기를 제어할 수 있음 -> maxRecordsPerFile 파일 당 레코드 수 지정
- df.write.option("maxRecordsPerFile", 5000)