# Python Technical Interview - AI Agent Developer Position

## Instructions
This notebook contains 10 questions designed to test your Python skills and ability to work with AI-generated code. Each question has:
- **Problem Description** - What you need to accomplish
- **Code Cell** - Where you write your solution
- **Test Cell** - Automated tests to verify your solution

**Guidelines:**
- Read each question carefully
- You can use whatever libraries or packages
- Some questions provide starter code, others start from scratch
- Focus on writing clean, readable, and robust code
- code should be able to run after clearing all outputs
- All test cells should pass when you're done

## Question 1: Debug AI-Generated Code (Lists & Logic)

**Scenario:** An AI generated this code to filter products by price range, but it has several bugs. Fix the code so it works correctly.

**Requirements:**
- Filter products where price is between min_price and max_price (inclusive)
- Handle edge cases gracefully
- Maintain the original function signature

In [1]:
def filter_products_by_price(products, min_price, max_price):
    """
    Filter products by price range.
    
    Args:
        products: List of dicts with 'name' and 'price' keys
        min_price: Minimum price (inclusive)
        max_price: Maximum price (inclusive)
    
    Returns:
        List of products within price range
    """
    # AI-generated buggy code below - FIX IT
    filtered = []
    
    # Validate that min_price and max_price are numbers
    if not isinstance(min_price, (int, float)) or not isinstance(max_price, (int, float)):
        raise ValueError("min_price and max_price must be numbers")
    
    for product in products:
        # Check if product has a price and it's a number
        if 'price' not in product or not isinstance(product['price'], (int, float)):
            continue  # Skip products with missing or non-numeric prices
        
        # Check if product price is between min_price and max_price (inclusive)
        if product['price'] >= min_price and product['price'] <= max_price:
            filtered.append(product)
    return filtered

# Test your solution here
products = [
    {'name': 'Laptop', 'price': 1000},
    {'name': 'Mouse', 'price': 25},
    {'name': 'Keyboard', 'price': 75},
    {'name': 'Monitor', 'price': 300},
    {'name': 'Adapter', 'price': 'fifteen'},  # Invalid price
    {'name': 'Cable'},  # Missing price
]

result = filter_products_by_price(products, 25, 300)
print("Filtered products:", result)

Filtered products: [{'name': 'Mouse', 'price': 25}, {'name': 'Keyboard', 'price': 75}, {'name': 'Monitor', 'price': 300}]


In [2]:
# Test Cell
def test_question_1():
    products = [
        {'name': 'Laptop', 'price': 1000},
        {'name': 'Mouse', 'price': 25},
        {'name': 'Keyboard', 'price': 75},
        {'name': 'Monitor', 'price': 300}
    ]
    
    # Test inclusive bounds
    result = filter_products_by_price(products, 25, 300)
    expected_names = ['Mouse', 'Keyboard', 'Monitor']
    actual_names = [p['name'] for p in result]
    assert set(actual_names) == set(expected_names), f"Expected {expected_names}, got {actual_names}"
    
    # Test edge case - empty list
    assert filter_products_by_price([], 0, 100) == []
    
    # Test no matches
    assert filter_products_by_price(products, 2000, 3000) == []
    
    print("✓ Question 1 tests passed!")

test_question_1()

✓ Question 1 tests passed!


## Question 2: Fix API Integration (Error Handling)

**Scenario:** This AI-generated code fetches user data from an API but lacks proper error handling. Add robust error handling and improve the code.

**Requirements:**
- Handle network timeouts
- Handle HTTP errors (4xx, 5xx)
- Handle JSON parsing errors
- Return None on any error, don't let exceptions bubble up
- Add appropriate logging

In [3]:
import requests
import json
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_user_data(user_id):
    """
    Fetch user data from API with proper error handling.
    
    Args:
        user_id: User ID to fetch
        
    Returns:
        dict: User data if successful, None if any error occurs
    """
    # AI-generated code with poor error handling - IMPROVE IT
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    
    try:
        logger.info(f"Fetching user data for user_id: {user_id}")
        
        # Make request with timeout
        response = requests.get(url, timeout=10)
        
        # Check for HTTP errors (4xx, 5xx)
        response.raise_for_status()
        
        # Parse JSON with explicit error handling
        data = response.json()
        
        logger.info(f"Successfully fetched user data for user_id: {user_id}")
        return data
        
    except requests.exceptions.Timeout:
        logger.error(f"Request timeout for user_id: {user_id}")
        return None
        
    except requests.exceptions.ConnectionError:
        logger.error(f"Connection error for user_id: {user_id}")
        return None
        
    except requests.exceptions.HTTPError as e:
        status_code = e.response.status_code if e.response else 'Unknown'
        logger.error(f"HTTP error {status_code} for user_id: {user_id}: {str(e)}")
        return None
        
    except requests.exceptions.RequestException as e:
        logger.error(f"Request exception for user_id: {user_id}: {str(e)}")
        return None
        
    except json.JSONDecodeError as e:
        logger.error(f"JSON parsing error for user_id: {user_id}: {str(e)}")
        return None
        
    except Exception as e:
        logger.error(f"Unexpected error for user_id: {user_id}: {str(e)}")
        return None

# Test your solution here
user_data = get_user_data(1)
print("User data:", user_data)

# Test error cases
print("Testing invalid user:", get_user_data(999))
print("Testing invalid URL scenario with different function...")

INFO:__main__:Fetching user data for user_id: 1
INFO:__main__:Successfully fetched user data for user_id: 1
INFO:__main__:Fetching user data for user_id: 999


User data: {'id': 1, 'name': 'Leanne Graham', 'username': 'Bret', 'email': 'Sincere@april.biz', 'address': {'street': 'Kulas Light', 'suite': 'Apt. 556', 'city': 'Gwenborough', 'zipcode': '92998-3874', 'geo': {'lat': '-37.3159', 'lng': '81.1496'}}, 'phone': '1-770-736-8031 x56442', 'website': 'hildegard.org', 'company': {'name': 'Romaguera-Crona', 'catchPhrase': 'Multi-layered client-server neural-net', 'bs': 'harness real-time e-markets'}}


ERROR:__main__:HTTP error Unknown for user_id: 999: 404 Client Error: Not Found for url: https://jsonplaceholder.typicode.com/users/999


Testing invalid user: None
Testing invalid URL scenario with different function...


In [4]:
# Test Cell
import unittest.mock as mock

def test_question_2():
    # Test successful request
    user_data = get_user_data(1)
    assert user_data is not None
    assert 'name' in user_data
    
    # Test invalid user ID
    user_data = get_user_data(999999)
    assert user_data is None
    
    # Test with mock to simulate network error
    with mock.patch('requests.get') as mock_get:
        mock_get.side_effect = requests.exceptions.RequestException("Network error")
        result = get_user_data(1)
        assert result is None
    
    # Test with mock to simulate timeout
    with mock.patch('requests.get') as mock_get:
        mock_get.side_effect = requests.exceptions.Timeout("Timeout")
        result = get_user_data(1)
        assert result is None
    
    print("✓ Question 2 tests passed!")

test_question_2()

INFO:__main__:Fetching user data for user_id: 1
INFO:__main__:Successfully fetched user data for user_id: 1
INFO:__main__:Fetching user data for user_id: 999999
ERROR:__main__:HTTP error Unknown for user_id: 999999: 404 Client Error: Not Found for url: https://jsonplaceholder.typicode.com/users/999999
INFO:__main__:Fetching user data for user_id: 1
ERROR:__main__:Request exception for user_id: 1: Network error
INFO:__main__:Fetching user data for user_id: 1
ERROR:__main__:Request timeout for user_id: 1


✓ Question 2 tests passed!


## Question 3: Code from Scratch (Data Structures)

