# Lab 03 (Advanced) — Python for AI

중급 내용을 충분히 이해한 상태에서, 실제 ML 프로젝트에서 자주 마주치는 **고급 Python 패턴**과 **사이킷런 파이프라인** 설계까지 다룹니다.

**학습 목표:**
1. Python 고급 패턴 — Generator, Decorator, Context Manager, `functools`
2. NumPy 고급 — `einsum`, SVD, PCA from scratch, 메모리 효율
3. Pandas 고급 — 메서드 체이닝, MultiIndex, Rolling 윈도우, 메모리 최적화
4. ML 전처리 파이프라인 — `sklearn` Pipeline + 커스텀 Transformer

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time, functools, warnings
from typing import Callable, Iterator, Any

warnings.filterwarnings("ignore")
plt.rcParams["font.family"] = "AppleGothic"
plt.rcParams["axes.unicode_minus"] = False
sns.set_theme(style="whitegrid")
rng = np.random.default_rng(42)

---
## Part 1. Python 고급 패턴

### 1-1. Generator — 메모리 효율적인 대용량 데이터 처리

리스트는 모든 값을 메모리에 올리지만, Generator는 **하나씩 필요할 때 생성**합니다.

In [None]:
import sys

# 메모리 비교
list_obj = [x**2 for x in range(100_000)]
gen_obj  = (x**2 for x in range(100_000))

print(f"List   크기: {sys.getsizeof(list_obj):>10,} bytes")
print(f"Generator 크기: {sys.getsizeof(gen_obj):>10,} bytes")

In [None]:
def batch_generator(data: np.ndarray, batch_size: int) -> Iterator[np.ndarray]:
    """대용량 배열을 배치 단위로 순회하는 Generator."""
    n = len(data)
    for start in range(0, n, batch_size):
        yield data[start : start + batch_size]


dataset = rng.random((1000, 4))   # 1000개 샘플

batch_means = []
for i, batch in enumerate(batch_generator(dataset, batch_size=100)):
    batch_means.append(batch.mean())

print(f"총 배치 수  : {len(batch_means)}")
print(f"배치별 평균 : {[round(m, 4) for m in batch_means]}")

In [None]:
# yield from — Generator 위임
def read_chunks(arrays: list, chunk_size: int) -> Iterator[np.ndarray]:
    for arr in arrays:
        yield from batch_generator(arr, chunk_size)

parts = [rng.random((200, 4)) for _ in range(3)]   # 3개 파일 시뮬레이션
total_rows = sum(chunk.shape[0] for chunk in read_chunks(parts, 50))
print(f"총 처리 행 수: {total_rows}")   # 600

### 1-2. Decorator — 함수에 기능 덧붙이기

In [None]:
def timer(func: Callable) -> Callable:
    """함수 실행 시간을 측정하는 Decorator."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - t0
        print(f"[timer] {func.__name__} → {elapsed*1000:.2f} ms")
        return result
    return wrapper


def retry(max_attempts: int = 3, exceptions: tuple = (Exception,)):
    """실패 시 최대 max_attempts회 재시도하는 Decorator 팩토리."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    print(f"  attempt {attempt}/{max_attempts} failed: {e}")
                    if attempt == max_attempts:
                        raise
        return wrapper
    return decorator


@timer
def slow_matmul(n: int) -> float:
    A = rng.random((n, n))
    B = rng.random((n, n))
    return (A @ B).sum()

slow_matmul(500)

In [None]:
# lru_cache — 메모이제이션 (반복 호출 캐싱)
@functools.lru_cache(maxsize=None)
def fibonacci(n: int) -> int:
    if n < 2:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

%timeit fibonacci(35)   # 두 번째 호출부터 캐시 적중

### 1-3. Context Manager

In [None]:
from contextlib import contextmanager

@contextmanager
def timer_block(label: str):
    """코드 블록의 실행 시간을 측정하는 Context Manager."""
    t0 = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - t0
        print(f"[{label}] {elapsed*1000:.2f} ms")


@contextmanager
def numpy_seed(seed: int):
    """블록 내부에서만 재현 가능한 랜덤 시드를 사용."""
    state = np.random.get_state()
    np.random.seed(seed)
    try:
        yield
    finally:
        np.random.set_state(state)  # 블록 종료 후 원래 상태 복원


with timer_block("행렬 생성"):
    X = rng.random((2000, 2000))

with timer_block("행렬 곱"):
    _ = X @ X.T

