# Module 2, Lesson 4: Data Integration and Real-World Applications

## Setup

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import warnings
warnings.filterwarnings('ignore')

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

print("Libraries loaded successfully!")
print(f"Pandas version: {pd.__version__}")

---

## Part 1: Common Integration Challenges
**Real Problems You'll Face When Combining Data**

### Example 1: Mismatched Column Names
Different systems often use different names for the same data. This is one of the most common integration problems.

In [None]:
# System A: Sales Database
sales_system = pd.DataFrame({
    'cust_id': [1001, 1002, 1003, 1004, 1005],
    'cust_name': ['Alice Johnson', 'Bob Smith', 'Carol White', 'David Lee', 'Eva Brown'],
    'purchase_amt': [250.00, 175.50, 320.75, 95.00, 445.25],
    'trans_date': ['2024-01-15', '2024-01-16', '2024-01-17', '2024-01-18', '2024-01-19']
})

# System B: CRM System
crm_system = pd.DataFrame({
    'CustomerID': [1001, 1002, 1003, 1004, 1005, 1006],
    'FullName': ['Alice Johnson', 'Bob Smith', 'Carol White', 'David Lee', 'Eva Brown', 'Frank Wilson'],
    'Email': ['alice@email.com', 'bob@email.com', 'carol@email.com', 
             'david@email.com', 'eva@email.com', 'frank@email.com'],
    'CustomerSince': ['2023-01-01', '2023-02-15', '2023-03-20', '2023-04-10', '2023-05-05', '2023-06-01']
})

print("PROBLEM: Different column names for same data")
print("=" * 50)
print("\nSales System columns:", sales_system.columns.tolist())
print("CRM System columns:", crm_system.columns.tolist())
print("\nSales System sample:")
print(sales_system.head(3))
print("\nCRM System sample:")
print(crm_system.head(3))

### Example 2: Solving Column Name Mismatches
Create a mapping dictionary to standardize column names before merging.

In [None]:
# Create column mapping
sales_rename = {
    'cust_id': 'customer_id',
    'cust_name': 'customer_name',
    'purchase_amt': 'amount',
    'trans_date': 'date'
}

crm_rename = {
    'CustomerID': 'customer_id',
    'FullName': 'customer_name',
    'Email': 'email',
    'CustomerSince': 'join_date'
}

# Standardize column names
sales_clean = sales_system.rename(columns=sales_rename)
crm_clean = crm_system.rename(columns=crm_rename)

print("SOLUTION: Standardized column names")
print("=" * 50)
print("\nStandardized Sales columns:", sales_clean.columns.tolist())
print("Standardized CRM columns:", crm_clean.columns.tolist())

# Now we can merge!
integrated_data = pd.merge(sales_clean, crm_clean[['customer_id', 'email', 'join_date']], 
                           on='customer_id', how='left')

print("\nIntegrated Data:")
print(integrated_data)

### Example 3: Different Data Formats
The same information can be stored in completely different formats across systems.

In [None]:
# System A: Dates as strings in different formats
system_a = pd.DataFrame({
    'order_id': [1, 2, 3, 4, 5],
    'date': ['2024-01-15', '2024-01-16', '2024-01-17', '2024-01-18', '2024-01-19'],
    'amount': ['$1,234.56', '$987.65', '$2,345.67', '$456.78', '$3,456.78']
})

# System B: Dates in different format, amounts as numbers
system_b = pd.DataFrame({
    'order_id': [6, 7, 8, 9, 10],
    'date': ['01/20/2024', '01/21/2024', '01/22/2024', '01/23/2024', '01/24/2024'],
    'amount': [1567.89, 2345.67, 876.54, 3456.78, 1234.56]
})

print("PROBLEM: Different data formats")
print("=" * 50)
print("\nSystem A:")
print(system_a)
print(f"Date type: {system_a['date'].dtype}")
print(f"Amount type: {system_a['amount'].dtype}")

print("\nSystem B:")
print(system_b)
print(f"Date type: {system_b['date'].dtype}")
print(f"Amount type: {system_b['amount'].dtype}")

### Example 4: Standardizing Data Formats
Convert all data to consistent formats before combining.

