# PySpark로 Spark 다뤄보기


이전 게시글

- [빅데이터 연대기](https://butter-shower.tistory.com/163)
- [빅데이터 양대산맥, Hadoop Ecosystem과 Spark Ecosystem](https://butter-shower.tistory.com/164)
- [Spark의 데이터 처리 : RDD(Resilient Distributed Dataset)](https://butter-shower.tistory.com/165?category=737935)


이번에는 로컬에서 한번 Spark를 다뤄봅시다. 원래 Spark는 AWS, GCP, Azure 등의 클라우드 환경에서 돌리는 것이 일반적이지만, 스파크에서 제공하는 파이선 API인 PySpark를 활용해서 간단한 실습을 로컬에서 진행할 수 있습니다. 

그럼 PySpark를 설치해줍시다. 

## 1. PySpark 설치하기

PySpark를 이용하기 위해서는 아래와 같은 패키지가 설치되어 있어야 합니다.

- java(>=8.0)
- Spark(>=2.2.0)
- Python (>=3.4.0)

#### 자바 버전 확인

자바 버전을 확인해줍시다.

In [1]:
!java -version

openjdk version "11.0.8" 2020-07-14
OpenJDK Runtime Environment (build 11.0.8+10-post-Ubuntu-0ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.8+10-post-Ubuntu-0ubuntu118.04.1, mixed mode, sharing)


잘 설치되어 있네요.

#### Spark 설치

터미널을 열어 아래 명령어를 통해 Spark를 다운로드 해줍시다.

```bash
$ wget http://mirror.apache-kr.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
$ tar xf spark-3.0.1-bin-hadoop2.7.tgz
```

#### Spark 실행

스파크가 설치된 폴더로 이동하고 bin폴더로도 이동합니다.

그리고 spark 실행 명령어 `./spark-shell`를 입력해줍시다.

```bash
$ cd spark-3.0.1-bin-hadoop2.7
$ cd bin
$ ./spark-shell
```

정상 설치가 되었다면 pyspark를 설치해줍시다.

```bash
$ pip install pyspark
```

설치 후 버전을 확인해보세요. 

In [2]:
import pyspark
pyspark.__version__

'3.0.1'

## SparkContext를 통한 스파크 초기화

### 스파크의 엔트리포인트 SparkContext 객체 선언

분산 환경에서 운영되는 스파크는 driver 프로그램을 구동시킬 때 SparkContext라는 특수 객체를 만들게 됩니다. 스파크는 이 SparkContext 객체를 통해 스파크의 모든 기능에 접근합니다. 이 객체는 스파크 프로그램당 한번만 실행할 수 있고 사용 후에는 종료해야 합니다. 따라서 SparkContext는 다른 말로 스파크의 "엔트리포인트(entry point)"라고도 하고, SparkContext를 생성하는 것을 "스파크를 초기화한다(Initializing Spark)"라고 합니다.

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-17.max-800x600.png)

### PySpark에서는..

PySpark에서 선언하는 SparkContext 객체는 내부의 JVM(Java Virtual Machine)위에 동작하는 Py4J의 SparkContext와 연결됩니다. 이 Py4J의 SparkContext는 Worker 노드들과도 연결되어 있고 이 Worker 녿들 역시 실제 동작은 JVM 위애서 동작합니다. Worker 노드들의 조작 역시 Python을 통해 할 수 있습니다만 실제 동작은 다 JVM위에서 행해집니다. 한마디로, PySpark는 Python으로 코딩을 하지만 실제 동작은 JVM에 의해서 행해지고 있습니다.

**공식 문서 확인**

아래는 PySpark에서 설명하는 SparkContext 객체 사양입니다.

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-21.max-800x600.png)

위 문서를 통해 확인할 수 있는 SparkContext의 주요 내용은 아래와 같습니다.

- 문법 : `pyspark.SparkContext()`
- 스파크 기능의 기본 엔트리 포인트입니다.
- 스파크 클러스터와 연결을 나타내며 RDD를 만들고 브로드캐스트하는데 사용될 수 있습니다.
- JVM당 하나만 활성화해야하며, 새로운 것을 만들기 전에는 활성을 중지해야 합니다.

그럼 시작해봅시다!


## Hands-on!!!

In [3]:
# 모듈 import
from pyspark import SparkConf, SparkContext

sc = SparkContext()
sc

보통 변수명은 `sc`로 선언합니다. 

SparkContext 객체 타입을 확인해봅시다.

In [4]:
type(sc)

pyspark.context.SparkContext

잘 만들어졌습니다.

하지만 실수로 SparkContext를 한개 더 만들면 에러가 납니다.

In [5]:
#에러가 발생합니다!
new_sc = SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-3-4d9b047b29c1>:4 

SparkContext를 종료합니다.

In [6]:
sc.stop()

#### code-2

SparkContext의 Configuration을 세팅할 수 있습니다.

In [7]:
sc = SparkContext(master = 'local', appName='PySpark Basic')
sc

Spark Web-UI 버전을 이용하여 로컬 모드에서 사용할 경우 master url은 http://localhost:8080 으로 설정합니다. 우리는 쥬피터 노트북에서 실행할 것이므로 위와 같이 master=local로 설정하면 됩니다. 상세 내용은 아래 공식홈페이지의 설명을 참고 하세요.

<https://spark.apache.org/docs/latest/spark-standalone.html>

생성한 SparkContext의 Configuration을 확인하기 위해서 `.getConf().getAll()`을 이용합니다.

In [8]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.name', 'PySpark Basic'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '192.168.0.8'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1600673692546'),
 ('spark.driver.port', '37379'),
 ('spark.ui.showConsoleProgress', 'true')]