with numpy_seed(0):
    a = np.random.rand(3)
with numpy_seed(0):
    b = np.random.rand(3)
print("재현 확인:", np.allclose(a, b))

### 1-4. `functools.partial` — 함수 부분 적용

In [None]:
from functools import partial

def clip_outliers(series: pd.Series, low_q: float, high_q: float) -> pd.Series:
    lo = series.quantile(low_q)
    hi = series.quantile(high_q)
    return series.clip(lo, hi)

# 자주 쓰는 설정을 미리 고정
clip_iqr  = partial(clip_outliers, low_q=0.25, high_q=0.75)
clip_5_95 = partial(clip_outliers, low_q=0.05, high_q=0.95)

s = pd.Series(rng.standard_normal(200))
print(f"원본 범위  : [{s.min():.2f}, {s.max():.2f}]")
print(f"IQR 클립   : [{clip_iqr(s).min():.2f}, {clip_iqr(s).max():.2f}]")
print(f"5-95% 클립 : [{clip_5_95(s).min():.2f}, {clip_5_95(s).max():.2f}]")

> **Exercise 1.** 아래 `memoize` 데코레이터를 완성하세요.  
> `lru_cache`를 사용하지 않고 딕셔너리로 직접 캐시를 구현합니다.

In [None]:
def memoize(func: Callable) -> Callable:
    cache = {}
    @functools.wraps(func)
    def wrapper(*args):
        # Your code here
        pass
    return wrapper

@memoize
def fib(n: int) -> int:
    if n < 2: return n
    return fib(n-1) + fib(n-2)

print(fib(10))   # 55
print(fib(30))   # 832040

---
## Part 2. NumPy 고급

### 2-1. `einsum` — 유연한 텐서 연산

In [None]:
A = rng.random((3, 4))
B = rng.random((4, 5))
v = rng.random(4)

# 행렬 곱 A @ B
assert np.allclose(np.einsum("ij,jk->ik", A, B), A @ B)
print("행렬 곱      :", np.einsum("ij,jk->ik", A, B).shape)   # (3, 5)

# 내적 (dot product)
assert np.allclose(np.einsum("i,i->", v, v), v @ v)

# 전치 (transpose)
assert np.allclose(np.einsum("ij->ji", A), A.T)

# 배치 행렬 곱 (batch matmul) — 3D 텐서
batch_A = rng.random((8, 3, 4))
batch_B = rng.random((8, 4, 5))
result  = np.einsum("bij,bjk->bik", batch_A, batch_B)
print("배치 행렬 곱 :", result.shape)   # (8, 3, 5)

# Attention score (Q @ K^T) — Transformer의 핵심 연산
Q = rng.random((8, 10, 64))  # (batch, seq, d_k)
K = rng.random((8, 10, 64))
scores = np.einsum("bid,bjd->bij", Q, K)   # (batch, seq, seq)
print("Attention score:", scores.shape)

### 2-2. SVD & PCA from Scratch

In [None]:
from sklearn.datasets import load_digits

digits = load_digits()
X_raw = digits.data.astype(float)   # (1797, 64)
y     = digits.target
print("데이터 형태:", X_raw.shape)

In [None]:
# PCA from scratch (SVD 기반)
def pca_from_scratch(X: np.ndarray, n_components: int):
    # 1. 중심화
    mean = X.mean(axis=0)
    X_c  = X - mean

    # 2. SVD 분해: X = U @ S_diag @ Vt
    U, S, Vt = np.linalg.svd(X_c, full_matrices=False)

    # 3. 주성분 선택 (V의 처음 k열)
    components = Vt[:n_components]            # (k, d)

    # 4. 투영
    X_pca = X_c @ components.T               # (n, k)

    # 5. 분산 설명 비율
    explained_var_ratio = (S**2 / (S**2).sum())[:n_components]

    return X_pca, components, explained_var_ratio


X_pca, components, var_ratio = pca_from_scratch(X_raw, n_components=2)
print(f"PC1 설명 분산: {var_ratio[0]*100:.1f}%")
print(f"PC2 설명 분산: {var_ratio[1]*100:.1f}%")

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(13, 5))

# PCA 산점도
scatter = axes[0].scatter(X_pca[:, 0], X_pca[:, 1],
                          c=y, cmap="tab10", alpha=0.6, s=15)