In [None]:
# Function to clean currency strings
def clean_currency(value):
    if isinstance(value, str):
        return float(value.replace('$', '').replace(',', ''))
    return value

# Standardize System A
system_a_clean = system_a.copy()
system_a_clean['date'] = pd.to_datetime(system_a_clean['date'])
system_a_clean['amount'] = system_a_clean['amount'].apply(clean_currency)

# Standardize System B
system_b_clean = system_b.copy()
system_b_clean['date'] = pd.to_datetime(system_b_clean['date'])

print("SOLUTION: Standardized formats")
print("=" * 50)

# Combine the cleaned data
combined_orders = pd.concat([system_a_clean, system_b_clean], ignore_index=True)
combined_orders = combined_orders.sort_values('date')

print("\nCombined and sorted data:")
print(combined_orders)
print(f"\nAll dates are now: {combined_orders['date'].dtype}")
print(f"All amounts are now: {combined_orders['amount'].dtype}")
print(f"\nTotal revenue: ${combined_orders['amount'].sum():,.2f}")

---

## Part 2: Key Matching Problems
**When IDs Don't Match Perfectly**

### Example 5: Inconsistent ID Formats
Different systems might store IDs with different prefixes, padding, or formats.

In [None]:
# Database 1: Product IDs with prefix
inventory = pd.DataFrame({
    'product_id': ['PROD-001', 'PROD-002', 'PROD-003', 'PROD-004', 'PROD-005'],
    'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
    'stock_qty': [15, 50, 30, 8, 25]
})

# Database 2: Product IDs without prefix, different padding
sales = pd.DataFrame({
    'prod_code': ['1', '2', '3', '4', '5', '01', '02'],
    'units_sold': [3, 10, 5, 2, 8, 2, 5],
    'sale_date': pd.date_range('2024-01-15', periods=7)
})

print("PROBLEM: IDs don't match between systems")
print("=" * 50)
print("\nInventory System IDs:", inventory['product_id'].tolist())
print("\nSales System IDs:", sales['prod_code'].tolist())
print("\nDirect merge would fail!")

### Example 6: Creating a Mapping Table
Sometimes you need to create a lookup table to map between different ID systems.

In [None]:
# Create ID mapping functions
def standardize_inventory_id(id_str):
    # Extract numeric part from 'PROD-XXX'
    return int(id_str.split('-')[1])

def standardize_sales_id(id_str):
    # Convert to integer, removing any leading zeros
    return int(id_str)

# Add standardized IDs
inventory['std_id'] = inventory['product_id'].apply(standardize_inventory_id)
sales['std_id'] = sales['prod_code'].apply(standardize_sales_id)

print("SOLUTION: Standardized IDs for matching")
print("=" * 50)
print("\nInventory with standard IDs:")
print(inventory[['product_id', 'std_id', 'product_name']])

print("\nSales with standard IDs:")
print(sales[['prod_code', 'std_id', 'units_sold']])

# Now merge on standardized IDs
integrated = pd.merge(sales, inventory[['std_id', 'product_name', 'stock_qty']], 
                      on='std_id', how='left')

print("\nIntegrated data:")
print(integrated[['sale_date', 'product_name', 'units_sold', 'stock_qty']].head())

---

## Part 3: Handling Missing Data During Integration
**What to Do When Data Doesn't Exist in All Sources**

### Example 7: Different Types of Joins
Understanding when to use inner, left, right, and outer joins is crucial for integration.

In [None]:
# Customer database
customers = pd.DataFrame({
    'customer_id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'city': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix']
})

# Orders database (not all customers have orders)
orders = pd.DataFrame({
    'order_id': [101, 102, 103, 104],
    'customer_id': [1, 2, 1, 6],  # Customer 6 doesn't exist!
    'amount': [100, 200, 150, 300]
})

print("Original Data:")
print("=" * 50)
print("\nCustomers:")
print(customers)
print("\nOrders:")
print(orders)

# Demonstrate different join types
print("\n" + "=" * 50)
print("JOIN TYPE COMPARISON")
print("=" * 50)

# Inner join - only matching records
inner = pd.merge(customers, orders, on='customer_id', how='inner')
print(f"\nINNER JOIN (only customers with orders): {len(inner)} rows")
print(inner)

