# Spark Partition이란?
- Spark Partition은 데이터를 물리적, 논리적으로 분리하여 병렬 처리가 가능하도록 만드는 단위입니다.
- 클러스터의 각 워커 노드에서 각 Partition이 개별적으로 처리되며, 효율적인 분산 처리를 가능하게 합니다.

Spark에서 Partition이 자동으로 설정되기 때문에 소규모 데이터의 경우는 사용자가 신경 쓸 필요 없지만, 대규모 데이터 처리의 경우는 사용자가 Partition에 신경을 써야 합니다. Partition은 Spark 작업의 성능 최적화와 리소스 효율성에 큰 영향을 미치기 때문입니다.


In [66]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EDA Session") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

spark

In [67]:
from google.colab import drive

# Google Drive 마운트
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


- 이 테이블에는 사용자가 제출한 2천만 개가 조금 넘는 영화 등급 레코드가 포함되어 있습니다. 타임스탬프는 UTC 시간으로 기록된 integer 값입니다.

In [68]:
# DROP TABLE 실행
# movieRatings 테이블이 이미 존재하면 삭제
spark.sql("""
DROP TABLE IF EXISTS movieRatings
""")

# CREATE TABLE 실행
# ratings.csv 파일을 기반으로 movieRatings 테이블 생성
spark.sql("""
CREATE TABLE movieRatings (
  userId INT,
  movieId INT,
  rating FLOAT,
  timeRecorded INT
)
USING csv
OPTIONS (
  path '/content/drive/Othercomputers/내 컴퓨터/BigDataDatasets/ratings.csv',
  header 'true'
)
""")

In [69]:
# Spark SQL의 기본 데이터베이스 및 테이블의 물리적 저장 경로 확인
spark.conf.get("spark.sql.warehouse.dir")

'file:/content/spark-warehouse'

In [70]:
# movieRatings 테이블을 DataFrame으로 로드
movie_ratings_df = spark.table("movieRatings")

# Partition 개수 확인
print(f"Default Partition Count: {movie_ratings_df.rdd.getNumPartitions()}")

Default Partition Count: 6


## 2. Partition 조정
Partition 개수를 조정하여 데이터 처리를 최적화할 수 있습니다.

- Repartition: Partition 개수를 늘리거나 줄임 (Shuffle 발생).
- Coalesce: Partition 개수를 줄일 때 사용 (Shuffle 없이 처리).

In [71]:
# Partition 개수를 10으로 변경 (Repartition)
repartitioned_df = movie_ratings_df.repartition(10)
print(f"Repartitioned Partition Count: {repartitioned_df.rdd.getNumPartitions()}")

# Partition 개수를 2로 줄임 (Coalesce)
coalesced_df = movie_ratings_df.coalesce(2)
print(f"Coalesced Partition Count: {coalesced_df.rdd.getNumPartitions()}")

Repartitioned Partition Count: 10
Coalesced Partition Count: 2


##3. Partition을 활용한 데이터 처리
Partition을 활용하면 데이터를 병렬로 처리할 수 있습니다.

예를 들어 ratings.csv의 평균 평점을 각 Partition에서 병렬로 계산할 수 있습니다.  

아래의 compute_partition_avg 함수가 각 Partition에 독립적으로 실행됩니다.

In [72]:
# 각 Partition에서 평균 평점 계산
def compute_partition_avg(iterator):
    total = 0
    count = 0
    for row in iterator:
        total += row['rating']
        count += 1
    yield total / count if count > 0 else 0

# RDD를 사용하여 Partition별 평균 계산
# movie_ratings_df.rdd는 movie_ratings_df DataFrame을 RDD로 변환
# mapPartitions는 각 Partition에 대해 독립적으로 작업을 수행
partition_avg = movie_ratings_df.rdd.mapPartitions(compute_partition_avg).collect()
print(f"**Partition별 평균 평점**: {partition_avg}")

