# Spark SQL 기반 탐색적 데이터 분석 (EDA)

## 학습 목표
1. 데이터 준비 및 테이블 생성
2. 데이터 탐색 및 변환
3. Temporary View 생성 및 활용
4. 데이터 분석 및 시각화
5. 데이터 결합 및 집계

In [None]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

# Colab 환경인지 확인
IN_COLAB = "google.colab" in sys.modules
# Colab이면 /content, 아니면 현재 작업 디렉토리를 BASE로 설정
BASE = "/content" if IN_COLAB else os.getcwd()

# SparkSession 생성 (PySpark 애플리케이션의 진입점)
spark = SparkSession.builder.appName("Spark_SQL_EDA").getOrCreate()

## 1. 데이터 준비 및 테이블 생성

### MovieRatings 테이블
- 사용자가 제출한 2천만 개가 조금 넘는 영화 등급 레코드
- 타임스탬프는 UTC 시간으로 기록된 integer 값

In [None]:
# Colab에서 데이터 다운로드 및 압축 해제
if IN_COLAB:
    import gdown
    import zipfile
    
    file_id = "1qq6l62PWQa9FMdSmWyqyqPxqUBN_7Teo"
    zip_filename = "data.zip"
    extract_dir = os.path.join(BASE, "unzipped_data")
    
    gdown.download(f"https://drive.google.com/uc?id={file_id}", zip_filename, quiet=False)
    
    with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
    
    print(f"압축이 '{extract_dir}' 폴더에 풀렸습니다.")

In [None]:
# movieRatings 테이블 생성
RATINGS_PATH = os.path.join(BASE, "unzipped_data", "ratings.csv")

spark.sql("DROP TABLE IF EXISTS movieRatings")

spark.sql(f"""
CREATE TABLE movieRatings (
  userId INT,
  movieId INT,
  rating FLOAT,
  timeRecorded INT
)
USING csv
OPTIONS (
  path '{RATINGS_PATH}',
  header 'true'
)
""")

In [None]:
# 테이블 확인
spark.sql("SELECT * FROM movieRatings").limit(5).show()

In [None]:
# 전체 레코드 수
spark.sql("SELECT COUNT(*) FROM movieRatings").show()

## 2. 데이터 탐색 및 변환

### CAST를 사용한 타임스탬프 변환

In [None]:
# timeRecorded를 timestamp 형식으로 변환
spark.sql("""
SELECT
    rating,
    CAST(timeRecorded AS timestamp) AS timestamp
FROM movieRatings
""").limit(5).show()

## 3. Temporary View 생성 및 활용

### 월별 평균 평점 View

In [None]:
# ratingsByMonth View 생성
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW ratingsByMonth AS
SELECT
  ROUND(AVG(rating), 3) AS avgRating,
  MONTH(CAST(timeRecorded AS timestamp)) AS month
FROM movieRatings
GROUP BY month
""")

In [None]:
# View 조회
spark.sql("""
SELECT *
FROM ratingsByMonth
ORDER BY avgRating
""").limit(5).show()

## 4. 데이터 분석 및 시각화

### 평점 분포 히스토그램

In [None]:
# 전체 레코드의 1% 샘플링
total_count = spark.sql("SELECT COUNT(*) AS count FROM movieRatings").collect()[0]['count']
percent_count = int(total_count * 0.01)

result = spark.sql("SELECT rating FROM movieRatings").orderBy(rand()).limit(percent_count)

In [None]:
import matplotlib.pyplot as plt

pandas_df = result.toPandas()

plt.figure(figsize=(10, 6))
plt.hist(pandas_df['rating'], bins=10, color='blue', alpha=0.7, edgecolor='black')
plt.title('Distribution of Movie Ratings (1% Sample)', fontsize=14)
plt.xlabel('Rating', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.grid(axis='y', alpha=0.3)
plt.show()

## 5. 데이터 결합 및 집계

### OutdoorProducts 테이블 생성

In [None]:
# 새 SparkSession (필요 시)
# spark.stop()
# spark = SparkSession.builder.appName("New EDA Session").getOrCreate()

In [None]:
# outdoorProducts 테이블 생성
RETAIL_PATH = os.path.join(BASE, "OnlineRetail.csv")

spark.sql("DROP TABLE IF EXISTS outdoorProducts")

spark.sql(f"""
CREATE TABLE outdoorProducts (
    InvoiceNo INT,
    StockCode STRING,
    Description STRING,
    Quantity INT,
    invoiceDate TIMESTAMP,
    UnitPrice DOUBLE,
    CustomerID INT,
    Country STRING
)
USING csv
OPTIONS (
    path '{RETAIL_PATH}',
    header 'true'
)
""")

In [None]:
spark.sql("SELECT * FROM outdoorProducts").limit(5).show()

In [None]:
spark.sql("SELECT COUNT(*) FROM outdoorProducts").show()

### 음수 수량 데이터 탐색

In [None]:
# quantity가 0보다 작은 행의 수
result = spark.sql("""
SELECT COUNT(quantity) AS negative_quantity_count
FROM outdoorProducts
WHERE quantity < 0
""")

print(f"음수 수량: {result.collect()[0]['negative_quantity_count']}")

### Sales View 생성 (quantity > 0, totalAmount 계산)

In [None]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW sales AS
SELECT stockCode, quantity, unitPrice,
  country AS countryName,
  ROUND(quantity * unitPrice, 2) AS totalAmount
FROM outdoorProducts
WHERE quantity > 0
""")