# Left join - all customers
left = pd.merge(customers, orders, on='customer_id', how='left')
print(f"\nLEFT JOIN (all customers): {len(left)} rows")
print(left)

# Right join - all orders
right = pd.merge(customers, orders, on='customer_id', how='right')
print(f"\nRIGHT JOIN (all orders): {len(right)} rows")
print(right)

# Outer join - everything
outer = pd.merge(customers, orders, on='customer_id', how='outer')
print(f"\nOUTER JOIN (all records): {len(outer)} rows")
print(outer)

### Example 8: Handling Missing Values After Joins
After joining, you often need to deal with NaN values that appear.

In [None]:
# Use the left join result from above
merged_data = pd.merge(customers, orders, on='customer_id', how='left')

print("Data after LEFT JOIN:")
print(merged_data)
print("\nMissing values:")
print(merged_data.isna().sum())

# Strategy 1: Fill with defaults
merged_clean1 = merged_data.copy()
merged_clean1['order_id'] = merged_clean1['order_id'].fillna(0).astype(int)
merged_clean1['amount'] = merged_clean1['amount'].fillna(0)

print("\n" + "=" * 50)
print("Strategy 1: Fill with defaults (0)")
print(merged_clean1)

# Strategy 2: Add indicator column
merged_clean2 = merged_data.copy()
merged_clean2['has_order'] = ~merged_clean2['order_id'].isna()

print("\n" + "=" * 50)
print("Strategy 2: Add indicator column")
print(merged_clean2)

# Strategy 3: Aggregate to customer level
customer_summary = merged_data.groupby(['customer_id', 'name', 'city']).agg({
    'order_id': 'count',
    'amount': 'sum'
}).reset_index()
customer_summary.columns = ['customer_id', 'name', 'city', 'order_count', 'total_spent']

print("\n" + "=" * 50)
print("Strategy 3: Aggregate to customer level")
print(customer_summary)

---

## Part 4: Time-Based Integration
**Aligning Data from Different Time Periods**

### Example 9: Different Granularities
One system might have daily data while another has monthly summaries.

In [None]:
# Daily sales data
daily_sales = pd.DataFrame({
    'date': pd.date_range('2024-01-01', '2024-01-31'),
    'daily_revenue': np.random.uniform(1000, 3000, 31).round(2)
})

# Weekly marketing spend
weekly_marketing = pd.DataFrame({
    'week_start': pd.date_range('2024-01-01', '2024-01-29', freq='W-MON'),
    'marketing_spend': [5000, 4500, 5500, 4800, 5200]
})

# Monthly targets
monthly_targets = pd.DataFrame({
    'month': ['2024-01'],
    'target_revenue': [60000],
    'target_customers': [500]
})

print("PROBLEM: Different time granularities")
print("=" * 50)
print(f"\nDaily data: {len(daily_sales)} records")
print(daily_sales.head())
print(f"\nWeekly data: {len(weekly_marketing)} records")
print(weekly_marketing)
print(f"\nMonthly data: {len(monthly_targets)} records")
print(monthly_targets)

### Example 10: Aligning Different Time Granularities
Convert everything to a common granularity or use appropriate aggregation.

In [None]:
# Add week and month to daily data
daily_sales['week'] = daily_sales['date'].dt.to_period('W').dt.start_time
daily_sales['month'] = daily_sales['date'].dt.to_period('M')

# Aggregate daily to weekly
weekly_sales = daily_sales.groupby('week')['daily_revenue'].sum().reset_index()
weekly_sales.columns = ['week_start', 'weekly_revenue']

# Merge weekly data
weekly_combined = pd.merge(weekly_sales, weekly_marketing, on='week_start', how='outer')

print("SOLUTION: Aligned weekly data")
print("=" * 50)
print(weekly_combined)

# Calculate weekly ROI
weekly_combined['roi'] = (weekly_combined['weekly_revenue'] / weekly_combined['marketing_spend']).round(2)
print("\nWeekly ROI Analysis:")
print(weekly_combined[['week_start', 'roi']])

# Monthly summary
monthly_actual = daily_sales.groupby('month')['daily_revenue'].sum().reset_index()
monthly_actual.columns = ['month', 'actual_revenue']
monthly_actual['month'] = monthly_actual['month'].astype(str)

