<a href="https://colab.research.google.com/github/itsThien/Projects/blob/main/AI_Ready_Enterprise_Data_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import kagglehub
import pandas as pd
import os

def ingest_data():
    """
    Downloads the COVID-19 dataset from Kaggle (imdevskp/corona-virus-report)
    and loads the 'covid_19_clean_complete.csv' file into a Pandas DataFrame.

    Returns:
        df (pd.DataFrame): Loaded dataset
    """
    print("Downloading dataset from Kaggle...")
    path = kagglehub.dataset_download("imdevskp/corona-virus-report")

    # Find the main data file
    data_file = os.path.join(path, "covid_19_clean_complete.csv")

    if not os.path.exists(data_file):
        raise FileNotFoundError(f"Expected file 'covid_19_clean_complete.csv' not found in {path}")

    # Load the dataset
    print("Loading dataset into Pandas...")
    df = pd.read_csv(data_file)

    # Preview schema
    print("\nDataset successfully loaded!")
    print(f"Shape: {df.shape}")
    print("\nColumns:", list(df.columns))
    print("\nPreview:")
    print(df.head(5))

    return df


# Example usage
if __name__ == "__main__":
    df = ingest_data()


In [None]:
import pandas as pd
import numpy as np

def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans the COVID-19 dataset:
    - Converts Date to datetime
    - Fills missing numeric values with 0
    - Fills missing region or country names
    - Removes duplicates
    """

    df = df.copy()
    df.drop_duplicates(inplace=True)

    # Convert date column
    if 'Date' in df.columns:
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

    # Fill missing text fields
    for col in ['Province/State', 'Country/Region']:
        if col in df.columns:
            df[col] = df[col].fillna('Unknown')

    # Fill missing numeric columns
    num_cols = ['Confirmed', 'Deaths', 'Recovered']
    for col in num_cols:
        if col in df.columns:
            df[col] = df[col].fillna(0)

    print("Data cleaned successfully.")
    print(f"Remaining missing values:\n{df.isnull().sum().sort_values(ascending=False).head()}")
    return df


def feature_engineering(df: pd.DataFrame) -> pd.DataFrame:
    """
    Adds useful derived features:
    1. ActiveCases = Confirmed - Deaths - Recovered
    2. MortalityRate = Deaths / Confirmed (as %)
    3. RecoveryRate = Recovered / Confirmed (as %)
    """

    df = df.copy()

    # Derived feature 1: Active cases
    df['ActiveCases'] = df['Confirmed'] - df['Deaths'] - df['Recovered']

    # Derived feature 2: Mortality rate (%)
    df['MortalityRate'] = np.where(df['Confirmed'] > 0, (df['Deaths'] / df['Confirmed']) * 100, 0)

    # Derived feature 3: Recovery rate (%)
    df['RecoveryRate'] = np.where(df['Confirmed'] > 0, (df['Recovered'] / df['Confirmed']) * 100, 0)

    print("Feature engineering completed.")
    print(df[['Date', 'Country/Region', 'Confirmed', 'ActiveCases', 'MortalityRate', 'RecoveryRate']].head())

    return df


In [None]:
if __name__ == "__main__":
    df_raw = ingest_data()
    df_clean = clean_data(df_raw)
    df_final = feature_engineering(df_clean)

    print("\nTransformed dataset preview:")
    print(df_final.head(3))


In [None]:
import os
import pandas as pd
import numpy as np
import kagglehub
import logging
from datetime import datetime


# -----------------------------------------------------------------------------
# LOGGING SETUP
# -----------------------------------------------------------------------------
os.makedirs("logs", exist_ok=True)
log_file = f"logs/pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)

console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
console.setFormatter(formatter)
logging.getLogger().addHandler(console)


# -----------------------------------------------------------------------------
# DATA INGESTION
# -----------------------------------------------------------------------------
def ingest_data():
    """Download the COVID-19 dataset and load it into a DataFrame."""
    logging.info("Starting data ingestion...")

    path = kagglehub.dataset_download("imdevskp/corona-virus-report")

    raw_dir = "data/raw"
    os.makedirs(raw_dir, exist_ok=True)

    file_path = os.path.join(path, "covid_19_clean_complete.csv")
    if not os.path.exists(file_path):
        logging.error("File not found in Kaggle dataset folder.")
        raise FileNotFoundError("covid_19_clean_complete.csv missing.")

    df = pd.read_csv(file_path)
    df.to_csv(os.path.join(raw_dir, "covid_raw.csv"), index=False)
    logging.info(f"Raw data saved to {raw_dir}/covid_raw.csv")

    return df


# -----------------------------------------------------------------------------
# DATA CLEANING
# -----------------------------------------------------------------------------
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean the dataset and handle missing values."""
    logging.info("Cleaning data...")

    df = df.copy()
    df.drop_duplicates(inplace=True)

    if 'Date' in df.columns:
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

    for col in ['Province/State', 'Country/Region']:
        if col in df.columns:
            df[col] = df[col].fillna('Unknown')

    for col in ['Confirmed', 'Deaths', 'Recovered']:
        if col in df.columns:
            df[col] = df[col].fillna(0)

    logging.info(f"Cleaned data shape: {df.shape}")
    logging.info(f"Missing values summary:\n{df.isnull().sum().to_dict()}")

    return df


