# <font color='black'>НИС: регрессионный анализ </font>
## <font color='black'> Синтетический контроль </font>

Источник данных: Alberto Abadie, Alexis Diamond & Jens Hainmueller (2010) Synthetic Control Methods for Comparative Case Studies: Estimating the Effect of California’s Tobacco Control Program, Journal of the American Statistical Association, 105:490, 493-505

Ссылка на статью: https://disk.yandex.ru/i/0ALu3UjyDrNTQw

In [None]:
import pandas as pd
import numpy as np
from matplotlib import style
from matplotlib import pyplot as plt
%matplotlib inline
import plotly.express as px
import plotly.graph_objects as pgo
import seaborn as sns
import statsmodels.formula.api as smf
!pip install linearmodels
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from scipy.optimize import minimize
from scipy.stats import norm

In [None]:
df = pd.read_stata('https://github.com/microsoft/SparseSC/raw/master/replication/smoking.dta')
df = df.drop(columns=["lnincome", "beer", "age15to24"])
df.head()

Предварительно оценим модель DiD, уже знакомую Вам. Сгенерим дамми-переменную, которая отвечает за группу воздействия, и дамми-переменную для периода (1 - после введения антитабачной программы в Калифорнии в 1988, 0 - соответственно, до).    

In [None]:
df['treated'] = df['state'].apply(lambda x: 1 if x == "California" else 0)
df['after'] = df['year'].apply(lambda x: 1 if x > 1988 else 0)

Проинтерпретируйте полученные оценки в модели DiD:

In [None]:
did_model = smf.ols("cigsale ~ after*treated", data=df).fit(cov_type = "HC3")
print(did_model.summary())

Для последующего построения графика преобразуем вид данных таким образом, чтобы каждая строка соответствовала штату, столбец - году.

In [None]:
df_t = df.pivot(index= 'state', columns = 'year', values = "cigsale")
df_t.head()

Построим график динамики показателя продаж сигарет в Калифорнии (группа воздействия) и остальных штатах, составляющих контрольную группу. Для контрольной группы усредним значения показателя продаж сигарет по временным периодам. Какие предварительные выводы Вы можете сделать по этому графику?  

In [None]:
plot_df = df_t.loc[df_t.index == "California"].T.reset_index(drop=False)
plot_df["Untreated"] = df_t.loc[df_t.index != "California"].mean(axis=0).values

fig = px.line(
        data_frame = plot_df,
        x = "year",
        y = ["California","Untreated"], template = 'ggplot2',
        color_discrete_map={
        "California": "red",
        "Untreated": "blue"})

fig.add_trace(
    pgo.Scatter(
        x=[1988,1988],
        y=[plot_df.California.min(),plot_df.Untreated.max()],
        line={
           'dash': 'dash', 'color': 'grey'
        }, name='Tobacco Tax and Health Protection Act'
    ))

fig.update_layout(
        title  = {
            'text':"Dynamics of per-capita cigarette sales",
            'y':0.95,
            'x':0.5,
        },
        legend =  dict(y=1, x= 0.8),
        legend_title = "",
        xaxis_title="Year",
        yaxis_title="Cigarette Sales",
        font = dict(size=10)
)
fig.show()

Реализуем метод синтетического контроля и пересчитаем средний эффект воздействия (ATT):

In [None]:
# Шаг 1 предполагает подготовку данных к реализации метода синтетического контроля

before = df_t.loc[:, df_t.columns <= 1988].values  # данные о продажах сигарет до введения антитабачной программы 1988 г. Мы их используем для получения значений
                                                     # синтетического контроля
after = df_t.loc[:, df_t.columns > 1988].values    # соответственно, данные о продажах сигарет после введения антитабачной программы 1988 г., с которыми мы будем сравнивать
                                                     # полученные значения синтетического контроля для оценки среднего эффекта воздействия (ATT)
treated_idx = df_t.index.get_loc('California')       # получаем индекс строки Калифорнии в массиве данных
control_mask = np.ones(len(df_t), dtype=bool)
control_mask[treated_idx] = False                    # False - для Калифорнии (группы воздействия), остальные штаты в контрольной группе - True
X_control = before[control_mask].T                 # задаем в матрице значения признака продаж сигарет для контрольных штатов

# Шаг 2: находим оптимальные веса для получения синтетического контроля
def loss(w, X, y):
    return np.sqrt(np.mean((X @ w - y)**2))          # зададим функцию потерь через RMSE (отклонения группы воздействия от взвешенной комбинации признаков контрольных штатов)

