# Developing Robust ETL Pipelines for Data Science Projects

## Data Ingestion
Extract data from a CSV file

In [None]:
import pandas as pd

# Function to extract data from a CSV file
def extract_data(file_path):
    try:
        data = pd.read_csv(file_path)
        print(f"Data extracted from {file_path}")
        return data
    except Exception as e:
        print(f"Error in extraction: {e}")
        return None

# Extract employee data
employee_data = extract_data('/content/employees_data.csv')

# Print the first few rows of the data
if employee_data is not None:
    print(employee_data.head())

## Data Transformation
- Handling Missing Data: Remove or fill in missing values.
- Creating Derived Features: Make new columns, like salary bands or age groups.
- Encoding Categories: Change data like department names into a format computers can use.

In [None]:
# Function to transform employee data 
def transform_data(data):
    try:
        
        # Ensure salary and age are numeric and handle any errors
        data['Salary'] = pd.to_numeric(data['Salary'], errors='coerce')
        data['Age'] = pd.to_numeric(data['Age'], errors='coerce')

        # Remove rows with missing values
        data = data.dropna(subset=['Salary', 'Age', 'Department'])

        # Create salary bands
        data['Salary_band'] = pd.cut(data['Salary'], bins=[0, 60000, 90000, 120000, 1500000], labels=['Low', 'Medium', 'High', 'Very High'])

        # Create age groups
        data['Age_group'] = pd.cut(data['Age'], bins=[0, 30, 40, 50, 60], labels=['Young', 'Middle-aged', 'Senior', 'Older'])

        # Convert department to categorical
        data['Department'] = data['Department'].astype('category')

        print("Data transformation complete")
        return data
    except Exception as e:
        print(f"Error in transformation: {e}")
        return None

employee_data = extract_employee_data('/content/employees_data.csv')

# Transform the employee data
if employee_data is not None:
    transformed_employee_data = transform_data(employee_data)

    # Print the first few rows of the transformed data
    print(transformed_employee_data.head())

## Data Storage
Load data into a database. This makes it easy to search and analyze.
Here is SQLite DB used. It is a lightweight database that stores data. Create a table called employees in the SQLite database, then insert the transformed data into this table.

In [None]:
import sqlite3

# Function to load transformed data into SQLite database
def load_data_to_db(data, db_name='employee_data.db'):
    try:
        # Connect to SQLite database (or create it if it doesn't exist)
        conn = sqlite3.connect(db_name)
        cursor = conn.cursor()

        # Create table if it doesn't exist
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS employees (
                employee_id INTEGER PRIMARY KEY,
                first_name TEXT,
                last_name TEXT,
                salary REAL,
                age INTEGER,
                department TEXT,
                salary_band TEXT,
                age_group TEXT
            )
        ''')

        # Insert data into the employees table
        data.to_sql('employees', conn, if_exists='replace', index=False)

        # Commit and close the connection
        conn.commit()
        print(f"Data loaded into {db_name} successfully")

        # Query the data to verify it was loaded
        query = "SELECT * FROM employees"
        result = pd.read_sql(query, conn)
        print("\nData loaded into the database:")
        print(result.head())  # Print the first few rows of the data from the database

        conn.close()
    except Exception as e:
        print(f"Error in loading data: {e}")

load_data_to_db(transformed_employee_data)

## Running the Complete ETL Pipeline
The pipeline will get the employee data. It will clean and change the data. Finally, it will save the data in the database.

In [None]:
def run_etl_pipeline(file_path, db_name='employee_data.db'):
    # Extract
    data = extract_employee_data(file_path)
    if data is not None:
        # Transform
        transformed_data = transform_employee_data(data)
        if transformed_data is not None:
            # Load
            load_data_to_db(transformed_data, db_name)

# Run the ETL pipeline
run_etl_pipeline('/content/employees_data.csv', 'employee_data.db')