In [None]:
import os
import multiprocessing
import logging
import pandas as pd
from sqlalchemy import create_engine

# Set up logging
logging.basicConfig(filename='etl.log', level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

# Function to extract data from multiple sources (e.g., CSV files)
def extract(source):
    try:
        data = pd.read_csv(source)
        logging.info(f"Extracted data from {source}")
        return data
    except Exception as e:
        logging.error(f"Error extracting data from {source}: {str(e)}")
        return None

# Function to transform data (e.g., clean, enrich, aggregate)
def transform(data):
    try:
        # Sample transformation: converting 'amount' column to uppercase
        data['amount'] = data['amount'].str.upper()
        logging.info("Transformed data")
        return data
    except Exception as e:
        logging.error(f"Error transforming data: {str(e)}")
        return None

# Function to load data into a database
def load(data, db_uri):
    try:
        engine = create_engine(db_uri)
        data.to_sql('my_table', engine, if_exists='replace', index=False)
        logging.info("Loaded data into database")
    except Exception as e:
        logging.error(f"Error loading data into database: {str(e)}")

# List of data sources
data_sources = ['data_source1.csv', 'data_source2.csv', 'data_source3.csv']

# Define the number of parallel processes (adjust as needed)
num_processes = 3

if __name__ == '__main__':
    # Create a multiprocessing pool
    pool = multiprocessing.Pool(processes=num_processes)
    
    # Extract data from multiple sources in parallel
    extracted_data = pool.map(extract, data_sources)
    
    # Close the pool
    pool.close()
    pool.join()
    
    # Filter out None values (indicating extraction errors)
    extracted_data = [data for data in extracted_data if data is not None]
    
    if extracted_data:
        # Concatenate the extracted data
        combined_data = pd.concat(extracted_data, ignore_index=True)
        
        # Transform the data
        transformed_data = transform(combined_data)
        
        if transformed_data is not None:
            # Load the transformed data into a database
            db_uri = 'postgresql://username:password@localhost:5432/mydatabase'
            load(transformed_data, db_uri)
    else:
        logging.error("No valid data to process.")


**This code demonstrates several advanced ETL concepts:**

Parallel Processing: It uses the multiprocessing module to extract data from multiple sources concurrently, improving performance.

Error Handling: It includes error handling with detailed logging, ensuring that errors are captured and logged.

Database Interaction: It loads the transformed data into a PostgreSQL database using SQLAlchemy.

Logging: It sets up logging to record extraction, transformation, and loading activities, which helps with monitoring and debugging.

Data Transformation: It applies a simple transformation (changing the 'amount' column to uppercase) as an example. You can customize this transformation to your needs.