In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .appName("restaurant_reviews-average")\
        .master("local[*]")\
        .getOrCreate()

In [2]:
sc = spark.sparkContext

In [3]:
sc.defaultParallelism

2

In [4]:
data = [
    (0, "짜장면", "중식", 125),
    (1, "짬뽕", "중식", 235),
    (2, "김밥", "분식", 32),
    (3, "떡볶이", "분식", 534),
    (4, "라멘", "일식", 223),
    (5, "돈가스", "일식", 52),
    (6, "우동", "일식", 12),
    (7, "쌀국수", "아시안", 312),
    (8, "햄버거", "패스트푸드", 12),
    (9, "치킨", "패스트푸드", 23),
]

In [5]:
import os
print(os.getcwd())  # 현재 작업 디렉토리 출력

/home/jovyan/work/practice_virtual-environment


In [18]:
lines = sc.textFile("file:///home/jovyan/work/practice_virtual-environment/learning_spark_data/restaurant_reviews.csv")

lines.take(5)

['id,item,cateogry,reviews,',
 '0,짜장면,중식,125,',
 '1,짬뽕,중식,235,',
 '2,김밥,분식,32,',
 '3,떡볶이,분식,534,']

In [19]:
rdd1 = sc.parallelize(data)

In [8]:
lines.take(5)

['id,item,cateogry,reviews,',
 '0,짜장면,중식,125,',
 '1,짬뽕,중식,235,',
 '2,김밥,분식,32,',
 '3,떡볶이,분식,534,']

In [20]:
# 총건수 확인

# 첫줄, 데이터 분할
header = lines.first()
filterd_lines = lines.filter(lambda row : row!=header)
# map으로 split

In [10]:
def parse(row):
    fields = row.split(",")

    category = fields[2]

    reviews = fields[3]
    reviews = int(reviews)

    return category, reviews

In [11]:
parse('0,짜장면,중식,125,')

('중식', 125)

In [24]:
category_reviews = filterd_lines.map(parse)
category_reviews

PythonRDD[21] at RDD at PythonRDD.scala:53

In [25]:
category_reviews.collect()

[('중식', 125),
 ('중식', 235),
 ('분식', 32),
 ('분식', 534),
 ('일식', 223),
 ('일식', 52),
 ('일식', 12),
 ('아시안', 312),
 ('패스트푸드', 12),
 ('패스트푸드', 23)]

In [14]:
category_reviews_count = category_reviews.mapValues(lambda x : (x,1))
category_reviews_count.collect()

[('중식', (125, 1)),
 ('중식', (235, 1)),
 ('분식', (32, 1)),
 ('분식', (534, 1)),
 ('일식', (223, 1)),
 ('일식', (52, 1)),
 ('일식', (12, 1)),
 ('아시안', (312, 1)),
 ('패스트푸드', (12, 1)),
 ('패스트푸드', (23, 1))]

In [15]:
reduced = category_reviews_count.reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1]))
reduced.collect()

[('중식', (360, 2)),
 ('분식', (566, 2)),
 ('일식', (287, 3)),
 ('아시안', (312, 1)),
 ('패스트푸드', (35, 2))]

In [16]:
average = reduced.mapValues(lambda x : x[0] / x[1])
average.collect()

[('중식', 180.0),
 ('분식', 283.0),
 ('일식', 95.66666666666667),
 ('아시안', 312.0),
 ('패스트푸드', 17.5)]

map(key, value) -> key, value 전체에 적용<br>
mapValues(key, value) -> value에만 적용

In [28]:
# 기존 저장 레벨 해제
category_reviews.unpersist()

PythonRDD[21] at RDD at PythonRDD.scala:53

In [None]:
sc.stop()
spark.stop()

# Narrow Transformations
1:1 변환 -> 하나의 열을 다룰 때 다른 데이터가 필요 없는 경우
filter(), map(), flatMap(), sample(), union()

In [None]:
# flatMap()


In [30]:
rdd = sc.parallelize([1, 2, 3])
rdd_map = rdd.map(lambda x: [x, x + 1])  # => [[1, 2], [2, 3], [3, 4]]
rdd_map.collect()

[[1, 2], [2, 3], [3, 4]]

In [31]:
rdd_flatmap = rdd.flatMap(lambda x: [x, x + 1])  # => [1, 2, 2, 3, 3, 4]
rdd_flatmap.collect()

[1, 2, 2, 3, 3, 4]

In [32]:
movies = [
    "그린 북",
    "매트릭스",
    "토이 스토리",
    "캐스트 어웨이",
    "포드 V 페라리",
    "보헤미안 랩소디",
    "빽 투 더 퓨처",
    "반지의 제왕",
    "죽은 시인의 사회"
]

In [33]:
moviesRDD = sc.parallelize(movies)
moviesRDD

ParallelCollectionRDD[25] at readRDDFromFile at PythonRDD.scala:289

In [34]:
#job 추가 x
flatMovies = moviesRDD.flatMap(lambda x : x.split(" "))
flatMovies.collect()

['그린',
 '북',
 '매트릭스',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

In [None]:
# 집합 Transformation

In [36]:
num1 = sc.parallelize([1, 2, 3, 4, 5])
num2 = sc.parallelize([4, 5, 6, 7, 8, 9, 10])

In [37]:
num1.intersection(num2).collect()

[4, 5]

In [None]:
# 합집합 구하기 - union

In [38]:
num_union = num1.union(num2)
num_union

UnionRDD[41] at union at NativeMethodAccessorImpl.java:0

In [39]:
num_union.collect()

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8, 9, 10]