plt.colorbar(scatter, ax=axes[0], label="digit")
axes[0].set_title("PCA (2D) — Digits dataset")
axes[0].set_xlabel(f"PC1 ({var_ratio[0]*100:.1f}%)")
axes[0].set_ylabel(f"PC2 ({var_ratio[1]*100:.1f}%)")

# Scree Plot (성분 수 결정)
_, _, all_var = pca_from_scratch(X_raw, n_components=30)
cumvar = np.cumsum(all_var) * 100
axes[1].plot(range(1, len(cumvar)+1), cumvar, marker="o", markersize=4)
axes[1].axhline(90, color="red", linestyle="--", label="90% 기준선")
axes[1].set_title("Scree Plot (누적 분산)")
axes[1].set_xlabel("주성분 수")
axes[1].set_ylabel("누적 분산 설명율 (%)")
axes[1].legend()

plt.tight_layout()
plt.show()

### 2-3. 메모리 레이아웃과 효율

In [None]:
A = rng.random((1000, 1000))

# C-order(행 우선) vs F-order(열 우선)
A_c = np.ascontiguousarray(A)          # C-order
A_f = np.asfortranarray(A)             # F-order

with timer_block("행 합 (C-order)"):  _ = A_c.sum(axis=1)
with timer_block("행 합 (F-order)"):  _ = A_f.sum(axis=1)

with timer_block("열 합 (C-order)"):  _ = A_c.sum(axis=0)
with timer_block("열 합 (F-order)"):  _ = A_f.sum(axis=0)

In [None]:
# float64 vs float32 — 메모리 절반, 속도 향상
X64 = rng.random((5000, 5000)).astype(np.float64)
X32 = X64.astype(np.float32)

print(f"float64 메모리: {X64.nbytes / 1e6:.0f} MB")
print(f"float32 메모리: {X32.nbytes / 1e6:.0f} MB")

with timer_block("float64 합"):  _ = X64.sum()
with timer_block("float32 합"):  _ = X32.sum()

> **Exercise 2.** SVD를 이용한 **저차원 행렬 근사(Image Compression)**를 구현하세요.  
> 임의의 (100×100) 행렬을 생성하고, 상위 k개 특이값만 사용해 재구성한 뒤  
> k=5, 20, 50일 때 원본 대비 **재구성 오차(Frobenius norm)** 를 비교하세요.

In [None]:
# Your code here


---
## Part 3. Pandas 고급

In [None]:
titanic = sns.load_dataset("titanic")

### 3-1. 메서드 체이닝 + `pipe()`

In [None]:
def fill_age(df: pd.DataFrame) -> pd.DataFrame:
    """pclass · sex 그룹 중앙값으로 나이 결측 대체."""
    df = df.copy()
    df["age"] = df.groupby(["pclass", "sex"])["age"].transform(
        lambda s: s.fillna(s.median())
    )
    return df

def add_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["family_size"] = df["sibsp"] + df["parch"] + 1
    df["is_alone"]    = (df["family_size"] == 1).astype(int)
    df["log_fare"]    = np.log1p(df["fare"])
    df["age_group"]   = pd.cut(df["age"],
                               bins=[0, 12, 18, 60, 100],
                               labels=["child", "teen", "adult", "senior"])
    return df


# 하나의 체인으로 전처리 파이프라인 구성
df = (
    titanic
    .drop(columns=["deck", "embark_town", "alive", "who", "adult_male", "alone"])
    .drop_duplicates()
    .pipe(fill_age)
    .assign(embarked=lambda d: d["embarked"].fillna(d["embarked"].mode()[0]))
    .pipe(add_features)
    .reset_index(drop=True)
)

print(df.shape)
df.head(3)

### 3-2. MultiIndex

In [None]:
# 다중 집계 → MultiIndex 열 생성
stats = (
    df.groupby(["pclass", "sex"])
    [["survived", "age", "fare"]]
    .agg(["mean", "std", "count"])
    .round(2)
)
stats

In [None]:
# MultiIndex 접근
print(stats[("survived", "mean")])           # 생존율
print()
print(stats.loc[(1, "female"), :])           # 1등석 여성

# 열 평탄화 (flatten)
stats.columns = ["_".join(col) for col in stats.columns]
print("\n평탄화 후:", stats.columns.tolist())

### 3-3. Rolling 윈도우 — 시계열 스타일 분석

In [None]:
# 승객 번호 기준으로 rolling 통계 (시계열 아이디어를 표 데이터에 적용)
df_sorted = df.sort_values("fare").reset_index(drop=True)

