# ***Question 1: Debug AI-Generated Code***

In [8]:
def filter_products_by_price(products, min_price, max_price):
    """
    Filter products by price range.
    
    Args:
        products: List of dicts with 'name' and 'price' keys
        min_price: Minimum price (inclusive)
        max_price: Maximum price (inclusive)
    
    Returns:
        List of products within price range
    """
    filtered = []
    for product in products:
        price = product.get('price')
        if price is not None and min_price <= price <= max_price:
            filtered.append(product)
    return filtered

In [12]:
# Test Cell
def test_question_1():
    products = [
        {'name': 'Laptop', 'price': 1000},
        {'name': 'Mouse', 'price': 25},
        {'name': 'Keyboard', 'price': 75},
        {'name': 'Monitor', 'price': 300}
    ]
    
    # Test inclusive bounds
    result = filter_products_by_price(products, 25, 300)
    expected_names = ['Mouse', 'Keyboard', 'Monitor']
    actual_names = [p['name'] for p in result]
    assert set(actual_names) == set(expected_names), f"Expected {expected_names}, got {actual_names}"
    
    # Test edge case - empty list
    assert filter_products_by_price([], 0, 100) == []
    
    # Test no matches
    assert filter_products_by_price(products, 2000, 3000) == []
    
    print("✓ Question 1 tests passed!")

test_question_1()

✓ Question 1 tests passed!


Errors:

Exclusive bounds: > and < exclude values equal to min_price or max_price. The instructions required the bounds to be inclusive.

No key check: product['price'] raises an error if a product does not have a price key.

No type handling: if price is not a number, the code crashes.

# ***Question 2: Fix API Integration (Error handling)***

In [10]:
import requests
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_user_data(user_id):
    """
    Fetch user data from API with proper error handling.
    
    Args:
        user_id: User ID to fetch
        
    Returns:
        dict: User data if successful, None if any error occurs
    """
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()  # Raise HTTPError for bad status
        data = response.json()
        return data
    except requests.exceptions.Timeout:
        logger.error(f"Timeout occurred while fetching user {user_id}")
    except requests.exceptions.RequestException as e:
        logger.error(f"HTTP error for user {user_id}: {e}")
    except ValueError:
        logger.error(f"JSON parsing error for user {user_id}")
    return None

In [11]:
# Test Cell
import unittest.mock as mock

def test_question_2():
    # Test successful request
    user_data = get_user_data(1)
    assert user_data is not None
    assert 'name' in user_data
    
    # Test invalid user ID
    user_data = get_user_data(999999)
    assert user_data is None
    
    # Test with mock to simulate network error
    with mock.patch('requests.get') as mock_get:
        mock_get.side_effect = requests.exceptions.RequestException("Network error")
        result = get_user_data(1)
        assert result is None
    
    # Test with mock to simulate timeout
    with mock.patch('requests.get') as mock_get:
        mock_get.side_effect = requests.exceptions.Timeout("Timeout")
        result = get_user_data(1)
        assert result is None
    
    print("✓ Question 2 tests passed!")

test_question_2()

ERROR:__main__:HTTP error for user 999999: 404 Client Error: Not Found for url: https://jsonplaceholder.typicode.com/users/999999
ERROR:__main__:HTTP error for user 1: Network error
ERROR:__main__:Timeout occurred while fetching user 1


✓ Question 2 tests passed!


# ***Question 3: TaskManager Class***

In [13]:
class TaskManager:
    """
    A simple task manager for tracking todo items.
    """
    def __init__(self):
        self.tasks = []
        self.next_id = 1
    
    def add_task(self, description, priority=2):
        task = {
            'id': self.next_id,
            'description': description,
            'priority': priority,
            'completed': False
        }
        self.tasks.append(task)
        self.next_id += 1
    
    def complete_task(self, task_id):
        for task in self.tasks:
            if task['id'] == task_id:
                task['completed'] = True
                return True
        return False
    
    def get_tasks(self, completed=None, priority=None):
        result = self.tasks
        if completed is not None:
            result = [t for t in result if t['completed'] == completed]
        if priority is not None:
            result = [t for t in result if t['priority'] == priority]
        return result
    
    def get_task_count(self, completed=None):
        return len(self.get_tasks(completed=completed))

In [14]:
# Test Cell
def test_question_3():
    tm = TaskManager()
    
    # Test adding tasks
    tm.add_task("Task 1", 1)
    tm.add_task("Task 2", 2)
    tm.add_task("Task 3", 3)
    
    # Test get all tasks
    all_tasks = tm.get_tasks()
    assert len(all_tasks) == 3
    
    # Test priority filtering
    high_priority = tm.get_tasks(priority=1)
    assert len(high_priority) == 1
    
    # Test task completion
    task_id = all_tasks[0]['id']  # Assuming tasks have 'id' field
    success = tm.complete_task(task_id)
    assert success == True
    
    # Test completion filtering
    completed_tasks = tm.get_tasks(completed=True)
    assert len(completed_tasks) == 1
    
    pending_tasks = tm.get_tasks(completed=False)
    assert len(pending_tasks) == 2
    
    # Test task counts
    assert tm.get_task_count() == 3
    assert tm.get_task_count(completed=True) == 1
    assert tm.get_task_count(completed=False) == 2
    
    print("✓ Question 3 tests passed!")

