In [None]:
# ==============================================================================
#                  Comprehensive Guide: Data Science in ETL
# ==============================================================================
# This notebook provides detailed examples of Data Science practices in
# each phase of the ETL (Extract, Transform, Load) process.
#
# Each section includes:
# - Practice Description.
# - Practical example with a business scenario.
# - Executable Python code (with simulated example data where necessary).
# - Recommended tools.
# - Key objective of the practice.
#
# Make sure you have the necessary libraries installed!
# You can install them with:
# pip install pandas numpy requests sqlalchemy
# (Note: `psycopg2-binary`, `snowflake-sqlalchemy`, and `boto3` are for real connections
# and would require installation. The script uses mocks to run without them).
# ==============================================================================

# Import libraries
import pandas as pd
import numpy as np
import requests
import json
from datetime import datetime, date, timedelta

# To simulate psycopg2 and sqlalchemy without a real DB
from sqlalchemy import create_engine, text
from unittest.mock import MagicMock
import os # For deleting temporary files

# --- Configuration for Simulations (Mocks) ---
# These mocks allow the code to run without real databases or
# configured cloud services. If you wish to use real connections, ensure
# you have the corresponding libraries installed and comment out/remove
# the relevant mocking sections.

class MockCursor:
    def execute(self, query, params=None):
        # print(f"Mock DB: Executing query: {query}") # Uncomment to see mock logs
        if "SELECT" in query.upper():
            return self._mock_data()
        return None

    def fetchall(self):
        # Simulated data for a SELECT in Practice 1
        return [
            (1, 101, datetime(2023, 1, 10), 50.00, 'John Doe', 'john.doe@example.com', 'Laptop', 'Electronics', 1000.00),
            (2, 102, datetime(2023, 1, 15), 120.50, 'Jane Smith', 'jane.smith@example.com', 'Mouse', 'Electronics', 25.00),
            (3, 101, datetime(2023, 2, 1), 75.00, 'John Doe', 'john.doe@example.com', 'Keyboard', 'Peripherals', 70.00),
            (4, 103, datetime(2023, 2, 5), 30.00, 'Alice Wonderland', 'alice@example.com', 'Webcam', 'Accessories', 50.00)
        ]

    def close(self):
        # print("Mock DB: Cursor closed.") # Uncomment to see mock logs
        pass

    def _mock_data(self):
        pass

class MockConnection:
    def cursor(self):
        return MockCursor()
    def commit(self):
        # print("Mock DB: Committing transaction.") # Uncomment to see mock logs
        pass
    def close(self):
        # print("Mock DB: Connection closed.") # Uncomment to see mock logs
        pass

def mock_connect(*args, **kwargs):
    # print("Mock DB: Connecting to database.") # Uncomment to see mock logs
    return MockConnection()

# Override `psycopg2.connect` with our mock if psycopg2 is not available
try:
    import psycopg2
    # If psycopg2 is installed, a real connection will be attempted.
    # To force the mock even with psycopg2 installed, uncomment the line:
    # psycopg2.connect = mock_connect
    print("`psycopg2` installed. Real connection will be attempted; if it fails, simulated data will be used.")
except ImportError:
    print("`psycopg2` not installed. Using mock for database connections.")
    import sys
    sys.modules['psycopg2'] = MagicMock()
    sys.modules['psycopg2.extras'] = MagicMock()
    sys.modules['psycopg2'].connect = mock_connect

# Mock for SQLAlchemy `create_engine` and `Connection`
class MockSAConnection:
    def execute(self, statement, parameters=None):
        # print(f"Mock SQLAlchemy: Executing statement: {statement}") # Uncomment to see logs
        if isinstance(statement, text) and "SELECT" in statement.text.upper():
            return MagicMock(fetchall=lambda: [("2023-11-20",)]) # Simulates a result for SELECT in DELETE
        return MagicMock()

    def commit(self):
        # print("Mock SQLAlchemy: Committing transaction.") # Uncomment to see logs
        pass

    def close(self):
        # print("Mock SQLAlchemy: Connection closed.") # Uncomment to see logs
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

