## Linking with Spark
pyspark는 driver 와 worker들의 python version 이 동일해야한다.

In [1]:
from pyspark import SparkContext, SparkConf

## Initializing Spark 
### SparkContext
어떻게 cluster 에 접근할지 Spark 에게 알려줌
### SparkConf
내 application 정보 저장  
### SparkSession 와 SparkConf 차이?
#### 공통점
Spark 앱 진입점  
<br>

#### 차이점
SparkSession 은 2.0 부터 도입된 새로운 진입점  
SparkContext 는 RDD API 중심 지원, SparkSession 은 RDD + DataFrame + SQL 등 모든 API 지원  
SparkSession 내부에 SparkContext 포함(spark.sparkContext)

In [2]:
if SparkContext._active_spark_context is not None:
    sc = SparkContext.getOrCreate()
else:
    conf = SparkConf().setAppName("RDD Programming Guide").setMaster("local")
    sc = SparkContext(conf=conf)

25/05/07 23:07:40 WARN Utils: Your hostname, gimjunhaui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.45.106 instead (on interface en0)
25/05/07 23:07:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/07 23:07:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Using shell

shell 에서 사용법은 다음에   
python driver 설정 내용이 있는데 필요할 때 알아보자

## Resilient Distributed Datasets (RDDs)
RDDs 는 고장에 강하고 병렬로 처리할 수 있는 데이터 요소들의 집합  
RDDs 생성은 드라이버 프로그램에 있는 데이터 요소들 혹은 외부 저장장치에 있는 데이터셋 (HDFS, HBase, Hadoop 기반)

### Parallelized Collection
Parallelized Collection은 드라이버 프로그램에서 기존의 리스트나 반복 가능한 객체에 대해 SparkContext의 parallelize 메소드를 호출함으로 생성  
데이터 요소들은 복사되어 병렬로 처리할 수 있는 RDDs 가 됨

In [36]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData

ConnectionRefusedError: [Errno 61] Connection refused

일단 생성을 하면 분산된 데이터셋(distData) 는 병렬로 처리 가능  
예를 들어 아래와 같이 코드를 작성하면 모든 요소의 합을 구할 수 있다.

In [18]:
distData.reduce(lambda a, b: a + b)

15

Parallelized Collection 에서 가장 중요한 요소(변수) 하나는 데이터셋을 몇개의 파티션으로 나눌지이다.  
Spark 는 각 클러스터의 파티션에 대해 한가지 task 만 실행  
보통 클러스터의 CPU 당 2~4 파티션으로 나눈다. 
Spark 는 자동으로 클러스터 기반으로 파티션 갯수를 나눈다.  
그러나, 수동으로 몇개의 파티션을 나눌지 설정이 가능

In [19]:
sc.parallelize(data, 10) # 10개의 파티션으로 나눈다

ParallelCollectionRDD[30] at readRDDFromFile at PythonRDD.scala:289

In [2]:
# 실험 시간 갯수
# 1~1000억 하니 컴퓨터 메모리가 못버틴다...
# 메모리 에러 발생 -> SparkConf 기본 메모리가 1G 로 되어 있어서 아래와같이 설정
data1 = list(range(100000000))
conf1 = SparkConf().setAppName("MyApp").set("spark.executor.memory", "2g").set("spark.driver.memory", "2g")
sc1 = SparkContext(conf=conf1)

25/04/18 00:22:21 WARN Utils: Your hostname, gimjunhaui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.45.202 instead (on interface en0)
25/04/18 00:22:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/18 00:22:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
distData1 = sc1.parallelize(data1, 2)
distData1.reduce(lambda a, b: a + b)