하나씩도 확인할 수 있습니다.

In [9]:
sc.master

'local'

In [10]:
sc.appName

'PySpark Basic'

In [11]:
sc.stop()

#### code - 3

`SparkConf()`를 이용해 SparkContext의 Configuration을 설정하는 방법을 이용해서 SparkContext를 만들 수 있습니다.

`.setMaster()`, `.setAppName()`을 이용해 어플리케이션의 이름과 Master의 URL을 설정해줄 수 있습니다.

In [12]:
conf = SparkConf().setAppName('PySpark Basic').setMaster('local')
sc = SparkContext(conf=conf)
sc

세가지 모두 많이 사용하는 코딩 스타일입니다! 잘 알아두시길 바랍니다. :)

## RDD Creation

### (1) 내부에서 만들어진 데이터 집합을 병렬화 하는 방법 : `parallelize()` 함수 사용

방금 전 만든 SparkContext()의 parallelize() 함수를 사용하여 내부의 데이터 집합을 RDD로 만들 수 있습니다.

In [13]:
rdd = sc.parallelize([1, 2, 3])
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

그냥 설명만 출력됩니다.

데이터 타입을 한번 확인해봅시다.

In [14]:
type(rdd)

pyspark.rdd.RDD

정상적으로 생성은 된 것 같습니다.

![Img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-15.max-800x600.png)

위 그림을 다시 한번 봅시다.

RDD는 생성과 transformation 연산을 바로 수행하지 않습니다. 이 단계에서는 연산을 하고있지 않고 계보(lineage)만 만들어놓고 actions 동작을 할 때 rdd가 비로소 만들어지며, 이를 *느긋한 계산법* 이라고 했습니다.

즉, 지금은 RDD를 생성했지만 RDD가 만들어지지는 않았습니다.

actions를 해봅시다.

함수는 `take()`라는 함수를 사용하겠습니다. RDD의 원소를 반환합니다. 인자로는 반환하고자하는 RDD의 원소의 갯수를 입력합니다.



In [15]:
rdd.take(3)

[1, 2, 3]

### (2) 외부의 파일을 로드하는 방법 : `.textFile()` 함수 사용

`.textFile()` 함수를 이용해 외부 파일을 로드하며 rdd를 만들 수 있습니다.

우선 간단히 파일 하나 만들어보겠습니다. 우리는 `./data`  파일에 만들어보도록 하겠습니다.

In [16]:
import os

file_path = os.path.dirname(os.path.abspath('__file__')) + r'/data/test.txt'

with open(file_path, 'w') as f:
    for i in range(10):
        f.write(str(i) + '\n')
print("슝=3")

슝=3


파일이 잘 만들어졌나요? 그럼 방금 만든 파일을 불러와 RDD를 생성합니다.

In [17]:
rdd2 = sc.textFile(file_path)
print(rdd2)
print(type(rdd2))

/home/aiffel0039/Documents/GitHub/aiffel/Week8/data/test.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
<class 'pyspark.rdd.RDD'>


actions를 수행합니다.

In [19]:
rdd2.take(3)

['0', '1', '2']

한가지 특이사항이 있습니다.

우리는 숫자를 입력했으므로 위의 결과가 [0, 1, 2]가 될 줄 알았는데 문자열의 list가 얻어졌습니다.

이것은 spark가 `.textFile()`을 통해 얻은 데이터 타입을 무조건 string으로 처리하기 때문에 그렇습니다. 그렇다면 이 데이터를 숫자로 변환하려면 어떻게 해야할까요? 