class MockSAEngine:
    def connect(self):
        return MockSAConnection()

    def dispose(self):
        # print("Mock SQLAlchemy: Engine disposed.") # Uncomment to see logs
        pass

    # `pd.to_sql` expects the `con` object to have an `execute` or `dialect` method
    # We simplify this for our mock
    def to_sql(self, df, name, con, if_exists, index):
        # print(f"Mock SQLAlchemy: pd.to_sql called. Table: {name}, if_exists: {if_exists}") # Uncomment to see logs
        pass # Simulates writing

# Replace the actual SQLAlchemy create_engine function with our mock
_original_create_engine = create_engine
create_engine = lambda *args, **kwargs: MockSAEngine()

# Replace `pd.DataFrame.to_sql` with a mock if you're using the mock engine
_original_to_sql_method = pd.DataFrame.to_sql
def mocked_to_sql(df_self, name, con, if_exists='fail', index=True, chunksize=None, dtype=None, method=None):
    if isinstance(con, MockSAEngine) or (hasattr(con, 'connect') and isinstance(con.connect(), MockSAConnection)):
        con.to_sql(df_self, name, con, if_exists, index) # Calls the mock engine's to_sql method
    else:
        _original_to_sql_method(df_self, name, con, if_exists, index, chunksize, dtype, method)
pd.DataFrame.to_sql = mocked_to_sql


# Mock for boto3 (AWS S3) if you don't have AWS credentials configured
try:
    import boto3
    # If boto3 is installed, a real connection will be attempted.
    # To force the mock even with boto3 installed, uncomment the line:
    # boto3.client = MagicMock(return_value=MagicMock(upload_file=lambda *args, **kwargs: print(f"Mock S3: Uploading file {args[0]} to s3://{args[1]}/{args[2]}")))
    print("`boto3` installed. Real connection will be attempted; if it fails, it will be reported.")
except ImportError:
    print("`boto3` not installed. Using mock for S3 operations.")
    import sys
    sys.modules['boto3'] = MagicMock()
    sys.modules['boto3'].client = MagicMock(return_value=MagicMock(upload_file=lambda *args, **kwargs: print(f"Mock S3: Uploading file {args[0]} to s3://{args[1]}/{args[2]}")))

print("\n--- Mocks configured for execution without external dependencies if not installed! ---")
print("If you wish to use real connections, ensure you have the libraries installed and adjust mocking and credential sections.\n")


# Comprehensive Guide: Data Science in ETL

The ETL (Extract, Transform, Load) stage is fundamental in any Data Science project. It ensures that data is clean, consistent, and ready for analysis and modeling. This notebook will guide you through detailed examples of Data Science practices in each ETL phase.

## 1. Extract Phase

**Objective:** Collect data from various sources and formats, ensuring its integrity.

### Practice 1: Extracting Data from a Relational Database

**Description:** Connect to an SQL database to extract specific tables or results from complex queries.

**Practical Example:**
Imagine you are a Data Scientist at an e-commerce company and need to analyze customer purchasing behavior. Transaction data is stored in a PostgreSQL database.

*   **Data Sources:** PostgreSQL database (`orders` table, `customers` table, `products` table).
*   **Data to Extract:** `order_id`, `customer_id`, `order_date`, `total_amount` from `orders`, along with customer and product details.

In [None]:
print("--- PHASE 1: EXTRACT ---\n")

### Practice 1: Extracting Data from a Relational Database ###

print("### Practice 1: Extracting Data from a Relational Database ###")

# Connection configuration (using mock if no real DB exists)
# If you have a real DB (e.g., PostgreSQL), replace with your credentials:
db_config = {
    'host': 'localhost',
    'database': 'testdb',
    'user': 'user',
    'password': 'password'
}

# SQL Extraction Query
sql_query = """
SELECT
    o.order_id,
    o.customer_id,
    o.order_date,
    o.total_amount,
    c.name AS customer_name,
    c.email AS customer_email,
    p.product_name,
    p.category,
    p.price
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.order_date >= '2023-01-01';
"""

df_raw_orders = pd.DataFrame() # Initialize

