# Pipeline Workflow

CausalPy provides a composable pipeline API that chains causal inference steps into a single, reproducible workflow. Instead of manually calling experiment construction, sensitivity analysis, and report generation separately, you can define them as steps in a pipeline.

In [1]:
import pandas as pd
import causalpy as cp

## Manual approach (before pipeline)

Traditionally, a CausalPy analysis involves several sequential steps:

In [2]:
df = (
    cp.load_data("its")
    .assign(date=lambda x: pd.to_datetime(x["date"]))
    .set_index("date")
)
treatment_time = pd.to_datetime("2017-01-01")

seed = 42
model = cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": seed})

# Step 1: Fit the experiment
result = cp.InterruptedTimeSeries(
    df,
    treatment_time,
    formula="y ~ 1 + t",
    model=model,
)

# Step 2: Get effect summary
summary = result.effect_summary()
print(summary.text)

Initializing NUTS using jitter+adapt_diag...


Multiprocess sampling (4 chains in 4 jobs)


NUTS: [beta, y_hat_sigma]


Output()

Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.


Sampling: [beta, y_hat, y_hat_sigma]


Sampling: [y_hat]


Sampling: [y_hat]


Sampling: [y_hat]


Sampling: [y_hat]


Post-period (2017-01-31 00:00:00 to 2019-12-31 00:00:00), the average effect was 0.99 (95% HDI [-3.30, 5.42]), with a posterior probability of an increase of 0.667. The cumulative effect was 35.47 (95% HDI [-118.92, 195.03]); probability of an increase 0.667. Relative to the counterfactual, this equals 1.92% on average (95% HDI [-5.68%, 10.24%]).


## Pipeline approach

The pipeline wraps these steps into a single, declarative workflow. Each step is configured upfront, and the pipeline validates everything before running.

In [3]:
df = (
    cp.load_data("its")
    .assign(date=lambda x: pd.to_datetime(x["date"]))
    .set_index("date")
)

result = cp.Pipeline(
    data=df,
    steps=[
        cp.EstimateEffect(
            method=cp.InterruptedTimeSeries,
            treatment_time=pd.to_datetime("2017-01-01"),
            formula="y ~ 1 + t",
            model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": 42}),
        ),
        cp.GenerateReport(include_plots=False),
    ],
).run()

print("Experiment type:", type(result.experiment).__name__)
print("Effect summary available:", result.effect_summary is not None)
print("Report generated:", result.report is not None)

Initializing NUTS using jitter+adapt_diag...


Multiprocess sampling (4 chains in 4 jobs)


NUTS: [beta, y_hat_sigma]


Output()

Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.


Sampling: [beta, y_hat, y_hat_sigma]


Sampling: [y_hat]


Sampling: [y_hat]


Sampling: [y_hat]


Sampling: [y_hat]


Experiment type: InterruptedTimeSeries
Effect summary available: True
Report generated: True


## Adding sensitivity analysis

The `SensitivityAnalysis` step runs a suite of diagnostic checks against the fitted experiment. Checks are pluggable, and you can choose which ones to run.

In [4]:
result = cp.Pipeline(
    data=df,
    steps=[
        cp.EstimateEffect(
            method=cp.InterruptedTimeSeries,
            treatment_time=pd.to_datetime("2017-01-01"),
            formula="y ~ 1 + t",
            model=cp.pymc_models.LinearRegression(sample_kwargs={"random_seed": 42}),
        ),
        cp.SensitivityAnalysis(
            checks=[
                cp.checks.PlaceboInTime(n_folds=2),
            ]
        ),
        cp.GenerateReport(include_plots=True),
    ],
).run()

print(f"Sensitivity checks run: {len(result.sensitivity_results)}")
for check_result in result.sensitivity_results:
    print(f"  - {check_result.check_name}: {check_result.text[:80]}...")

Initializing NUTS using jitter+adapt_diag...