In [None]:
# 차집합 구하기 - subtract

In [40]:
num1.subtract(num2).collect()

[1, 2, 3]

In [None]:
# 데이터 랜덤 추출 - sample(withReplacement, fraction, seed=None)

In [44]:
# withReplacement : True -> 중복 추출
num_union.sample(True, 0.3).collect()

[1, 4, 10]

In [45]:
# withReplacement : False -> 중복 X
num_union.sample(False, 0.7).collect()

[1, 5, 5, 6, 7]

In [46]:
# 랜덤을 고정해서 항상 같은 결과가 나올 수 있도록
num_union.sample(True, 0.5, seed=42).collect()

[4, 5, 5, 5, 7]

In [None]:
# Wide Transformations

In [47]:
foods = sc.parallelize([
    "짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면", "라면", "우동", "라면"
])
foods

ParallelCollectionRDD[53] at readRDDFromFile at PythonRDD.scala:289

In [48]:
# 그룹핑의 기준을 문자열의 첫 번째 글자로 설정
foodsGroup = foods.groupBy(lambda x : x[0])
foodsGroup

PythonRDD[58] at RDD at PythonRDD.scala:53

In [51]:
res = foodsGroup.collect()
res

[('짜', <pyspark.resultiterable.ResultIterable at 0x7f14095ae890>),
 ('짬', <pyspark.resultiterable.ResultIterable at 0x7f1408e23dd0>),
 ('쌀', <pyspark.resultiterable.ResultIterable at 0x7f1408b2d090>),
 ('라', <pyspark.resultiterable.ResultIterable at 0x7f1408e3b250>),
 ('우', <pyspark.resultiterable.ResultIterable at 0x7f14182fab50>),
 ('마', <pyspark.resultiterable.ResultIterable at 0x7f1408c826d0>),
 ('떡', <pyspark.resultiterable.ResultIterable at 0x7f1418310550>)]

In [52]:
for (k, v) in res:
    print(k, list(v))

짜 ['짜장면', '짜장면', '짜장면', '짜장면']
짬 ['짬뽕', '짬뽕']
쌀 ['쌀국수']
라 ['라면', '라면']
우 ['우동']
마 ['마라탕']
떡 ['떡볶이']


In [None]:
# 내로우 narrow  트랜스포메이션

In [58]:
### 2. Narrow 트랜스포메이션 예제
numbers = sc.parallelize([1, 2, 3, 4, 5])

# 1:1 변환 (Narrow Transformation)
sample_rdd = numbers.sample(False, 0.5)
sample_rdd

PythonRDD[62] at RDD at PythonRDD.scala:53

In [54]:
# wide 연산
kv_rdd = sc.parallelize([("apple",1), ("banana", 2), ("cherry", 3)])

In [59]:
mapp_values = kv_rdd.mapValues(lambda x:x*10)
mapp_values.collect()

[('apple', 10), ('banana', 20), ('cherry', 30)]

In [60]:
grouped_rdd = kv_rdd.groupByKey()
grouped_rdd.collect()

[('apple', <pyspark.resultiterable.ResultIterable at 0x7f14089c38d0>),
 ('banana', <pyspark.resultiterable.ResultIterable at 0x7f1408c5d1d0>),
 ('cherry', <pyspark.resultiterable.ResultIterable at 0x7f1408c5f850>)]

In [61]:
reduced_rdd = kv_rdd.reduceByKey(lambda x,y : x+y)
reduced_rdd.collect()

[('apple', 1), ('banana', 2), ('cherry', 3)]

In [62]:
#  조인 연산

In [64]:
### 3. Wide 트랜스포메이션 예제

# 셔플링이 필요한 변환 (Wide Transformation)
distinct_rdd = sc.parallelize([1, 1, 2, 2, 3, 3]).distinct()
grouped_rdd = kv_rdd.groupByKey()
reduced_rdd = kv_rdd.reduceByKey(lambda x, y: x + y)

In [65]:
# 두 RDD 조인하기
rdd1 = sc.parallelize([("apple", 2), ("banana", 1)])
rdd2 = sc.parallelize([("apple", "fruit"), ("banana", "fruit"), ("carrot", "vegetable")])

# 두 RDD 조인
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect())
# 출력 결과: [('apple', (2, 'fruit')), ('banana', (1, 'fruit'))]

print("Distinct RDD:", distinct_rdd.collect())  # [1, 2, 3]
print("Joined RDD:", joined_rdd.collect())  # [("apple", (2, "fruit")), ("banana", (1, "fruit"))]

[('apple', (2, 'fruit')), ('banana', (1, 'fruit'))]
Distinct RDD: [2, 1, 3]
Joined RDD: [('apple', (2, 'fruit')), ('banana', (1, 'fruit'))]


In [67]:
left_joined = rdd1.leftOuterJoin(rdd2)
print(left_joined.collect())

[('apple', (2, 'fruit')), ('banana', (1, 'fruit'))]


In [68]:
right_joined = rdd1.rightOuterJoin(rdd2)
print(right_joined.collect())

[('apple', (2, 'fruit')), ('banana', (1, 'fruit')), ('carrot', (None, 'vegetable'))]


In [69]:
subtract_result = rdd1.subtractByKey(sc.parallelize([("apple", "any")]))
print(subtract_result.collect())

[('banana', 1)]


In [70]:
sc.stop()
spark.stop()