# -----------------------------------------------------------------------------
# FEATURE ENGINEERING
# -----------------------------------------------------------------------------
def feature_engineering(df: pd.DataFrame) -> pd.DataFrame:
    """Add derived features for analysis."""
    logging.info("Performing feature engineering...")

    df = df.copy()
    df['ActiveCases'] = df['Confirmed'] - df['Deaths'] - df['Recovered']
    df['MortalityRate'] = np.where(df['Confirmed'] > 0, (df['Deaths'] / df['Confirmed']) * 100, 0)
    df['RecoveryRate'] = np.where(df['Confirmed'] > 0, (df['Recovered'] / df['Confirmed']) * 100, 0)

    logging.info("Feature engineering complete. Added: ActiveCases, MortalityRate, RecoveryRate.")
    return df


# -----------------------------------------------------------------------------
# STORAGE
# -----------------------------------------------------------------------------
def save_data(df: pd.DataFrame, stage: str):
    """Save DataFrame to Parquet and CSV in appropriate folder."""
    stage_dir = f"data/{stage}"
    os.makedirs(stage_dir, exist_ok=True)

    csv_path = os.path.join(stage_dir, f"covid_{stage}.csv")
    parquet_path = os.path.join(stage_dir, f"covid_{stage}.parquet")

    df.to_csv(csv_path, index=False)
    df.to_parquet(parquet_path, index=False)

    logging.info(f"Data saved to {csv_path} and {parquet_path}")


# -----------------------------------------------------------------------------
# MAIN PIPELINE
# -----------------------------------------------------------------------------
def run_pipeline():
    logging.info("Pipeline started.")
    try:
        # Ingest
        df_raw = ingest_data()
        save_data(df_raw, "raw")

        # Clean
        df_clean = clean_data(df_raw)
        save_data(df_clean, "processed")

        # Transform
        df_final = feature_engineering(df_clean)
        save_data(df_final, "output")

        logging.info("Pipeline completed successfully.")

    except Exception as e:
        logging.error(f"Pipeline failed: {e}", exc_info=True)


if __name__ == "__main__":
    run_pipeline()


In [None]:
pip install prefect


In [None]:
import os
import pandas as pd
import numpy as np
import kagglehub
import logging
from prefect import task, flow
from datetime import datetime


# -----------------------------------------------------------------------------
# LOGGING SETUP
# -----------------------------------------------------------------------------
os.makedirs("logs", exist_ok=True)
log_file = f"logs/prefect_pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)


# -----------------------------------------------------------------------------
# PREFECT TASKS
# -----------------------------------------------------------------------------
@task(name="Ingest Data")
def ingest_data():
    logging.info("Starting data ingestion...")
    path = kagglehub.dataset_download("imdevskp/corona-virus-report")
    data_file = os.path.join(path, "covid_19_clean_complete.csv")

    df = pd.read_csv(data_file)
    logging.info(f"Data ingested: {df.shape}")
    return df


@task(name="Clean Data")
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    logging.info("Cleaning data...")
    df = df.copy()
    df.drop_duplicates(inplace=True)

    if 'Date' in df.columns:
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')

    for col in ['Province/State', 'Country/Region']:
        if col in df.columns:
            df[col] = df[col].fillna('Unknown')

    for col in ['Confirmed', 'Deaths', 'Recovered']:
        if col in df.columns:
            df[col] = df[col].fillna(0)

    logging.info(f"Cleaned data shape: {df.shape}")
    return df


