In [1]:
import numpy as np
import pandas as pd
import dowhy
from dowhy import CausalModel
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
#| label: threshold-data-generation
#| echo: true
#| fig-cap: "나이에 따라 운동 기준선(Red Line)이 높아지지만, 개인의 랜덤한 성향(점)에 따라 기준선을 넘기도 하고 못 넘기도 합니다."

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

np.random.seed(42)
N = 1000

# 1. Confounder: Age
age = np.random.randint(20, 70, size=N)

# 2. Treatment Assignment (Threshold 방식)
# 아이디어: "운동을 하기 위한 진입장벽(Threshold)"은 나이가 들수록 높아진다.
# Threshold = 나이에 비례하는 어떤 값
threshold = 0.5 * (age - 20) # 예: 20세=0, 70세=25

# 개인의 운동 의지 (Latent Exercise Tendency)
# 평균은 12.5 정도지만, 사람마다 편차(표준편차 10)가 큼
# 이 '개인차(Noise)'가 있어야 overlap이 확보됨!
personal_will = np.random.normal(12.5, 10, size=N)

# 처치 할당: 개인의 의지가 나이로 인한 장벽(Threshold)을 넘었는가?
exercise = (personal_will > threshold).astype(int)

# 3. Outcome: Cholesterol
# 나이가 많을수록 높음, 운동하면 낮아짐(-10)
cholesterol = 2.0 * age - 10 * exercise + np.random.normal(150, 10, size=N)

df_threshold = pd.DataFrame({
    'Age': age, 
    'Threshold': threshold, 
    'Personal_Will': personal_will, 
    'Exercise': exercise, 
    'Cholesterol': cholesterol
})

# ---------------------------------------------------------
# 시각화: Threshold 모델의 작동 원리 확인
# ---------------------------------------------------------
plt.figure(figsize=(10, 6))

# 운동 한 사람(1)과 안 한 사람(0) 시각화
sns.scatterplot(
    x='Age', 
    y='Personal_Will', 
    hue='Exercise', 
    data=df_threshold, 
    palette={0: 'blue', 1: 'red'},
    alpha=0.6
)

# 기준선(Threshold) 그리기
# 이 선보다 위에 있는 점은 빨강(운동 O), 아래는 파랑(운동 X)
plt.plot(age, threshold, color='black', linewidth=2, linestyle='--', label='Exercise Threshold')

plt.title("Treatment Assignment by Age-Dependent Threshold")
plt.xlabel("Age (Confounder)")
plt.ylabel("Latent Exercise Tendency (Personal Will)")
plt.legend(title='Exercise')
plt.grid(True, alpha=0.3)
plt.show()

# 데이터 확인
print(df_threshold.head())

NameError: name 'personal_will' is not defined

In [None]:
# ---------------------------------------------------------
# 1. 데이터 생성 (Binary Treatment)
# ---------------------------------------------------------
np.random.seed(42)
N = 1000

# Confounder: Age
age = np.random.randint(20, 70, size=N)

# Treatment: Exercise (Binary: 0 or 1)
# 나이가 많을수록 운동할 확률(Propensity)이 높아짐 (교란 발생)
exercise_threshold = 0.2 * (age - 20) + 0.5

prob_exercise = 1 / (1 + np.exp(-(age - 45) / 20))
exercise = np.random.binomial(1, prob_exercise)

# Outcome: Cholesterol
# 나이가 많을수록 높음 (+2.0 * Age)
# 운동을 하면 낮아짐 (True Causal Effect = -10)
cholesterol = 2.0 * age - 10 * exercise + np.random.normal(150, 10, size=N)

df = pd.DataFrame({'Age': age, 'Exercise': exercise, 'prob_exercise': prob_exercise, 'Cholesterol': cholesterol})

In [None]:
#| label: age-colored-plot
#| fig-cap: "나이(Age)에 따라 색을 다르게 표시하면, 같은 나이대(비슷한 색)에서는 운동을 할수록 콜레스테롤이 낮아지는 패턴이 보입니다."
#| echo: true

plt.figure(figsize=(8, 6))