try:
    # Attempt real connection with psycopg2 if available, otherwise use mock
    import psycopg2 # Import here to handle ImportError from mock
    conn = psycopg2.connect(**db_config)
    cur = conn.cursor()
    cur.execute(sql_query)
    # Get column names from cursor description
    column_names = [desc[0] for desc in cur.description]
    raw_data = cur.fetchall()
    df_raw_orders = pd.DataFrame(raw_data, columns=column_names)
    cur.close()
    conn.close()
    print("Database extraction successful (simulated or real).")
except Exception as e:
    print(f"Error connecting to DB or executing query: {e}. Using simulated data.")
    # Simulate data if connection fails or mock is active
    df_raw_orders = pd.DataFrame({
        'order_id': [1, 2, 3, 4],
        'customer_id': [101, 102, 101, 103],
        'order_date': [datetime(2023, 1, 10), datetime(2023, 1, 15), datetime(2023, 2, 1), datetime(2023, 2, 5)],
        'total_amount': [50.00, 120.50, 75.00, 30.00],
        'customer_name': ['John Doe', 'Jane Smith', 'John Doe', 'Alice Wonderland'],
        'customer_email': ['john.doe@example.com', 'jane.smith@example.com', 'john.doe@example.com', 'alice@example.com'],
        'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Webcam'],
        'category': ['Electronics', 'Electronics', 'Peripherals', 'Accessories'],
        'price': [1000.00, 25.00, 70.00, 50.00]
    })

print("\nFirst rows of data extracted from the database:")
print(df_raw_orders.head())

# Tools to Use: Python (psycopg2, sqlalchemy), Apache Airflow, DBeaver.
# Objective: Obtain a flat and combined dataset with order, customer, and product information.


### Practice 2: Extracting Data from a Web API

**Description:** Connect to an API (Application Programming Interface) to obtain data in JSON or XML format.

**Practical Example:**
You are a Data Scientist at a marketing company and need to get Google Trends search trend data for a specific set of keywords.

*   **Data Sources:** Google Trends API (or a similar API).
*   **Data to Extract:** Search volumes by keyword, region, and time period.

In [None]:
print("\n### Practice 2: Extracting Data from a Web API ###")

keywords = ['data science', 'machine learning', 'artificial intelligence']
start_date_api = '2023-01-01'
end_date_api = '2023-01-07'
api_key = "YOUR_API_KEY" # Replace if the real API requires authentication

all_trends_data = []

print(f"\nExtracting API data for keywords: {', '.join(keywords)}")

# API response simulation
mock_api_responses = {
    'data science': {'trend_score': [80, 85, 82, 88, 90, 87, 91], 'dates': [f'2023-01-0{i+1}' for i in range(7)]},
    'machine learning': {'trend_score': [70, 72, 75, 71, 78, 76, 79], 'dates': [f'2023-01-0{i+1}' for i in range(7)]},
    'artificial intelligence': {'trend_score': [95, 93, 98, 96, 99, 94, 97], 'dates': [f'2023-01-0{i+1}' for i in range(7)]}
}

for keyword in keywords:
    url = f"https://api.example.com/trends?keyword={keyword}&start_date={start_date_api}&end_date={end_date_api}&api_key={api_key}"
    try:
        # In a real case, you would use requests.get(url)
        # response = requests.get(url)
        # if response.status_code == 200:
        #     trend_data = response.json()
        if keyword in mock_api_responses:
            trend_data = mock_api_responses[keyword]
            all_trends_data.append({'keyword': keyword, 'data': trend_data})
            # print(f"  - Data for '{keyword}' successfully obtained (simulated).") # Uncomment to see logs
        else:
            print(f"  - Simulated error: No data found for '{keyword}'.")
    except requests.exceptions.RequestException as e:
        print(f"  - Connection error for {keyword}: {e}")
    except json.JSONDecodeError:
        print(f"  - JSON decoding error for {keyword}.")

# Convert data to a DataFrame for better visualization
df_trends_list = []
for entry in all_trends_data:
    keyword = entry['keyword']
    data = entry['data']
    for i in range(len(data['dates'])):
        df_trends_list.append({
            'keyword': keyword,
            'date': pd.to_datetime(data['dates'][i]),
            'trend_score': data['trend_score'][i]
        })