monthly_comparison = pd.merge(monthly_targets, monthly_actual, on='month')
monthly_comparison['pct_of_target'] = (monthly_comparison['actual_revenue'] / 
                                       monthly_comparison['target_revenue'] * 100).round(1)

print("\n" + "=" * 50)
print("Monthly Performance vs Target:")
print(monthly_comparison)

---

## Part 5: Real-World Integration Scenarios
**Complete Examples from Different Industries**

### Example 11: E-Commerce Data Integration
Combining website analytics, order system, and inventory to get a complete picture.

In [None]:
# Website Analytics
web_analytics = pd.DataFrame({
    'date': pd.date_range('2024-01-15', periods=7),
    'sessions': [1200, 1350, 1100, 1500, 1650, 1400, 1250],
    'unique_visitors': [800, 900, 750, 1000, 1100, 950, 850],
    'page_views': [3600, 4050, 3300, 4500, 4950, 4200, 3750],
    'bounce_rate': [0.35, 0.32, 0.38, 0.30, 0.28, 0.33, 0.36]
})

# Order System
orders = pd.DataFrame({
    'date': pd.date_range('2024-01-15', periods=7),
    'orders_placed': [45, 52, 38, 61, 67, 55, 48],
    'revenue': [4500, 5200, 3800, 6100, 6700, 5500, 4800],
    'avg_order_value': [100, 100, 100, 100, 100, 100, 100],
    'cancelled_orders': [2, 3, 1, 4, 3, 2, 2]
})

# Inventory System
inventory = pd.DataFrame({
    'date': pd.date_range('2024-01-15', periods=7),
    'products_in_stock': [150, 145, 142, 138, 131, 126, 122],
    'out_of_stock_items': [5, 6, 7, 8, 10, 11, 12],
    'restock_orders': [0, 0, 1, 0, 0, 1, 0]
})

print("E-COMMERCE DATA INTEGRATION")
print("=" * 60)

# Integrate all three sources
ecommerce_dashboard = web_analytics.merge(orders, on='date').merge(inventory, on='date')

# Calculate key metrics
ecommerce_dashboard['conversion_rate'] = (ecommerce_dashboard['orders_placed'] / 
                                          ecommerce_dashboard['unique_visitors'] * 100).round(2)
ecommerce_dashboard['fulfillment_rate'] = ((ecommerce_dashboard['orders_placed'] - 
                                           ecommerce_dashboard['cancelled_orders']) / 
                                          ecommerce_dashboard['orders_placed'] * 100).round(2)
ecommerce_dashboard['stock_coverage'] = (ecommerce_dashboard['products_in_stock'] / 
                                         ecommerce_dashboard['orders_placed']).round(1)

# Display integrated dashboard
print("\nIntegrated E-Commerce Dashboard:")
display_cols = ['date', 'unique_visitors', 'orders_placed', 'revenue', 
                'conversion_rate', 'fulfillment_rate', 'stock_coverage']
print(ecommerce_dashboard[display_cols])

# Summary statistics
print("\n" + "=" * 60)
print("WEEKLY SUMMARY:")
print(f"Total Visitors: {ecommerce_dashboard['unique_visitors'].sum():,}")
print(f"Total Orders: {ecommerce_dashboard['orders_placed'].sum()}")
print(f"Total Revenue: ${ecommerce_dashboard['revenue'].sum():,.2f}")
print(f"Avg Conversion Rate: {ecommerce_dashboard['conversion_rate'].mean():.2f}%")
print(f"Avg Fulfillment Rate: {ecommerce_dashboard['fulfillment_rate'].mean():.2f}%")

### Example 12: Healthcare Data Integration
Combining patient records, lab results, and appointment data while maintaining privacy.

In [None]:
# Patient Records (de-identified)
patients = pd.DataFrame({
    'patient_id': ['P001', 'P002', 'P003', 'P004', 'P005'],
    'age_group': ['30-40', '50-60', '40-50', '60-70', '20-30'],
    'gender': ['F', 'M', 'F', 'M', 'F'],
    'risk_category': ['Low', 'High', 'Medium', 'High', 'Low'],
    'enrollment_date': pd.to_datetime(['2023-01-15', '2023-02-01', '2023-01-20', '2023-03-01', '2023-02-15'])
})

