<a href="https://colab.research.google.com/github/sanjeevmanvithvellala/DATA-PIPELINE-DEVELOPMENT/blob/main/etl_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
import os

In [9]:
# ========== 1. Extract ==========
def extract_data(file_path):
    print("[Extract] Loading data from:", file_path)
    return pd.read_csv(file_path)

In [10]:
# ========== 2. Transform ==========
def transform_data(df):
    print("[Transform] Starting data transformation...")

    # Separate features and target (assuming last column is the target)
    X = df.iloc[:, :-1]
    y = df.iloc[:, -1]

    # Identify column types
    numerical_cols = X.select_dtypes(include=["int64", "float64"]).columns
    categorical_cols = X.select_dtypes(include=["object"]).columns

    # Pipelines for transformation
    numeric_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler())
    ])

    categorical_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Combine both pipelines
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_pipeline, numerical_cols),
            ('cat', categorical_pipeline, categorical_cols)
        ])

    X_processed = preprocessor.fit_transform(X)
    print("[Transform] Transformation complete.")

    return X_processed, y

In [11]:
# ========== 3. Load ==========
def load_data(X, y, output_path='processed_data.csv'):
    print("[Load] Saving transformed data...")
    # If X is a NumPy array, convert to DataFrame
    if not isinstance(X, pd.DataFrame):
        X = pd.DataFrame(X.toarray() if hasattr(X, "toarray") else X)

    df_out = pd.concat([X, y.reset_index(drop=True)], axis=1)
    df_out.to_csv(output_path, index=False)
    print("[Load] Data saved to:", output_path)

In [12]:
# ========== Main Execution ==========
def run_etl_pipeline(input_file):
    df = extract_data(input_file)
    X_processed, y = transform_data(df)
    load_data(X_processed, y)

In [19]:
# ========== Example Usage ==========
if __name__ == "__main__":
    # Replace with your file path
    input_csv = "sample_data.csv"

    if os.path.exists(input_csv):
        run_etl_pipeline(input_csv)
    else:
        print("❌ File not found. Please check the path.")

❌ File not found. Please check the path.


# **Testing With the Sample_data.csv provided**

In [14]:
# ========== 1. Extract ==========
def extract_data(file_path):
    print("[Extract] Loading data from:", file_path)
    return pd.read_csv(file_path)


In [15]:
# ========== 2. Transform ==========
def transform_data(df, target_column=None):
    print("[Transform] Starting data transformation...")

    if target_column:
        y = df[target_column]
        X = df.drop(columns=[target_column])
    else:
        X = df.iloc[:, :-1]
        y = df.iloc[:, -1]

    # Identify column types
    numerical_cols = X.select_dtypes(include=["int64", "float64"]).columns
    categorical_cols = X.select_dtypes(include=["object"]).columns

    # Pipelines for transformation
    numeric_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler())
    ])

    categorical_pipeline = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore'))
    ])

    # Combine both pipelines
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_pipeline, numerical_cols),
            ('cat', categorical_pipeline, categorical_cols)
        ])

    X_processed = preprocessor.fit_transform(X)
    print("[Transform] Transformation complete.")

    # Output feature names (optional debug)
    feature_names = preprocessor.get_feature_names_out()
    print("[Transform] Transformed feature names:")
    print(feature_names)

    return X_processed, y

In [16]:
# ========== 3. Load ==========
def load_data(X, y, output_path='processed_data.csv'):
    print("[Load] Saving transformed data...")
    if not isinstance(X, pd.DataFrame):
        X = pd.DataFrame(X.toarray() if hasattr(X, "toarray") else X)

    df_out = pd.concat([X, y.reset_index(drop=True)], axis=1)
    df_out.to_csv(output_path, index=False)
    print("[Load] Data saved to:", output_path)

In [17]:
# ========== Main Execution ==========
def run_etl_pipeline(input_file, target_column="Churn"):
    df = extract_data(input_file)
    X_processed, y = transform_data(df, target_column)
    load_data(X_processed, y)

In [18]:
# ========== Example Usage ==========
if __name__ == "__main__":
    input_csv = "sample_data.csv"

    if os.path.exists(input_csv):
        run_etl_pipeline(input_csv)
    else:
        print("❌ File not found. Please check the path.")

[Extract] Loading data from: sample_data.csv
[Transform] Starting data transformation...
[Transform] Transformation complete.
[Transform] Transformed feature names:
['num__Age' 'num__Income' 'cat__Name_Alice' 'cat__Name_Bob'
 'cat__Name_Charlie' 'cat__Name_Diana' 'cat__Name_Eva' 'cat__Name_Frank'
 'cat__Name_Grace' 'cat__Name_Henry' 'cat__Name_Isla' 'cat__Name_Jake'
 'cat__Gender_Female' 'cat__Gender_Male' 'cat__City_Chicago'
 'cat__City_Los Angeles' 'cat__City_New York' 'cat__City_San Francisco']
[Load] Saving transformed data...
[Load] Data saved to: processed_data.csv