df_trends = pd.DataFrame(df_trends_list)

print("\nFirst rows of trend data (simulated API):")
print(df_trends.head())

# Tools to Use: Python (requests, json), pytrends, Apache Airflow.
# Objective: Obtain search trend data to identify patterns of interest.


## 2. Transform Phase

**Objective:** Clean, enrich, and structure data to make it suitable for analysis.

### Practice 3: Data Cleaning and Standardization

**Description:** Identify and correct errors, inconsistencies, and missing values, and standardize formats.

**Practical Example:**
After extracting customer data from the database (Example 1), you notice that customer names have typos, email addresses might be in mixed case, and registration dates have varied formats.

In [None]:
print("\n--- PHASE 2: TRANSFORM ---\n")

### Practice 3: Data Cleaning and Standardization ###

print("### Practice 3: Data Cleaning and Standardization ###")

# Simulated input data (with inconsistencies)
data_customers_raw = {
    'customer_id': [1, 2, 3, 4, 5, 6],
    'customer_name': ["John Doe", "  jane smith  ", "Alice B.", "BOB BROWN", "Charlie D.", "Eve"],
    'customer_email': ["john.doe@example.com", "JANE.SMITH@EXAMPLE.COM", "aliceb@example.com", "bob.brown@example.com", None, "eve@example.com "],
    'registration_date': ["2023-01-15 10:30:00", "Jan 16, 2023", "17/01/2023", "2023-01-18", None, "2023-01-19"]
}
df_customers_raw = pd.DataFrame(data_customers_raw)

print("Raw Customer DataFrame (with inconsistencies):")
print(df_customers_raw)

# Perform transformations
df_cleaned_customers = df_customers_raw.copy()

# 1. Remove extra whitespace from `customer_name` and `customer_email`
df_cleaned_customers['customer_name'] = df_cleaned_customers['customer_name'].str.strip()
df_cleaned_customers['customer_email'] = df_cleaned_customers['customer_email'].str.strip()

# 2. Convert email addresses to lowercase
df_cleaned_customers['customer_email'] = df_cleaned_customers['customer_email'].str.lower()

# 3. Standardize date format and handle nulls
df_cleaned_customers['registration_date'] = pd.to_datetime(df_cleaned_customers['registration_date'], errors='coerce')
df_cleaned_customers['registration_date'].fillna(datetime.now(), inplace=True) # Impute with current date
df_cleaned_customers['registration_date'] = df_cleaned_customers['registration_date'].dt.strftime('%Y-%m-%d')

# 4. Impute missing values in `customer_email` (e.g., if None after strip)
df_cleaned_customers['customer_email'].fillna('unknown@example.com', inplace=True)

# 5. (Optional) Standardize name capitalization (e.g., Title Case)
df_cleaned_customers['customer_name'] = df_cleaned_customers['customer_name'].str.title()

print("\nCleaned and standardized Customer DataFrame:")
print(df_cleaned_customers)

# Tools to Use: Python (pandas), Talend, OpenRefine.
# Objective: Ensure customer data is consistent and high-quality.


### Practice 4: Data Enrichment and Feature Creation

**Description:** Combine data from different sources and generate new features that can be useful for modeling.

**Practical Example:**
You have transaction data (Example 1) and want to calculate customer metrics like "customer lifetime value" (CLV) or "purchase frequency."

In [None]:
print("\n### Practice 4: Data Enrichment and Feature Creation ###")

# Example order data (using df_raw_orders from Example 1 for more realism)
df_orders_for_features = df_raw_orders[['customer_id', 'order_date', 'total_amount']].copy()
df_orders_for_features['order_date'] = pd.to_datetime(df_orders_for_features['order_date'])

# Add some additional data to show more features
new_orders_data = pd.DataFrame({
    'customer_id': [101, 102, 103],
    'order_date': [datetime(2023, 3, 1), datetime(2023, 3, 10), datetime(2023, 3, 15)],
    'total_amount': [90.00, 150.00, 60.00]
})
df_orders_for_features = pd.concat([df_orders_for_features, new_orders_data], ignore_index=True)
df_orders_for_features['order_date'] = pd.to_datetime(df_orders_for_features['order_date']) # Ensure datetime type