# Lab Results
lab_results = pd.DataFrame({
    'patient_id': ['P001', 'P002', 'P003', 'P004', 'P005', 'P001', 'P002'],
    'test_date': pd.to_datetime(['2024-01-10', '2024-01-11', '2024-01-12', '2024-01-13', '2024-01-14',
                                 '2024-01-15', '2024-01-16']),
    'test_type': ['Blood', 'Blood', 'Urine', 'Blood', 'Blood', 'Urine', 'Blood'],
    'result_flag': ['Normal', 'Abnormal', 'Normal', 'Abnormal', 'Normal', 'Normal', 'Critical']
})

# Appointments
appointments = pd.DataFrame({
    'patient_id': ['P001', 'P002', 'P003', 'P004', 'P005', 'P001'],
    'appointment_date': pd.to_datetime(['2024-01-20', '2024-01-21', '2024-01-22', '2024-01-23', 
                                        '2024-01-24', '2024-01-25']),
    'appointment_type': ['Follow-up', 'Emergency', 'Routine', 'Follow-up', 'Routine', 'Routine'],
    'attended': ['Yes', 'Yes', 'Yes', 'No', 'Yes', 'Yes']
})

print("HEALTHCARE DATA INTEGRATION")
print("=" * 60)
print("Note: All patient data is de-identified")
print("\nData Sources:")
print(f"- Patient Records: {len(patients)} patients")
print(f"- Lab Results: {len(lab_results)} tests")
print(f"- Appointments: {len(appointments)} appointments")

# Create patient summary
patient_summary = patients.copy()

# Add lab result summary
lab_summary = lab_results.groupby('patient_id').agg({
    'test_date': 'count',
    'result_flag': lambda x: (x != 'Normal').sum()
}).reset_index()
lab_summary.columns = ['patient_id', 'total_tests', 'abnormal_results']

# Add appointment summary
appt_summary = appointments.groupby('patient_id').agg({
    'appointment_date': 'count',
    'attended': lambda x: (x == 'No').sum()
}).reset_index()
appt_summary.columns = ['patient_id', 'total_appointments', 'missed_appointments']

# Integrate all data
integrated_patient_data = patient_summary.merge(lab_summary, on='patient_id', how='left')\
                                         .merge(appt_summary, on='patient_id', how='left')

# Fill NaN with 0 for patients with no tests/appointments
integrated_patient_data = integrated_patient_data.fillna(0)

# Calculate risk scores
def calculate_risk_score(row):
    score = 0
    if row['risk_category'] == 'High': score += 3
    elif row['risk_category'] == 'Medium': score += 2
    else: score += 1
    
    score += row['abnormal_results'] * 2
    score += row['missed_appointments'] * 1.5
    
    return score

integrated_patient_data['risk_score'] = integrated_patient_data.apply(calculate_risk_score, axis=1)

print("\nIntegrated Patient Dashboard:")
print(integrated_patient_data)

# Priority patients
high_priority = integrated_patient_data[integrated_patient_data['risk_score'] > 5]
print("\n" + "=" * 60)
print("HIGH PRIORITY PATIENTS (Risk Score > 5):")
print(high_priority[['patient_id', 'risk_category', 'abnormal_results', 'missed_appointments', 'risk_score']])

### Example 13: Financial Data Integration
Combining transaction data, account information, and market data for analysis.

In [None]:
# Account Information
accounts = pd.DataFrame({
    'account_id': ['ACC001', 'ACC002', 'ACC003', 'ACC004', 'ACC005'],
    'account_type': ['Checking', 'Savings', 'Investment', 'Checking', 'Savings'],
    'opening_balance': [5000, 10000, 25000, 3000, 15000],
    'account_status': ['Active', 'Active', 'Active', 'Dormant', 'Active']
})