@task(name="Feature Engineering")
def feature_engineering(df: pd.DataFrame) -> pd.DataFrame:
    logging.info("Performing feature engineering...")
    df = df.copy()
    df['ActiveCases'] = df['Confirmed'] - df['Deaths'] - df['Recovered']
    df['MortalityRate'] = np.where(df['Confirmed'] > 0, (df['Deaths'] / df['Confirmed']) * 100, 0)
    df['RecoveryRate'] = np.where(df['Confirmed'] > 0, (df['Recovered'] / df['Confirmed']) * 100, 0)
    logging.info("Feature engineering complete.")
    return df


@task(name="Save Output")
def save_output(df: pd.DataFrame):
    os.makedirs("data/output", exist_ok=True)
    out_path = f"data/output/covid_output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet"
    df.to_parquet(out_path, index=False)
    logging.info(f"Saved processed data to {out_path}")
    return out_path


# -----------------------------------------------------------------------------
# PREFECT FLOW (DAG)
# -----------------------------------------------------------------------------
@flow(name="COVID Data Pipeline")
def covid_pipeline():
    """Define the Prefect data pipeline DAG."""
    df_raw = ingest_data()
    df_clean = clean_data(df_raw)
    df_transformed = feature_engineering(df_clean)
    save_output(df_transformed)
    logging.info("Prefect pipeline run completed successfully.")


# -----------------------------------------------------------------------------
# ENTRY POINT
# -----------------------------------------------------------------------------
if __name__ == "__main__":
    covid_pipeline()


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_absolute_error
import os
import glob

# ---------------------------------------------------------------------
# Load transformed dataset (output from your Prefect pipeline)
# ---------------------------------------------------------------------
output_dir = "data/output"
parquet_files = glob.glob(os.path.join(output_dir, "*.parquet"))

if not parquet_files:
    raise FileNotFoundError(f"No parquet files found in {output_dir}")

# Find the most recent file
latest_file = max(parquet_files, key=os.path.getctime)

print(f"Loading data from: {latest_file}")
df = pd.read_parquet(latest_file)

# Select numeric features
features = ['ActiveCases', 'Deaths', 'Recovered']
target = 'Confirmed'

X = df[features]
y = df[target]

# ---------------------------------------------------------------------
# Split data into training/testing
# ---------------------------------------------------------------------
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# ---------------------------------------------------------------------
# Train a simple linear regression model
# ---------------------------------------------------------------------
model = LinearRegression()
model.fit(X_train, y_train)

# ---------------------------------------------------------------------
# Evaluate performance
# ---------------------------------------------------------------------
y_pred = model.predict(X_test)

print("Model trained successfully!")
print(f"R² Score: {r2_score(y_test, y_pred):.4f}")
print(f"MAE: {mean_absolute_error(y_test, y_pred):.2f}")

# Example prediction
sample = X_test.iloc[0:1]
predicted_confirmed = model.predict(sample)
print(f"\nPredicted Confirmed Cases for sample:\n{sample}\n→ {predicted_confirmed[0]:.0f}")

In [None]:
yaml_config = """
# Configuration for COVID-19 Data Pipeline

dataset:
  kaggle_id: "imdevskp/corona-virus-report"
  main_file: "covid_19_clean_complete.csv"

paths:
  raw_dir: "data/raw"
  processed_dir: "data/processed"
  output_dir: "data/output"
  logs_dir: "logs"

storage:
  output_format: "parquet"

pipeline:
  test_size: 0.2
  random_state: 42
"""

with open("config.yaml", "w") as f:
    f.write(yaml_config)

print("config.yaml created successfully.")

In [None]:
"""
COVID-19 Data Pipeline
----------------------

A modular data pipeline that ingests, cleans, transforms, and saves COVID-19 data
from Kaggle using configurable parameters defined in `config.yaml`.

This pipeline follows enterprise-grade best practices:
- Configuration management (YAML)
- Logging
- Reproducible directory structure
- PEP8-compliant, modular code
"""

import os
import yaml
import logging
import pandas as pd
import numpy as np
import kagglehub
from datetime import datetime


# -----------------------------------------------------------------------------
# Load Configuration
# -----------------------------------------------------------------------------
def load_config(path="config.yaml"):
    """Load pipeline configuration from YAML file."""
    with open(path, "r") as file:
        config = yaml.safe_load(file)
    return config