# 1. 점 찍기 (색상 기준: Age)
# palette='viridis': 나이가 적을수록 보라색, 많을수록 노란색 계열
sns.scatterplot(
    x='prob_exercise', 
    y='Cholesterol', 
    data=df, 
    hue='Age', 
    palette='viridis', 
    alpha=0.6
)

# 2. 전체적인 (잘못된) 추세선 그리기
# scatter=False로 설정하여 점은 중복해서 찍지 않고 선만 그림
sns.regplot(
    x='prob_exercise', 
    y='Cholesterol', 
    data=df, 
    scatter=False, 
    line_kws={'color': 'red', 'label': 'Naive Trend (Positive)'}
)

plt.title("Exercise vs Cholesterol (Colored by Age)")
plt.legend(title='Age')
plt.show()

In [None]:
# ---------------------------------------------------------
# 2. 단순 비교 (Naive Estimate)
# ---------------------------------------------------------
naive_diff = df[df['Exercise']==1]['Cholesterol'].mean() - df[df['Exercise']==0]['Cholesterol'].mean()
print(f"Naive Difference (Exercise 1 - 0): {naive_diff:.4f}")

In [None]:
# ---------------------------------------------------------
# 3. DoWhy 모델링 및 층화(Stratification) 분석
# ---------------------------------------------------------
model = CausalModel(
    data=df,
    treatment='Exercise',
    outcome='Cholesterol',
    common_causes=['Age']
)

identified_estimand = model.identify_effect()

# Stratification 적용
# num_strata=50: 데이터를 성향 점수(나이)에 따라 50개(나이 개수) 구간으로 나눔
estimate_strat = model.estimate_effect(
    identified_estimand,
    method_name="backdoor.propensity_score_stratification",
    method_params={
        'num_strata': 5
    }
)

print("-" * 30)
print(f"Causal Estimate (Stratification): {estimate_strat.value:.4f}")
print(f"True Causal Effect: -10.0")
print("-" * 30)

In [None]:
# 50개 구간으로 나누기
num_strata = 50

# [수정] duplicates='drop' 옵션 추가
# 경계가 겹치면 중복되는 구간을 하나로 합쳐서 에러를 방지합니다.
df['strata_50'] = pd.qcut(
    df['propensity_score'], 
    q=num_strata, 
    labels=False, 
    duplicates='drop'
)

plt.figure(figsize=(15, 6))
sns.boxplot(x='strata_50', y='Age', data=df, color="skyblue")

plt.xticks(rotation=90, fontsize=8) 
plt.title(f"Age Distribution per Strata (Exact Matching Check)")
plt.xlabel("Propensity Score Strata")
plt.ylabel("Age")
plt.show()

# 실제로 생성된 구간 개수 확인
print(f"Requested strata: {num_strata}")
print(f"Actual strata created: {df['strata_50'].nunique()}")

In [None]:
#| label: refutation-test
#| echo: true
#| fig-cap: "인과 모델의 견고성(Robustness) 검증"

# ---------------------------------------------------------
# 5. 결과 검증 (Refutation Test)
# ---------------------------------------------------------
# 우리가 구한 인과 효과가 정말 믿을만한지 테스트합니다.

print("\n=== Refutation 1: Placebo Treatment (가짜 처치) ===")
# 처치(Exercise)를 무작위로 섞어서 모델에 넣습니다.
# 기대 결과: 인과 효과가 0에 가까워야 함 (운동을 랜덤으로 배정하면 효과가 없어야 하므로)

refute_placebo = model.refute_estimate(
    identified_estimand,
    estimate_strat,
    method_name="placebo_treatment_refuter",
    method_params={"placebo_type": "permute_treatment"}
)

print(refute_placebo)

print("\n=== Refutation 2: Add Random Common Cause (랜덤 변수 추가) ===")
# 데이터에 아무 상관 없는 랜덤 변수를 추가해서 다시 분석합니다.
# 기대 결과: 원래 구한 인과 효과(-10.0 근처)가 크게 변하지 않아야 함

refute_random = model.refute_estimate(
    identified_estimand,
    estimate_strat,
    method_name="random_common_cause"
)

print(refute_random)