df_sorted["rolling_surv_mean"]  = df_sorted["survived"].rolling(window=30, min_periods=1).mean()
df_sorted["expanding_surv_mean"] = df_sorted["survived"].expanding().mean()

fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(df_sorted["fare"],               df_sorted["rolling_surv_mean"],
        label="Rolling(30)", color="steelblue")
ax.plot(df_sorted["fare"],               df_sorted["expanding_surv_mean"],
        label="Expanding",   color="tomato", linestyle="--")
ax.set_xscale("log")
ax.set_title("운임에 따른 생존율 (Rolling / Expanding)")
ax.set_xlabel("운임 (log scale)")
ax.set_ylabel("생존율")
ax.legend()
plt.tight_layout()
plt.show()

### 3-4. 메모리 최적화

In [None]:
def memory_usage_mb(df: pd.DataFrame) -> float:
    return df.memory_usage(deep=True).sum() / 1e6

print(f"최적화 전: {memory_usage_mb(df):.3f} MB")
print(df.dtypes)

def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    for col in df.select_dtypes("float64").columns:
        df[col] = df[col].astype("float32")
    for col in df.select_dtypes("int64").columns:
        df[col] = pd.to_numeric(df[col], downcast="integer")
    for col in df.select_dtypes("object").columns:
        if df[col].nunique() / len(df) < 0.5:   # 카디널리티 낮으면 category
            df[col] = df[col].astype("category")
    return df

df_opt = optimize_dtypes(df)
print(f"\n최적화 후: {memory_usage_mb(df_opt):.3f} MB")
print(f"절감율   : {(1 - memory_usage_mb(df_opt)/memory_usage_mb(df))*100:.1f}%")

> **Exercise 3.** `df`에서 `pclass × age_group` 조합별로  
> 생존율(`mean`), 인원수(`count`), 평균 운임(`fare mean`)을 계산하고  
> 히트맵으로 **생존율**을 시각화하세요.

In [None]:
# Your code here


---
## Part 4. ML 전처리 파이프라인

### 4-1. 커스텀 sklearn Transformer

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

class OutlierClipper(BaseEstimator, TransformerMixin):
    """각 열을 quantile 기반으로 클리핑하는 Transformer."""

    def __init__(self, low_q: float = 0.05, high_q: float = 0.95):
        self.low_q  = low_q
        self.high_q = high_q

    def fit(self, X, y=None):
        X = np.asarray(X)
        self.lowers_ = np.nanquantile(X, self.low_q, axis=0)
        self.uppers_ = np.nanquantile(X, self.high_q, axis=0)
        return self

    def transform(self, X, y=None):
        X = np.asarray(X, dtype=float)
        return np.clip(X, self.lowers_, self.uppers_)


class TargetMeanEncoder(BaseEstimator, TransformerMixin):
    """범주형 변수를 타겟 평균값으로 인코딩."""

    def __init__(self, smoothing: float = 1.0):
        self.smoothing = smoothing

    def fit(self, X, y):
        X = np.asarray(X).ravel()
        y = np.asarray(y)
        self.global_mean_ = y.mean()
        self.encoding_ = {}
        for cat in np.unique(X):
            mask = X == cat
            n    = mask.sum()
            cat_mean = y[mask].mean()
            # Smoothed encoding
            self.encoding_[cat] = (
                (n * cat_mean + self.smoothing * self.global_mean_)
                / (n + self.smoothing)
            )
        return self

    def transform(self, X, y=None):
        X = np.asarray(X).ravel()
        return np.array([self.encoding_.get(v, self.global_mean_) for v in X]).reshape(-1, 1)


# 동작 확인
clipper = OutlierClipper()
sample  = rng.standard_normal((100, 3))
clipped = clipper.fit_transform(sample)
print("원본 범위:",  sample.min().round(2),  sample.max().round(2))
print("클립 범위:", clipped.min().round(2), clipped.max().round(2))

### 4-2. 완전한 ML 파이프라인 구축

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.compose  import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.impute    import SimpleImputer
from sklearn.ensemble  import RandomForestClassifier
from sklearn.model_selection import StratifiedKFold, cross_validate
from sklearn.metrics   import make_scorer, roc_auc_score

# 특성 정의
NUM_COLS = ["age", "fare", "family_size", "log_fare"]
CAT_COLS = ["sex", "embarked"]
ORD_COLS = ["pclass"]   # 순서가 있는 범주형