print("Order DataFrame for feature engineering:")
print(df_orders_for_features)

# Current date for calculating tenure and recency. Fix a date for reproducibility.
current_analysis_date = pd.to_datetime('2023-04-01')

# 1. Total number of orders and total amount spent per customer
df_customer_features = df_orders_for_features.groupby('customer_id')['total_amount'].agg(
    num_orders=('total_amount', 'count'),
    total_spent=('total_amount', 'sum')
).reset_index()

# 2. Average spent per order
df_customer_features['avg_spent_per_order'] = df_customer_features['total_spent'] / df_customer_features['num_orders']

# 3. Customer Tenure (days since first order)
first_order_date = df_orders_for_features.groupby('customer_id')['order_date'].min().reset_index()
first_order_date.rename(columns={'order_date': 'first_order_date'}, inplace=True)
df_customer_features = pd.merge(df_customer_features, first_order_date, on='customer_id')
df_customer_features['customer_tenure_days'] = (current_analysis_date - df_customer_features['first_order_date']).dt.days

# 4. Calculate Recency (days since last order)
last_order_date = df_orders_for_features.groupby('customer_id')['order_date'].max().reset_index()
last_order_date.rename(columns={'order_date': 'last_order_date'}, inplace=True)
df_customer_features = pd.merge(df_customer_features, last_order_date, on='customer_id')
df_customer_features['recency_days'] = (current_analysis_date - df_customer_features['last_order_date']).dt.days

# 5. Basic RFM Segmentation (Recency, Frequency, Monetary)
# For the example, we will use quartiles.
# Ensure there are enough unique values for qcut.
# If there are few values, qcut may fail. `duplicates='drop'` helps handle this.
for col in ['recency_days', 'num_orders', 'total_spent']:
    if df_customer_features[col].nunique() < 4:
        print(f"Warning: Not enough unique values in '{col}' for a 4-quartile qcut. Assigning values in a simplified way.")
        df_customer_features[f'{col}_score'] = pd.qcut(df_customer_features[col], q=df_customer_features[col].nunique(), labels=False, duplicates='drop') + 1
    else:
        labels_q = [1, 2, 3, 4]
        if col == 'recency_days': # Lower recency is better, so we invert labels
            labels_q = [4, 3, 2, 1]
        df_customer_features[f'{col}_score'] = pd.qcut(df_customer_features[col], 4, labels=labels_q, duplicates='drop')

df_customer_features['rfm_score'] = df_customer_features['recency_days_score'].astype(str) + \
                                   df_customer_features['num_orders_score'].astype(str) + \
                                   df_customer_features['total_spent_score'].astype(str)


print("\nDataFrame with enriched customer features:")
print(df_customer_features)

# Tools to Use: Python (pandas, numpy, scikit-learn), Snowflake, Apache Spark.
# Objective: Create enriched features for segmentation models, CLV, churn, etc.


### Practice 5: Data Aggregation and Summarization

**Description:** Reduce data granularity by summarizing it to a higher level to facilitate analysis or populate dashboards.

**Practical Example:**
You need to build a dashboard that shows daily and monthly sales by product category. Transaction data is at the order item level.

In [None]:
print("\n### Practice 5: Data Aggregation and Summarization ###")

# Simulated input data (order items and products)
data_items = {
    'order_date': [datetime(2023, 1, 1), datetime(2023, 1, 1), datetime(2023, 1, 2), datetime(2023, 1, 2), datetime(2023, 2, 1), datetime(2023, 2, 1)],
    'product_id': [10, 20, 10, 30, 20, 10],
    'quantity': [2, 1, 1, 3, 2, 1],
    'price': [10.00, 25.00, 10.00, 5.00, 25.00, 10.00]
}
df_order_items = pd.DataFrame(data_items)

data_products_agg = {
    'product_id': [10, 20, 30],
    'product_name': ['Laptop', 'Mouse', 'Keyboard'],
    'category': ['Electronics', 'Electronics', 'Peripherals']
}
df_products_agg = pd.DataFrame(data_products_agg)