바로 다음 스텝에 나오는 Transformation 같은 RDD Operation이 필요한 것입니다.


---


## RDD Operation - (1) Transformations

이어서 RDD 동작에 대해 연습해보도록 하겠습니다.

RDD의 연산은 NumPy 연습하듯이 자주 사용되는 함수들을 간단한 예제코드를 통해 학습하는 것이 좋습니다. 우선 기본이 되는 몇가지 동작을 설명하여 RDD 연산 과정을 확인해보고 동영상 자료를 통해 여러가지 예제를 익혀봅시다.

### Transformations

- map()
- filter()
- flatmap()

#### map

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-23.max-800x600.png)

x의 모든 원소에 대해 map 함수를 적용한 결과는 y값입니다. 따라서 x와 y의 원소 갯수는 같습니다.

In [20]:
x = sc.parallelize(['b', 'a', 'c', 'd'])
y = x.map(lambda z : (z, 1))
print(x.collect()) # collect()는 actions입니다.
print(y.collect())

['b', 'a', 'c', 'd']
[('b', 1), ('a', 1), ('c', 1), ('d', 1)]


`(문자, 1)` 형식으로 map 했습니다!! 아래 예제도 한번 확인해봅시다.

In [22]:
nums = sc.parallelize([1, 2, 3])
squares = nums.map(lambda x : x*x)
print(squares.collect())

[1, 4, 9]


#### filter

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-24.max-800x600.png)

filter 연산은 어떤 조건을 만족하는 값만을 반환합니다. 따라서 조건문이 들어가야하며, x와 y의 원소의 갯수는 같지 않을 수 있습니다.


In [23]:
x = sc.parallelize([1, 2, 3, 4, 5])
y = x.filter(lambda x : x%2 == 0)
print(x.collect())
print(y.collect())

[1, 2, 3, 4, 5]
[2, 4]


2의 배수만 출력하는 필터였습니다. 아래 예제도 확인해보세요.

In [25]:
text = sc.parallelize(['a', 'b', 'c', 'd'])
capital = text.map(lambda x : x.upper())
A = capital.filter(lambda x : 'A' in x)
print(text.collect())
print(A.collect())

['a', 'b', 'c', 'd']
['A']


#### flatmap

아래는 flatmap의 연산 과정을 나타낸 그림입니다.

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-25.max-800x600.png)

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-26.max-800x600.png)

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-27.max-800x600.png)

![img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-28.max-800x600.png)

FlatMap은 RDD의 원소에 map 연산을 수행하고 원소의 갯수를 증가시키기도 합니다. 원소의 갯수는 꼭 동일하게 증가시키지 않아도 됩니다.

![Img](https://aiffelstaticprd.blob.core.windows.net/media/images/F-39-29.max-800x600.png)

In [26]:
x = sc.parallelize([1, 2, 3])
y = x.flatMap(lambda x : (x, x*10, 30))
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 10, 30, 2, 20, 30, 3, 30, 30]


아래 예제는 RDD Transformation 몇가지를 1개 라인에 중첩 적용한 경우입니다. 언뜻 복집해 보이겠지만 적용된 Transformation 함수의 효과를 하나씩 따로 적용해서 collect()로 결과를 확인해보면 어렵지 않게 파악 가능할 것입니다.

In [28]:
wordsDataset = sc.parallelize(['Spark is funny', 'It is beautiful', 'And also It is implemented by python'])
result = wordsDataset.flatMap(lambda x : x.split()).filter(lambda x : x != ' ').map(lambda x : x.lower())
result.collect()

['spark',
 'is',
 'funny',
 'it',
 'is',
 'beautiful',
 'and',
 'also',
 'it',
 'is',
 'implemented',
 'by',
 'python']

#### CSV 파일 읽어들이기

위에서 살펴본 RDD transformation 함수들을 토대로 좀더 실전적인 예제를 다루어 봅시다. 우리가 다루는 많은 데이터들은 주로 csv 파일로 되어 있습니다. 유명한 Titanic 데이터셋 파일을 스파크로 읽어들이는 것을 연습해 봅시다. 우선 작업환경에 아래와 같이 csv 파일을 다운받아 둡시다.

```bash
$ wget https://storage.googleapis.com/tf-datasets/titanic/train.csv
$ mv train.csv ./data
```
파일을 불러들여 RDD를 생성하는 방법은 이미 알고 있습니다. sc.textFile()를 아래와 같이 활용해 봅시다.




In [30]:
import os 

csv_path = os.path.dirname(os.path.abspath('__file__')) + r'/data/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_0.take(5)

['survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone',
 '0,male,22.0,1,0,7.25,Third,unknown,Southampton,n',
 '1,female,38.0,1,0,71.2833,First,C,Cherbourg,n',
 '1,female,26.0,0,0,7.925,Third,unknown,Southampton,y',
 '1,female,35.0,1,0,53.1,First,C,Southampton,n']

파일을 그대로 읽어서 상위 5라인만 출력해 보았습니다. 이것을 데이터셋으로 만들려면, 1번째 라인의 컬럼 부분을 분리해 내고, 매 데이터 라인마다 [(column1, 데이터1), (column2, 데이터2)…] 의 list 형태로 바꿔 주고 싶습니다. 좀더 데이터를 가공해 봅시다.


In [31]:
# 비어있는 라인은 제외하고, delimeter인 ,로 line을 분리해 줍니다. 
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
csv_data_1.take(5)

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone'],
 ['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n']]

컬럼 부분만 아래와 같이 분리해 낼 수 있습니다.



In [32]:
columns = csv_data_1.take(1)
columns

[['survived',
  'sex',
  'age',
  'n_siblings_spouses',
  'parch',
  'fare',
  'class',
  'deck',
  'embark_town',
  'alone']]

컬럼을 제외한 나머지 데이터만 분리해 낼 방법이 필요합니다. 데이터의 첫번째 컬럼은 0 또는 1의 숫자로만 이루어져 있습니다. 이 조건을 filter로 활용하면 컬럼 부분을 제외할 수 있을 것 같습니다.


In [33]:
csv_data_2 = csv_data_1.filter(lambda line : line[0].isdecimal()) # 첫번째 칼럼이 숫자인지
csv_data_2.take(5)

[['0',
  'male',
  '22.0',
  '1',
  '0',
  '7.25',
  'Third',
  'unknown',
  'Southampton',
  'n'],
 ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
 ['1',
  'female',
  '26.0',
  '0',
  '0',
  '7.925',
  'Third',
  'unknown',
  'Southampton',
  'y'],
 ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n'],
 ['0',
  'male',
  '28.0',
  '0',
  '0',
  '8.4583',
  'Third',
  'unknown',
  'Queenstown',
  'y']]

거의 다 왔습니다. 이제 칼럼 기준으로 `csv_data_2`를 정리해봅시다.

