# 12장. RDD

## 12.1 저수준 API : RDD
- 저수준 API의 진입지점 : SparkContext (sc)
- 구조적API를 사용하는것이 더욱 효율적 >> 효율적, 안정적, 표현력이 높음

- 부득이한 경우 RDD를 활용
    - 클러스터의 물리적 데이터의 배치를 아주 세밀하게 제어
    - RDD를 사용해 개발된 기존 코드를 유지
    - 공유변수를 다뤄야 할 때

- 두가지 타입의 RDD
    - 제네릭RDD
    - 키-값RDD : 특수연산뿐만 아니라 키를 이용한 사용자지정 파티셔닝 개념을 가짐

## 12.2 RDD 생성
- rdd 메서드로 DataFrame에서 RDD로 변환
- toDF 메서드로 RDD에서 DataFrame으로 변환

In [1]:
spark.range(10).rdd

MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:0

In [2]:
spark.range(10).toDF("id").rdd.map(lambda row : row[0])

PythonRDD[10] at RDD at PythonRDD.scala:52

#### 로컬 컬랙션으로 RDD 생성
- parallelize 메서드로 단일노드의 컬랙션을 병렬 컬랙션으로 변환

In [3]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split()
# myCollection을 2개로 파티션
words = sc.parallelize(myCollection, 2)
words

ParallelCollectionRDD[11] at parallelize at PythonRDD.scala:194

#### 데이터소스로 RDD 생성

In [4]:
# sc.textFile("path")
# sc.wholeTextFiles("path")

## 12.3 트랜스포메이션
- distinct : 중복데이터 제거

In [5]:
print(words.count())
print(words.distinct().count())

10
10


- filter : SQL의 where과 유사, 주어진 조건을 만족하는 RDD만 리턴

In [6]:
def startsWithS(indi):
    return indi.startswith("S")
words.filter(lambda word : startsWithS(word)).collect()

['Spark', 'Simple']

- map : 주어진 입력을 원하는 값으로 반환하는 함수를 레코드별로 적용

In [7]:
words2 = words.map(lambda word : (word, word[0], startsWithS(word)))
words2.collect()

[('Spark', 'S', True),
 ('The', 'T', False),
 ('Definitive', 'D', False),
 ('Guide', 'G', False),
 (':', ':', False),
 ('Big', 'B', False),
 ('Data', 'D', False),
 ('Processing', 'P', False),
 ('Made', 'M', False),
 ('Simple', 'S', True)]

- flatMap : map의 확장버전, flatMap으로 다수의 로우로 변환 가능

In [8]:
words.flatMap(lambda word : list(word)).take(10)

['S', 'p', 'a', 'r', 'k', 'T', 'h', 'e', 'D', 'e']

- sortBy : 함수를 지정해 리턴값을 기준으로 정렬

In [9]:
print(words.sortBy(lambda word : len(word)).take(6))
print(words.sortBy(lambda word : len(word)*(-1)).take(6))   # 내림차순은 -1의 곱으로

[':', 'The', 'Big', 'Data', 'Made', 'Spark']
['Definitive', 'Processing', 'Simple', 'Spark', 'Guide', 'Data']


- randomSplit : RDD를 임의로 분할해 RDD생성시 사용

In [10]:
fiftyFiftySplit = words.randomSplit([0.5, 0.5])
fiftyFiftySplit    # RDD 배열로 리턴

[PythonRDD[35] at RDD at PythonRDD.scala:52,
 PythonRDD[36] at RDD at PythonRDD.scala:52]

## 12.4 액션

#### reduce : 모든 값을 하나로 만들 수 있음

In [11]:
sc.parallelize(range(1, 21)).reduce(lambda x, y : x + y)

210

reduce로 최댓값 찾기

In [12]:
words.reduce(lambda x, y: x if len(x) >= len(y) else y)

'Definitive'

In [13]:
def wordLength(x, y):
    if len(x) >= len(y):
        return x
    else:
        return y
words.reduce(wordLength)

'Definitive'

#### count : RDD 전체 레코드 수를 리턴
#### countApprox : 전체 레코드 수의 근사치를 리턴 : Scala
#### countApproxDistinct : ?
#### countByValue : 전체 레코드 수나 고유아이템 수가 작은 경우에만 사용; 연산결과가 메모리에 적재

In [14]:
words.count()

10

In [15]:
words.countApprox(1)

10

In [16]:
words.countApproxDistinct()

10

In [17]:
words.countByValue()

defaultdict(int,
            {':': 1,
             'Big': 1,
             'Data': 1,
             'Definitive': 1,
             'Guide': 1,
             'Made': 1,
             'Processing': 1,
             'Simple': 1,
             'Spark': 1,
             'The': 1})

#### first : 첫번째 값 반환

In [18]:
words.first()

'Spark'

#### max, min

In [19]:
sc.parallelize(range(1, 11)).min(), sc.parallelize(range(1, 11)).max()

(1, 10)

#### take : rdd에서 가져올 값의 개수만큼 출력
#### takeOrdered : 최하위값 반환
#### top : 최상위값 반환
#### takeSample : withReplacement, 임의표본 수, 난수 시드값을 인자로 받아 표본을 추출

In [20]:
words.take(5)

['Spark', 'The', 'Definitive', 'Guide', ':']

In [21]:
words.takeOrdered(6)

[':', 'Big', 'Data', 'Definitive', 'Guide', 'Made']