**Scenario:** Create a `TaskManager` class to manage a simple todo list.

**Requirements:**
- Add tasks with priority (1=high, 2=medium, 3=low)
- Mark tasks as complete
- Get tasks filtered by completion status and/or priority
- Get task count by status

In [5]:
class TaskManager:
    """
    A simple task manager for tracking todo items.
    """
    
    def __init__(self):
        """Initialize empty task manager."""
        self.tasks = []
        self.next_id = 1
    
    def add_task(self, description, priority=2):
        """
        Add a new task.
        
        Args:
            description (str): Task description
            priority (int): Priority level (1=high, 2=medium, 3=low)
        """
        # Validate priority
        if priority not in (1, 2, 3):
            raise ValueError("Priority must be 1 (high), 2 (medium), or 3 (low)")
        
        task = {
            'id': self.next_id,
            'description': description,
            'priority': priority,
            'completed': False
        }
        self.tasks.append(task)
        self.next_id += 1
    
    def complete_task(self, task_id):
        """
        Mark a task as complete.
        
        Args:
            task_id: Unique identifier for the task
            
        Returns:
            bool: True if task was found and completed, False otherwise
        """
        for task in self.tasks:
            if task['id'] == task_id:
                task['completed'] = True
                return True
        return False
    
    def get_tasks(self, completed=None, priority=None):
        """
        Get tasks filtered by status and/or priority.
        
        Args:
            completed (bool, optional): Filter by completion status
            priority (int, optional): Filter by priority level
            
        Returns:
            list: List of matching tasks
        """
        filtered_tasks = self.tasks
        
        # Filter by completion status if specified
        if completed is not None:
            filtered_tasks = [task for task in filtered_tasks if task['completed'] == completed]
        
        # Filter by priority if specified
        if priority is not None:
            if priority not in (1, 2, 3):
                raise ValueError("Priority must be 1 (high), 2 (medium), or 3 (low)")
            filtered_tasks = [task for task in filtered_tasks if task['priority'] == priority]
        
        return filtered_tasks
    
    def get_task_count(self, completed=None):
        """
        Get count of tasks by completion status.
        
        Args:
            completed (bool, optional): Count completed (True) or pending (False) tasks
            
        Returns:
            int: Number of matching tasks
        """
        if completed is None:
            return len(self.tasks)
        else:
            return len([task for task in self.tasks if task['completed'] == completed])


# Test the implementation
tm = TaskManager()
tm.add_task("Fix bug in login", 1)  # High priority
tm.add_task("Update documentation", 3)  # Low priority
tm.add_task("Code review", 2)  # Medium priority

print("All tasks:", len(tm.get_tasks()))
print("High priority tasks:", len(tm.get_tasks(priority=1)))
print("Pending tasks:", len(tm.get_tasks(completed=False)))

All tasks: 3
High priority tasks: 1
Pending tasks: 3


In [6]:
# Test Cell
def test_question_3():
    tm = TaskManager()
    
    # Test adding tasks
    tm.add_task("Task 1", 1)
    tm.add_task("Task 2", 2)
    tm.add_task("Task 3", 3)
    
    # Test get all tasks
    all_tasks = tm.get_tasks()
    assert len(all_tasks) == 3
    
    # Test priority filtering
    high_priority = tm.get_tasks(priority=1)
    assert len(high_priority) == 1
    
    # Test task completion
    task_id = all_tasks[0]['id']  # Assuming tasks have 'id' field
    success = tm.complete_task(task_id)
    assert success == True
    
    # Test completion filtering
    completed_tasks = tm.get_tasks(completed=True)
    assert len(completed_tasks) == 1
    
    pending_tasks = tm.get_tasks(completed=False)
    assert len(pending_tasks) == 2
    
    # Test task counts
    assert tm.get_task_count() == 3
    assert tm.get_task_count(completed=True) == 1
    assert tm.get_task_count(completed=False) == 2
    
    print("✓ Question 3 tests passed!")

test_question_3()

✓ Question 3 tests passed!


## Question 4: Optimize AI Code (Performance)

**Scenario:** This AI code finds common elements between multiple lists, but it's very inefficient. Optimize it for better performance.

**Requirements:**
- Same functionality as original
- Significantly better time complexity
- Handle edge cases (empty lists, no common elements)

In [7]:
def find_common_elements_slow(lists):
    """
    Find elements that appear in ALL provided lists.
    AI-generated inefficient version - OPTIMIZE THIS!
    
    Args:
        lists: List of lists to find common elements in
        
    Returns:
        list: Elements that appear in all lists
    """
    if not lists:
        return []
    
    common = []
    for item in lists[0]:
        is_common = True
        for other_list in lists[1:]:
            found = False
            for other_item in other_list:
                if item == other_item:
                    found = True
                    break
            if not found:
                is_common = False
                break
        if is_common and item not in common:
            common.append(item)
    
    return common

# Optimized version - implement this
def find_common_elements_fast(lists):
    """
    Find elements that appear in ALL provided lists.
    Optimized version with better time complexity.
    
    Args:
        lists: List of lists to find common elements in
        
    Returns:
        list: Elements that appear in all lists
    """
    # Handle edge cases
    if not lists:
        return []
    
    # Convert all lists to sets for O(1) lookups
    sets = [set(lst) for lst in lists]
    
    # Start with the smallest set to minimize intersection operations
    smallest_set = min(sets, key=len)
    
    # Find intersection of all sets
    common_elements = smallest_set.copy()
    for s in sets:
        common_elements &= s  # Set intersection operation
    
    # Convert back to list and return
    return list(common_elements)
# Test both versions
test_lists = [
    [1, 2, 3, 4, 5],
    [3, 4, 5, 6, 7],
    [4, 5, 7, 8, 9]
]

print("Slow version:", find_common_elements_slow(test_lists))
print("Fast version:", find_common_elements_fast(test_lists))

Slow version: [4, 5]
Fast version: [4, 5]


In [8]:
# Test Cell

import time

def test_question_4():
    # Basic functionality test
    test_lists = [
        [1, 2, 3, 4, 5],
        [3, 4, 5, 6, 7],
        [4, 5, 7, 8, 9]
    ]
    
    slow_result = find_common_elements_slow(test_lists)
    fast_result = find_common_elements_fast(test_lists)
    
    assert set(slow_result) == set(fast_result), "Results don't match"
    assert set(fast_result) == {4, 5}, f"Expected {{4, 5}}, got {set(fast_result)}"
    
    # Edge cases
    assert find_common_elements_fast([]) == []
    assert find_common_elements_fast([[1, 2], []]) == []
    assert find_common_elements_fast([[1, 2, 3]]) == [1, 2, 3]
    
    # Performance test (rough)
    large_lists = [[i for i in range(1000)] for _ in range(10)]
    
    start_time = time.time()
    find_common_elements_fast(large_lists)
    fast_time = time.time() - start_time
    
    # Fast version should complete in reasonable time
    assert fast_time < 1.0, "Optimized version is still too slow"
    
    print("✓ Question 4 tests passed!")

test_question_4()

✓ Question 4 tests passed!


## Question 5: Fix Function with Edge Cases

**Scenario:** This AI function calculates statistics for a list of numbers, but fails on various edge cases. Make it robust.

**Requirements:**
- Handle empty lists
- Handle non-numeric values gracefully
- Handle division by zero
- Return meaningful error messages or default values