**Partition별 평균 평점**: [3.5263380240022877, 3.5265608123587704, 3.531131160679831, 3.5275173966835704, 3.526652612278908, 3.536409580786592]


## 4. Partition 조정을 통한 성능 비교

- Partition이 너무 적음: 작업이 병렬화되지 않아 처리 속도가 느려짐.
- Partition이 너무 많음: 네트워크 및 메모리 오버헤드 증가.

In [75]:
import time

# Default Partition 처리 시간 측정
start_time = time.time()
default_partition_result = movie_ratings_df.groupBy("movieId").avg("rating").count()
default_time = time.time() - start_time
print(f"Default Partition Count: {movie_ratings_df.rdd.getNumPartitions()}")
print(f"Default Partition Execution Time: {default_time:.2f} seconds")
print(f"Default Partition Result: {default_partition_result}")
print()

# Repartition 후 처리 시간 측정
start_time = time.time()
optimized_df = movie_ratings_df.repartition(10)
optimized_result = optimized_df.groupBy("movieId").avg("rating").count()
optimized_time = time.time() - start_time
print(f"Repartitioned Partition Count: {repartitioned_df.rdd.getNumPartitions()}")
print(f"Repartitioned Execution Time: {optimized_time:.2f} seconds")
print(f"Optimized Partition Result: {optimized_result}")

Default Partition Count: 6
Default Partition Execution Time: 37.84 seconds
Default Partition Result: 45115

Repartitioned Partition Count: 10
Repartitioned Execution Time: 84.05 seconds
Optimized Partition Result: 45115


## 테이블 생성
`outdoorProducts`라는 새 테이블을 만듭니다.

In [76]:
# DROP TABLE 실행: 기존에 outdoorProducts 테이블이 존재하면 삭제
spark.sql("""
DROP TABLE IF EXISTS outdoorProducts
""")

# CREATE TABLE 실행: 수정된 스키마를 사용하여 outdoorProducts 테이블 생성
spark.sql("""
CREATE TABLE outdoorProducts (
    InvoiceNo INT,                 -- 청구 번호
    StockCode STRING,       -- 제품 코드
    Description STRING,      -- 제품 설명
    Quantity INT,                    -- 수량
    invoiceDate TIMESTAMP,  -- 청구 날짜와 시간 (TIMESTAMP 타입으로 변경)
    UnitPrice DOUBLE,          -- 단가
    CustomerID INT,               -- 고객 ID
    Country STRING              -- 국가 이름 (컬럼 이름 수정)
)
USING csv
OPTIONS (
    path '/content/drive/Othercomputers/내 컴퓨터/BigDataDatasets/OnlineRetail.csv',
    header 'true'           -- 데이터에 헤더가 포함되어 있음을 지정
)
""")

In [77]:
spark.sql("SELECT * FROM outdoorProducts;").limit(5)

InvoiceNo,StockCode,Description,Quantity,invoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEA...,6,,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEART...,8,,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLA...,6,,3.39,17850,United Kingdom
536365,84029E,RED WOOLLY HOTTIE...,6,,3.39,17850,United Kingdom


## 5. 실전 활용: outdoorProducts 테이블로 Partition
outdoorProducts 테이블을 국가(countryName)를 기준으로 Partition하여 각 국가별 데이터를 병렬로 처리합니다.

In [82]:
# outdoorProducts 테이블을 DataFrame으로 로드
outdoor_products_df = spark.table("outdoorProducts")

# countryName을 기준으로 Partition 생성
partitioned_df = outdoor_products_df.repartition("Country")
print(f"Partition Count After Repartition: {partitioned_df.rdd.getNumPartitions()}")

Partition Count After Repartition: 2


In [83]:
# 국가별 총 판매량 계산
result = partitioned_df.groupBy("Country").sum("quantity")
result

Country,sum(quantity)
Sweden,35637
Singapore,5234
Germany,117448
France,110480
Greece,1556
European Community,497
Belgium,23152
Finland,10666
Malta,944
Unspecified,3300
