# 📊 Data Ingestion Pipeline: L7 to OMOP CDM via Airflow


## 🔍 What is Apache Airflow?

Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. Think of it as a workflow orchestrator for your data pipelines.

### Benefits:
- **Modular**: Each task is defined in Python.
- **Scalable**: Handles complex workflows with retries and dependencies.
- **Visual**: Provides a UI to track job execution and dependencies.
    


## 🏗️ Use Case: Ingesting Clinical Data from L7 (Postgres)

The objective is to extract patient data from the L7 Postgres database, apply lightweight transformation to conform with OMOP CDM, and load it into the OMOP-compliant database.
    


## 🧠 How the DAG Works

- **Start** ➡️ **Extract from L7** ➡️ **Transform Data** ➡️ **Load to OMOP DB** ➡️ **End**

Each task in Airflow corresponds to a Python function. The DAG ensures they run in the correct order.
    


## 🧾 Airflow DAG Code Example
    


## 🛠️ Requirements

- Airflow (`pip install apache-airflow`)
- PostgreSQL Driver: `psycopg2`, `sqlalchemy`
- Airflow running with DAG folder configured (`~/airflow/dags/`)

## 📚 Resources

- [Airflow Docs](https://airflow.apache.org/docs/apache-airflow/stable/)
- [OMOP CDM Info](https://ohdsi.github.io/CommonDataModel/)
- [Astronomer Academy](https://www.astronomer.io/learn/)
    

## Code + Explanation:

- Imports Airflow modules and data libraries.

- ✅ You’ll need psycopg2 and sqlalchemy installed (requirements.txt).

- ✅ These will be used to connect to both L7 and OMOP databases.

then: 

- Sets default behavior: 1 retry if a task fails.

and:

- Defines the DAG object.

    - Runs daily. catchup=False means it won’t backfill for past dates.

In [None]:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import pandas as pd
import psycopg2
import sqlalchemy

default_args = {
    'owner': 'harbinger',
    'start_date': days_ago(1),
    'retries': 1
}

dag = DAG(
    'l7_to_omop_ingestion',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    description='Ingest L7 clinical data from Postgres to OMOP CDM staging area'
)
    

## 💡 Required from Harbinger for this section:

| **Need**                                                            | **Why**                                               |
| ------------------------------------------------------------------- | ----------------------------------------------------- |
| ✅ L7 PostgreSQL host, port, database name, user, and password       | To extract source clinical data                       |
| ✅ Table names and sample schema                                     | So you can define `SELECT` queries properly           |
| ✅ OMOP Postgres DB connection (or Redshift/Snowflake, if different) | To load transformed data                              |
| ✅ Mapping from L7 schema → OMOP CDM schema                          | For transforming fields properly                      |
| ✅ Data dictionary if available                                      | To help with transformation logic and standardization |


### Step1: L7 Data Extraction from Postgres

- Connects to the L7 Postgres database.

- Pulls raw patient data.

- Saves it to disk for transformation.

✅ What is NEEDED:

- Actual patient table names

- Column structure

- Filtering logic (e.g., only recent patients?)

In [None]:

def extract_data_from_l7():
    conn = psycopg2.connect(
        host='your-l7-host',
        dbname='l7_database',
        user='your_user',
        password='your_password',
        port=5432
    )
    df = pd.read_sql("SELECT * FROM patient_table", conn)
    df.to_csv('/tmp/l7_patient_data.csv', index=False)
    conn.close()
    

### Step 2: Transform to OMOP format

- Simple transform example: lowercase column names.

- We can replace this with logic that maps to OMOP CDM specs.

✅ Needed:

- Mapping sheet: L7 columns to OMOP columns

- Any standard terminologies (LOINC, SNOMED, etc.) needed

In [None]:

def transform_data():
    df = pd.read_csv('/tmp/l7_patient_data.csv')
    df.columns = [col.lower() for col in df.columns]  # sample transformation
    df.to_csv('/tmp/transformed_patient_data.csv', index=False)
    

### Step 3: Load to OMOP

- Loads to the person table in the OMOP schema.

✅ Needed:

- The target schema and table (e.g., staging or production)

- Whether deduplication or upserts are required

- Which layer this sits in (Bronze, Silver, Gold?)

In [None]:

def load_to_omop():
    engine = sqlalchemy.create_engine('postgresql://omop_user:password@omop-host:5432/omop_db')
    df = pd.read_csv('/tmp/transformed_patient_data.csv')
    df.to_sql('person', engine, if_exists='append', index=False)
    

In [None]:

extract_task = PythonOperator(
    task_id='extract_l7',
    python_callable=extract_data_from_l7,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_l7',
    python_callable=transform_data,
    dag=dag
)

load_task = PythonOperator(
    task_id='load_omop',
    python_callable=load_to_omop,
    dag=dag
)

extract_task >> transform_task >> load_task
    

## Summary of What is Needed from Harbinger 

| **Input Needed**                         | **Why**                                      |
| ---------------------------------------- | -------------------------------------------- |
| ✅ L7 connection details                  | To extract raw data                          |
| ✅ Table/schema information               | What and how much data to extract            |
| ✅ Transformation logic                   | L7 → OMOP mapping                            |
| ✅ OMOP target DB + credentials           | Where to load the data                       |
| ✅ Data refresh frequency                 | How often DAG should run (daily/weekly)      |
| ✅ Data dictionary (if available)         | Ensures field alignment and semantic clarity |
| ✅ Any PII/PHI concerns or masking needed | For compliance with HIPAA                    |