In [9]:
def calculate_stats(numbers):
    """
    Calculate basic statistics for a list of numbers.
    Fixed version with proper mode calculation.
    """
    # Handle empty list
    if not numbers:
        return {
            'mean': None,
            'median': None,
            'mode': None,
            'std_dev': None,
            'count': 0,
            'error': 'Empty list provided'
        }
    
    # Filter out non-numeric values
    numeric_numbers = []
    non_numeric_count = 0
    
    for num in numbers:
        if isinstance(num, (int, float)) and num is not None:
            numeric_numbers.append(num)
        else:
            non_numeric_count += 1
    
    # Handle case where no valid numbers remain
    if not numeric_numbers:
        return {
            'mean': None,
            'median': None,
            'mode': None,
            'std_dev': None,
            'count': 0,
            'error': 'No valid numeric values found'
        }
    
    # Handle case with only one valid number
    if len(numeric_numbers) == 1:
        single_value = numeric_numbers[0]
        return {
            'mean': single_value,
            'median': single_value,
            'mode': single_value,
            'std_dev': 0.0,
            'count': 1,
            'non_numeric_count': non_numeric_count
        }
    
    # Sort for median calculation
    sorted_nums = sorted(numeric_numbers)
    n = len(numeric_numbers)
    
    # Mean
    mean = sum(numeric_numbers) / n
    
    # Median
    if n % 2 == 0:
        median = (sorted_nums[n//2 - 1] + sorted_nums[n//2]) / 2
    else:
        median = sorted_nums[n//2]
    
    # Mode - FIXED VERSION
    from collections import Counter
    counts = Counter(numeric_numbers)
    
    # Get the most common element (this returns a list of (element, count) tuples)
    most_common = counts.most_common(1)
    
    # The mode is the element with the highest frequency
    mode = most_common[0][0]  # First element of first tuple
    
    # Standard deviation
    variance = sum((x - mean) ** 2 for x in numeric_numbers) / n
    std_dev = variance ** 0.5
    
    # Build result
    result = {
        'mean': round(mean, 4),
        'median': round(median, 4),
        'mode': mode,  # Now correctly shows most frequent value
        'std_dev': round(std_dev, 4),
        'count': n
    }
    
    # Add non-numeric count if any were filtered out
    if non_numeric_count > 0:
        result['non_numeric_count'] = non_numeric_count
        result['valid_count'] = n
        result['total_count'] = len(numbers)
    
    return result



In [12]:
# Test Cell
def test_question_5():
    # Normal case
    result = calculate_stats([1, 2, 3, 4, 5])
    assert result['mean'] == 3.0
    assert result['median'] == 3.0
    assert result['count'] == 5
    
    # Single item
    result = calculate_stats([42])
    assert result['mean'] == 42
    assert result['median'] == 42
    assert result['mode'] == 42
    assert result['std_dev'] == 0
    
    # Empty list - should handle gracefully
    result = calculate_stats([])
    assert 'error' in result or all(v is None or v == 0 for v in result.values())
    
    # Mixed types - should handle gracefully
    result = calculate_stats([1, 'invalid', 3])
    assert 'error' in result or result['count'] == 2  # Only valid numbers counted
    
    # All same values
    result = calculate_stats([5, 5, 5, 5])
    assert result['mean'] == 5
    assert result['std_dev'] == 0
    
    print("✓ Question 5 tests passed!")

test_question_5()

✓ Question 5 tests passed!


## Question 6: Complete Partial Implementation (Pandas/Data)

### Goal
Implement `analyze_sales_data(df, group_by_column)`.

### Input
A pandas DataFrame `df` with columns:
- `product`
- `category`
- `sales`
- `profit`

### Output (must match exactly)
- Return a DataFrame **indexed by `group_by_column`** (do not reset the index).
- Include exactly these columns (names must match):
  - `sales_sum` — sum of `sales`
  - `sales_mean` — mean of `sales`
  - `profit_sum` — sum of `profit`
  - `profit_mean` — mean of `profit`
  - `profit_margin` — `profit_sum / sales_sum` (use `NaN` if `sales_sum == 0`)
- Handle missing values: treat missing `sales` or `profit` as `0` before aggregation.
- Sorting is **not required**.

### Edge Behavior
- If `df` is empty or `group_by_column` is missing, return an empty DataFrame with the required column names.

In [13]:
import pandas as pd
import numpy as np

def analyze_sales_data(df, group_by_column):
    """
    Analyze sales data by grouping and calculating statistics.
    
    Args:
        df: DataFrame with columns ['product', 'category', 'sales', 'profit']
        group_by_column: Column name to group by
        
    Returns:
        DataFrame with aggregated statistics
    """
    # Handle edge cases: empty DataFrame or missing group_by_column
    if df.empty or group_by_column not in df.columns:
        # Return empty DataFrame with required columns
        return pd.DataFrame(columns=['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin'])
    
    # Create a copy to avoid modifying original DataFrame
    df_clean = df.copy()
    
    # Handle missing values: treat missing sales or profit as 0
    df_clean['sales'] = df_clean['sales'].fillna(0)
    df_clean['profit'] = df_clean['profit'].fillna(0)
    
    # Group by the specified column
    grouped = df_clean.groupby(group_by_column)
    
    # Calculate required aggregations
    result = grouped.agg({
        'sales': ['sum', 'mean'],
        'profit': ['sum', 'mean']
    })
    
    # Flatten the multi-level column names
    result.columns = ['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean']
    
    # Calculate profit margin (profit_sum / sales_sum), handle division by zero
    result['profit_margin'] = np.where(
        result['sales_sum'] == 0, 
        np.nan, 
        result['profit_sum'] / result['sales_sum']
    )
    
    # Select and return only the required columns in the specified order
    return result[['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin']]

# Create sample data for testing
sample_data = pd.DataFrame({
    'product': ['A', 'B', 'C', 'A', 'B', 'C', 'A'],
    'category': ['Electronics', 'Electronics', 'Clothing', 'Electronics', 'Electronics', 'Clothing', 'Electronics'],
    'sales': [100, 200, 150, 120, np.nan, 180, 110],
    'profit': [20, 50, 30, 25, 40, 35, 22]
})

print("Sample data:")
print(sample_data)
print("\nAnalysis by product:")
result = analyze_sales_data(sample_data, 'product')
print(result)

Sample data:
  product     category  sales  profit
0       A  Electronics  100.0      20
1       B  Electronics  200.0      50
2       C     Clothing  150.0      30
3       A  Electronics  120.0      25
4       B  Electronics    NaN      40
5       C     Clothing  180.0      35
6       A  Electronics  110.0      22

Analysis by product:
         sales_sum  sales_mean  profit_sum  profit_mean  profit_margin
product                                                               
A            330.0       110.0          67    22.333333        0.20303
B            200.0       100.0          90    45.000000        0.45000
C            330.0       165.0          65    32.500000        0.19697


In [14]:
# Test Cell
def test_question_6():
    # Create test data
    test_data = pd.DataFrame({
        'product': ['A', 'B', 'A', 'B', 'A'],
        'category': ['Cat1', 'Cat2', 'Cat1', 'Cat2', 'Cat1'],
        'sales': [100, 200, 150, 300, 50],
        'profit': [20, 40, 30, 60, 10]
    })
    
    # Test grouping by product
    result = analyze_sales_data(test_data, 'product')
    
    # Check structure
    assert isinstance(result, pd.DataFrame), "Should return DataFrame"
    assert len(result) == 2, "Should have 2 groups (A and B)"
    
    # Check required columns exist
    required_cols = ['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin']
    for col in required_cols:
        assert col in result.columns, f"Missing column: {col}"
    
    # Check calculations for product A
    product_a = result.loc['A'] if 'A' in result.index else result[result.index == 'A'].iloc[0]
    assert product_a['sales_sum'] == 300, "Product A sales sum should be 300"
    assert product_a['profit_sum'] == 60, "Product A profit sum should be 60"
    
    print("✓ Question 6 tests passed!")

test_question_6()

✓ Question 6 tests passed!


## Question 7: Refactor Messy AI Code (Clean Code)

**Scenario:** This AI code works but is poorly structured and hard to maintain. Refactor it following clean code principles.

**Requirements:**
- Improve readability and maintainability
- Add proper documentation
- Follow naming conventions
- Break down large functions
- Add type hints if possible

In [15]:
from typing import List, Dict, Any, Optional
def process_data(data):
    """Messy AI-generated code that works but needs refactoring - CLEAN IT UP!"""
    result = {}
    for item in data:
        if 'type' in item and item['type'] == 'user':
            if 'active' in item and item['active']:
                if 'age' in item:
                    if item['age'] >= 18:
                        if 'email' in item and '@' in item['email']:
                            category = 'adult'
                            if item['age'] >= 65:
                                category = 'senior'
                            elif item['age'] >= 25:
                                category = 'adult'
                            else:
                                category = 'young_adult'
                            
                            if category not in result:
                                result[category] = {'count': 0, 'emails': [], 'total_age': 0}
                            
                            result[category]['count'] += 1
                            result[category]['emails'].append(item['email'])
                            result[category]['total_age'] += item['age']
    
    # Calculate averages
    for cat in result:
        result[cat]['avg_age'] = result[cat]['total_age'] / result[cat]['count']
        del result[cat]['total_age']
    
    return result

# Test data
test_data = [
    {'type': 'user', 'active': True, 'age': 25, 'email': 'user1@test.com'},
    {'type': 'user', 'active': True, 'age': 70, 'email': 'user2@test.com'},
    {'type': 'user', 'active': False, 'age': 30, 'email': 'user3@test.com'},
    {'type': 'admin', 'active': True, 'age': 35, 'email': 'admin@test.com'},
    {'type': 'user', 'active': True, 'age': 20, 'email': 'invalid-email'},
    {'type': 'user', 'active': True, 'age': 40, 'email': 'user4@test.com'},
]

# Your refactored version should produce the same results
original_result = process_data(test_data)
print("Original result:", original_result)

# TODO: Create your clean, refactored version here
def process_user_data_clean(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
    """
    Process user data to categorize active adult users by age groups.
    
    Refactored with clean code principles and constant age thresholds.
    
    Args:
        data: List of dictionaries containing user data with keys:
              'type', 'active', 'age', 'email'
              
    Returns:
        Dictionary with age categories as keys, each containing:
        - count: Number of users in category
        - emails: List of user emails
        - avg_age: Average age in category
    """
    # Age threshold constants - easy to modify in one place
    YOUNG_ADULT_MIN_AGE = 18
    ADULT_MIN_AGE = 25
    SENIOR_MIN_AGE = 65
    
    # Calculate upper bounds (one less than next threshold)
    YOUNG_ADULT_MAX_AGE = ADULT_MIN_AGE - 1      # 24
    ADULT_MAX_AGE = SENIOR_MIN_AGE - 1           # 64
    # Senior has no upper limit
    
    # Configuration using constants
    AGE_CATEGORIES = {
        'young_adult': {'min_age': YOUNG_ADULT_MIN_AGE, 'max_age': YOUNG_ADULT_MAX_AGE},
        'adult': {'min_age': ADULT_MIN_AGE, 'max_age': ADULT_MAX_AGE},
        'senior': {'min_age': SENIOR_MIN_AGE, 'max_age': None}  # No upper limit
    }
    
    def is_valid_user(user: Dict[str, Any]) -> bool:
        """
        Check if user meets all validation criteria.
        
        Args:
            user: User data dictionary
            
        Returns:
            bool: True if user is valid for processing
        """
        # Check required fields exist
        if not all(field in user for field in ['type', 'active', 'age', 'email']):
            return False
        
        # Check user type and active status
        if user['type'] != 'user' or not user['active']:
            return False
        
        # Check minimum age using constant
        if user['age'] < YOUNG_ADULT_MIN_AGE:
            return False
        
        # Check email format
        return '@' in user['email']
    
    def get_age_category(age: int) -> Optional[str]:
        """
        Determine which age category the user belongs to using constants.
        
        Args:
            age: User's age
            
        Returns:
            str: Age category name, or None if no category matches
        """
        # Use constant-based logic for clarity
        if age >= SENIOR_MIN_AGE:
            return 'senior'
        elif age >= ADULT_MIN_AGE:
            return 'adult'
        elif age >= YOUNG_ADULT_MIN_AGE:
            return 'young_adult'
        else:
            return None
    
    def initialize_category(category: str, results: Dict) -> None:
        """
        Ensure a category exists in results with proper structure.
        
        Args:
            category: Category name to initialize
            results: Results dictionary to modify
        """
        if category not in results:
            results[category] = {
                'count': 0,
                'emails': [],
                'total_age': 0
            }
    
    def process_valid_user(user: Dict[str, Any], results: Dict) -> None:
        """
        Process a single valid user and update results.
        
        Args:
            user: Valid user data
            results: Results dictionary to update
        """
        age = user['age']
        email = user['email']
        category = get_age_category(age)
        
        if category:
            initialize_category(category, results)
            
            results[category]['count'] += 1
            results[category]['emails'].append(email)
            results[category]['total_age'] += age
    
    def calculate_final_statistics(results: Dict) -> Dict:
        """
        Calculate averages and clean up temporary fields.
        
        Args:
            results: Results dictionary with raw data
            
        Returns:
            Dict: Results with averages calculated
        """
        final_results = {}
        
        for category, stats in results.items():
            final_results[category] = {
                'count': stats['count'],
                'emails': stats['emails'],
                'avg_age': round(stats['total_age'] / stats['count'], 2) if stats['count'] > 0 else 0
            }
        
        return final_results
    
    # Main processing logic
    raw_results = {}
    
    for user in data:
        if is_valid_user(user):
            process_valid_user(user, raw_results)
    
    return calculate_final_statistics(raw_results)

# Test both versions
clean_result = process_user_data_clean(test_data)
print("Clean result:", clean_result)

Original result: {'adult': {'count': 2, 'emails': ['user1@test.com', 'user4@test.com'], 'avg_age': 32.5}, 'senior': {'count': 1, 'emails': ['user2@test.com'], 'avg_age': 70.0}}
Clean result: {'adult': {'count': 2, 'emails': ['user1@test.com', 'user4@test.com'], 'avg_age': 32.5}, 'senior': {'count': 1, 'emails': ['user2@test.com'], 'avg_age': 70.0}}


In [16]:
# Test Cell
def test_question_7():
    test_data = [
        {'type': 'user', 'active': True, 'age': 25, 'email': 'user1@test.com'},
        {'type': 'user', 'active': True, 'age': 70, 'email': 'user2@test.com'},
        {'type': 'user', 'active': False, 'age': 30, 'email': 'user3@test.com'},
        {'type': 'user', 'active': True, 'age': 20, 'email': 'user4@test.com'},
    ]
    
    original_result = process_data(test_data)
    clean_result = process_user_data_clean(test_data)
    
    # Results should be functionally equivalent
    assert set(original_result.keys()) == set(clean_result.keys()), "Categories don't match"
    
    for category in original_result:
        assert original_result[category]['count'] == clean_result[category]['count'], f"Count mismatch for {category}"
        assert abs(original_result[category]['avg_age'] - clean_result[category]['avg_age']) < 0.01, f"Average age mismatch for {category}"
    
    print("✓ Question 7 tests passed!")

test_question_7()

✓ Question 7 tests passed!


## Question 8: Debug Complex Logic (Algorithms)

**Scenario:** This AI implementation of binary search has subtle bugs. Find and fix all the issues.

**Requirements:**
- Fix the binary search algorithm
- Handle edge cases properly
- Maintain O(log n) time complexity
- Return correct index or -1 if not found

In [17]:
def binary_search_buggy(arr, target):
    """
    Fixed binary search implementation.
    Kept the naming/structure of the original buggy code but fixed the issues.
    Args:
        arr: Sorted list of integers
        target: Value to search for
        
    Returns:
        int: Index of target if found, -1 otherwise
    """
    # Handle empty array edge case
    if not arr:
        return -1
    
    left = 0
    right = len(arr) - 1  # FIX 1: right should be last index, not length
    
    while left <= right:  # FIX 2: Use <= to handle single element case
        mid = (left + right) // 2
        
        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1  # FIX 3: Exclude mid from next search
        else:
            right = mid - 1  # FIX 4: Exclude mid from next search
    
    return -1

# Test cases
test_arrays = [
    ([1, 3, 5, 7, 9, 11], 7),    # Should find at index 3
    ([1, 3, 5, 7, 9, 11], 1),    # Should find at index 0
    ([1, 3, 5, 7, 9, 11], 11),   # Should find at index 5
    ([1, 3, 5, 7, 9, 11], 6),    # Should return -1
    ([5], 5),                     # Single element found
    ([5], 3),                     # Single element not found
    ([], 5),                      # Empty array
]

for arr, target in test_arrays:
    result = binary_search_buggy(arr, target)
    print(f"Searching for {target} in {arr}: {result}")

Searching for 7 in [1, 3, 5, 7, 9, 11]: 3
Searching for 1 in [1, 3, 5, 7, 9, 11]: 0
Searching for 11 in [1, 3, 5, 7, 9, 11]: 5
Searching for 6 in [1, 3, 5, 7, 9, 11]: -1
Searching for 5 in [5]: 0
Searching for 3 in [5]: -1
Searching for 5 in []: -1


In [18]:
# Test Cell
def test_question_8():
    # Test cases with expected results
    test_cases = [
        ([1, 3, 5, 7, 9, 11], 7, 3),      # Found at index 3
        ([1, 3, 5, 7, 9, 11], 1, 0),      # Found at index 0
        ([1, 3, 5, 7, 9, 11], 11, 5),     # Found at index 5
        ([1, 3, 5, 7, 9, 11], 6, -1),     # Not found
        ([1, 3, 5, 7, 9, 11], 0, -1),     # Less than min
        ([1, 3, 5, 7, 9, 11], 12, -1),    # Greater than max
        ([5], 5, 0),                       # Single element found
        ([5], 3, -1),                      # Single element not found
        ([], 5, -1),                       # Empty array
    ]
    
    for arr, target, expected in test_cases:
        result = binary_search_buggy(arr, target)
        assert result == expected, f"Failed for {target} in {arr}: expected {expected}, got {result}"
    
    # Test that it actually uses binary search (check performance)
    large_array = list(range(0, 10000, 2))  # [0, 2, 4, 6, ..., 9998]
    result = binary_search_buggy(large_array, 5000)
    assert result == 2500, "Should find 5000 at index 2500"
    
    print("✓ Question 8 tests passed!")

test_question_8()

✓ Question 8 tests passed!


## Question 9: Add Missing Functionality

**Scenario:** This AI code provides a basic cache implementation but is missing several key features. Add the missing functionality to make it production-ready.

**Requirements:**
- Add TTL (time-to-live) support for automatic expiration
- Add size limit with LRU (Least Recently Used) eviction
- Add cache statistics tracking (hits, misses, evictions)
- Add methods for cache management (clear, size, cleanup)
- Handle thread safety considerations

In [19]:
import time
import threading
from typing import Any, Optional, Dict
from collections import OrderedDict

class SimpleCache:
    """
    Enhanced cache with TTL, LRU eviction, statistics, and thread safety.
    
    Features:
    - TTL (time-to-live) support with automatic expiration
    - Size limit with LRU (Least Recently Used) eviction
    - Cache statistics tracking (hits, misses, evictions)
    - Cache management methods (clear, size, cleanup)
    - Thread-safe operations
    """
    
    def __init__(self, max_size: int = 100, default_ttl: Optional[int] = None):
        """
        Initialize cache with size limit and default TTL.
        
        Args:
            max_size: Maximum number of items to store
            default_ttl: Default time-to-live in seconds (None = no expiration)
        """
        self.max_size = max_size
        self.default_ttl = default_ttl
        
        # Main storage: {key: {'value': value, 'expires_at': timestamp}}
        self._data = {}
        
        # LRU tracking using OrderedDict (automatically maintains access order)
        self._lru = OrderedDict()
        
        # Statistics
        self._stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0,
            'expired_removals': 0
        }
        
        # Thread safety
        self._lock = threading.RLock()
    
    def get(self, key: str) -> Optional[Any]:
        """
        Get value from cache.
        
        Args:
            key: Cache key
            
        Returns:
            Cached value or None if not found/expired
        """
        with self._lock:
            # Check if key exists and not expired
            if self._is_expired(key):
                self._remove_key(key)
                self._stats['misses'] += 1
                return None
            
            if key in self._data:
                # Update LRU order (move to end)
                self._lru.move_to_end(key)
                self._stats['hits'] += 1
                return self._data[key]['value']
            else:
                self._stats['misses'] += 1
                return None
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """
        Set value in cache.
        
        Args:
            key: Cache key
            value: Value to cache
            ttl: Time-to-live in seconds (overrides default)
        """
        with self._lock:
            # Check if cache is full and evict if needed
            if len(self._data) >= self.max_size and key not in self._data:
                self._evict_lru(1)
            
            # Calculate expiration time
            expires_at = None
            if ttl is not None:
                expires_at = time.time() + ttl
            elif self.default_ttl is not None:
                expires_at = time.time() + self.default_ttl
            
            # Store value with metadata
            self._data[key] = {
                'value': value,
                'expires_at': expires_at
            }
            
            # Update LRU order (add or move to end)
            self._lru[key] = True
            self._lru.move_to_end(key)
    
    def delete(self, key: str) -> bool:
        """Delete key from cache."""
        with self._lock:
            return self._remove_key(key)
    
    def clear(self) -> None:
        """Clear all items from cache."""
        with self._lock:
            self._data.clear()
            self._lru.clear()
    
    def size(self) -> int:
        """Return current number of items in cache."""
        with self._lock:
            return len(self._data)
    
    def get_stats(self) -> Dict[str, int]:
        """
        Get cache statistics.
        
        Returns:
            Dict with keys: hits, misses, evictions, expired_removals, current_size
        """
        with self._lock:
            stats = self._stats.copy()
            stats['current_size'] = len(self._data)
            return stats
    
    def cleanup_expired(self) -> int:
        """
        Remove expired items from cache.
        
        Returns:
            Number of items removed
        """
        with self._lock:
            expired_keys = []
            for key in list(self._data.keys()):
                if self._is_expired(key):
                    expired_keys.append(key)
            
            for key in expired_keys:
                self._remove_key(key)
                self._stats['expired_removals'] += 1
            
            return len(expired_keys)
    
    def _evict_lru(self, count: int = 1) -> int:
        """
        Evict least recently used items.
        
        Args:
            count: Number of items to evict
            
        Returns:
            Number of items actually evicted
        """
        evicted_count = 0
        while self._lru and evicted_count < count:
            # Get the least recently used key (first in OrderedDict)
            lru_key, _ = self._lru.popitem(last=False)
            if lru_key in self._data:
                del self._data[lru_key]
                evicted_count += 1
                self._stats['evictions'] += 1
        
        return evicted_count
    
    def _is_expired(self, key: str) -> bool:
        """Check if a cache entry has expired."""
        if key not in self._data:
            return False
        
        entry = self._data[key]
        if entry['expires_at'] is None:
            return False  # No expiration
        
        return time.time() > entry['expires_at']
    
    def _remove_key(self, key: str) -> bool:
        """Remove key from all data structures."""
        if key in self._data:
            del self._data[key]
            if key in self._lru:
                del self._lru[key]
            return True
        return False
    
    def __contains__(self, key: str) -> bool:
        """Check if key exists and is not expired."""
        with self._lock:
            if self._is_expired(key):
                self._remove_key(key)
                return False
            return key in self._data
    
    def keys(self):
        """Get all non-expired keys."""
        with self._lock:
            self.cleanup_expired()  # Clean up before returning keys
            return list(self._data.keys())

# Test your enhanced implementation
if __name__ == "__main__":
    # Test TTL functionality
    cache = SimpleCache(max_size=3, default_ttl=1)  # 1 second TTL
    
    print("=== Testing TTL ===")
    cache.set("temp_key", "temp_value")
    print(f"Immediately after set: {cache.get('temp_key')}")
    time.sleep(1.1)
    print(f"After TTL expired: {cache.get('temp_key')}")
    
    print("\n=== Testing Size Limits & LRU ===")
    cache.clear()
    cache.set("a", 1, ttl=None)  # No expiration
    cache.set("b", 2, ttl=None)
    cache.set("c", 3, ttl=None)
    print(f"Cache size after adding 3 items: {cache.size()}")
    
    # Access 'a' to make it recently used
    cache.get("a")
    
    # Add 'd' which should evict 'b' (least recently used)
    cache.set("d", 4, ttl=None)
    print(f"After adding 'd': a={cache.get('a')}, b={cache.get('b')}, c={cache.get('c')}, d={cache.get('d')}")
    
    print("\n=== Testing Statistics ===")
    stats = cache.get_stats()
    print(f"Cache statistics: {stats}")
    
    print("\n=== Testing Cleanup ===")
    cache.set("expire_me", "value", ttl=1)
    time.sleep(1.1)
    removed_count = cache.cleanup_expired()
    print(f"Expired items removed: {removed_count}")

=== Testing TTL ===
Immediately after set: temp_value
After TTL expired: None

=== Testing Size Limits & LRU ===
Cache size after adding 3 items: 3
After adding 'd': a=1, b=None, c=3, d=4

=== Testing Statistics ===
Cache statistics: {'hits': 5, 'misses': 2, 'evictions': 1, 'expired_removals': 0, 'current_size': 3}

=== Testing Cleanup ===
Expired items removed: 3


In [20]:
# Test Cell 
import time

def test_question_9():
    print("Testing enhanced cache implementation...")
    
    # Test 1: Basic functionality
    cache = SimpleCache(max_size=3, default_ttl=60)
    
    cache.set("key1", "value1")
    cache.set("key2", "value2")
    
    assert cache.get("key1") == "value1", "Basic get/set failed"
    assert cache.get("key2") == "value2", "Basic get/set failed"
    assert cache.size() == 2, f"Expected size 2, got {cache.size()}"
    
    # Test 2: TTL expiration
    cache.clear()
    cache.set("ttl_key", "ttl_value", ttl=1)  # 1 second TTL
    assert cache.get("ttl_key") == "ttl_value", "TTL key should be accessible immediately"
    
    time.sleep(1.1)  # Wait for expiration
    assert cache.get("ttl_key") is None, "TTL key should be expired and return None"
    
    # Test 3: Size limits and LRU eviction
    cache.clear()
    cache.set("a", 1)
    cache.set("b", 2) 
    cache.set("c", 3)  # Cache is now full (max_size=3)
    
    # Access 'a' to make it recently used
    cache.get("a")
    
    # Add 'd' which should evict 'b' (least recently used)
    cache.set("d", 4)
    
    assert cache.get("a") == 1, "Recently used 'a' should not be evicted"
    assert cache.get("b") is None, "Least recently used 'b' should be evicted"
    assert cache.get("c") == 3, "'c' should still be in cache"
    assert cache.get("d") == 4, "Newly added 'd' should be in cache"
    assert cache.size() == 3, "Cache size should remain at max_size"
    
    # Test 4: Statistics tracking
    cache.clear()
    cache.set("stat_key", "stat_value")
    cache.get("stat_key")  # Hit
    cache.get("nonexistent")  # Miss
    
    stats = cache.get_stats()
    required_stats = ["hits", "misses", "evictions", "current_size"]
    for stat in required_stats:
        assert stat in stats, f"Missing statistic: {stat}"
    
    assert stats["hits"] > 0, "Should have recorded hits"
    assert stats["misses"] > 0, "Should have recorded misses"
    assert stats["current_size"] == 1, "Should track current size"
    
    # Test 5: Manual cleanup
    cache.clear()
    cache.set("expire1", "value1", ttl=1)
    cache.set("expire2", "value2", ttl=1)
    cache.set("keep", "value3", ttl=None)  # No expiration
    
    time.sleep(1.1)  # Wait for expiration
    removed_count = cache.cleanup_expired()
    
    assert removed_count == 2, f"Should have removed 2 expired items, removed {removed_count}"
    assert cache.get("keep") == "value3", "Non-expiring item should remain"
    assert cache.size() == 1, "Only one item should remain after cleanup"
    
    # Test 6: Edge cases
    cache.clear()
    assert cache.size() == 0, "Cache should be empty after clear"
    assert cache.get("nonexistent") is None, "Getting non-existent key should return None"
    assert cache.delete("nonexistent") == False, "Deleting non-existent key should return False"
    
    # Test delete functionality
    cache.set("delete_me", "value")
    assert cache.delete("delete_me") == True, "Deleting existing key should return True"
    assert cache.get("delete_me") is None, "Deleted key should not be accessible"
    
    print("✓ All Question 9 tests passed!")

test_question_9()


Testing enhanced cache implementation...
✓ All Question 9 tests passed!


## Question 10: Integration Challenge (Multiple Components)

**Scenario:** You have three separate AI-generated modules that need to work together in a data processing pipeline, but they have interface mismatches and compatibility issues. Your job is to create the integration layer that makes them work together seamlessly.

**Requirements:**
- Create adapter/wrapper functions to handle data format conversions
- Build a unified pipeline that chains all three components
- Add comprehensive error handling for the integration
- Handle edge cases and invalid data gracefully
- Create helper functions for data transformation


In [21]:
import json
from typing import List, Dict, Any, Tuple, Optional, Union

# Component 1: Data Processor (returns dict with specific structure)
class DataProcessor:
    """AI Component 1 - processes raw data and returns structured dict"""
    
    def process_data(self, raw_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Process raw data and return structured dict."""
        if not isinstance(raw_data, list):
            raise ValueError("Expected list input")
        
        result = {
            'total_items': len(raw_data),
            'processed_items': [],
            'metadata': {'processing_time': 0.1, 'timestamp': '2024-01-01T12:00:00Z'}
        }
        
        for item in raw_data:
            if isinstance(item, dict) and 'value' in item:
                result['processed_items'].append({
                    'id': item.get('id', 'unknown'),
                    'processed_value': item['value'] * 2,
                    'original_value': item['value'],
                    'status': 'processed'
                })
            else:
                result['processed_items'].append({
                    'id': 'error',
                    'processed_value': 0,
                    'original_value': None,
                    'status': 'failed'
                })
        
        return result

# Component 2: Analytics Engine (expects JSON string, returns tuple)
class AnalyticsEngine:
    """AI Component 2 - performs analytics on data, expects JSON string input"""
    
    def analyze(self, json_data_string: str) -> Tuple[Optional[str], Union[Dict[str, float], str]]:
        """Analyze data from JSON string, return (summary, metrics) tuple."""
        try:
            data = json.loads(json_data_string)
        except json.JSONDecodeError:
            return None, "Invalid JSON format"
        
        if not isinstance(data, dict) or 'processed_items' not in data:
            return None, "Missing processed_items in data structure"
        
        items = data['processed_items']
        if not isinstance(items, list):
            return None, "processed_items must be a list"
        
        # Extract numeric values for analysis
        values = []
        failed_count = 0
        
        for item in items:
            if isinstance(item, dict) and item.get('status') == 'processed':
                if 'processed_value' in item and isinstance(item['processed_value'], (int, float)):
                    values.append(item['processed_value'])
            else:
                failed_count += 1
        
        if not values:
            return None, "No valid numeric data found for analysis"
        
        summary = f"Analyzed {len(items)} items ({len(values)} successful, {failed_count} failed)"
        metrics = {
            'avg_value': sum(values) / len(values),
            'max_value': max(values),
            'min_value': min(values),
            'total_value': sum(values),
            'success_rate': len(values) / len(items) if items else 0.0
        }
        
        return summary, metrics

# Component 3: Report Generator (expects list of tuples, returns formatted string)
class ReportGenerator:
    """AI Component 3 - generates reports from analytics results"""
    
    def generate_report(self, analytics_results_list: List[Tuple[Optional[str], Union[Dict, str]]]) -> str:
        """Generate report from list of (summary, metrics) tuples."""
        if not isinstance(analytics_results_list, list):
            return "Error: Expected list input for report generation"
        
        if not analytics_results_list:
            return "Error: No data provided for report generation"
        
        report_lines = [
            "=" * 50,
            "           ANALYSIS REPORT",
            "=" * 50
        ]
        
        for i, result in enumerate(analytics_results_list):
            if not isinstance(result, tuple) or len(result) != 2:
                report_lines.append(f"\nSection {i+1}: Invalid data format - expected (summary, metrics) tuple")
                continue
            
            summary, metrics = result
            
            if summary is None:
                report_lines.append(f"\nSection {i+1}: Analysis failed")
                report_lines.append(f"  Error: {metrics}")
                continue
            
            report_lines.append(f"\nSection {i+1}: {summary}")
            
            if isinstance(metrics, dict):
                report_lines.append("  Metrics:")
                for key, value in metrics.items():
                    if isinstance(value, float):
                        report_lines.append(f"    {key}: {value:.2f}")
                    else:
                        report_lines.append(f"    {key}: {value}")
            else:
                report_lines.append(f"  Metrics: {metrics}")
        
        report_lines.append("\n" + "=" * 50)
        return "\n".join(report_lines)

# Integration functions

def dict_to_json_adapter(data_dict: Dict[str, Any]) -> str:
    """
    Convert dictionary to JSON string for AnalyticsEngine.
    
    Args:
        data_dict: Dictionary from DataProcessor
        
    Returns:
        JSON string suitable for AnalyticsEngine
        
    Raises:
        ValueError: If data cannot be converted to JSON
    """
    try:
        # Ensure the data is JSON serializable
        return json.dumps(data_dict, default=str)  # Use str for non-serializable types
    except (TypeError, ValueError) as e:
        raise ValueError(f"Failed to convert data to JSON: {e}")

def validate_and_clean_raw_data(raw_data: Any) -> List[Dict[str, Any]]:
    """
    Validate and clean raw input data.
    
    Args:
        raw_data: Input data of any type
        
    Returns:
        Cleaned list of dictionaries
        
    Raises:
        ValueError: If data cannot be converted to the expected format
    """
    # Handle None or empty data
    if raw_data is None:
        return []
    
    # Convert single item to list if needed
    if not isinstance(raw_data, list):
        raw_data = [raw_data]
    
    cleaned_data = []
    
    for item in raw_data:
        # Skip None items
        if item is None:
            continue
            
        # Convert to dictionary if it's not already
        if isinstance(item, dict):
            cleaned_item = item.copy()  # Create a copy to avoid modifying original
        else:
            # Try to convert non-dict items
            try:
                if hasattr(item, '__dict__'):
                    cleaned_item = item.__dict__
                elif hasattr(item, '_asdict'):  # Handle namedtuples
                    cleaned_item = item._asdict()
                else:
                    cleaned_item = {'value': item, 'id': str(item)}
            except (AttributeError, TypeError):
                cleaned_item = {'value': 0, 'id': 'conversion_error', 'status': 'converted'}
        
        # Ensure minimum required structure
        if 'id' not in cleaned_item:
            cleaned_item['id'] = 'unknown'
        
        # Validate value field
        if 'value' in cleaned_item:
            try:
                # Convert to number if possible
                cleaned_item['value'] = float(cleaned_item['value'])
            except (TypeError, ValueError):
                # Keep as-is if conversion fails
                pass
        
        cleaned_data.append(cleaned_item)
    
    return cleaned_data

def integrated_pipeline(raw_data_list: List[Any]) -> str:
    """
    Integrate all three components to process data end-to-end.
    
    This function:
    1. Validate and clean each raw dataset
    2. Process each dataset through DataProcessor
    3. Convert results to format expected by AnalyticsEngine
    4. Run analytics on each processed dataset
    5. Collect all analytics results
    6. Generate final report using ReportGenerator
    7. Handle all errors gracefully
    
    Args:
        raw_data_list: List of raw data sets to process
        
    Returns:
        str: Final report combining all analyses
    """
    # Initialize components
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    analytics_results = []
    
    # Process each dataset
    for i, raw_data in enumerate(raw_data_list):
        try:
            print(f"Processing dataset {i+1}...")
            
            # Step 1: Validate and clean raw data
            cleaned_data = validate_and_clean_raw_data(raw_data)
            
            if not cleaned_data:
                analytics_results.append((None, "No valid data after cleaning"))
                continue
            
            # Step 2: Process through DataProcessor
            processed_data = processor.process_data(cleaned_data)
            
            # Step 3: Convert to JSON for AnalyticsEngine
            json_data = dict_to_json_adapter(processed_data)
            
            # Step 4: Run analytics
            analysis_result = analytics.analyze(json_data)
            analytics_results.append(analysis_result)
            
            print(f"  Dataset {i+1} processed successfully")
            
        except Exception as e:
            # Handle any errors gracefully and continue with next dataset
            error_msg = f"Error processing dataset {i+1}: {str(e)}"
            analytics_results.append((None, error_msg))
            print(f"  {error_msg}")
    
    # Step 5: Generate final report
    return reporter.generate_report(analytics_results)

def create_sample_data() -> List[List[Dict[str, Any]]]:
    """Create sample test data for the pipeline."""
    return [
        # Dataset 1: Normal data
        [
            {'id': 'A1', 'value': 10},
            {'id': 'A2', 'value': 20},
            {'id': 'A3', 'value': 15}
        ],
        # Dataset 2: Smaller dataset
        [
            {'id': 'B1', 'value': 5},
            {'id': 'B2', 'value': 25}
        ],
        # Dataset 3: Mixed data with issues
        [
            {'id': 'C1', 'value': 30},
            {'id': 'C2'},  # Missing value
            {'value': 40},  # Missing id
            {'id': 'C4', 'value': 'invalid'},  # Invalid value type
        ],
        # Dataset 4: Edge cases
        [],
        # Dataset 5: Single item (not in list)
        {'id': 'E1', 'value': 100}
    ]

# Additional helper functions for data transformation

def flatten_nested_data(nested_data: List[List[Any]]) -> List[Any]:
    """
    Flatten nested data structures into a single list.
    
    Args:
        nested_data: List of lists or mixed data structures
        
    Returns:
        Flattened list of items
    """
    flattened = []
    for item in nested_data:
        if isinstance(item, list):
            flattened.extend(item)
        else:
            flattened.append(item)
    return flattened

def extract_metrics_summary(analytics_results: List[Tuple[Optional[str], Union[Dict, str]]]) -> Dict[str, Any]:
    """
    Extract and summarize metrics from all analytics results.
    
    Args:
        analytics_results: List of analytics results
        
    Returns:
        Dictionary with summary statistics
    """
    successful_metrics = []
    failed_analyses = 0
    
    for summary, metrics in analytics_results:
        if summary is not None and isinstance(metrics, dict):
            successful_metrics.append(metrics)
        else:
            failed_analyses += 1
    
    if not successful_metrics:
        return {
            'total_datasets': len(analytics_results),
            'successful_analyses': 0,
            'failed_analyses': failed_analyses,
            'message': 'No successful analyses to summarize'
        }
    
    # Calculate overall metrics
    all_values = []
    for metrics in successful_metrics:
        if 'total_value' in metrics:
            all_values.append(metrics['total_value'])
    
    return {
        'total_datasets': len(analytics_results),
        'successful_analyses': len(successful_metrics),
        'failed_analyses': failed_analyses,
        'overall_avg_value': sum(all_values) / len(all_values) if all_values else 0,
        'overall_max_value': max(all_values) if all_values else 0,
        'overall_min_value': min(all_values) if all_values else 0,
        'success_rate': len(successful_metrics) / len(analytics_results) if analytics_results else 0
    }
# Test the integration
if __name__ == "__main__":
    print("Testing component integration...")
    
    # Test individual components first
    print("\n=== Testing Individual Components ===")
    
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    # Test DataProcessor
    test_data = [{'id': 'test', 'value': 10}]
    processed = processor.process_data(test_data)
    print(f"DataProcessor output: {processed}")
    
    # Test AnalyticsEngine
    json_data = json.dumps(processed)
    analysis_result = analytics.analyze(json_data)
    print(f"AnalyticsEngine output: {analysis_result}")
    
    # Test ReportGenerator
    report = reporter.generate_report([analysis_result])
    print(f"ReportGenerator output:\n{report}")
    
    print("\n=== Testing Integrated Pipeline ===")
    
    # Test full pipeline
    sample_datasets = create_sample_data()
    
    try:
        final_report = integrated_pipeline(sample_datasets)
        print("Integration successful!")
        print(final_report)
    except Exception as e:
        print(f"Integration failed: {e}")
        import traceback
        traceback.print_exc()

Testing component integration...

=== Testing Individual Components ===
DataProcessor output: {'total_items': 1, 'processed_items': [{'id': 'test', 'processed_value': 20, 'original_value': 10, 'status': 'processed'}], 'metadata': {'processing_time': 0.1, 'timestamp': '2024-01-01T12:00:00Z'}}
AnalyticsEngine output: ('Analyzed 1 items (1 successful, 0 failed)', {'avg_value': 20.0, 'max_value': 20, 'min_value': 20, 'total_value': 20, 'success_rate': 1.0})
ReportGenerator output:
           ANALYSIS REPORT

Section 1: Analyzed 1 items (1 successful, 0 failed)
  Metrics:
    avg_value: 20.00
    max_value: 20
    min_value: 20
    total_value: 20
    success_rate: 1.00


=== Testing Integrated Pipeline ===
Processing dataset 1...
  Dataset 1 processed successfully
Processing dataset 2...
  Dataset 2 processed successfully
Processing dataset 3...
  Dataset 3 processed successfully
Processing dataset 4...
Processing dataset 5...
  Dataset 5 processed successfully
Integration successful!
    

In [22]:
# Test Cell
def test_question_10():
    print("Testing integrated pipeline...")
    
    # Test 1: Individual component functionality
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    # Test DataProcessor
    test_data = [{'id': 'test1', 'value': 10}, {'id': 'test2', 'value': 20}]
    processed = processor.process_data(test_data)
    
    assert isinstance(processed, dict), "DataProcessor should return dict"
    assert 'total_items' in processed, "Missing total_items in processed data"
    assert 'processed_items' in processed, "Missing processed_items in processed data"
    assert processed['total_items'] == 2, "Should count items correctly"
    
    # Test AnalyticsEngine
    json_data = json.dumps(processed)
    summary, metrics = analytics.analyze(json_data)
    
    assert summary is not None, "Analytics should return valid summary"
    assert isinstance(metrics, dict), "Analytics should return metrics dict"
    assert 'avg_value' in metrics, "Missing avg_value in metrics"
    
    # Test ReportGenerator
    report = reporter.generate_report([(summary, metrics)])
    
    assert isinstance(report, str), "Report should be string"
    assert "ANALYSIS REPORT" in report, "Report should contain header"
    assert "Section 1" in report, "Report should contain section"
    
    # Test 2: Data validation and cleaning
    cleaned_data = validate_and_clean_raw_data([
        {'id': 'valid', 'value': 10},
        {'value': 20},  # Missing id
        {'id': 'invalid'},  # Missing value
        'invalid_format'  # Wrong format
    ])
    
    assert isinstance(cleaned_data, list), "Should return list"
    # Should handle invalid data gracefully
    
    # Test 3: Integration adapters
    test_dict = {'processed_items': [{'processed_value': 10}]}
    json_str = dict_to_json_adapter(test_dict)
    
    assert isinstance(json_str, str), "Should return JSON string"
    # Should be valid JSON
    parsed = json.loads(json_str)
    assert parsed == test_dict, "Should preserve data structure"
    
    # Test 4: Full pipeline integration
    sample_datasets = [
        [{'id': 'A1', 'value': 10}, {'id': 'A2', 'value': 20}],
        [{'id': 'B1', 'value': 5}],
        []  # Empty dataset
    ]
    
    final_report = integrated_pipeline(sample_datasets)
    
    assert isinstance(final_report, str), "Pipeline should return string report"
    assert "ANALYSIS REPORT" in final_report, "Should contain report header"
    
    # Should handle multiple sections
    assert "Section 1" in final_report, "Should have first section"
    assert "Section 2" in final_report, "Should have second section"
    
    # Test 5: Error handling
    # Test with invalid input
    error_report = integrated_pipeline([])
    assert isinstance(error_report, str), "Should handle empty input gracefully"
    
    # Test with malformed data
    malformed_report = integrated_pipeline([["not", "a", "dict", "list"]])
    assert isinstance(malformed_report, str), "Should handle malformed data"
    
    # Test 6: Edge cases
    edge_cases = [
        [{'id': 'only_id'}],  # Missing value
        [{'value': 42}],      # Missing id
        [{}],                 # Empty dict
    ]
    
    edge_report = integrated_pipeline(edge_cases)
    assert isinstance(edge_report, str), "Should handle edge cases"
    assert "ANALYSIS REPORT" in edge_report, "Should still generate report structure"
    
    print("✓ All Question 10 tests passed!")

# Run the test
test_question_10()

Testing integrated pipeline...
Processing dataset 1...
  Dataset 1 processed successfully
Processing dataset 2...
  Dataset 2 processed successfully
Processing dataset 3...
Processing dataset 1...
  Dataset 1 processed successfully
Processing dataset 1...
  Dataset 1 processed successfully
Processing dataset 2...
  Dataset 2 processed successfully
Processing dataset 3...
  Dataset 3 processed successfully
✓ All Question 10 tests passed!


## Final Submission Instructions

### Before You Submit:

**Code Quality Checklist:**
- All test cells pass without errors
- Code follows Python best practices and conventions  
- Functions include appropriate documentation
- Error handling is implemented where required
- Edge cases are handled appropriately
- Code is clean, readable, and maintainable

**Save Your Work:**
- **Save all code outputs** - Run all cells and keep the output visible
- Save the notebook file (Ctrl+S / Cmd+S)
- Verify all your implementations are in the correct code cells
- Double-check that test cells show "tests passed!" messages

### Submission Format:
Submit your completed `firstname_lastname.ipynb` file with **all outputs preserved**. We want to see:
- Your code implementations
- Test results (passed/failed)
- Any debugging output or print statements
- Cell execution numbers


