# Refactoring Example - Step 3: Adding behaviour constraints / basic error handling

```mermaid
flowchart TD
    A[Start Pipeline] --> B[Load Data]
    B -->|Success| C[Validate Schema]
    B -->|File Not Found / Corrupted| E1[DataLoadError Logged & Continue]

    C -->|Success| D[Clean Data]
    C -->|Missing Columns| E2[SchemaValidationError Logged & Skip File]

    D -->|Success| F[Transform Data]
    D -->|Invalid Data| E3[DataCleaningError Logged & Drop Invalid Rows]

    F -->|Success| G[Write to Database]
    F -->|Transformation Error| E4[TransformationError Logged & Retry Step]

    G -->|Success| H[End Pipeline]
    G -->|Connection Error| E5[DatabaseWriteError Logged & Retry/Abort]

    E1 --> B
    E2 --> B
    E3 --> D
    E4 --> F
    E5 --> G
```

In [None]:
import pandas as pd
import sqlite3

class DataCleaningPipeline:
    def __init__(self, input_path="data/customers.csv", db_path="data/cleaned_customers.db"):
        self.input_path = input_path
        self.db_path = db_path
        self.df = pd.DataFrame()

    def load_data(self):
        try:
            self.df = pd.read_csv(self.input_path)
            print("Data loaded successfully.")
        except Exception as e:
            print("Something went wrong while loading the CSV.")
            self.df = pd.DataFrame()

    def validate_columns(self):
        try:
            if not all(col in self.df.columns for col in ["id", "name", "email", "signup_date"]):
                print("Missing some columns...")
        except Exception as e:
            print("Could not validate schema for some reason.")

    def clean_data(self):
        try:
            if isinstance(self.df["email"],"str"):
                self.df["email"] = self.df["email"].str.lower()
            else:
                raise WrongDataTypeError("email")
            self.df["signup_date"] = pd.to_datetime(self.df["signup_date"])
            self.df = self.df.dropna(subset=["id", "email"])
            print("Cleaning done.")
        except Exception as e:
            print("Something went wrong during cleaning.")

    def transform_data(self):
        try:
            self.df["days_since_signup"] = (pd.Timestamp.now() - self.df["signup_date"]).dt.days
        except Exception as e:
            print("Failed to calculate days since signup.")

    def save_to_database(self):
        try:
            with sqlite3.connect(self.db_path) as conn:
                self.df.to_sql("customers", conn, if_exists="replace", index=False)
            print("Data written to database.")
        except Exception as e:
            print("Something went wrong while writing to the database.")

    def run(self):
        try:
            print("Starting data cleaning pipeline...")
            self.load_data()
            self.validate_columns()
            self.clean_data()
            self.transform_data()
            self.save_to_database()
            print("Pipeline finished.")

        except Exception as e:
            raise Exception from e