# 수치형 파이프라인: 결측 대체 → 이상치 클리핑 → 표준화
num_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("clipper", OutlierClipper(0.02, 0.98)),
    ("scaler",  StandardScaler()),
])

# 범주형 파이프라인: 결측 대체 → 순서 인코딩
cat_pipe = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)),
])

# ColumnTransformer로 합치기
preprocessor = ColumnTransformer([
    ("num", num_pipe, NUM_COLS),
    ("cat", cat_pipe, CAT_COLS + ORD_COLS),
])

# 전체 파이프라인 (전처리 + 모델)
pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("model",        RandomForestClassifier(n_estimators=200, random_state=42)),
])

print(pipeline)

### 4-3. Cross-Validation

In [None]:
feature_cols = NUM_COLS + CAT_COLS + ORD_COLS
X = df[feature_cols]
y = df["survived"]

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scoring = {
    "accuracy": "accuracy",
    "roc_auc":  "roc_auc",
    "f1":       "f1",
}

with timer_block("5-Fold CV"):
    cv_results = cross_validate(pipeline, X, y, cv=cv, scoring=scoring, n_jobs=-1)

results_df = pd.DataFrame({
    "Accuracy": cv_results["test_accuracy"],
    "ROC-AUC":  cv_results["test_roc_auc"],
    "F1":       cv_results["test_f1"],
})

print(results_df.round(4))
print("\n평균:")
print(results_df.mean().round(4))

### 4-4. 특성 중요도 시각화

In [None]:
# 전체 데이터로 학습 후 특성 중요도 추출
pipeline.fit(X, y)
rf_model    = pipeline.named_steps["model"]
feature_names = (
    NUM_COLS
    + CAT_COLS
    + ORD_COLS
)

importances = pd.Series(rf_model.feature_importances_, index=feature_names)
importances = importances.sort_values(ascending=True)

fig, ax = plt.subplots(figsize=(8, 4))
bars = ax.barh(importances.index, importances.values,
               color=plt.cm.RdYlGn(importances.values / importances.max()))
ax.set_title("Random Forest 특성 중요도")
ax.set_xlabel("Importance")
plt.tight_layout()
plt.show()

### 4-5. 학습 곡선 (Learning Curve)

In [None]:
from sklearn.model_selection import learning_curve

train_sizes, train_scores, val_scores = learning_curve(
    pipeline, X, y,
    train_sizes=np.linspace(0.1, 1.0, 10),
    cv=5, scoring="roc_auc", n_jobs=-1
)

train_mean = train_scores.mean(axis=1)
train_std  = train_scores.std(axis=1)
val_mean   = val_scores.mean(axis=1)
val_std    = val_scores.std(axis=1)

fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(train_sizes, train_mean, "o-", color="steelblue", label="Train ROC-AUC")
ax.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.2, color="steelblue")
ax.plot(train_sizes, val_mean,   "o-", color="tomato",   label="Val ROC-AUC")
ax.fill_between(train_sizes, val_mean - val_std,   val_mean + val_std,   alpha=0.2, color="tomato")
ax.set_title("Learning Curve (Random Forest)")
ax.set_xlabel("학습 샘플 수")
ax.set_ylabel("ROC-AUC")
ax.legend()
ax.grid(True)
plt.tight_layout()
plt.show()

> **Exercise 4.** `TargetMeanEncoder`를 파이프라인에 적용해보세요.  
> `sex`, `embarked` 열을 OrdinalEncoder 대신 `TargetMeanEncoder`로 교체하고  
> 5-Fold CV ROC-AUC를 비교하세요.

In [None]:
# Your code here


---
## Summary

| 파트 | 핵심 개념 |
|---|---|
| Python 고급 | Generator(메모리 효율), Decorator(timer/retry/memoize), Context Manager, `functools.partial/lru_cache` |
| NumPy 고급 | `einsum`(배치 연산/Attention), SVD/PCA from scratch, 메모리 레이아웃(C/F-order), float32 최적화 |
| Pandas 고급 | 메서드 체이닝+`pipe()`, MultiIndex, Rolling/Expanding 윈도우, `category` dtype 메모리 최적화 |
| ML 파이프라인 | 커스텀 Transformer(`OutlierClipper`, `TargetMeanEncoder`), `ColumnTransformer`, 5-Fold CV, Learning Curve |