In [35]:
csv_data_3 = csv_data_2.map(lambda line : [(columns[0][i], linedata) for i, linedata in enumerate(line)])
csv_data_3.take(5)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '35.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '53.1'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '0'),
  ('sex', 'male'),
  ('age', '28.0'),
  ('n_siblings_spouses', '0'),
  ('parch'

이제 원하는 형태로 csv 파일이 가공되었습니다. 이 형태라면 다음 스텝에 나올 다양한 Action 함수를 적용하여 다양하게 분석해볼 수 있습니다.

하지만 csv 파일을 꼭 이렇게 가공해야 할까요? 꼭 그렇지는 않습니다. 마지막으로 csv 파일을 DataFrame으로 읽어들이는 방법을 소개해드리겠습니다.

In [36]:
from pyspark import SparkConf, SparkContext, SQLContext, SparkFiles

url = 'https://storage.googleapis.com/tf-datasets/titanic/train.csv'
sc.addFile(url)
sqlContext = SQLContext(sc)

df = sqlContext.read.csv(SparkFiles.get('train.csv'), header = True, inferSchema = True)
df.show(5, truncate = False)

+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class|deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
|0       |male  |22.0|1                 |0    |7.25   |Third|unknown|Southampton|n    |
|1       |female|38.0|1                 |0    |71.2833|First|C      |Cherbourg  |n    |
|1       |female|26.0|0                 |0    |7.925  |Third|unknown|Southampton|y    |
|1       |female|35.0|1                 |0    |53.1   |First|C      |Southampton|n    |
|0       |male  |28.0|0                 |0    |8.4583 |Third|unknown|Queenstown |y    |
+--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
only showing top 5 rows



지금까지 우리가 사용했던 SparkContext를 한번 더 가공한 SQLContext에서 제공하는 `read.csv()` 함수를 이용하면 스파크의 DataFrame을 얻을 수 있습니다. 이것은 우리에게 아주 익숙한 Pandas의 Dataframe과 똑같지는 않지만 매우 유사합니다. 실제로 SQLContext에는 RDD를 이용해 데이터를 분석하는 것 보다 훨씬 편리하고 강력한 기능들을 많이 제공하고 있습니다.

관련해서 자세한 내용은 아래 링크를 참고하세요!

<https://spark.apache.org/docs/1.6.1/sql-programming-guide.html>

In [37]:
# 위에서 얻은 데이터에서 40세 이상인 사람들의 데이터만 필터링해 봅시다. 
df2 = df[df['age'] > 40]
df2.show(5, truncate=False)

+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|survived|sex   |age |n_siblings_spouses|parch|fare   |class |deck   |embark_town|alone|
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
|0       |male  |66.0|0                 |0    |10.5   |Second|unknown|Southampton|y    |
|0       |male  |42.0|1                 |0    |52.0   |First |unknown|Southampton|n    |
|1       |female|49.0|1                 |0    |76.7292|First |D      |Cherbourg  |n    |
|0       |male  |65.0|0                 |1    |61.9792|First |B      |Cherbourg  |n    |
|0       |male  |45.0|1                 |0    |83.475 |First |C      |Southampton|n    |
+--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
only showing top 5 rows



#### RDD - Transformations 더 많은 함수들

더 많은 Transformations 함수들이 있습니다. 아래 링크에서 확인하세요!

<https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations>


## RDD Operation - (2) Actions

**Actions**

- collect()
- take()
- count()
- reduce()
- saveAsTextFile()

`collect()`, `take()`는 이미 몇번 사용해봤던 익숙한 Action들입니다.

#### collect

RDD 내의 모든 값을 리턴합니다. 만약 정말 빅데이터를 다루고 있다면 함부로 호출하지 않는 것이 좋습니다.

In [38]:
nums = sc.parallelize(list(range(10)))
nums.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

#### take

RDD에서 앞쪽 n개의 데이터 list를 리턴합니다. collect()보다 안전하게 데이터를 확인해볼 수 있습니다.

In [39]:
nums.take(5)

[0, 1, 2, 3, 4]

#### count

RDD에 포함된 데이터 갯수를 리턴합니다.

In [40]:
nums.count()

10

#### reduce

드디어 reduce 함수가 나왔습니다. MapReduce의 그 reduce에 해당합니다.

Map은 Transformations 함수로, Reduce는 Actions 함수로 구현했습니다. Reduce 할 데이터가 RDD에 메모리 상에 존재하므로 이전의 다른 구현체보다 훨씬 빠르게 MapReduce를 실행할 수 있겠습니다.

아래는 RDD의 모든 데이터를 차례차례 더하는 sum()을 구현한 예시입니다.

In [41]:
nums.reduce(lambda x, y : x+y)

45

#### saveAsTextFile

RDD 데이터를 파일로 저장합니다. 아래 코드를 실행하면 `file.txt`라는 파일에 RDD 내용이 저장될 것입니다. 과연 그럴까요?!

In [42]:
file_path = os.path.dirname(os.path.abspath('__file__')) + r'/data/file.txt'
nums.saveAsTextFile(file_path)

In [46]:
!ls -l ./data

total 5256
-rw-rw-r-- 1 aiffel0039 aiffel0039 1152695  9월 17 14:22 cat.jpg
-rw-rw-r-- 1 aiffel0039 aiffel0039 1944447  9월 17 14:22 dog.jpg
drwxrwxr-x 2 aiffel0039 aiffel0039    4096  9월 21 18:05 file.txt
-rw-rw-r-- 1 aiffel0039 aiffel0039  683114  9월 10 14:57 img.JPG
-rw-rw-r-- 1 aiffel0039 aiffel0039  730433  9월 17 14:22 my_pic1.JPG
-rw-rw-r-- 1 aiffel0039 aiffel0039  822069  9월 17 14:22 my_pic2.JPG
-rw-rw-r-- 1 aiffel0039 aiffel0039      20  9월 21 17:15 test.txt
-rw-r--r-- 1 aiffel0039 aiffel0039   30874  2월 21  2019 train.csv


`file.txt` 내용을 잘 보시면 놀랍게도 디렉토리 타입으로 생성되어 있습니다. 이 디렉토리 안에 들어가보면 `part-00000`라는 이름의 텍스트 파일이 생성되어 있어서, 실제 우리가 기록하고 싶었던 내용은 이 파일 안에 있습니다. 

왜 이런 일이 생겼을까요?

우리가 다루고있는 스파크가 바로 분산형 빅데이터 시스템이기 때문입니다. 스파크가 다룰 파일 사이즈는 하드디스크 하나에 다 담지 못할만큼 큰 경우일 수도 있습니다. 비록 작은 RDD지만 우리는 이미 `sc.parallelize()`를 통해 분산형 데이터로 생성했음을 잊지 마세요!

In [47]:
from IPython.core.display import display, HTML
display(HTML("<style> .container{width:90% !important;}</style>"))