test_question_3()

✓ Question 3 tests passed!


# ***Question 4: Optimize AI Code***

In [23]:
# Slow version (for comparison)
def find_common_elements_slow(lists):
    """
    Find elements that appear in ALL provided lists.
    AI-generated inefficient version - OPTIMIZE THIS!
    """
    if not lists:
        return []
    
    common = []
    for item in lists[0]:
        is_common = True
        for other_list in lists[1:]:
            found = False
            for other_item in other_list:
                if item == other_item:
                    found = True
                    break
            if not found:
                is_common = False
                break
        if is_common and item not in common:
            common.append(item)
    return common

# Optimized version
def find_common_elements_fast(lists):
    """
    Find elements that appear in ALL provided lists.
    Optimized version using sets.
    """
    if not lists:
        return []
    
    common_set = set(lists[0])
    for lst in lists[1:]:
        common_set &= set(lst)
        if not common_set:
            return []
    
    return list(common_set)

# Test both versions
test_lists = [
    [1, 2, 3, 4, 5],
    [3, 4, 5, 6, 7],
    [4, 5, 7, 8, 9]
]

print("Slow version:", find_common_elements_slow(test_lists))
print("Fast version:", find_common_elements_fast(test_lists))

Slow version: [4, 5]
Fast version: [4, 5]


In [22]:
# Test Cell

import time

def test_question_4():
    # Basic functionality test
    test_lists = [
        [1, 2, 3, 4, 5],
        [3, 4, 5, 6, 7],
        [4, 5, 7, 8, 9]
    ]
    
    slow_result = find_common_elements_slow(test_lists)
    fast_result = find_common_elements_fast(test_lists)
    
    assert set(slow_result) == set(fast_result), "Results don't match"
    assert set(fast_result) == {4, 5}, f"Expected {{4, 5}}, got {set(fast_result)}"
    
    # Edge cases
    assert find_common_elements_fast([]) == []
    assert find_common_elements_fast([[1, 2], []]) == []
    assert find_common_elements_fast([[1, 2, 3]]) == [1, 2, 3]
    
    # Performance test (rough)
    large_lists = [[i for i in range(1000)] for _ in range(10)]
    
    start_time = time.time()
    find_common_elements_fast(large_lists)
    fast_time = time.time() - start_time
    
    # Fast version should complete in reasonable time
    assert fast_time < 1.0, "Optimized version is still too slow"
    
    print("✓ Question 4 tests passed!")

test_question_4()

✓ Question 4 tests passed!


# ***Fix Function with Edge Cases***

In [24]:
from collections import Counter
import math

