In [3]:
import warnings
warnings.filterwarnings('ignore')

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('redd-transformations-actions')
sc = SparkContext(conf=conf)

In [5]:
# 설정 전체를 확인할 수 있음
sc.getConf().getAll()

# SparkContext 정지
# sc.stop()

[('spark.master', 'local'),
 ('spark.app.startTime', '1699261341870'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', 'localhost'),
 ('spark.app.id', 'local-1699261341965'),
 ('spark.app.submitTime', '1699261304138'),
 ('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/s

In [10]:
# sc.parallelize는 Python의 List형태를 입력 받아 Spark RDD 생성

foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면",  "라면", "우동", "라면"])

### collect()
- Action 함수
- 실제 Production 환경에서는 쓰이지 않음 (데이터를 너무 많이 가져오기 때문에 리소스 낭비)
- 개발 단계에서 활용
- RDD의 모든 Value 값을 확인할 수 있음

In [11]:
foods.collect() # type(foods.collect()) = list

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '짬뽕', '짜장면', '짜장면', '짜장면', '라면', '우동', '라면']

### countByvalue()
- Action 함수
- 값의 수를 확인할 수 있음
- return : collections.defaultdict

In [14]:
foods.countByValue() 
# return {K : v}

defaultdict(int,
            {'짜장면': 4,
             '마라탕': 1,
             '짬뽕': 2,
             '떡볶이': 1,
             '쌀국수': 1,
             '라면': 2,
             '우동': 1})

### take()
- Action 함수
- 개수를 지정해 데이터를 가져올 수 있음

In [16]:
foods.take(3)

['짜장면', '마라탕', '짬뽕']

In [17]:
foods.take(1)

['짜장면']

### first()
- Action 함수
- RDD의 첫 번째 row 반환

In [18]:
foods.first()

'짜장면'

### count()
- Action 함수
- RDD의 element가 총 몇개인지 (중복값 허용)
- 만약 중복값 제외시키고 싶으면 distinct 사용(transforma 함수)

In [19]:
foods.count()

12

### distinct()
- Transform 함수
- 중복값 제외 (파이썬이랑 같네)

In [21]:
foods.distinct().collect()

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '라면', '우동']

In [22]:
foods.distinct().count()

7

### foreach()
- Action 함수
- 요소 하나씩 가져와 함수 적용
- RDD 요소마다 어떤 연산을 하고 Log를 남길 때 잘 사용함

In [28]:
foods.foreach(lambda x: print(x+'123'))

짜장면123
마라탕123
짬뽕123
떡볶이123
쌀국수123
짬뽕123
짜장면123
짜장면123
짜장면123
라면123
우동123
라면123


### map()
- Transform 함수 (narrow)

In [30]:
print(sc.parallelize([1, 2, 3 , 4, 5]).map(lambda x: x ** 2)) # RDD 기억
print()
print(sc.parallelize([1, 2, 3 , 4, 5]).map(lambda x: x ** 2).collect()) # Action Func 만나면 이전 transfor Func 진행

PythonRDD[36] at RDD at PythonRDD.scala:53

[1, 4, 9, 16, 25]


### flatmap()
- Transform 함수 (narrow)
- list의 요소들을 펼쳐볼 때 사용

In [34]:
movies = [
    "그린 북",
    "매트릭스",
    "토이 스토리",
    "캐스트 어웨이",
    "포드 V 페라리",
    "보헤미안 랩소디",
    "빽 투 더 퓨처",
    "반지의 제왕",
    "죽은 시인의 사회"
]

rdd_movies = sc.parallelize(movies)


ParallelCollectionRDD[42] at readRDDFromFile at PythonRDD.scala:289

In [39]:
rdd_movies.flatMap(lambda x: x).take(10) # Element를 어떻게 나눌지 Function을 입력

['그', '린', ' ', '북', '매', '트', '릭', '스', '토', '이']

In [40]:
rdd_movies.flatMap(lambda x: x.split(' ')).take(10)

['그린', '북', '매트릭스', '토이', '스토리', '캐스트', '어웨이', '포드', 'V', '페라리']

### filter()
- Tranform 함수 (narrow)
- 지정한 요소만 filtering

In [45]:
rdd_movies.flatMap(lambda x: x.split(' '))\
    .filter(lambda x: x == '그린').collect()

['그린']

In [46]:
rdd_movies.flatMap(lambda x: x.split(' '))\
    .filter(lambda x: x != 'V').collect()

['그린',
 '북',
 '매트릭스',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

### uinon() & intersection() & subtract()
- Transform 함수 (narrow)

In [50]:
num1 = sc.parallelize([1, 2, 3, 4])
num2 = sc.parallelize([4, 5, 6, 7, 8, 9, 10])

In [51]:
num1.intersection(num2).collect()

[4]

In [52]:
num1.union(num2).collect()

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

In [53]:
num1.subtract(num2).collect()

[2, 1, 3]

### sample()
- Transform 함수 (narrow)

In [54]:
numUnion = num1.union(num2)

numUnion.collect()

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

In [56]:
numUnion.sample(withReplacement=False, # 복원추출 여부
                fraction = 0.5,        # 비율(확률적으로 뽑힘)
                seed = 42              # Random Seed
                ).collect()

[1, 2, 4, 9, 10]

groupBy()
- Transform (wide)
- 그룹 기준으로 연산

In [57]:
foodsGroup = foods.groupBy(lambda x: x[0])
foodsGroup.collect()

[('짜', <pyspark.resultiterable.ResultIterable at 0x10b78feb0>),
 ('마', <pyspark.resultiterable.ResultIterable at 0x10b78ea40>),
 ('짬', <pyspark.resultiterable.ResultIterable at 0x10bbfaa10>),
 ('떡', <pyspark.resultiterable.ResultIterable at 0x10bbfaf80>),
 ('쌀', <pyspark.resultiterable.ResultIterable at 0x10ba369e0>),
 ('라', <pyspark.resultiterable.ResultIterable at 0x10ba36800>),
 ('우', <pyspark.resultiterable.ResultIterable at 0x10ba35d80>)]

In [58]:
for k, v in foodsGroup.collect():
    print(k, list(v))

짜 ['짜장면', '짜장면', '짜장면', '짜장면']
마 ['마라탕']
짬 ['짬뽕', '짬뽕']
떡 ['떡볶이']
쌀 ['쌀국수']
라 ['라면', '라면']
우 ['우동']


In [61]:
nums = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
for k, v in nums.groupBy(lambda x: x % 2).collect():
    print(k, list(v))

1 [1, 3, 5, 7, 9]
0 [2, 4, 6, 8, 10]