In [None]:
# totalAmount 내림차순
spark.sql("SELECT * FROM sales ORDER BY totalAmount DESC").limit(5).show()

In [None]:
# 고유 국가 목록
spark.sql("SELECT DISTINCT countryName FROM sales").show(truncate=False)

### 국가별 총 판매량 View

In [None]:
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW salesQuants AS
SELECT
  country AS countryName,
  SUM(quantity) AS totalQuantity
FROM outdoorProducts
GROUP BY country
ORDER BY totalQuantity DESC
""")

In [None]:
spark.sql("SELECT * FROM salesQuants").limit(5).show()

### CountryCodes 테이블과 JOIN

In [None]:
# countryCodes 테이블 생성
COUNTRIES_PATH = os.path.join(BASE, "countries_iso3166b.csv")

spark.sql("DROP TABLE IF EXISTS countryCodes")

spark.sql(f"""
CREATE TABLE countryCodes
USING csv
OPTIONS (
  path '{COUNTRIES_PATH}',
  header 'true'
)
""")

In [None]:
spark.sql("DESC countryCodes").show()

In [None]:
spark.sql("SELECT * FROM countryCodes").limit(5).show()

### JOIN 및 원형 차트 시각화

In [None]:
result = spark.sql("""
SELECT
    sq.countryName AS countryName,
    sq.totalQuantity AS totalQuantity,
    cc.iso3 AS alpha3Code
FROM salesQuants sq
JOIN countryCodes cc
  ON sq.countryName = cc.country_common
ORDER BY totalQuantity DESC
""")

result.limit(10).show()

In [None]:
# Pandas로 변환
pandas_df = result.toPandas()

labels = pandas_df['alpha3Code']
sizes = pandas_df['totalQuantity']

In [None]:
# 원형 차트
plt.figure(figsize=(8, 8))
plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140)
plt.title('Total Sales by Country (3-letter ISO ID)', fontsize=14)
plt.axis('equal')
plt.show()

### 1% 이하 국가는 "Others"로 합치기

In [None]:
total = sizes.sum()
threshold = total * 0.01

new_labels = []
new_sizes = []
others_size = 0

for label, size in zip(labels, sizes):
    if size < threshold:
        others_size += size
    else:
        new_labels.append(label)
        new_sizes.append(size)

if others_size > 0:
    new_labels.append("Others")
    new_sizes.append(others_size)

print(f"Labels: {new_labels}")
print(f"Sizes: {new_sizes}")

In [None]:
# 분리된 원형 차트
plt.figure(figsize=(10, 10))
explode = [0.05] * len(new_labels)

wedges, texts, autotexts = plt.pie(
    new_sizes,
    labels=new_labels,
    autopct='%1.1f%%',
    explode=explode,
    labeldistance=1.2,
    startangle=140
)

for text in texts:
    text.set_fontsize(11)
for autotext in autotexts:
    autotext.set_fontsize(10)

plt.title('Total Sales by Country (Including "Others")', fontsize=16)
plt.axis('equal')
plt.tight_layout()
plt.show()

## 추가 데이터셋 예시 (CarPrice, McDonald)

### CarPrice 데이터셋
- 자동차 가격에 영향을 미치는 주요 특징 분석
- Spark DataFrame으로 로드 후 상관관계 분석, 시각화

```python
# CarPrice CSV 로드
df_car = spark.read.csv(os.path.join(BASE, "data", "clean_df.csv"), header=True, inferSchema=True)
df_car.createOrReplaceTempView("carPrice")

# 상관관계 분석 (Spark DataFrame API 또는 Pandas 변환)
pandas_car = df_car.toPandas()
corr = pandas_car.corr()
# seaborn heatmap, regplot 등으로 시각화
```

### McDonald 데이터셋
- 맥도날드 메뉴 영양 분석

```python
df_mcd = spark.read.csv(os.path.join(BASE, "data", "mcdonals_nutrition.csv"), header=True, inferSchema=True)
df_mcd.createOrReplaceTempView("mcdonalds")

# Sodium 분석
spark.sql("SELECT Category, MAX(Sodium) as max_sodium FROM mcdonalds GROUP BY Category").show()

# 시각화: swarm plot, regplot 등 (Pandas 변환 후)
```

In [None]:
spark.stop()