# Daily Transactions
transactions = pd.DataFrame({
    'transaction_id': range(1001, 1016),
    'account_id': ['ACC001', 'ACC002', 'ACC001', 'ACC003', 'ACC005',
                  'ACC001', 'ACC002', 'ACC003', 'ACC001', 'ACC005',
                  'ACC002', 'ACC003', 'ACC001', 'ACC003', 'ACC005'],
    'transaction_date': pd.date_range('2024-01-15', periods=15),
    'type': ['Deposit', 'Withdrawal', 'Withdrawal', 'Deposit', 'Deposit',
            'Withdrawal', 'Deposit', 'Withdrawal', 'Deposit', 'Withdrawal',
            'Withdrawal', 'Deposit', 'Withdrawal', 'Deposit', 'Deposit'],
    'amount': [500, 200, 100, 5000, 1000,
              300, 150, 2000, 400, 500,
              100, 3000, 250, 1500, 2000]
})

# Market Rates (daily)
market_rates = pd.DataFrame({
    'date': pd.date_range('2024-01-15', periods=15),
    'savings_rate': [0.04, 0.04, 0.041, 0.041, 0.042, 0.042, 0.042, 0.043, 0.043, 0.043,
                    0.044, 0.044, 0.044, 0.045, 0.045],
    'investment_return': [0.002, -0.001, 0.003, 0.004, -0.002, 0.001, 0.003, -0.001, 0.002, 0.001,
                         0.003, 0.002, -0.001, 0.004, 0.002]
})

print("FINANCIAL DATA INTEGRATION")
print("=" * 60)

# Calculate daily balances
daily_activity = transactions.groupby(['account_id', 'transaction_date', 'type'])['amount'].sum().reset_index()
daily_pivot = daily_activity.pivot_table(index=['account_id', 'transaction_date'], 
                                         columns='type', values='amount', fill_value=0).reset_index()

# Calculate net daily change
daily_pivot['net_change'] = daily_pivot.get('Deposit', 0) - daily_pivot.get('Withdrawal', 0)

# Merge with account information
account_activity = pd.merge(accounts, daily_pivot, on='account_id', how='left')

# Calculate running balance
account_activity = account_activity.sort_values(['account_id', 'transaction_date'])
account_activity['running_balance'] = account_activity.groupby('account_id')['net_change'].cumsum() + \
                                      account_activity['opening_balance']

print("\nAccount Activity with Running Balances:")
print(account_activity[['account_id', 'transaction_date', 'Deposit', 'Withdrawal', 
                        'net_change', 'running_balance']].head(10))

# Account Summary
account_summary = transactions.groupby('account_id').agg({
    'transaction_id': 'count',
    'amount': ['sum', 'mean']
}).reset_index()
account_summary.columns = ['account_id', 'total_transactions', 'total_volume', 'avg_transaction']

final_summary = pd.merge(accounts, account_summary, on='account_id', how='left')
final_summary = final_summary.fillna(0)

print("\n" + "=" * 60)
print("ACCOUNT SUMMARY:")
print(final_summary)

# Risk Assessment
print("\n" + "=" * 60)
print("RISK ASSESSMENT:")
dormant_accounts = final_summary[final_summary['account_status'] == 'Dormant']
if len(dormant_accounts) > 0:
    print(f"Dormant accounts found: {dormant_accounts['account_id'].tolist()}")

high_activity = final_summary[final_summary['total_transactions'] > 5]
print(f"High activity accounts (>5 transactions): {high_activity['account_id'].tolist()}")

---

## Part 6: Data Lineage and Documentation
**Tracking Where Data Comes From**

### Example 14: Creating a Data Lineage Tracker
Document transformations and source systems for audit and debugging purposes.

In [None]:
def track_data_lineage(df, source_system, transformations):
    """
    Add metadata columns to track data lineage.
    """
    df_tracked = df.copy()
    
    # Add metadata columns
    df_tracked['_source_system'] = source_system
    df_tracked['_load_timestamp'] = pd.Timestamp.now()
    df_tracked['_transformations'] = ', '.join(transformations)
    df_tracked['_version'] = '1.0'
    
    return df_tracked

# Example: Track lineage for integrated data
raw_sales = pd.DataFrame({
    'product': ['A', 'B', 'C'],
    'sales': ['100', '200', '150']  # Note: stored as strings
})

# Apply transformations and track them
transformations_applied = []