In [22]:
words.top(6)

['The', 'Spark', 'Simple', 'Processing', 'Made', 'Guide']

In [23]:
words.takeSample(True, 5, 1234)

[':', 'Processing', 'Big', 'Big', 'Processing']

## 12.5 파일 저장하기 
RDD를 통해 각 파티션의 내용을 저장하려면 전체 파티션을 순회하면서 외부 데이터베이스에 저장해야함
    <br>: 고수준API의 내부처리 과정을 저수준API로 구현하는 접근법

#### saveAsTextFile

In [24]:
words.saveAsTextFile("file:///home/ubuntu/ybigta/Dataset_spark/tmp/bookTitle")

Py4JJavaError: An error occurred while calling o362.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/ubuntu/ybigta/Dataset_spark/tmp/bookTitle already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:289)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


#### 시퀀스파일
바이너리 키-값 쌍으로 구성된 플랫 파일. 맵리듀스의 입출력포맷으로 널리 사용됨
- 키-값 쌍인 튜플로만 저장가능 / string만 저장 불가...

In [30]:
words.map(lambda row : (None, row))\
.saveAsSequenceFile("file:///home/ubuntu/ybigta/Dataset_spark/tmp/my/sequenceFilePath")

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/ubuntu/ybigta/Dataset_spark/tmp/my/sequenceFilePath already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:289)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.api.python.PythonRDD$.saveAsHadoopFile(PythonRDD.scala:511)
	at org.apache.spark.api.python.PythonRDD$.saveAsSequenceFile(PythonRDD.scala:478)
	at org.apache.spark.api.python.PythonRDD.saveAsSequenceFile(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [26]:
dir(words)

['__add__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_computeFractionForSampleSize',
 '_defaultReducePartitions',
 '_id',
 '_jrdd',
 '_jrdd_deserializer',
 '_memory_limit',
 '_pickled',
 '_reserialize',
 '_to_java_object_rdd',
 'aggregate',
 'aggregateByKey',
 'cache',
 'cartesian',
 'checkpoint',
 'coalesce',
 'cogroup',
 'collect',
 'collectAsMap',
 'combineByKey',
 'context',
 'count',
 'countApprox',
 'countApproxDistinct',
 'countByKey',
 'countByValue',
 'ctx',
 'distinct',
 'filter',
 'first',
 'flatMap',
 'flatMapValues',
 'fold',
 'foldByKey',
 'foreach',
 'foreachPartition',
 'fullOuterJoin',
 'getCheckpointFile',
 'getNumPartitions

## 12.6 캐싱
- RDD도 캐시하거나 저장 가능
- cache 메서드로 캐싱
- setName 메서드로 캐시된 RDD에 이름 부여 가능

In [27]:
caching_word = words.cache()

- getStorageLevel 메서드로 저장소 수준을 파악할 수 있다. >> 20장에서 다룸!

In [28]:
words.getStorageLevel()

StorageLevel(False, True, False, False, 1)

## 12.7 체크포인팅
- DataFrame API에서는 없는 기능!! RDD를 디스크에 저장하는 방식
- 나중에 참조시 RDD를 다시 계산하지 않고 디스크에 저장된 중간 결과를 참조
- 디스크에 저장한다는 점을 빼면 캐싱과 비슷

In [29]:
# sc.setCheckpointDir("path")
# words.checkpoint()

## 12.8 RDD를 시스템 명령으로 전송
- pipe 메서드로 RDD를 외부 프로세스에 전달 가능
- 외부 프로세스는 파티션 마다 한번씩 처리해 RDD를 생성

In [34]:
# 각 파티션을 wc에 표준입력으로 전달 >> 각 파티션은 5개의 row를 가짐
words.pipe("wc -l").collect()

['5', '5']

#### mapPartitions : 개별 파티션에 대해 map연산을 실행할 수 있다.
- 파티션 그룹의 전체 값을 단일 파티션으로 모은 다음 임의의 함수를 적용하고 제어 가능

In [37]:
# 왜 인자를 리스트 형태로??!
words.mapPartitions(lambda partition : [1]).sum()

2

#### mapPartitionsWithIndex() : 
인덱스와 파티션의 모든 아이템을 순회하는 이터레이터를 가진 함수를 인수로 
파티션 인덱스 : RDD 파티션 번호

In [39]:
def indexedFunc(partitionIndex, withinPartIterator):
    return [f"partition: {partitionIndex} >>> {x}" for x in withinPartIterator]

words.mapPartitionsWithIndex(indexedFunc).collect()

['partition: 0 >>> Spark',
 'partition: 0 >>> The',
 'partition: 0 >>> Definitive',
 'partition: 0 >>> Guide',
 'partition: 0 >>> :',
 'partition: 1 >>> Big',
 'partition: 1 >>> Data',
 'partition: 1 >>> Processing',
 'partition: 1 >>> Made',
 'partition: 1 >>> Simple']

#### foreachPartition : mapPartition과 달리 결과를 반환하지 않고 순회만 함


#### glom : 데이터셋의 모든 파티션을 배열로 반환하는 함수
데이터를 드라이버로 모으거나 데이터가 존재하는 파티션의 배열이 필요할때! 사용
<br>파티션이 크거나 수가 많으면 드라이버가 비정상 종료가 될 수 있음

In [45]:
sc.parallelize(range(1, 21)).glom().collect()

[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]]