print("Order Items DataFrame:")
print(df_order_items)
print("\nProducts DataFrame:")
print(df_products_agg)

# 1. Join `df_order_items` with `df_products_agg` to get the category
df_merged_sales_agg = pd.merge(df_order_items, df_products_agg, on='product_id')

# 2. Calculate revenue per item
df_merged_sales_agg['revenue'] = df_merged_sales_agg['quantity'] * df_merged_sales_agg['price']

print("\nSales DataFrame with categories and revenue:")
print(df_merged_sales_agg)

# 3. Daily aggregation by category
df_daily_revenue = df_merged_sales_agg.groupby([df_merged_sales_agg['order_date'].dt.date, 'category'])['revenue'].sum().reset_index()
df_daily_revenue.rename(columns={'order_date': 'date'}, inplace=True)
print("\nDaily Revenue by Category:")
print(df_daily_revenue)

# 4. Monthly aggregation by category
df_merged_sales_agg['month'] = df_merged_sales_agg['order_date'].dt.to_period('M')
df_monthly_revenue = df_merged_sales_agg.groupby(['month', 'category'])['revenue'].sum().reset_index()
print("\nMonthly Revenue by Category:")
print(df_monthly_revenue)

# Tools to Use: Python (pandas), Snowflake, Tableau.
# Objective: Provide summarized data for BI dashboards.


## 3. Load Phase

**Objective:** Store the transformed data in a suitable destination for final use, whether it's a data warehouse, data lake, or file system.

### Practice 6: Incremental Loading to a Data Warehouse

**Description:** Insert only new or modified records into the destination, instead of reloading all data each time. This is efficient for large volumes of constantly changing data.

**Practical Example:**
After transforming daily transaction data (Example 5), you need to load it into a fact table (`fact_sales`) in your Data Warehouse (e.g., Snowflake). You only want to add records for the current day, not duplicate historical data.

In [None]:
print("\n--- PHASE 3: LOAD ---\n")

### Practice 6: Incremental Loading to a Data Warehouse ###

print("### Practice 6: Incremental Loading to a Data Warehouse ###")

# Transformed daily sales data (using df_daily_revenue from Example 5)
df_daily_sales_to_load = df_daily_revenue.copy()
df_daily_sales_to_load['sale_date'] = df_daily_sales_to_load['date'] # Rename for consistency
del df_daily_sales_to_load['date'] # Remove original column

# Add an additional metric column for more realism
df_daily_sales_to_load['num_transactions'] = (df_daily_sales_to_load['total_revenue'] / 10).astype(int) # Simulated
df_daily_sales_to_load.rename(columns={'total_revenue': 'total_revenue'}, inplace=True) # Keep name

print("\nDaily data to load into the Data Warehouse:")
print(df_daily_sales_to_load)

# Connection configuration to Snowflake (or any other SQL DB)
# Replace with real credentials if a real connection is desired.
user = "YOUR_SNOWFLAKE_USER"
password = "YOUR_SNOWFLAKE_PASSWORD"
account = "YOUR_SNOWFLAKE_ACCOUNT"
warehouse = "YOUR_SNOWFLAKE_WAREHOUSE"
database = "YOUR_SNOWFLAKE_DATABASE"
schema = "YOUR_SNOWFLAKE_SCHEMA"

# Create the SQLAlchemy connection URL. Will use the mock if not configured.
snowflake_url = f"snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}"
engine = create_engine(snowflake_url) # This will use our mock if no real connection exists

try:
    with engine.connect() as connection:
        # Date of the data to be loaded (take the first date from the DataFrame)
        load_date = df_daily_sales_to_load['sale_date'].iloc[0].strftime('%Y-%m-%d')
        print(f"\nProcessing incremental load for date: {load_date}")

        # 1. Delete existing data for the current date (for idempotency)
        # This is a "delete-then-insert" or "upsert" pattern if the DB supports it.
        delete_sql = text(f"DELETE FROM fact_sales WHERE sale_date = '{load_date}';")
        connection.execute(delete_sql)
        connection.commit() # Commit deletion (in mock this does nothing real)

        print(f"Deleted existing data (if any) for {load_date} in 'fact_sales' (simulated).")

        # 2. Insert the new records
        df_daily_sales_to_load.to_sql('fact_sales', con=connection, if_exists='append', index=False)
        connection.commit() # Commit insertion (in mock this does nothing real)

        print(f"Loaded new data for {load_date} into 'fact_sales' (simulated).")