def constraint(w):
    return np.sum(w) - 1                             # введем ограничение: сумма весов = 1

bounds = [(0, 1)] * X_control.shape[1]               # второе ограничение: вес может принимать значения от 0 до 1
constraints = {'type': 'eq', 'fun': constraint}      # указывает тип ограничения - равенство (equality constraint), то есть, функция ограничения равна 0;
                                                     # ссылаемся через 'fun': constraint на функцию np.sum(w) - 1, заданную ранее

initial_weights = np.ones(X_control.shape[1]) / X_control.shape[1]  # стартовые значения - равные веса, а далее уже минимизируем RMSE
result = minimize(
    fun=loss,
    x0=initial_weights,
    args=(X_control, before[treated_idx]),
    method='SLSQP',           # используем в качестве метода оптимизации Sequential Least Squares Quadratic Programming, так как данный метод учитывает введенные ограничения
    bounds=bounds,
    constraints=constraints,
    options={'maxiter': 1000}
)

weights = result.x

# На шаге 3 получаем значения синтетического контроля с учетом вычисленных весов: делаем это для периода До и После введения воздействия
synthetic_pre = X_control @ weights
synthetic_post = after[control_mask].T @ weights

# Шаг 4: рассчитываем средний эффект воздействия (в данном случае - ATT)
calif_actual_pre = before[treated_idx]
calif_actual_post = after[treated_idx]
treatment_effect = calif_actual_post - synthetic_post

print("Optimization success:", result.success)
print("Pre-period RMSE:", loss(weights, X_control, calif_actual_pre))
print("Average treatment effect on the treated (ATT):", np.mean(treatment_effect))

Построим соответствующий график с учетом значений синтетического контроля:

In [None]:
all_years = df_t.columns.astype(int)
calif_idx = df_t.index.get_loc('California')
control_mask = np.ones(len(df_t), dtype=bool)
control_mask[calif_idx] = False

calif_full = df_t.loc['California'].values
synthetic_full = np.concatenate([
    before[control_mask].T @ weights, # значения синтетического контроля до 1988
    after[control_mask].T @ weights   # значения синтетического контроля после 1988
])

plt.figure(figsize=(12,6))

plt.plot(all_years, calif_full, label='California (Real data)', color='#1f77b4', linewidth=2)
plt.plot(all_years, synthetic_full, label='Synthetic Control', linestyle='--', color='#ff7f0e', linewidth=2)

fill_start_idx = np.where(all_years == 1988)[0][0]
plt.fill_between(all_years[fill_start_idx:],
                 synthetic_full[fill_start_idx:],
                 calif_full[fill_start_idx:],
                 color='red', alpha=0.1, label='Treatment Effect')

plt.axvline(1988, color='red', linestyle=':', alpha=0.7)
plt.title('Cigarette Sales: Real Data vs Synthetic', pad=20)
plt.ylabel('Packs per capita')
plt.xlabel('Year')
plt.legend()
plt.grid(alpha=0.2)
plt.show()

Убедимся, что полученный эффект не является случайным. Для этого реализуем плацебо-тест.

In [None]:
def fit_synthetic_control(X, y):
    def loss(w):
        return np.mean((X @ w - y)**2)

    constraints = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1})
    bounds = [(0, 1) for _ in range(X.shape[1])]
    initial_weights = np.ones(X.shape[1]) / X.shape[1]

    result = minimize(loss, initial_weights,
                     method='SLSQP', bounds=bounds, constraints=constraints)
    return result.x

control_units = np.where(control_mask)[0]
n_control = len(control_units)

placebo_effects = []

for _ in range(n_control):
    placebo_unit = np.random.choice(np.where(control_mask)[0])   # Случайный выбор "ложной" единицы анализа, подвергшейся воздействию, из контрольной группы

    placebo_weights = fit_synthetic_control(X_control, before[placebo_unit])  # Повторяем анализ для единицы анализа плацебо
    placebo_synth = after[control_mask].T @ placebo_weights
    placebo_effect = after[placebo_unit] - placebo_synth
    placebo_effects.append(np.mean(placebo_effect))

# Расчет p-value
att = np.mean(treatment_effect)
p_value = (np.abs(placebo_effects) >= np.abs(att)).mean()  # Какая доля случайных "эффектов" плацебо по величине превышает соответствующий эффект для Калифорнии?
print(f"ATE: {att:.2f}, p-value: {p_value:.4f}")  # Если бы p-value превышал конвенциональный уровень значимости,
                                                 # то можно было бы предполагать случайный эффект, полученный для Калифорнии