25/04/18 00:19:01 WARN TaskSetManager: Stage 0 contains a task of very large size (245355 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

4999999950000000

In [3]:
distData2 = sc1.parallelize(data1, 10)
distData2.reduce(lambda a, b: a + b)

25/04/18 00:22:37 WARN TaskSetManager: Stage 0 contains a task of very large size (48972 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

4999999950000000

#### 2개 파티션 vs 10개 파티션 시간 비교
1~1억 이 들어있는 List 의 sum 을 하나씩 더해서 구하는 경우  
2개 파티션 24.832초  
10개 파티션 17.91초  
나눠서 하면 빠르다!

### External Datasets
PySpark은 Hadoop 에서 호환되는 모든 저장 source로 분산된 데이터셋 생성이 가능 (local file system, HDFS, Cassandra, HBase, AWS S3 등)  
Spark 는 텍스트파일 sequence 파일 혹은 다른 Hadoop 입력형식을 지원한다.  

<strong>textFile</strong> method 을 사용해서 텍스트파일 RDDs 생성이 가능  
이 method 은 파일의 URI(로컬 경로, hdfs://, s3a:// 등의 URI) 를 가지고 줄 단위로 읽는다.  
생성된 RDDs 텍스트파일은 dataset 기능을 사용가능하다.

In [20]:
distFile = sc.textFile("../data/data.txt")
# distFile(data.txt) 의 총 길이
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

999

Spark에서 파일 읽는 경우 참고 사항 (textFile)
* 만약 로컬 파일을 쓰는 경우 노드 내에도 같은 경로에 접근 가능한 파일이 존재해야한다. 모든 노드에 복사 혹은 노드에 공유된 network-mounted 파일 시스템 사용
* 파일을 읽을때 파일경로(sample.txt), 와일드카드(\*.txt 같은), 압축파일(*.gz) 이 사용 가능하다
* textFile에는 두번째 인자가 있는데 이는 파일이 몇개의 파티션으로 구성될지 설정. 기본값으로 파일블럭(블럭은 HDFS 에서 128MB 기본값)당 1개의 파티션 생성, block 갯수 < 파티션 갯수

Spark 파이썬 API 지원하는 데이터 형식
* <strong>SparkContext.wholeTextFiles</strong> 은 경로 안에 있는 작은 여러 텍스트 파일을 읽고, 각각 (파일명, 내용) 쌍으로 리턴함. textFile은 다르게 각파일에서 각줄당 1개의 결과만 리턴
* <strong>RDD.saveAsPickleFile</strong> and <strong>SparkContext.pickleFile</strong>은 RDD를 간단한 형식인 pickle 의 Python object로 저장을 지원, 배치가 pickle serialization 에 사용, 배치 기본 크기는 10.
* <strong>SequenceFile</strong> 와 <strong>Hadoop Input/Output</strong>형식

### Spark SQL 의 read/write 지원은 실험 단계. 고수를 위한거므로 일단은 패스...

### RDD Operations
2가지의 operation 지원 
transformations, actions

 

#### transformations    
기존의 dataset으로 새로운 dataset 생성  

#### actions 
dataset의 실행 결과인 하나의 값을 driver program 으로 전달.

In [None]:
# 이해를 돕기위한 예
distFile = sc.textFile("../data/data.txt")
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

<code>distFile.map(lambda s: len(s))</code> 은 transformations 에 해당  
각 줄마다 길이를 갖는 새로운 dataset 으로 변환  

<code>distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)</code> 은 actions 에 해당  
위의 transformation 한 dataset 에 대해서 각줄의 길이의 합(전체 길이) 를 하나의 값으로 반환  
(참고: reduceByKey 는 distributed dataset 반환)

모든 transformations 은 lazy(게으름), 실제로 실행을 바로 하지 않음.  
transformation 해야하는거만 기억해둠.  
action 이 일어나지 않으면 실제 행하지 않음.   
이를 통해 Spark가 더 효율적으로 실행 가능  
transformations 연산을 즉시 실행하지 않고, "실행계획 DAG(Directed Acyclic Graph)" 만 구성   

여기서 transformations 을 lazy 하는게 왜 Spark를 더 효율적으로 실행이 될까?  
1. 최적회 기회를 제공: action 전단계의 모든 transformations 를 한번에 분석해서 중간 데이터 생성 최소화하거나 필요 없는 작업 제거
2. 중간 결과 저상 최소화: 즉시 실행되지 않기 때문에, 불필요한 중간 데이터 저장(IO 비용) 없이 메모리에서 연산 체인을 효율적으로 이어서 처리할 수 있음  
3. 필요한 데이터만 처리: Action 에 필요한 데이터만 처리하도록 계획 세움, <code>take(10)</code> 하면 딱 10개만 나오도록 계산  
4. 장기 실행 계획(DAG) 으로 장애 복구 용이: lazy transformation 은 저체 실행 결로를 DAG로 기록하므로, 장애가 나도 어느 지점에서 어떻게 다시 계산할지 알 수 있음 

기본적으로 transform RDD 는 매번 action 이 일어나면 실행하지만, 자주 사용하는 RDD 를 메모리에 <strong>persist</strong>(cache, 캐싱) 가능.  
자주 쓰이므로 메모리에 올려두고 더 빠르게 접근 가능(매번 transform 하지 않아도 된다)  
추가로 RDD를 디스크에 persisting 하거나 여러 노드에 복제할 수 있다.  

#### Basics

In [None]:
lines = sc.textFile("../data/data.txt") # 파일 위치를 그저 가르킬 뿐, 메모리에 올리거나 해당 RDD 에 대한 action 이 없으면 행하지 않음
lineLengths = lines.map(lambda s: len(s)) # transformation, 당장 실행하지 않음, 게으름
totalLength = lineLengths.reduce(lambda a, b: a + b) # action, 여러 장치에서 돌리기 위해 task 단위로 할일을 나눔, 각 장치에서 map 과 reduce 실행 지금 실행하는 위치에서만 결과값을 반환 받음
print(totalLength)

# 나중에 lineLengths 를 쓴다면
lineLengths.persist()

#### Passing Functions to Spark
클러스터 내에서 실행하는 Spark API는 driver program 이 보내주는 function 에 의지한다.  
3가지 권장된 방법:  
1. Lambda 표현식(Lambda는 multi-statement function(여러줄로 된 함수)과 return 값이 없는 것을 지원하지 않음
2. spark 연산을 호출하는 함수 내부에 로컬 함수를 정의하는 방식(코드가 좀 길고, 외부에서 안 쓰는 함수)
3. 함수 정의를 스크립트나 모듈의 최상단에 위치시키는 것 (재사용 시, 여러 spark 작업에서 쓸 때)

In [3]:
# 1. lambda 표현식
rdd = sc.textFile("../data/data.txt")
rdd.map(lambda s: len(s)).reduce(lambda a, b: a + b)

                                                                                

999

In [4]:
# 2. 로컬 함수 사용
def process_data():
    def is_even(x):
        return x % 2 == 0
    
    rdd = sc.parallelize(range(10))
    even_rdd = rdd.filter(is_even)
    print(even_rdd.collect())
process_data()

[0, 2, 4, 6, 8]


In [5]:
# 3. 최상단 함수
def is_even(x):
    return x % 2 == 0

rdd = sc.parallelize(range(10))
even_rdd = rdd.filter(is_even)
print(even_rdd.collect())

[0, 2, 4, 6, 8]


In [23]:
# 최상단
def myFunc(s):
    words = s.split(" ")
    return len(words)

sc.textFile("../data/data.txt").map(myFunc).collect()

[4,
 5,
 1,
 4,
 1,
 2,
 1,
 1,
 6,
 3,
 4,
 13,
 8,
 6,
 3,
 10,
 7,
 3,
 7,
 2,
 25,
 1,
 1,
 1,
 1,
 3,
 1,
 5,
 4,
 36,
 2,
 8,
 5,
 1,
 1,
 1,
 1]

In [22]:
# 클래스를 이용해서 다른 함수 사용 가능
class MyClass(object):
    def func(self, s):
        return len(s.split(' '))
    def doStuff(self, rdd):
        return rdd.map(self.func)
mc = MyClass()
mc.doStuff(sc.textFile("../data/data.txt")).collect()

[4,
 5,
 1,
 4,
 1,
 2,
 1,
 1,
 6,
 3,
 4,
 13,
 8,
 6,
 3,
 10,
 7,
 3,
 7,
 2,
 25,
 1,
 1,
 1,
 1,
 3,
 1,
 5,
 4,
 36,
 2,
 8,
 5,
 1,
 1,
 1,
 1]

#### Understanding closures
Spark 의 어려운점 한가지는 클러스터에서 코드 실행 시 변수(variable) 과 메소드(method) 범위와 생애주기(life cycle) 이다.  
범위 밖에서 변수 수정이 일어나는 RDD 작업은 자주 혼란스럽다.   

In [3]:
# 실행 ㄴㄴ 
counter = 0 
rdd = sc.parallelize(range(10))

def increament_counter(x):
    global counter 
    counter += x

rdd.foreach(increament_counter)

print("Counter value:", counter)

[Stage 0:>                                                          (0 + 1) / 1]

Counter value: 0


                                                                                

##### Local vs. cluster modes 
작업을 수행하려면, Spark 는 RDD 작업을 tasks 로 쪼갬, 각 task 는 executor 에 의해 실행  
execution 전에 Spark 는 task 의 **closure**을 계산함  
**Closure**은 해당 RDD를 계산하기 위해 executor에게 variable 과 method 가 보여야함(visible)(위 경우 foreach())  
해당 **closure**를 직렬화하여 각 executor 로 전송  

Closure 내의 변수는 각 executor 에 복사된 변수를 보냄  
즉, 위에서 foreach 안의 함수에서 참조한 counter 는 더이상 driver node 의 counter 와 다름  
driver 메모리에 counter 는 있지만 executor 에게 보여지지 않음  
executor 는 오르지 직열화되고 복사된 counter 만 보임  
즉, driver 가 가진 counter 는 여전히 0  

로컬 모드, 어느 상황에서, foreach 함수는 driver 메모리에 있는 counter 를 참조할 가능성이 있다.  

생각한대로 보장하기 위해서는 **Accumulator** 사용해야함  
Spark의 accumulator는 클러스터 내에 여러 노드로 분산처리할 때 안전하게 변수를 업데이트 함  
Accumulator 파트에서 자세한 내용 다룸  

일반적으로 closure - 반복문이나 로컬에서만 정의된 method 같은 구조들 - 은 전역 상태(global state)를 변경하는데 사용 X  
Spark는 closure 밖에서 참조된 object의 변형을 정의 혹은 보장하지 않음  
로컬에서는 혹시 운수가 좋아서 될지 몰라도 클러스터 환경에서는 안됨  
global 변수로 합을 구하지 말고 accumulator 사용 권장   

##### Printing elements of an RDD 
또 다른 흔한 방식은 RDD 의 구성요소를 rdd.foreach(println) 혹은 rdd.map(println) 을 사용해서 출력한다.  
이는 단독 기기에서 생각대로 잘 작동한다.   
그러나 클러스터 모드에서는 이를 출력하지 않는다.  
클러스터 환경에서 출력하려면 collect() 를 써서 RDD 를 driver 노드로 갖고 온디.  
rdd.collect().foreach(println)  
collect() 가 클러스터에 분산된 모든 정보를 driver 노드로 갖고와서 메모리가 부족할 수 있다 -> RDD 의 몇개만 갖고와서 출력 추천 take() 사용  
rdd.take(100).foreach(println)  

#### Working with Key-Value Pairs 
대부분 Spark 작업은 모든 형태의 object 를 갖는 RDD 로 하는데, 몇가지 특별한 작업은 RDD의 Key-Value 쌍에서만 가능하다  
가장 흔한 작업은 분산된 "shuffle" 작업, 구성요소의 Key로 그룹화 혹은 병합  

파이썬에서는 이런 작업은 Python tuple로 구성된 RDD에서 작동한다. (1, 2) 와 같은  

예를 들어 아래와 같이 텍스트 파일에서 같은 줄별로 몇개가 나오는지 계산할 수 있다 (reduceByKey(lambda a, b: a+b))  
아니면 sortByKey() 로 오름차순으로 정렬 가능 

In [13]:
lines = sc.textFile("../data/data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False)
# counts = pairs.sortByKey()
counts.collect()

[('', 5),
 ('----', 3),
 ('# Spark 3.5.5 tutorial', 1),
 ('Apache Spark 공식 문서 스터디', 1),
 ('## RDD Programming Guide', 1),
 ('2025.04.16', 1),
 ('실습 진행', 1),
 ('2025.04.15', 1),
 ('코드 실습은 jupyternotebook 이 편리해보여서 변경', 1),
 ('### Spark abstract', 1),
 ('#### Resilient Distributed Dataset(RDD)', 1),
 ('하둡 내에 있는 파일 혹은 디스크에 저장된 Scala collection 으로 시작해서 이를 변환한다.', 1),
 ('RDD 는 메모리 위에 두어서 병렬처리에서 효율적으로 재사용', 1),
 ('RDD 는 node의 실패에서 자동으로 회복', 1),
 ('#### Shared variables', 1),
 ('spark는 function 서로 다른 nodes 에서 작업 묶음으로 병렬 작동', 1),
 ('function 의 변수를 복사해서 각 노드에 전달', 1),
 ('- Broadcast variable', 1),
 ('한 값을 모든 node 의 메모리에 cache', 1),
 ('- Accumulatiors', 1),
 ('누적용 변수, sum or count 연산에 쓰임, 각 작업에서 값을 더해 나가면서 그 결과를 드라이버 프로그램에서 수집, += 처럼 누적 가능, 읽는건 드라이버 프로그램에서만 가능',
  1),
 ('## Quick Start', 1),
 ('2025.04.14', 1),
 ('윈도우 환경에서 맥 환경으로 변경', 1),
 ('첫 시작 simpleApp.py 돌리니', 1),
 ("Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly set

### Transformations 
아래 예제들은 Spark 에서 지원하고 자주 쓰는 transformation 들이다.  
RDD API 문서 (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD)  
pair RDD 함수 문서는 파이썬 링크가 없다 ㅜㅜ  

In [3]:
lines = sc.textFile("../data/data.txt")
lines.collect()

['# Spark 3.5.5 tutorial',
 'Apache Spark 공식 문서 스터디',
 '',
 '## RDD Programming Guide',
 '2025.04.16',
 '실습 진행',
 '----',
 '2025.04.15',
 '코드 실습은 jupyternotebook 이 편리해보여서 변경',
 '### Spark abstract',
 '#### Resilient Distributed Dataset(RDD)',
 '하둡 내에 있는 파일 혹은 디스크에 저장된 Scala collection 으로 시작해서 이를 변환한다.',
 'RDD 는 메모리 위에 두어서 병렬처리에서 효율적으로 재사용',
 'RDD 는 node의 실패에서 자동으로 회복',
 '#### Shared variables',
 'spark는 function 서로 다른 nodes 에서 작업 묶음으로 병렬 작동',
 'function 의 변수를 복사해서 각 노드에 전달',
 '- Broadcast variable',
 '한 값을 모든 node 의 메모리에 cache',
 '- Accumulatiors',
 '누적용 변수, sum or count 연산에 쓰임, 각 작업에서 값을 더해 나가면서 그 결과를 드라이버 프로그램에서 수집, += 처럼 누적 가능, 읽는건 드라이버 프로그램에서만 가능',
 '----',
 '',
 '',
 '',
 '## Quick Start',
 '2025.04.14',
 '윈도우 환경에서 맥 환경으로 변경',
 '첫 시작 simpleApp.py 돌리니',
 "Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress

In [18]:
linesMap = lines.map(lambda l: l.split(' '))
linesMap.collect()

[['#', 'Spark', '3.5.5', 'tutorial'],
 ['Apache', 'Spark', '공식', '문서', '스터디'],
 [''],
 ['##', 'RDD', 'Programming', 'Guide'],
 ['2025.04.16'],
 ['실습', '진행'],
 ['----'],
 ['2025.04.15'],
 ['코드', '실습은', 'jupyternotebook', '이', '편리해보여서', '변경'],
 ['###', 'Spark', 'abstract'],
 ['####', 'Resilient', 'Distributed', 'Dataset(RDD)'],
 ['하둡',
  '내에',
  '있는',
  '파일',
  '혹은',
  '디스크에',
  '저장된',
  'Scala',
  'collection',
  '으로',
  '시작해서',
  '이를',
  '변환한다.'],
 ['RDD', '는', '메모리', '위에', '두어서', '병렬처리에서', '효율적으로', '재사용'],
 ['RDD', '는', 'node의', '실패에서', '자동으로', '회복'],
 ['####', 'Shared', 'variables'],
 ['spark는', 'function', '서로', '다른', 'nodes', '에서', '작업', '묶음으로', '병렬', '작동'],
 ['function', '의', '변수를', '복사해서', '각', '노드에', '전달'],
 ['-', 'Broadcast', 'variable'],
 ['한', '값을', '모든', 'node', '의', '메모리에', 'cache'],
 ['-', 'Accumulatiors'],
 ['누적용',
  '변수,',
  'sum',
  'or',
  'count',
  '연산에',
  '쓰임,',
  '각',
  '작업에서',
  '값을',
  '더해',
  '나가면서',
  '그',
  '결과를',
  '드라이버',
  '프로그램에서',
  '수집,',
  '+=',
  '처럼'

In [19]:
linesFilter = lines.filter(lambda l: l == '----')
# 함수 결과값이 Boolean 일때 사용 가능
linesFilter.collect()

['----', '----', '----']

In [21]:
linesFlatMap = lines.flatMap(lambda l: range(1, len(l))) 
# map의 함수 결과가 list 형태인 경우 그걸 다차원 list 형태 [[]] 가 아닌 하나의 list로 만듬 []
# 매게변수 함수의 값이 list 혹은 generator 반환해야함
linesFlatMap.collect()

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 1,
 2,
 3,
 4,
 1,
 2,
 3,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40

In [39]:
def f(iterator): yield sum(iterator) # 
rdd = sc.parallelize([1, 2, 3, 4], 2)
linesMapPartition = rdd.mapPartitions(f) 
# [1, 2, 3, 4] 를 2개의 파티션으로 나누어서 함수(f) 실행
# [1, 2] 와 [3, 4] 생성
# 합인 3 과 7 생성
# [3, 7] 반환 

# mapPartitions 는 무조건 generator만 반환 기대
# spark 는 내부적으로 for item in f(partition_iterator): 로 하나씩 꺼내서 새 RDD 구성
linesMapPartition.collect()

[3, 7]

In [43]:
# generator 실습
def f(iterator):
    for x in iterator:
         yield sum(x)
i = [[2,3,4], [2,3,5]]
fi = f(i)
print(list(f(i)))
print(next(fi))
print(next(fi))
print(next(fi))


[9, 10]
9
10


StopIteration: 

In [31]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(splitIndex, iterator): yield (splitIndex, sum(iterator))
rdd.mapPartitionsWithIndex(f).collect()

[(0, 3), (1, 7)]

In [47]:
rdd = sc.parallelize(range(100), 4)
rdd.sample(withReplacement=False, fraction=0.1, seed=81).collect() #fraction: 0~1 사이 값, 전체의 몇 %를 샘플로 뽑을지 결정

[4, 26, 39, 41, 42, 52, 63, 76, 80, 86, 97]

In [48]:
rdd= sc.parallelize([1, 2, 3, 4])
rdd.union(rdd).collect()

[1, 2, 3, 4, 1, 2, 3, 4]

In [60]:
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize([1, 2, 3, 4])
rdd1.intersection(rdd2).collect()
# 왜 순서가 다르게 나오지? [4, 3] 으로 결과가 나옴 흠
# 이상하다 [1, 3] 은 잘 나오는데 4 가 들어간 경우 4가 앞에 나옴
# 2도 앞에 나옴
# 짝수 먼저 작은수터 그다음 홀수 작은수 부터 나옴 -> 우연!
# 값은 보장하지만 순서는 보장하지 않음 -> 정렬은 큰 비용이 듬

[2, 4, 1, 3]

In [65]:
rdd = sc.parallelize([1, 1, 2, 2, 3])
rdd.distinct().collect()

[1, 2, 3]

In [6]:
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda r: (r%2, r)).groupByKey().mapValues(list).collect()

[(1, [1, 3]), (0, [2, 4])]

In [8]:
from operator import add
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda r: (r%2, r)).reduceByKey(add).collect()

[(1, 4), (0, 6)]

In [15]:
rdd = sc.parallelize([("a", 2), ("b", 2), ("a", 2)])
seqFunc = (lambda x, y: (x[0] + y, x[1] + 1)) 
# 파티션 안에서 zerValue=x 에 더하기 여기에 경우 x[0] + y 는 key-value 에서 value 의 누적합, x[1] + 1 은 같은 키의 횟수
combFunc = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
# 여러 파티션을 통합하는 과정
sorted(rdd.aggregateByKey(zeroValue=(0, 0), seqFunc=seqFunc, combFunc=combFunc).collect())

[('a', (4, 2)), ('b', (2, 1))]

In [27]:
rdd = sc.parallelize([("big", 3), ("Bad", 2), ("a", 4), ("a", 1)])
print(rdd.collect())
print(rdd.sortByKey(ascending=True, numPartitions=3, keyfunc=lambda k: k.lower()).collect())
# key 기준으로만 정렬하고 별도로 value 에 대한 정렬은 없다
# value 기준으로는 그저 RDD 의 순서를 따름 

[('big', 3), ('Bad', 2), ('a', 4), ('a', 1)]
[('Bad', 2), ('a', 4), ('a', 1), ('big', 3)]


In [10]:
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 2), ("b", 3)])
sorted(rdd1.leftOuterJoin(rdd2).collect())

[('a', (1, 2)), ('b', (2, 3)), ('c', (3, None))]

In [12]:
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 2), ("b", 3)])
# rdd1.cogroup(rdd2).collect()
[(x, tuple(map(list, y))) for x, y in sorted(list(rdd1.cogroup(rdd2).collect()))]

[('a', ([1], [2])), ('b', ([2], [3])), ('c', ([3], []))]

In [13]:
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 2), ("b", 3)])
rdd1.cartesian(rdd2).collect() #모든 가능한 조합

[(('a', 1), ('a', 2)),
 (('a', 1), ('b', 3)),
 (('b', 2), ('a', 2)),
 (('b', 2), ('b', 3)),
 (('c', 3), ('a', 2)),
 (('c', 3), ('b', 3))]

In [None]:
rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 2), ("b", 3)])
rdd1.cartesian(rdd2).collect() #모든 가능한 조합