def calculate_stats(numbers):
    """
    Calculate basic statistics for a list of numbers.
    Handles edge cases: empty list, non-numeric values, division by zero.
    
    Args:
        numbers: List of numbers (may contain invalid values)
        
    Returns:
        dict: Statistics including mean, median, mode, std_dev, count
              Returns 'error' key if input is invalid or empty.
    """
    # Filter only numeric values
    numeric_numbers = [x for x in numbers if isinstance(x, (int, float))]
    
    if not numeric_numbers:
        return {
            'mean': None,
            'median': None,
            'mode': None,
            'std_dev': None,
            'count': 0,
            'error': 'No valid numeric values provided'
        }
    
    count = len(numeric_numbers)
    
    # Mean
    mean = sum(numeric_numbers) / count
    
    # Median
    sorted_nums = sorted(numeric_numbers)
    n = count
    if n % 2 == 0:
        median = (sorted_nums[n//2 - 1] + sorted_nums[n//2]) / 2
    else:
        median = sorted_nums[n//2]
    
    # Mode
    counts = Counter(numeric_numbers)
    mode = counts.most_common(1)[0][0] if counts else None
    
    # Standard deviation
    if count > 1:
        variance = sum((x - mean) ** 2 for x in numeric_numbers) / count
        std_dev = math.sqrt(variance)
    else:
        std_dev = 0.0
    
    return {
        'mean': mean,
        'median': median,
        'mode': mode,
        'std_dev': std_dev,
        'count': count
    }

In [25]:
def test_question_5():
    test_cases = [
        [1, 2, 3, 4, 5],           # Normal case
        [],                        # Empty list
        [1],                       # Single item
        [1, 1, 1],                 # All same
        [1, 'invalid', 3],         # Mixed types
        [1, 2, None, 4]            # None values
    ]
    
    for i, case in enumerate(test_cases):
        print(f"Test case {i+1}: {case}")
        result = calculate_stats(case)
        print(f"  Result: {result}\n")

    # Assertions
    result = calculate_stats([1, 2, 3, 4, 5])
    assert result['mean'] == 3.0
    assert result['median'] == 3.0
    assert result['count'] == 5
    
    result = calculate_stats([42])
    assert result['mean'] == 42
    assert result['median'] == 42
    assert result['mode'] == 42
    assert result['std_dev'] == 0.0
    
    result = calculate_stats([])
    assert 'error' in result
    
    result = calculate_stats([1, 'invalid', 3])
    assert result['count'] == 2
    
    result = calculate_stats([5, 5, 5, 5])
    assert result['mean'] == 5
    assert result['std_dev'] == 0.0

    print("✓ Question 5 tests passed!")

test_question_5()

Test case 1: [1, 2, 3, 4, 5]
  Result: {'mean': 3.0, 'median': 3, 'mode': 1, 'std_dev': 1.4142135623730951, 'count': 5}

Test case 2: []
  Result: {'mean': None, 'median': None, 'mode': None, 'std_dev': None, 'count': 0, 'error': 'No valid numeric values provided'}

Test case 3: [1]
  Result: {'mean': 1.0, 'median': 1, 'mode': 1, 'std_dev': 0.0, 'count': 1}

Test case 4: [1, 1, 1]
  Result: {'mean': 1.0, 'median': 1, 'mode': 1, 'std_dev': 0.0, 'count': 3}

Test case 5: [1, 'invalid', 3]
  Result: {'mean': 2.0, 'median': 2.0, 'mode': 1, 'std_dev': 1.0, 'count': 2}

Test case 6: [1, 2, None, 4]
  Result: {'mean': 2.3333333333333335, 'median': 2, 'mode': 1, 'std_dev': 1.247219128924647, 'count': 3}

✓ Question 5 tests passed!


# ***Question 6: Complete Partial Implementation (Pandas/Data)***

In [27]:
import pandas as pd
import numpy as np

def analyze_sales_data(df, group_by_column):
    """
    Analyze sales data by grouping and calculating statistics.
    
    Args:
        df: DataFrame with columns ['product', 'category', 'sales', 'profit']
        group_by_column: Column name to group by
        
    Returns:
        DataFrame with aggregated statistics:
        ['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin']
        Indexed by group_by_column.
    """
    # Handle edge cases: empty df or missing group_by_column
    required_cols = ['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin']
    if df.empty or group_by_column not in df.columns:
        return pd.DataFrame(columns=required_cols)

    # Replace missing sales/profit with 0
    df_copy = df.copy()
    df_copy['sales'] = df_copy['sales'].fillna(0)
    df_copy['profit'] = df_copy['profit'].fillna(0)

    # Group by the specified column
    grouped = df_copy.groupby(group_by_column).agg(
        sales_sum=('sales', 'sum'),
        sales_mean=('sales', 'mean'),
        profit_sum=('profit', 'sum'),
        profit_mean=('profit', 'mean')
    )

    # Calculate profit margin, handle division by zero
    grouped['profit_margin'] = grouped.apply(
        lambda row: row['profit_sum'] / row['sales_sum'] if row['sales_sum'] != 0 else np.nan,
        axis=1
    )

    return grouped

In [29]:
sample_data = pd.DataFrame({
    'product': ['A', 'B', 'C', 'A', 'B', 'C', 'A'],
    'category': ['Electronics', 'Electronics', 'Clothing', 'Electronics', 'Electronics', 'Clothing', 'Electronics'],
    'sales': [100, 200, 150, 120, np.nan, 180, 110],
    'profit': [20, 50, 30, 25, 40, 35, 22]
})

result = analyze_sales_data(sample_data, 'product')
print(result)

         sales_sum  sales_mean  profit_sum  profit_mean  profit_margin
product                                                               
A            330.0       110.0          67    22.333333        0.20303
B            200.0       100.0          90    45.000000        0.45000
C            330.0       165.0          65    32.500000        0.19697


In [30]:
# Test Cell
def test_question_6():
    # Create test data
    test_data = pd.DataFrame({
        'product': ['A', 'B', 'A', 'B', 'A'],
        'category': ['Cat1', 'Cat2', 'Cat1', 'Cat2', 'Cat1'],
        'sales': [100, 200, 150, 300, 50],
        'profit': [20, 40, 30, 60, 10]
    })
    
    # Test grouping by product
    result = analyze_sales_data(test_data, 'product')
    
    # Check structure
    assert isinstance(result, pd.DataFrame), "Should return DataFrame"
    assert len(result) == 2, "Should have 2 groups (A and B)"
    
    # Check required columns exist
    required_cols = ['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin']
    for col in required_cols:
        assert col in result.columns, f"Missing column: {col}"
    
    # Check calculations for product A
    product_a = result.loc['A'] if 'A' in result.index else result[result.index == 'A'].iloc[0]
    assert product_a['sales_sum'] == 300, "Product A sales sum should be 300"
    assert product_a['profit_sum'] == 60, "Product A profit sum should be 60"
    
    print("✓ Question 6 tests passed!")
test_question_6()

✓ Question 6 tests passed!


# ***Question 7: Refactor Messy AI Code (Clean Code)***

In [44]:
from typing import List, Dict, Any
def process_data(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
    """
    Processes a list of user dictionaries and categorizes them based on age.
    
    Categories:
        - 'young_adult': 18 <= age < 25
        - 'adult': 25 <= age < 65
        - 'senior': age >= 65
    
    Only considers active users with valid email addresses containing '@'.
    
    Returns a dictionary with categories as keys and stats as values:
        - count: Number of users in category
        - emails: List of user emails
        - avg_age: Average age of users in category
    """
    def categorize_age(age: int) -> str:
        if age >= 65:
            return 'senior'
        elif age >= 25:
            return 'adult'
        elif age >= 18:
            return 'young_adult'
        else:
            return 'underage'  # Optional, won't be counted
    
    result: Dict[str, Dict[str, Any]] = {}
    
    for item in data:
        if item.get('type') != 'user':
            continue
        if not item.get('active', False):
            continue
        age = item.get('age')
        email = item.get('email')
        if not isinstance(age, (int, float)) or age < 18:
            continue
        if not isinstance(email, str) or '@' not in email:
            continue
        
        category = categorize_age(age)
        if category not in result:
            result[category] = {'count': 0, 'emails': [], 'total_age': 0}
        
        result[category]['count'] += 1
        result[category]['emails'].append(email)
        result[category]['total_age'] += age
    
    # Compute average age
    for cat in result:
        result[cat]['avg_age'] = result[cat]['total_age'] / result[cat]['count']
        del result[cat]['total_age']
    
    return result

# ==========================
# Test Question 7
# ==========================
test_data = [
    {'type': 'user', 'active': True, 'age': 25, 'email': 'user1@test.com'},
    {'type': 'user', 'active': True, 'age': 70, 'email': 'user2@test.com'},
    {'type': 'user', 'active': False, 'age': 30, 'email': 'user3@test.com'},
    {'type': 'admin', 'active': True, 'age': 35, 'email': 'admin@test.com'},
    {'type': 'user', 'active': True, 'age': 20, 'email': 'invalid-email'},
    {'type': 'user', 'active': True, 'age': 40, 'email': 'user4@test.com'},
]

clean_result = process_user_data_clean(test_data)
print("Clean result:", clean_result)

Clean result: {'adult': {'count': 2, 'emails': ['user1@test.com', 'user4@test.com'], 'avg_age': 32.5}, 'senior': {'count': 1, 'emails': ['user2@test.com'], 'avg_age': 70.0}}


In [45]:
# Test Cell
def test_question_7():
    test_data = [
        {'type': 'user', 'active': True, 'age': 25, 'email': 'user1@test.com'},
        {'type': 'user', 'active': True, 'age': 70, 'email': 'user2@test.com'},
        {'type': 'user', 'active': False, 'age': 30, 'email': 'user3@test.com'},
        {'type': 'user', 'active': True, 'age': 20, 'email': 'user4@test.com'},
    ]
    
    original_result = process_data(test_data)
    clean_result = process_user_data_clean(test_data)
    
    # Results should be functionally equivalent
    assert set(original_result.keys()) == set(clean_result.keys()), "Categories don't match"
    
    for category in original_result:
        assert original_result[category]['count'] == clean_result[category]['count'], f"Count mismatch for {category}"
        assert abs(original_result[category]['avg_age'] - clean_result[category]['avg_age']) < 0.01, f"Average age mismatch for {category}"
    
    print("✓ Question 7 tests passed!")

test_question_7()

✓ Question 7 tests passed!


# ***Question 8: Debug Complex Logic (Algorithms)***

In [41]:
def binary_search_buggy(arr, target):
    """
    Fixed binary search implementation.
    
    Args:
        arr: Sorted list of integers
        target: Value to search for
        
    Returns:
        int: Index of target if found, -1 otherwise
    """
    left = 0
    right = len(arr) - 1  # Fix 1: right should be last index, not len(arr)
    
    while left <= right:  # Fix 2: use <= to include last element
        mid = (left + right) // 2
        
        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1  # Fix 3: move past mid
        else:
            right = mid - 1  # Fix 4: move before mid
    
    return -1


In [42]:
test_arrays = [
    ([1, 3, 5, 7, 9, 11], 7),    # index 3
    ([1, 3, 5, 7, 9, 11], 1),    # index 0
    ([1, 3, 5, 7, 9, 11], 11),   # index 5
    ([1, 3, 5, 7, 9, 11], 6),    # -1
    ([5], 5),                     # 0
    ([5], 3),                     # -1
    ([], 5),                      # -1
]

for arr, target in test_arrays:
    print(f"Searching for {target} in {arr}: {binary_search_buggy(arr, target)}")


Searching for 7 in [1, 3, 5, 7, 9, 11]: 3
Searching for 1 in [1, 3, 5, 7, 9, 11]: 0
Searching for 11 in [1, 3, 5, 7, 9, 11]: 5
Searching for 6 in [1, 3, 5, 7, 9, 11]: -1
Searching for 5 in [5]: 0
Searching for 3 in [5]: -1
Searching for 5 in []: -1


In [43]:
# Test Cell
def test_question_8():
    # Test cases with expected results
    test_cases = [
        ([1, 3, 5, 7, 9, 11], 7, 3),      # Found at index 3
        ([1, 3, 5, 7, 9, 11], 1, 0),      # Found at index 0
        ([1, 3, 5, 7, 9, 11], 11, 5),     # Found at index 5
        ([1, 3, 5, 7, 9, 11], 6, -1),     # Not found
        ([1, 3, 5, 7, 9, 11], 0, -1),     # Less than min
        ([1, 3, 5, 7, 9, 11], 12, -1),    # Greater than max
        ([5], 5, 0),                       # Single element found
        ([5], 3, -1),                      # Single element not found
        ([], 5, -1),                       # Empty array
    ]
    
    for arr, target, expected in test_cases:
        result = binary_search_buggy(arr, target)
        assert result == expected, f"Failed for {target} in {arr}: expected {expected}, got {result}"
    
    # Test that it actually uses binary search (check performance)
    large_array = list(range(0, 10000, 2))  # [0, 2, 4, 6, ..., 9998]
    result = binary_search_buggy(large_array, 5000)
    assert result == 2500, "Should find 5000 at index 2500"
    
    print("✓ Question 8 tests passed!")

test_question_8()

✓ Question 8 tests passed!


# ***Question 9: Add Missing Fonctionnality***

In [46]:
import time
from typing import Any, Optional, Dict
from collections import OrderedDict
import threading

class SimpleCache:
    """
    Enhanced cache with TTL, LRU eviction, size limit, statistics, and management methods.
    """
    def __init__(self, max_size: int = 100, default_ttl: Optional[int] = None):
        """
        Initialize cache with size limit and default TTL.
        """
        self.max_size = max_size
        self.default_ttl = default_ttl
        self._data = OrderedDict()  # key -> (value, expire_time)
        self._lock = threading.Lock()
        
        # Statistics
        self.hits = 0
        self.misses = 0
        self.evictions = 0

    def _current_time(self) -> float:
        return time.time()

    def _is_expired(self, key: str) -> bool:
        value, expire_time = self._data.get(key, (None, None))
        if expire_time is None:
            return False
        return self._current_time() >= expire_time

    def _evict_lru(self, count: int = 1) -> int:
        evicted = 0
        while evicted < count and self._data:
            key, _ = self._data.popitem(last=False)  # Remove least recently used
            evicted += 1
            self.evictions += 1
        return evicted

    def cleanup_expired(self) -> int:
        removed = 0
        keys_to_delete = []
        now = self._current_time()
        with self._lock:
            for key, (_, expire_time) in self._data.items():
                if expire_time is not None and now >= expire_time:
                    keys_to_delete.append(key)
            for key in keys_to_delete:
                self._data.pop(key, None)
                removed += 1
        return removed

    def get(self, key: str) -> Optional[Any]:
        with self._lock:
            if key not in self._data:
                self.misses += 1
                return None
            if self._is_expired(key):
                self._data.pop(key, None)
                self.misses += 1
                return None
            # Move to end to mark as recently used
            self._data.move_to_end(key)
            self.hits += 1
            return self._data[key][0]

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        expire_time = None
        if ttl is None:
            ttl = self.default_ttl
        if ttl is not None:
            expire_time = self._current_time() + ttl
        with self._lock:
            if key in self._data:
                # Remove first to re-insert at end
                self._data.pop(key)
            elif len(self._data) >= self.max_size:
                self._evict_lru(1)
            self._data[key] = (value, expire_time)
            self._data.move_to_end(key)

    def delete(self, key: str) -> bool:
        with self._lock:
            if key in self._data:
                self._data.pop(key)
                return True
            return False

    def clear(self) -> None:
        with self._lock:
            self._data.clear()
            self.hits = 0
            self.misses = 0
            self.evictions = 0

    def size(self) -> int:
        with self._lock:
            return len(self._data)

    def get_stats(self) -> Dict[str, int]:
        with self._lock:
            return {
                "hits": self.hits,
                "misses": self.misses,
                "evictions": self.evictions,
                "current_size": len(self._data)
            }

In [47]:
# Test your enhanced implementation
if __name__ == "__main__":
    # Test TTL functionality
    cache = SimpleCache(max_size=3, default_ttl=1)  # 1 second TTL
    
    print("=== Testing TTL ===")
    cache.set("temp_key", "temp_value")
    print(f"Immediately after set: {cache.get('temp_key')}")
    time.sleep(1.1)
    print(f"After TTL expired: {cache.get('temp_key')}")
    
    print("\n=== Testing Size Limits & LRU ===")
    cache.clear()
    cache.set("a", 1, ttl=None)  # No expiration
    cache.set("b", 2, ttl=None)
    cache.set("c", 3, ttl=None)
    print(f"Cache size after adding 3 items: {cache.size()}")
    
    # Access 'a' to make it recently used
    cache.get("a")
# Add 'd' which should evict 'b' (least recently used)
    cache.set("d", 4, ttl=None)
    print(f"After adding 'd': a={cache.get('a')}, b={cache.get('b')}, c={cache.get('c')}, d={cache.get('d')}")
    
    print("\n=== Testing Statistics ===")
    stats = cache.get_stats()
    print(f"Cache statistics: {stats}")
    
    print("\n=== Testing Cleanup ===")
    cache.set("expire_me", "value", ttl=1)
    time.sleep(1.1)
    removed_count = cache.cleanup_expired()
    print(f"Expired items removed: {removed_count}")


=== Testing TTL ===
Immediately after set: temp_value
After TTL expired: None

=== Testing Size Limits & LRU ===
Cache size after adding 3 items: 3
After adding 'd': a=1, b=None, c=3, d=4

=== Testing Statistics ===
Cache statistics: {'hits': 4, 'misses': 1, 'evictions': 1, 'current_size': 3}

=== Testing Cleanup ===
Expired items removed: 3


In [48]:
# Test Cell 
import time

def test_question_9():
    print("Testing enhanced cache implementation...")
    
    # Test 1: Basic functionality
    cache = SimpleCache(max_size=3, default_ttl=60)
    
    cache.set("key1", "value1")
    cache.set("key2", "value2")
    
    assert cache.get("key1") == "value1", "Basic get/set failed"
    assert cache.get("key2") == "value2", "Basic get/set failed"
    assert cache.size() == 2, f"Expected size 2, got {cache.size()}"
    
    # Test 2: TTL expiration
    cache.clear()
    cache.set("ttl_key", "ttl_value", ttl=1)  # 1 second TTL
    assert cache.get("ttl_key") == "ttl_value", "TTL key should be accessible immediately"
    
    time.sleep(1.1)  # Wait for expiration
    assert cache.get("ttl_key") is None, "TTL key should be expired and return None"
    
    # Test 3: Size limits and LRU eviction
    cache.clear()
    cache.set("a", 1)
    cache.set("b", 2) 
    cache.set("c", 3)  # Cache is now full (max_size=3)
    
    # Access 'a' to make it recently used
    cache.get("a")
    
    # Add 'd' which should evict 'b' (least recently used)
    cache.set("d", 4)
    
    assert cache.get("a") == 1, "Recently used 'a' should not be evicted"
    assert cache.get("b") is None, "Least recently used 'b' should be evicted"
    assert cache.get("c") == 3, "'c' should still be in cache"
    assert cache.get("d") == 4, "Newly added 'd' should be in cache"
    assert cache.size() == 3, "Cache size should remain at max_size"
    
    # Test 4: Statistics tracking
    cache.clear()
    cache.set("stat_key", "stat_value")
    cache.get("stat_key")  # Hit
    cache.get("nonexistent")  # Miss
    
    stats = cache.get_stats()
    required_stats = ["hits", "misses", "evictions", "current_size"]
    for stat in required_stats:
        assert stat in stats, f"Missing statistic: {stat}"
    
    assert stats["hits"] > 0, "Should have recorded hits"
    assert stats["misses"] > 0, "Should have recorded misses"
    assert stats["current_size"] == 1, "Should track current size"

    # Test 5: Manual cleanup
    cache.clear()
    cache.set("expire1", "value1", ttl=1)
    cache.set("expire2", "value2", ttl=1)
    cache.set("keep", "value3", ttl=None)  # No expiration
    
    time.sleep(1.1)  # Wait for expiration
    removed_count = cache.cleanup_expired()
    
    assert removed_count == 2, f"Should have removed 2 expired items, removed {removed_count}"
    assert cache.get("keep") == "value3", "Non-expiring item should remain"
    assert cache.size() == 1, "Only one item should remain after cleanup"
    
    # Test 6: Edge cases
    cache.clear()
    assert cache.size() == 0, "Cache should be empty after clear"
    assert cache.get("nonexistent") is None, "Getting non-existent key should return None"
    assert cache.delete("nonexistent") == False, "Deleting non-existent key should return False"
    
    # Test delete functionality
    cache.set("delete_me", "value")
    assert cache.delete("delete_me") == True, "Deleting existing key should return True"
    assert cache.get("delete_me") is None, "Deleted key should not be accessible"
    
    print("✓ All Question 9 tests passed!")

test_question_9()



Testing enhanced cache implementation...
✓ All Question 9 tests passed!


# ***Question 10: Integration Challenge (Multiple Components)***

In [51]:
import json
from typing import List, Dict, Any, Tuple, Optional, Union

# --- Component 1: DataProcessor ---
class DataProcessor:
    """AI Component 1 - processes raw data and returns structured dict"""
    def process_data(self, raw_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        if not isinstance(raw_data, list):
            raise ValueError("Expected list input")
        result = {
            'total_items': len(raw_data),
            'processed_items': [],
            'metadata': {'processing_time': 0.1, 'timestamp': '2024-01-01T12:00:00Z'}
        }
        for item in raw_data:
            if isinstance(item, dict) and 'value' in item:
                result['processed_items'].append({
                    'id': item.get('id', 'unknown'),
                    'processed_value': item['value'] * 2,
                    'original_value': item['value'],
                    'status': 'processed'
                })
            else:
                result['processed_items'].append({
                    'id': 'error',
                    'processed_value': 0,
                    'original_value': None,
                    'status': 'failed'
                })
        return result

# --- Component 2: AnalyticsEngine ---
class AnalyticsEngine:
    """AI Component 2 - performs analytics on data, expects JSON string input"""
    def analyze(self, json_data_string: str) -> Tuple[Optional[str], Union[Dict[str, float], str]]:
        try:
            data = json.loads(json_data_string)
        except json.JSONDecodeError:
            return None, "Invalid JSON format"
        if not isinstance(data, dict) or 'processed_items' not in data:
            return None, "Missing processed_items in data structure"
        items = data['processed_items']
        if not isinstance(items, list):
            return None, "processed_items must be a list"
        values = []
        failed_count = 0
        for item in items:
            if isinstance(item, dict) and item.get('status') == 'processed':
                if 'processed_value' in item and isinstance(item['processed_value'], (int, float)):
                    values.append(item['processed_value'])
            else:
                failed_count += 1
        if not values:
            return None, "No valid numeric data found for analysis"
        summary = f"Analyzed {len(items)} items ({len(values)} successful, {failed_count} failed)"
        metrics = {
            'avg_value': sum(values) / len(values),
            'max_value': max(values),
            'min_value': min(values),
            'total_value': sum(values),
            'success_rate': len(values) / len(items) if items else 0.0
        }
        return summary, metrics

# --- Component 3: ReportGenerator ---
class ReportGenerator:
    """AI Component 3 - generates reports from analytics results"""
    def generate_report(self, analytics_results_list: List[Tuple[Optional[str], Union[Dict, str]]]) -> str:
        if not isinstance(analytics_results_list, list):
            return "Error: Expected list input for report generation"
        if not analytics_results_list:
            return "Error: No data provided for report generation"
        report_lines = [
            "=" * 50,
            "           ANALYSIS REPORT",
            "=" * 50
        ]
        for i, result in enumerate(analytics_results_list):
            if not isinstance(result, tuple) or len(result) != 2:
                report_lines.append(f"\nSection {i+1}: Invalid data format - expected (summary, metrics) tuple")
                continue
            summary, metrics = result
            if summary is None:
                report_lines.append(f"\nSection {i+1}: Analysis failed")
                report_lines.append(f"  Error: {metrics}")
                continue
            report_lines.append(f"\nSection {i+1}: {summary}")
            if isinstance(metrics, dict):
                report_lines.append("  Metrics:")
                for key, value in metrics.items():
                    if isinstance(value, float):
                        report_lines.append(f"    {key}: {value:.2f}")
                    else:
                        report_lines.append(f"    {key}: {value}")
            else:
                report_lines.append(f"  Metrics: {metrics}")
        report_lines.append("\n" + "=" * 50)
        return "\n".join(report_lines)

# --- Adapter / Helper Functions ---
def dict_to_json_adapter(data_dict: Dict[str, Any]) -> str:
    try:
        return json.dumps(data_dict)
    except (TypeError, ValueError):
        return json.dumps({'processed_items': []})

def validate_and_clean_raw_data(raw_data: Any) -> List[Dict[str, Any]]:
    cleaned = []
    if not isinstance(raw_data, list):
        return cleaned
    for item in raw_data:
        if isinstance(item, dict) and 'value' in item:
            val = item['value']
            if isinstance(val, (int, float)):
                cleaned.append({'id': item.get('id', 'unknown'), 'value': val})
    return cleaned

# --- Integrated Pipeline ---
def integrated_pipeline(raw_data_list: List[Any]) -> str:
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    analytics_results = []
    if not raw_data_list:
        return reporter.generate_report([])
    for raw_data in raw_data_list:
        try:
            cleaned_data = validate_and_clean_raw_data(raw_data)
            processed_dict = processor.process_data(cleaned_data)
            json_data = dict_to_json_adapter(processed_dict)
            result = analytics.analyze(json_data)
            analytics_results.append(result)
        except Exception as e:
            analytics_results.append((None, f"Unexpected error: {e}"))
    return reporter.generate_report(analytics_results)

# --- Sample Data Creation ---
def create_sample_data() -> List[List[Dict[str, Any]]]:
    return [
        [{'id': 'A1', 'value': 10}, {'id': 'A2', 'value': 20}, {'id': 'A3', 'value': 15}],
        [{'id': 'B1', 'value': 5}, {'id': 'B2', 'value': 25}],
        [{'id': 'C1', 'value': 30}, {'id': 'C2'}, {'value': 40}, {'id': 'C4', 'value': 'invalid'}]
    ]

# --- Test Run ---
if __name__ == "__main__":
    sample_datasets = create_sample_data()
    report = integrated_pipeline(sample_datasets)
    print(report)


           ANALYSIS REPORT

Section 1: Analyzed 3 items (3 successful, 0 failed)
  Metrics:
    avg_value: 30.00
    max_value: 40
    min_value: 20
    total_value: 90
    success_rate: 1.00

Section 2: Analyzed 2 items (2 successful, 0 failed)
  Metrics:
    avg_value: 30.00
    max_value: 50
    min_value: 10
    total_value: 60
    success_rate: 1.00

Section 3: Analyzed 2 items (2 successful, 0 failed)
  Metrics:
    avg_value: 70.00
    max_value: 80
    min_value: 60
    total_value: 140
    success_rate: 1.00



In [52]:
if __name__ == "__main__":
    print("Testing component integration...")
    
    # Test individual components first
    print("\n=== Testing Individual Components ===")
    
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    # Test DataProcessor
    test_data = [{'id': 'test', 'value': 10}]
    processed = processor.process_data(test_data)
    print(f"DataProcessor output: {processed}")
    
    # Test AnalyticsEngine
    json_data = json.dumps(processed)
    analysis_result = analytics.analyze(json_data)
    print(f"AnalyticsEngine output: {analysis_result}")
    
    # Test ReportGenerator
    report = reporter.generate_report([analysis_result])
    print(f"ReportGenerator output:\n{report}")
    
    print("\n=== Testing Integrated Pipeline ===")
    
    # Test full pipeline
    sample_datasets = create_sample_data()
    
    try:
        final_report = integrated_pipeline(sample_datasets)
        print("Integration successful!")
        print(final_report)
    except Exception as e:
        print(f"Integration failed: {e}")
        import traceback
        traceback.print_exc()

Testing component integration...

=== Testing Individual Components ===
DataProcessor output: {'total_items': 1, 'processed_items': [{'id': 'test', 'processed_value': 20, 'original_value': 10, 'status': 'processed'}], 'metadata': {'processing_time': 0.1, 'timestamp': '2024-01-01T12:00:00Z'}}
AnalyticsEngine output: ('Analyzed 1 items (1 successful, 0 failed)', {'avg_value': 20.0, 'max_value': 20, 'min_value': 20, 'total_value': 20, 'success_rate': 1.0})
ReportGenerator output:
           ANALYSIS REPORT

Section 1: Analyzed 1 items (1 successful, 0 failed)
  Metrics:
    avg_value: 20.00
    max_value: 20
    min_value: 20
    total_value: 20
    success_rate: 1.00


=== Testing Integrated Pipeline ===
Integration successful!
           ANALYSIS REPORT

Section 1: Analyzed 3 items (3 successful, 0 failed)
  Metrics:
    avg_value: 30.00
    max_value: 40
    min_value: 20
    total_value: 90
    success_rate: 1.00

Section 2: Analyzed 2 items (2 successful, 0 failed)
  Metrics:
    avg

In [53]:
# Test Cell
def test_question_10():
    print("Testing integrated pipeline...")
    
    # Test 1: Individual component functionality
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    # Test DataProcessor
    test_data = [{'id': 'test1', 'value': 10}, {'id': 'test2', 'value': 20}]
    processed = processor.process_data(test_data)
    
    assert isinstance(processed, dict), "DataProcessor should return dict"
    assert 'total_items' in processed, "Missing total_items in processed data"
    assert 'processed_items' in processed, "Missing processed_items in processed data"
    assert processed['total_items'] == 2, "Should count items correctly"
    
    # Test AnalyticsEngine
    json_data = json.dumps(processed)
    summary, metrics = analytics.analyze(json_data)
    
    assert summary is not None, "Analytics should return valid summary"
    assert isinstance(metrics, dict), "Analytics should return metrics dict"
    assert 'avg_value' in metrics, "Missing avg_value in metrics"
    
    # Test ReportGenerator
    report = reporter.generate_report([(summary, metrics)])
    
    assert isinstance(report, str), "Report should be string"
    assert "ANALYSIS REPORT" in report, "Report should contain header"
    assert "Section 1" in report, "Report should contain section"
    
    # Test 2: Data validation and cleaning
    cleaned_data = validate_and_clean_raw_data([
        {'id': 'valid', 'value': 10},
        {'value': 20},  # Missing id
        {'id': 'invalid'},  # Missing value
        'invalid_format'  # Wrong format
    ])
    
    assert isinstance(cleaned_data, list), "Should return list"
    # Should handle invalid data gracefully
    
    # Test 3: Integration adapters
    test_dict = {'processed_items': [{'processed_value': 10}]}
    json_str = dict_to_json_adapter(test_dict)
    
    assert isinstance(json_str, str), "Should return JSON string"
    # Should be valid JSON
    parsed = json.loads(json_str)
    assert parsed == test_dict, "Should preserve data structure"
    
    # Test 4: Full pipeline integration
    sample_datasets = [
        [{'id': 'A1', 'value': 10}, {'id': 'A2', 'value': 20}],
        [{'id': 'B1', 'value': 5}],
        []  # Empty dataset
    ]
    
    final_report = integrated_pipeline(sample_datasets)
    
    assert isinstance(final_report, str), "Pipeline should return string report"
    assert "ANALYSIS REPORT" in final_report, "Should contain report header"
    
    # Should handle multiple sections
    assert "Section 1" in final_report, "Should have first section"
    assert "Section 2" in final_report, "Should have second section"
    
    # Test 5: Error handling
    # Test with invalid input
    error_report = integrated_pipeline([])
    assert isinstance(error_report, str), "Should handle empty input gracefully"
    
    # Test with malformed data
    malformed_report = integrated_pipeline([["not", "a", "dict", "list"]])
    assert isinstance(malformed_report, str), "Should handle malformed data"
    
    # Test 6: Edge cases
    edge_cases = [
        [{'id': 'only_id'}],  # Missing value
        [{'value': 42}],      # Missing id
        [{}],                 # Empty dict
    ]
    
    edge_report = integrated_pipeline(edge_cases)
    assert isinstance(edge_report, str), "Should handle edge cases"
    assert "ANALYSIS REPORT" in edge_report, "Should still generate report structure"
    
    print("✓ All Question 10 tests passed!")

# Run the test
test_question_10()

Testing integrated pipeline...
✓ All Question 10 tests passed!