# Transformation 1: Convert sales to numeric
processed_sales = raw_sales.copy()
processed_sales['sales'] = pd.to_numeric(processed_sales['sales'])
transformations_applied.append('Convert sales to numeric')

# Transformation 2: Add calculated field
processed_sales['sales_with_tax'] = processed_sales['sales'] * 1.08
transformations_applied.append('Add 8% tax calculation')

# Transformation 3: Add category
processed_sales['category'] = 'Electronics'
transformations_applied.append('Add category field')

# Add lineage tracking
final_data = track_data_lineage(processed_sales, 'SALES_SYSTEM_V2', transformations_applied)

print("DATA WITH LINEAGE TRACKING:")
print("=" * 60)
print(final_data)

print("\nLineage Metadata:")
print("-" * 40)
for col in final_data.columns:
    if col.startswith('_'):
        print(f"{col}: {final_data[col].iloc[0]}")

### Example 15: Integration Pipeline Summary
Create a summary report of all data sources and integration steps.

In [None]:
def create_integration_report(integration_steps):
    """
    Generate a report documenting the integration process.
    """
    report = pd.DataFrame(integration_steps)
    return report

# Document a complete integration pipeline
integration_pipeline = [
    {
        'step': 1,
        'name': 'Extract Sales Data',
        'source': 'SQL Database',
        'records_in': 10000,
        'records_out': 10000,
        'duration_sec': 2.3,
        'status': 'Success'
    },
    {
        'step': 2,
        'name': 'Extract Customer Data',
        'source': 'CRM API',
        'records_in': 5000,
        'records_out': 5000,
        'duration_sec': 5.1,
        'status': 'Success'
    },
    {
        'step': 3,
        'name': 'Clean Sales Data',
        'source': 'Memory',
        'records_in': 10000,
        'records_out': 9850,
        'duration_sec': 1.2,
        'status': 'Success (150 records removed)'
    },
    {
        'step': 4,
        'name': 'Standardize IDs',
        'source': 'Memory',
        'records_in': 14850,
        'records_out': 14850,
        'duration_sec': 0.8,
        'status': 'Success'
    },
    {
        'step': 5,
        'name': 'Merge Datasets',
        'source': 'Memory',
        'records_in': 14850,
        'records_out': 9500,
        'duration_sec': 1.5,
        'status': 'Success (inner join)'
    },
    {
        'step': 6,
        'name': 'Calculate Metrics',
        'source': 'Memory',
        'records_in': 9500,
        'records_out': 9500,
        'duration_sec': 0.5,
        'status': 'Success'
    },
    {
        'step': 7,
        'name': 'Export Results',
        'source': 'Memory',
        'records_in': 9500,
        'records_out': 9500,
        'duration_sec': 3.2,
        'status': 'Success (saved to warehouse)'
    }
]

pipeline_report = create_integration_report(integration_pipeline)

print("INTEGRATION PIPELINE REPORT")
print("=" * 70)
print(f"Pipeline Execution: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("-" * 70)
print(pipeline_report.to_string(index=False))

# Summary Statistics
print("\n" + "=" * 70)
print("PIPELINE SUMMARY:")
print("-" * 40)
print(f"Total Steps: {len(pipeline_report)}")
print(f"Total Duration: {pipeline_report['duration_sec'].sum():.1f} seconds")
print(f"Records Processed: {pipeline_report['records_in'].iloc[0]:,} → {pipeline_report['records_out'].iloc[-1]:,}")
print(f"Data Reduction: {(1 - pipeline_report['records_out'].iloc[-1] / pipeline_report['records_in'].iloc[0]) * 100:.1f}%")
print(f"All Steps: {'✓ Success' if all(pipeline_report['status'].str.contains('Success')) else '✗ Errors Detected'}")

# Data quality metrics
print("\n" + "=" * 70)
print("DATA QUALITY METRICS:")
print("-" * 40)
quality_metrics = {
    'Completeness': '98.5%',
    'Accuracy': '99.2%',
    'Consistency': '97.8%',
    'Timeliness': 'Real-time (< 15 sec delay)',
    'Validity': '99.5%',
    'Uniqueness': '100% (no duplicates)'
}

for metric, value in quality_metrics.items():
    print(f"{metric:15} {value}")