# -----------------------------------------------------------------------------
# Logging Setup
# -----------------------------------------------------------------------------
def setup_logging(log_dir: str):
    """Initialize logging system."""
    os.makedirs(log_dir, exist_ok=True)
    log_file = os.path.join(log_dir, f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

    logging.basicConfig(
        filename=log_file,
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
    )
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    console.setFormatter(formatter)
    logging.getLogger().addHandler(console)
    return log_file


# -----------------------------------------------------------------------------
# Data Ingestion
# -----------------------------------------------------------------------------
def ingest_data(dataset_id: str, filename: str, raw_dir: str) -> pd.DataFrame:
    """Download dataset from Kaggle and load it into a DataFrame."""
    logging.info("Starting data ingestion...")
    path = kagglehub.dataset_download(dataset_id)
    file_path = os.path.join(path, filename)

    df = pd.read_csv(file_path)
    os.makedirs(raw_dir, exist_ok=True)
    raw_path = os.path.join(raw_dir, "covid_raw.csv")
    df.to_csv(raw_path, index=False)

    logging.info(f"Raw data saved to {raw_path}")
    return df


# -----------------------------------------------------------------------------
# Data Cleaning
# -----------------------------------------------------------------------------
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean dataset: handle missing values and data types."""
    logging.info("Cleaning data...")

    df = df.copy()
    df.drop_duplicates(inplace=True)

    if "Date" in df.columns:
        df["Date"] = pd.to_datetime(df["Date"], errors="coerce")

    for col in ["Province/State", "Country/Region"]:
        if col in df.columns:
            df[col] = df[col].fillna("Unknown")

    for col in ["Confirmed", "Deaths", "Recovered"]:
        if col in df.columns:
            df[col] = df[col].fillna(0)

    logging.info(f"Cleaned data shape: {df.shape}")
    return df


# -----------------------------------------------------------------------------
# Feature Engineering
# -----------------------------------------------------------------------------
def feature_engineering(df: pd.DataFrame) -> pd.DataFrame:
    """Add derived metrics useful for analysis and AI modeling."""
    logging.info("Performing feature engineering...")
    df = df.copy()

    df["ActiveCases"] = df["Confirmed"] - df["Deaths"] - df["Recovered"]
    df["MortalityRate"] = np.where(df["Confirmed"] > 0,
                                   (df["Deaths"] / df["Confirmed"]) * 100, 0)
    df["RecoveryRate"] = np.where(df["Confirmed"] > 0,
                                  (df["Recovered"] / df["Confirmed"]) * 100, 0)

    logging.info("Feature engineering complete.")
    return df


# -----------------------------------------------------------------------------
# Save Data
# -----------------------------------------------------------------------------
def save_data(df: pd.DataFrame, output_dir: str, output_format: str = "parquet"):
    """Save processed data to specified folder and format."""
    os.makedirs(output_dir, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_path = os.path.join(output_dir, f"covid_output_{timestamp}.{output_format}")

    if output_format == "parquet":
        df.to_parquet(output_path, index=False)
    else:
        df.to_csv(output_path, index=False)

    logging.info(f"Data saved to {output_path}")
    return output_path


# -----------------------------------------------------------------------------
# Main Pipeline
# -----------------------------------------------------------------------------
def run_pipeline(config_path="config.yaml"):
    """Execute full ETL pipeline."""
    config = load_config(config_path)
    setup_logging(config["paths"]["logs_dir"])

    logging.info("Starting COVID-19 data pipeline.")

    df_raw = ingest_data(config["dataset"]["kaggle_id"],
                         config["dataset"]["main_file"],
                         config["paths"]["raw_dir"])

    df_clean = clean_data(df_raw)
    df_final = feature_engineering(df_clean)

    save_data(df_final,
              config["paths"]["output_dir"],
              config["storage"]["output_format"])

    logging.info("Pipeline completed successfully.")


if __name__ == "__main__":
    run_pipeline()


In [None]:
!pip install prefect --quiet
!pip install prefect graphviz --quiet

In [None]:
from prefect import flow, task

@task
def ingest_data():
    print("Ingesting data...")
    return {"raw_data": [1, 2, 3, 4, 5]}

@task
def clean_data(data):
    print("Cleaning data...")
    cleaned = [x for x in data["raw_data"] if x > 2]
    return {"cleaned_data": cleaned}

@task
def feature_engineering(data):
    print("Feature engineering...")
    features = [x**2 for x in data["cleaned_data"]]
    return {"features": features}

@task
def save_output(data):
    print("Saving output...")
    print("Final features:", data["features"])
    return True

@flow
def etl_flow():
    raw = ingest_data()
    cleaned = clean_data(raw)
    feats = feature_engineering(cleaned)
    save_output(feats)


In [None]:
from graphviz import Digraph
from IPython.display import Image, display

def visualize_etl_dag():
    dot = Digraph(comment="ETL Workflow (Prefect)", format='png')
    dot.attr(rankdir='LR', size='6,3')

    # Define nodes
    dot.node("A", "ingest_data")
    dot.node("B", "clean_data")
    dot.node("C", "feature_engineering")
    dot.node("D", "save_output")

    # Define edges
    dot.edges(["AB", "BC", "CD"])

    # Render and display
    dot.render("etl_dag", cleanup=True)
    display(Image("etl_dag.png"))

visualize_etl_dag()


## Enterprise-Grade COVID-19 Data Pipeline Summary

This data pipeline demonstrates several key characteristics of an enterprise-grade solution, focusing on robustness, maintainability, and scalability.

**What makes this pipeline "enterprise-grade":**

*   **Modularity:** The pipeline is broken down into distinct functions for ingestion, cleaning, feature engineering, and saving data. This modularity makes the code easier to understand, test, and maintain.
*   **Configuration Management:** Using a `config.yaml` file externalizes parameters like dataset IDs, file paths, and processing options. This allows for easy modification of pipeline behavior without changing the code, promoting flexibility and reusability.
*   **Logging:** The pipeline incorporates comprehensive logging at various stages. This provides visibility into the pipeline's execution, making it easier to debug issues, monitor performance, and track data flow.
*   **Reproducible Directory Structure:** The pipeline saves data to organized directories (`data/raw`, `data/processed`, `data/output`) based on the processing stage. This standardized structure improves data governance and makes it clear where to find data at different points in the pipeline.
*   **Error Handling:** While not explicitly shown in the Prefect flow definition, the underlying functions include basic error handling (e.g., `FileNotFoundError` in `ingest_data`). In a full enterprise system, more comprehensive error handling and alerting would be implemented.
*   **Use of Libraries:** The pipeline leverages established libraries like pandas, numpy, kagglehub, and Prefect, which are industry standards for data manipulation, access, and workflow orchestration.

**Which best practices were applied:**

*   **Separation of Concerns:** Each function focuses on a single task (ingest, clean, transform, save), adhering to the principle of separation of concerns.
*   **Parameterization:** Using the `config.yaml` file allows for parameterization of the pipeline, making it adaptable to different datasets or configurations without code changes.
*   **Code Readability and Documentation:** The code includes docstrings explaining the purpose of each function, improving readability and maintainability.
*   **Use of Standard File Formats:** Saving data in Parquet format (a columnar storage format) is a best practice for large datasets, offering improved performance for analytical queries.

**How orchestration (using Prefect) improves maintainability:**

Orchestration tools like Prefect provide a framework for defining, scheduling, and monitoring data pipelines as directed acyclic graphs (DAGs). This offers several benefits for maintainability:

*   **Visibility and Monitoring:** Prefect provides a UI that visualizes the pipeline's execution, allowing developers to easily see the status of each task, identify bottlenecks, and diagnose failures.
*   **Error Handling and Retries:** Prefect can automatically handle task failures by retrying them, improving the pipeline's resilience.
*   **Scheduling:** Flows can be scheduled to run automatically at predefined intervals, eliminating the need for manual execution and ensuring data is processed regularly.
*   **Dependency Management:** Prefect explicitly defines the dependencies between tasks, ensuring that tasks are executed in the correct order. This makes the pipeline's logic clear and prevents issues caused by incorrect task sequencing.
*   **Modularity and Reusability:** Tasks defined in Prefect can be reused across different flows, reducing code duplication and improving maintainability.

By applying these practices and leveraging orchestration, the pipeline becomes more robust, easier to manage, and scalable for handling larger datasets and more complex workflows in an enterprise environment.

In [None]:
etl_flow()