In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("chapter9") \
.getOrCreate()

24/10/07 15:23:25 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.34 instead (on interface en0)
24/10/07 15:23:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/07 15:23:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 데이터소스 API의 구조

### 읽기 API 구조


```
DataFrameReader.format(...) \
.option("KEY", "VALUE") \
.schema(...) \
.load()
```

```
spark.read.format("csv") \
.option("header", "true") \
.option("mode", "FAILFAST") \
.option("inferschema", "true") \
.option("path", "path/to/file(s)") \
.schema(SOME_SCHEMA) \
.load()
```

### 쓰기 API 구조

```
DataFrameWriter.format(...) \
.partitionBy(...) \
.bucketBy(...)\ 
.sortBy(...) \
.save()
```

```
spark.write.format('csv') \
.option("mode", "OVERWRITE") \
.option("dataFormat", "yyyy-WW-dd") \
.option("path", "path/to/file(s)") \
.save()
```

## CSV 파일

### CSV 파일 읽기

In [28]:
csvFile = spark.read.format('csv') \
.option('header', 'true') \
.option('mode', 'FAILFAST') \
.option('inferschema', 'true') \
.load('./data/flight-data/csv/2010-summary.csv')

### CSV 파일 쓰기

In [29]:
csvFile.write.format('csv').mode('overwrite').option('sep', '\t').save('/tmp/my-tsv-file.csv')

## JSON 파일

### JSON 파일 읽기

In [30]:
spark.read.format("json") \
.option('mode', "FAILFAST") \
.option('inferschema', 'true')\
.load('./data/flight-data/json/2010-summary.json')\
.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### JSON 파일 쓰기

In [31]:
csvFile.write.format('json') \
.mode('overwrite')\
.save('/tmp/my-json-file.json')

## 파케이 파일

### 파케이 파일 읽기

In [32]:
spark.read.format('parquet') \
.load('./data/flight-data/parquet/2010-summary.parquet') \
.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### 파케이 파일 쓰기

In [33]:
csvFile.write.format('parquet').mode('overwrite')\
.save('/tmp/my-parquet-file.parquet')

## ORC 파일

### ORC 파일 읽기

In [34]:
spark.read.format('orc').load('./data/flight-data/orc/2010-summary.orc').show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### ORC 파일 쓰기

In [35]:
csvFile.write.format('orc').mode('overwrite').save('/tmp/my-orc-file.orc')

## 고급 I/O 개념

### 병렬로 데이터 읽기

여러 익스큐터가 같은 파일을 동시에 읽을 수는 없지만 여러 파일을 동시에 읽을 수는 있다. 다수의 파일이 존재하는 폴더를 읽을 때 폴더의 개별 파일은 DataFrame의 파티션이 된다.

따라서 사용 가능한 익스큐터를 이용해 병렬(익스큐터 수를 넘어가는 파일은 처리 중인 파일이 완료될 때까지 대기)로 파일을 읽는다.

### 병렬로 데이터 쓰기

파일이나 데이터 수는 데이터를 쓰는 시점에 DataFrame이 가진 파티션 수에 따라 달라질 수 있다. 기본적으로 데이터 파티션당 하나의 파일이 작성된다.

따라서 옵션에 지정된 파일명은 실제로는 다수의 파일을 가진 디렉터리이다. 그리고 디렉터리 안에 파티션당 하나의 파일로 데이터를 저장한다.

다음은 폴더 안에 5개의 파일을 생성한다.

In [38]:
csvFile.repartition(5).write.format('csv').mode('overwrite').save('/tmp/multiple.csv')

In [39]:
ls /tmp/multiple.csv

_SUCCESS
part-00000-a19e3c0a-0314-4117-b2f0-ea0f0bc2f26d-c000.csv
part-00001-a19e3c0a-0314-4117-b2f0-ea0f0bc2f26d-c000.csv
part-00002-a19e3c0a-0314-4117-b2f0-ea0f0bc2f26d-c000.csv
part-00003-a19e3c0a-0314-4117-b2f0-ea0f0bc2f26d-c000.csv
part-00004-a19e3c0a-0314-4117-b2f0-ea0f0bc2f26d-c000.csv


### 파티셔닝

파티셔닝은 어떤 데이터를 어디에 저장할 것인지 제어할 수 있는 기능이다. 파티셔닝된 디렉터리 또는 테이블에 파일을 쓸 때 디렉터리 별로 컬럼 데이터를 인코딩해 저장한다. 

그러므로 데이터를 읽을 때 전체 데이터셋을 스캔하지 않고 필요한 컬럼의 데이터만 읽을 수 있다.

In [42]:
csvFile.limit(10).write.mode('overwrite').partitionBy("DEST_COUNTRY_NAME") \
.save("/tmp/partitioned-files.parquet")

In [43]:
ls /tmp/partitioned-files.parquet

[1m[36mDEST_COUNTRY_NAME=Costa Rica[m[m/        [1m[36mDEST_COUNTRY_NAME=Senegal[m[m/
[1m[36mDEST_COUNTRY_NAME=Egypt[m[m/             [1m[36mDEST_COUNTRY_NAME=United States[m[m/
[1m[36mDEST_COUNTRY_NAME=Equatorial Guinea[m[m/ _SUCCESS


### 버켓팅

버켓팅은 각 파일에 저장된 데이터를 제어할 수 있는 또 다른 파일 조직화 기법이다. 

이 기법을 사용하면 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여 있기 때문에 데이터를 읽을 때 셔플을 피할 수 있다. 데이터가 이후의 사용 방식에 맞춰 사전에 파티셔닝 되므로 조인이나 집계 시 발생하는 고비용의 셔플을 피할 수 있다.

In [52]:
number_buckets = 10
column_to_bucket = "count"

csvFile.write.format('parquet').mode('overwrite').bucketBy(number_buckets, column_to_bucket).saveAsTable('bucketedFiles')