Multiprocess sampling (4 chains in 4 jobs)


NUTS: [beta, y_hat_sigma]


Output()

Sampling 4 chains for 1_000 tune and 1_000 draw iterations (4_000 + 4_000 draws total) took 1 seconds.


Sampling: [beta, y_hat, y_hat_sigma]


Sampling: [y_hat]


Sampling: [y_hat]


Sampling: [y_hat]


Sampling: [y_hat]


Fold 1 failed to fit (pseudo_treatment_time=2011-01-05 00:00:00), skipping.


Fold 2 failed to fit (pseudo_treatment_time=2014-01-03 00:00:00), skipping.


Sensitivity checks run: 1
  - PlaceboInTime: Placebo-in-time analysis: 0 of 2 folds completed (2 skipped).
INCONCLUSIVE — no ...


## Available checks

CausalPy provides a range of sensitivity checks, each applicable to specific experiment types:

| Check | Applicable methods | Description |
|-------|-------------------|-------------|
| `PlaceboInTime` | ITS, SC | Shifts treatment time backward to test for spurious effects |
| `PriorSensitivity` | All Bayesian | Re-fits with different priors |
| `ConvexHullCheck` | SC | Validates treated values are within control range |
| `PersistenceCheck` | ITS (3-period) | Checks if effects persist after intervention ends |
| `PreTreatmentPlaceboCheck` | Staggered DiD | Validates parallel trends via pre-treatment effects |
| `BandwidthSensitivity` | RD, RKink | Re-fits with multiple bandwidths |
| `LeaveOneOut` | SC | Drops each control unit and refits |
| `PlaceboInSpace` | SC | Treats each control as placebo treated |
| `McCraryDensityTest` | RD | Tests for running variable manipulation |

## Pipeline result

The `PipelineResult` object contains all accumulated outputs:

In [5]:
print("result.experiment      ->", type(result.experiment).__name__)
print("result.effect_summary  ->", type(result.effect_summary).__name__)
print("result.sensitivity_results ->", len(result.sensitivity_results), "checks")
print("result.report          ->", "HTML" if result.report else "None")

result.experiment      -> InterruptedTimeSeries
result.effect_summary  -> EffectSummary
result.sensitivity_results -> 1 checks
result.report          -> HTML


The effect summary provides both a table and prose:

In [6]:
if result.effect_summary is not None:
    print(result.effect_summary.text)
    display(result.effect_summary.table)

Post-period (2017-01-31 00:00:00 to 2019-12-31 00:00:00), the average effect was 0.99 (95% HDI [-3.30, 5.42]), with a posterior probability of an increase of 0.667. The cumulative effect was 35.47 (95% HDI [-118.92, 195.03]); probability of an increase 0.667. Relative to the counterfactual, this equals 1.92% on average (95% HDI [-5.68%, 10.24%]).


Unnamed: 0,mean,median,hdi_lower,hdi_upper,p_gt_0,relative_mean,relative_hdi_lower,relative_hdi_upper
average,0.985237,0.998458,-3.303218,5.417612,0.66725,1.919774,-5.684154,10.243178
cumulative,35.468549,35.944489,-118.915839,195.034044,0.66725,1.919774,-5.684154,10.243178


## Viewing the HTML report

The `GenerateReport` step renders the full analysis — effect summary, diagnostic plots, and sensitivity-check results — into a single HTML report stored in `result.report`. See the {doc}`report generation notebook <report_demo>` for the standalone (non-pipeline) API.

In [7]:
import html as html_module
import warnings

from IPython.display import HTML

with warnings.catch_warnings():
    warnings.filterwarnings(
        "ignore", "Consider using IPython.display.IFrame", UserWarning
    )
    report_widget = HTML(
        '<iframe srcdoc="'
        + html_module.escape(result.report)
        + '" width="100%" height="1000"'
        ' style="border: 1px solid #ddd; border-radius: 6px;"></iframe>'
    )
report_widget