except Exception as e:
    print(f"Error during Data Warehouse load: {e}")
    print("Ensure DW credentials and configuration if attempting a real connection.")
finally:
    if 'engine' in locals() and engine:
        engine.dispose()
    print("Data Warehouse connection closed (simulated or real).")

# Tools to Use: Python (pandas, sqlalchemy), Snowflake, Apache Airflow.
# Objective: Keep the Data Warehouse updated efficiently.


### Practice 7: Loading to a Data Lake

**Description:** Store raw or semi-structured data in its original or near-original format, typically in a distributed file system (like HDFS) or object storage (like S3).

**Practical Example:**
You want to store raw Google Trends API data (Example 2) in an S3 bucket for future analysis or to be processed by other Big Data tools.

In [None]:
print("\n### Practice 7: Loading to a Data Lake ###")

# Example Google Trends data (using all_trends_data from Example 2)
print("\nTrend data to load into the Data Lake (JSON Format):")
print(json.dumps(all_trends_data, indent=2))

# Bucket name and folder prefix (e.g., based on load date)
bucket_name = 'your-data-lake-bucket-name' # IMPORTANT! Replace with a real S3 bucket if you use it.
current_date_str = datetime.now().strftime('%Y-%m-%d')
file_name = f"google_trends_data_{current_date_str}.json"
local_file_path = f"/tmp/{file_name}" # Temporary path for the file
s3_key = f"raw_data/google_trends/{current_date_str}/{file_name}" # Path in S3

# Create the /tmp directory if it doesn't exist (important in some environments like notebooks)
os.makedirs('/tmp', exist_ok=True)

# Save data to a local temporary file before uploading
try:
    with open(local_file_path, 'w') as f:
        json.dump(all_trends_data, f, indent=4)
    print(f"\nTemporary file created at: {local_file_path}")

    # Initialize S3 client (will use mock if boto3 is not configured/installed)
    s3 = boto3.client('s3',
                      aws_access_key_id="YOUR_AWS_ACCESS_KEY",       # Replace with real credentials
                      aws_secret_access_key="YOUR_AWS_SECRET_KEY",   # Replace with real credentials
                      region_name="YOUR_AWS_REGION")                 # Replace with real region

    # Upload the file to S3
    s3.upload_file(local_file_path, bucket_name, s3_key)
    print(f"File '{file_name}' successfully uploaded to s3://{bucket_name}/{s3_key} (simulated or real).")

except Exception as e:
    print(f"Error uploading file to S3: {e}")
    print("Ensure AWS credentials are configured (env variables, ~/.aws/credentials) or replace with your keys and that the bucket exists.")
finally:
    # Clean up the temporary file
    if os.path.exists(local_file_path):
        os.remove(local_file_path)
        print(f"Temporary file '{local_file_path}' deleted.")

# Tools to Use: Python (boto3), Amazon S3, Apache Airflow.
# Objective: Store raw data in a flexible and scalable format.


## Key Considerations for Data Science in ETL:


*   **Data Quality:** A robust ETL is the foundation for accurate ML models. Garbage in results in garbage out.
*   **Observability:** Monitoring ETL pipelines to detect failures, latencies, or data quality issues is crucial.
*   **Versioning:** Versioning ETL code, data schemas, and the data itself helps with reproducibility.
*   **Idempotency:** Ensuring that re-running ETL does not cause unwanted side effects (e.g., data duplication).
*   **Scalability:** Designing ETL to handle increasing data volumes.
*   **Automation:** Using orchestrators to automate scheduled pipeline execution.
*   **Documentation:** Documenting each ETL step, sources, transformations, and destinations.

In [None]:
print("\n--- END OF SCRIPT ---")