# Python Performance and Memory Management

## Learning Objectives
By the end of this notebook, you will understand:
- Memory management in Python
- Performance optimization techniques
- Profiling and benchmarking
- Memory-efficient data structures
- Garbage collection and memory leaks
- Optimization for ML/NLP workloads

## Why Performance Matters for ML/NLP
- Large datasets require efficient memory usage
- Training time optimization is crucial
- Real-time inference needs fast execution
- Resource-constrained environments

## 1. Memory Management Fundamentals

In [None]:
import sys
import gc
import tracemalloc
import psutil
import os
from memory_profiler import profile

def get_memory_usage():
    """Get current memory usage of the process."""
    process = psutil.Process(os.getpid())
    memory_info = process.memory_info()
    return memory_info.rss / 1024 / 1024  # MB

def get_object_size(obj):
    """Get the size of an object in bytes."""
    return sys.getsizeof(obj)

print("=== Memory Management Basics ===")
print(f"Current memory usage: {get_memory_usage():.2f} MB")

# Memory usage of different data types
data_types = {
    'int': 42,
    'float': 3.14159,
    'string': "Hello, World!",
    'list': [1, 2, 3, 4, 5],
    'dict': {'a': 1, 'b': 2, 'c': 3},
    'tuple': (1, 2, 3, 4, 5),
    'set': {1, 2, 3, 4, 5}
}

print("\nMemory usage by data type:")
for name, obj in data_types.items():
    size = get_object_size(obj)
    print(f"{name:8}: {size:4} bytes")

# Memory growth with list size
print("\nMemory usage growth with list size:")
for size in [10, 100, 1000, 10000]:
    lst = list(range(size))
    memory_size = get_object_size(lst)
    print(f"List of {size:5} elements: {memory_size:8} bytes ({memory_size/size:.2f} bytes per element)")

## 2. Memory-Efficient Data Structures

In [None]:
import array
from collections import deque, defaultdict, Counter
import numpy as np

# Compare memory usage of different sequence types
def compare_sequence_memory():
    """Compare memory usage of different sequence types."""
    n = 10000
    
    # Python list
    py_list = list(range(n))
    
    # Python array (typed)
    py_array = array.array('i', range(n))  # 'i' for integers
    
    # NumPy array
    np_array = np.arange(n, dtype=np.int32)
    
    # Tuple
    py_tuple = tuple(range(n))
    
    sequences = {
        'Python list': py_list,
        'Python array': py_array,
        'NumPy array': np_array,
        'Tuple': py_tuple
    }
    
    print(f"Memory usage for {n} integers:")
    for name, seq in sequences.items():
        size = get_object_size(seq)
        if hasattr(seq, 'nbytes'):  # NumPy array
            size = seq.nbytes
        print(f"{name:12}: {size:8} bytes ({size/n:.2f} bytes per element)")

compare_sequence_memory()

print("\n=== Generator vs List Memory Usage ===")

def memory_efficient_processing():
    """Demonstrate memory-efficient processing with generators."""
    
    # Memory-heavy approach: create full list
    def process_with_list(n):
        data = [i**2 for i in range(n)]  # Create full list
        return sum(x for x in data if x % 2 == 0)
    
    # Memory-efficient approach: use generator
    def process_with_generator(n):
        data = (i**2 for i in range(n))  # Generator expression
        return sum(x for x in data if x % 2 == 0)
    
    # Compare memory usage
    n = 100000
    
    print(f"Processing {n} numbers:")
    
    # Measure list approach
    initial_memory = get_memory_usage()
    result1 = process_with_list(n)
    list_memory = get_memory_usage()
    
    # Force garbage collection
    gc.collect()
    
    # Measure generator approach  
    gc_memory = get_memory_usage()
    result2 = process_with_generator(n)
    generator_memory = get_memory_usage()
    
    print(f"List approach memory increase: {list_memory - initial_memory:.2f} MB")
    print(f"Generator approach memory increase: {generator_memory - gc_memory:.2f} MB")
    print(f"Results match: {result1 == result2}")
    print(f"Result: {result1}")

memory_efficient_processing()

## 3. Performance Profiling and Benchmarking

In [None]:
import time
import timeit
import cProfile
import pstats
from io import StringIO

# Performance measurement decorator
def benchmark(func):
    """Decorator to benchmark function execution time."""
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        execution_time = end_time - start_time
        print(f"{func.__name__}: {execution_time:.6f} seconds")
        return result
    return wrapper

# Different approaches to text processing
@benchmark
def string_concatenation_slow(words):
    """Slow string concatenation."""
    result = ""
    for word in words:
        result += word + " "
    return result.strip()

@benchmark 
def string_concatenation_fast(words):
    """Fast string concatenation."""
    return " ".join(words)

@benchmark
def list_comprehension_approach(words):
    """Using list comprehension."""
    return " ".join([word for word in words])

@benchmark
def generator_approach(words):
    """Using generator expression."""
    return " ".join(word for word in words)

# Test with sample data
print("=== String Processing Performance Comparison ===")
sample_words = [f"word{i}" for i in range(1000)]

result1 = string_concatenation_slow(sample_words)
result2 = string_concatenation_fast(sample_words)
result3 = list_comprehension_approach(sample_words)
result4 = generator_approach(sample_words)

print(f"All results equal: {result1 == result2 == result3 == result4}")

# Using timeit for more precise measurements
print("\n=== Precise Timing with timeit ===")

setup_code = "words = [f'word{i}' for i in range(100)]"

# Test different approaches
approaches = {
    'String concat': "''.join(words)",
    'Plus operator': "result = ''; [result := result + word + ' ' for word in words]; result.strip()",
    'Format string': "' '.join('{}' for _ in words).format(*words)",
    'Join method': "' '.join(words)"
}

for name, code in approaches.items():
    try:
        time_taken = timeit.timeit(code, setup=setup_code, number=1000)
        print(f"{name:15}: {time_taken:.6f} seconds (1000 iterations)")
    except:
        print(f"{name:15}: Error in execution")

## 4. Memory Profiling with tracemalloc

In [None]:
import tracemalloc
import linecache

def memory_intensive_function():
    """Function that uses significant memory."""
    # Create large data structures
    large_list = list(range(100000))  # Line that will show up in trace
    large_dict = {i: f"value_{i}" for i in range(50000)}  # Another memory-heavy line
    
    # Some processing
    processed_data = [x * 2 for x in large_list[:10000]]
    
    return len(processed_data)

def analyze_memory_usage():
    """Analyze memory usage with tracemalloc."""
    print("=== Memory Profiling with tracemalloc ===")
    
    # Start tracing
    tracemalloc.start()
    
    # Get initial snapshot
    snapshot1 = tracemalloc.take_snapshot()
    
    # Run memory-intensive function
    result = memory_intensive_function()
    
    # Get final snapshot
    snapshot2 = tracemalloc.take_snapshot()
    
    # Compare snapshots
    top_stats = snapshot2.compare_to(snapshot1, 'lineno')
    
    print(f"Function result: {result}")
    print("\nTop 5 memory allocations:")
    
    for index, stat in enumerate(top_stats[:5]):
        print(f"{index + 1}. {stat}")
        
        # Show the actual code line
        frame = stat.traceback.format()[-1]
        print(f"   Code: {frame.strip()}")
    
    # Get current memory statistics
    current, peak = tracemalloc.get_traced_memory()
    print(f"\nCurrent memory usage: {current / 1024 / 1024:.2f} MB")
    print(f"Peak memory usage: {peak / 1024 / 1024:.2f} MB")
    
    # Stop tracing
    tracemalloc.stop()

analyze_memory_usage()

# Memory leak detection
def detect_memory_leaks():
    """Simple memory leak detection."""
    print("\n=== Memory Leak Detection ===")
    
    # Simulate a potential memory leak
    global_cache = []  # This could cause memory leaks if not managed
    
    def leaky_function(size):
        data = list(range(size))
        global_cache.append(data)  # Adding to global cache
        return len(data)
    
    # Monitor memory usage
    initial_memory = get_memory_usage()
    print(f"Initial memory: {initial_memory:.2f} MB")
    
    # Run function multiple times
    for i in range(5):
        leaky_function(10000)
        current_memory = get_memory_usage()
        print(f"After iteration {i+1}: {current_memory:.2f} MB (+{current_memory - initial_memory:.2f} MB)")
    
    print(f"Global cache size: {len(global_cache)} items")
    
    # Clean up
    global_cache.clear()
    gc.collect()
    
    final_memory = get_memory_usage()
    print(f"After cleanup: {final_memory:.2f} MB")

detect_memory_leaks()

## 5. Optimization Techniques

In [None]:
from functools import lru_cache, reduce
from operator import mul
import itertools

print("=== Optimization Techniques ===")

# 1. Memoization with lru_cache
def fibonacci_slow(n):
    """Slow recursive fibonacci."""
    if n < 2:
        return n
    return fibonacci_slow(n-1) + fibonacci_slow(n-2)

@lru_cache(maxsize=128)
def fibonacci_fast(n):
    """Fast memoized fibonacci."""
    if n < 2:
        return n
    return fibonacci_fast(n-1) + fibonacci_fast(n-2)

# Compare performance
print("Fibonacci comparison (n=30):")

start_time = time.perf_counter()
result_slow = fibonacci_slow(30)
slow_time = time.perf_counter() - start_time

start_time = time.perf_counter()
result_fast = fibonacci_fast(30)
fast_time = time.perf_counter() - start_time

print(f"Slow version: {slow_time:.6f} seconds")
print(f"Fast version: {fast_time:.6f} seconds")
print(f"Speedup: {slow_time / fast_time:.0f}x faster")
print(f"Results match: {result_slow == result_fast}")

# 2. Using built-in functions vs loops
print("\n=== Built-in Functions vs Loops ===")

numbers = list(range(100000))

@benchmark
def manual_sum(numbers):
    """Manual summation with loop."""
    total = 0
    for num in numbers:
        total += num
    return total

@benchmark 
def builtin_sum(numbers):
    """Using built-in sum function."""
    return sum(numbers)

@benchmark
def numpy_sum(numbers):
    """Using NumPy sum."""
    arr = np.array(numbers)
    return np.sum(arr)

result1 = manual_sum(numbers)
result2 = builtin_sum(numbers)
result3 = int(numpy_sum(numbers))

print(f"All results equal: {result1 == result2 == result3}")

# 3. List comprehensions vs loops
print("\n=== List Comprehensions vs Loops ===")

@benchmark
def process_with_loop(numbers):
    """Process numbers with explicit loop."""
    result = []
    for num in numbers:
        if num % 2 == 0:
            result.append(num ** 2)
    return result

@benchmark
def process_with_comprehension(numbers):
    """Process numbers with list comprehension."""
    return [num ** 2 for num in numbers if num % 2 == 0]

@benchmark
def process_with_filter_map(numbers):
    """Process numbers with filter and map."""
    return list(map(lambda x: x ** 2, filter(lambda x: x % 2 == 0, numbers)))

test_numbers = list(range(10000))
result1 = process_with_loop(test_numbers)
result2 = process_with_comprehension(test_numbers)
result3 = process_with_filter_map(test_numbers)

print(f"All results equal: {result1 == result2 == result3}")
print(f"Result length: {len(result1)}")

## 6. ML/NLP Specific Optimizations

In [None]:
import string
from collections import defaultdict, Counter
import re

print("=== ML/NLP Performance Optimizations ===")

# Sample text data for testing
sample_texts = [
    "Natural language processing is fascinating and challenging.",
    "Machine learning algorithms require careful optimization.", 
    "Text preprocessing can be computationally expensive.",
    "Efficient implementations make a significant difference."
] * 1000  # Multiply to create larger dataset

# Text preprocessing optimization
@benchmark
def preprocess_text_slow(texts):
    """Slow text preprocessing."""
    processed = []
    for text in texts:
        # Multiple passes over the text
        text = text.lower()
        text = text.translate(str.maketrans('', '', string.punctuation))
        text = ' '.join(text.split())  # Remove extra whitespace
        words = text.split()
        # Filter short words
        words = [word for word in words if len(word) > 2]
        processed.append(words)
    return processed

@benchmark
def preprocess_text_fast(texts):
    """Optimized text preprocessing."""
    # Pre-compile regex for better performance
    punct_pattern = re.compile(r'[^\w\s]')
    
    processed = []
    for text in texts:
        # Single pass with combined operations
        text = punct_pattern.sub('', text.lower())
        words = [word for word in text.split() if len(word) > 2]
        processed.append(words)
    return processed

# Compare preprocessing approaches
result1 = preprocess_text_slow(sample_texts[:100])  # Use smaller sample for slow version
result2 = preprocess_text_fast(sample_texts)

print(f"Preprocessing results match (first 100): {result1 == result2[:100]}")

# Vectorization optimization
print("\n=== Text Vectorization Optimization ===")

@benchmark
def create_bow_slow(texts, vocab):
    """Slow bag-of-words creation."""
    vectors = []
    for text in texts:
        vector = []
        for word in vocab:
            count = text.count(word)
            vector.append(count)
        vectors.append(vector)
    return vectors

@benchmark
def create_bow_fast(texts, vocab):
    """Fast bag-of-words creation using Counter."""
    vocab_set = set(vocab)
    vectors = []
    
    for text in texts:
        word_counts = Counter(text)
        vector = [word_counts.get(word, 0) for word in vocab]
        vectors.append(vector)
    return vectors

# Create vocabulary from processed text
processed_sample = preprocess_text_fast(sample_texts[:100])
all_words = set()
for words in processed_sample:
    all_words.update(words)
vocab = sorted(list(all_words))[:50]  # Use top 50 words

print(f"Vocabulary size: {len(vocab)}")
print(f"Sample size: {len(processed_sample)}")

vectors1 = create_bow_slow(processed_sample[:20], vocab)  # Smaller sample for slow version
vectors2 = create_bow_fast(processed_sample, vocab)

print(f"Vector dimensions: {len(vectors2[0])}")
print(f"Vectorization results match (first 20): {vectors1 == vectors2[:20]}")

## 7. NumPy Performance Optimizations

In [None]:
import numpy as np

print("=== NumPy Performance Optimizations ===")

# Vectorization vs loops
@benchmark
def compute_distances_loop(points1, points2):
    """Compute distances using Python loops."""
    distances = []
    for p1 in points1:
        row_distances = []
        for p2 in points2:
            # Euclidean distance
            dist = ((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)**0.5
            row_distances.append(dist)
        distances.append(row_distances)
    return distances

@benchmark
def compute_distances_numpy(points1, points2):
    """Compute distances using NumPy vectorization."""
    # Convert to NumPy arrays if needed
    p1 = np.array(points1)
    p2 = np.array(points2)
    
    # Vectorized distance computation
    # Broadcasting to compute all pairwise distances
    diff = p1[:, np.newaxis, :] - p2[np.newaxis, :, :]
    distances = np.sqrt(np.sum(diff**2, axis=2))
    
    return distances.tolist()

# Generate test data
np.random.seed(42)
points1 = np.random.rand(100, 2).tolist()
points2 = np.random.rand(100, 2).tolist()

print(f"Computing distances between {len(points1)} and {len(points2)} points")

distances1 = compute_distances_loop(points1[:20], points2[:20])  # Smaller for loop version
distances2 = compute_distances_numpy(points1, points2)

# Check if results are approximately equal (first 20x20)
diff = np.max(np.abs(np.array(distances1) - np.array(distances2[:20])[:, :20]))
print(f"Maximum difference: {diff:.10f}")

# Memory-efficient operations
print("\n=== Memory-Efficient NumPy Operations ===")

@benchmark
def matrix_operations_copy(matrix):
    """Matrix operations that create copies."""
    result = matrix + 1
    result = result * 2
    result = result - 0.5
    return result

@benchmark 
def matrix_operations_inplace(matrix):
    """Memory-efficient in-place operations."""
    result = matrix.copy()  # Only one copy
    result += 1
    result *= 2
    result -= 0.5
    return result

# Test with large matrix
large_matrix = np.random.rand(1000, 1000)

print(f"Matrix shape: {large_matrix.shape}")
print(f"Matrix memory usage: {large_matrix.nbytes / 1024 / 1024:.2f} MB")

result1 = matrix_operations_copy(large_matrix)
result2 = matrix_operations_inplace(large_matrix)

print(f"Results are equal: {np.allclose(result1, result2)}")

# Data type optimization
print("\n=== Data Type Optimization ===")

def compare_dtypes():
    """Compare memory usage of different data types."""
    size = 1000000
    
    arrays = {
        'float64': np.ones(size, dtype=np.float64),
        'float32': np.ones(size, dtype=np.float32),
        'int64': np.ones(size, dtype=np.int64),
        'int32': np.ones(size, dtype=np.int32),
        'int16': np.ones(size, dtype=np.int16),
        'int8': np.ones(size, dtype=np.int8),
    }
    
    print(f"Memory usage for {size} elements:")
    for dtype_name, arr in arrays.items():
        memory_mb = arr.nbytes / 1024 / 1024
        print(f"{dtype_name:8}: {memory_mb:6.2f} MB ({arr.dtype.itemsize} bytes per element)")

compare_dtypes()

## 8. Garbage Collection and Memory Management

In [None]:
import gc
import weakref

print("=== Garbage Collection Analysis ===")

def analyze_gc():
    """Analyze garbage collection behavior."""
    print(f"Garbage collection enabled: {gc.isenabled()}")
    print(f"Current thresholds: {gc.get_threshold()}")
    print(f"Collection counts: {gc.get_count()}")
    
    # Get current stats
    stats_before = gc.get_stats()
    print(f"\nGC stats before: {len(stats_before)} generations")
    
    # Create objects that might form cycles
    objects = []
    for i in range(1000):
        obj = {'id': i, 'data': list(range(100))}
        obj['self_ref'] = obj  # Create circular reference
        objects.append(obj)
    
    print(f"Created {len(objects)} objects with circular references")
    print(f"Collection counts after creation: {gc.get_count()}")
    
    # Force garbage collection
    collected = gc.collect()
    print(f"Objects collected by gc.collect(): {collected}")
    print(f"Collection counts after gc.collect(): {gc.get_count()}")
    
    # Clear references and collect again
    objects.clear()
    collected = gc.collect()
    print(f"Objects collected after clearing references: {collected}")

analyze_gc()

# Weak references to avoid memory leaks
print("\n=== Weak References Demo ===")

class DataProcessor:
    """Example class for weak reference demo."""
    def __init__(self, name):
        self.name = name
        self.data = list(range(1000))  # Some data
    
    def __del__(self):
        print(f"DataProcessor {self.name} is being deleted")

def weak_reference_demo():
    """Demonstrate weak references."""
    # Create object
    processor = DataProcessor("main")
    
    # Create weak reference
    weak_ref = weakref.ref(processor)
    print(f"Weak reference created: {weak_ref}")
    print(f"Object accessible via weak ref: {weak_ref() is not None}")
    
    # Delete original reference
    del processor
    
    # Force garbage collection
    gc.collect()
    
    print(f"Object accessible after deletion: {weak_ref() is not None}")
    if weak_ref() is None:
        print("Weak reference is now dead")

weak_reference_demo()

# Context managers for resource management
print("\n=== Context Managers for Resource Management ===")

class MemoryManagedProcessor:
    """Context manager for automatic resource cleanup."""
    
    def __init__(self, size):
        self.size = size
        self.data = None
    
    def __enter__(self):
        print(f"Allocating {self.size} elements")
        self.data = list(range(self.size))
        initial_memory = get_memory_usage()
        print(f"Memory after allocation: {initial_memory:.2f} MB")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Cleaning up resources")
        self.data = None
        gc.collect()
        final_memory = get_memory_usage()
        print(f"Memory after cleanup: {final_memory:.2f} MB")
    
    def process(self):
        """Simulate some processing."""
        if self.data:
            return sum(x**2 for x in self.data[:1000])  # Process first 1000 elements
        return 0

# Use context manager
print(f"Initial memory: {get_memory_usage():.2f} MB")

with MemoryManagedProcessor(100000) as processor:
    result = processor.process()
    print(f"Processing result: {result}")

print(f"Final memory: {get_memory_usage():.2f} MB")

## 9. Practical Performance Optimization Tips

In [None]:
print("=== Practical Performance Tips ===")

# 1. Use slots for memory efficiency
class RegularClass:
    """Regular class without slots."""
    def __init__(self, x, y):
        self.x = x
        self.y = y

class SlottedClass:
    """Memory-efficient class with slots."""
    __slots__ = ['x', 'y']
    
    def __init__(self, x, y):
        self.x = x
        self.y = y

def compare_class_memory():
    """Compare memory usage of regular vs slotted classes."""
    n = 10000
    
    # Create instances
    regular_objects = [RegularClass(i, i*2) for i in range(n)]
    slotted_objects = [SlottedClass(i, i*2) for i in range(n)]
    
    # Measure memory usage
    regular_size = sum(sys.getsizeof(obj) + sys.getsizeof(obj.__dict__) for obj in regular_objects)
    slotted_size = sum(sys.getsizeof(obj) for obj in slotted_objects)
    
    print(f"Memory comparison for {n} objects:")
    print(f"Regular class: {regular_size / 1024:.2f} KB")
    print(f"Slotted class: {slotted_size / 1024:.2f} KB")
    print(f"Memory savings: {(regular_size - slotted_size) / regular_size * 100:.1f}%")

compare_class_memory()

# 2. Efficient string operations
print("\n=== String Operation Optimizations ===")

@benchmark
def string_formatting_old(items):
    """Old-style string formatting."""
    return ['Item: %s, Value: %d' % (item['name'], item['value']) for item in items]

@benchmark
def string_formatting_new(items):
    """New-style string formatting."""
    return ['Item: {}, Value: {}'.format(item['name'], item['value']) for item in items]

@benchmark
def string_formatting_fstring(items):
    """f-string formatting (fastest)."""
    return [f"Item: {item['name']}, Value: {item['value']}" for item in items]

# Test string formatting
test_items = [{'name': f'item_{i}', 'value': i} for i in range(10000)]

result1 = string_formatting_old(test_items)
result2 = string_formatting_new(test_items)
result3 = string_formatting_fstring(test_items)

print(f"All string formatting results equal: {result1 == result2 == result3}")

# 3. Set operations vs list operations
print("\n=== Set vs List Operations ===")

large_list1 = list(range(10000))
large_list2 = list(range(5000, 15000))
large_set1 = set(large_list1)
large_set2 = set(large_list2)

@benchmark
def find_intersection_list(list1, list2):
    """Find intersection using lists."""
    return [x for x in list1 if x in list2]

@benchmark
def find_intersection_set(set1, set2):
    """Find intersection using sets."""
    return list(set1 & set2)

print(f"List sizes: {len(large_list1)}, {len(large_list2)}")

intersection1 = find_intersection_list(large_list1[:1000], large_list2)  # Smaller for list version
intersection2 = find_intersection_set(large_set1, large_set2)

print(f"Intersection sizes: {len(intersection1)}, {len(intersection2)}")

# 4. Generator expressions for memory efficiency
print("\n=== Generator Memory Efficiency ===")

def memory_usage_comparison():
    """Compare memory usage of list vs generator."""
    n = 1000000
    
    # List comprehension - all in memory
    print(f"Creating list of {n} elements...")
    initial_memory = get_memory_usage()
    
    large_list = [x**2 for x in range(n)]
    list_memory = get_memory_usage()
    
    print(f"List memory increase: {list_memory - initial_memory:.2f} MB")
    
    # Clear the list
    del large_list
    gc.collect()
    
    # Generator expression - lazy evaluation
    print(f"Creating generator for {n} elements...")
    generator_initial = get_memory_usage()
    
    large_generator = (x**2 for x in range(n))
    generator_memory = get_memory_usage()
    
    print(f"Generator memory increase: {generator_memory - generator_initial:.2f} MB")
    
    # Consume first 10 elements from generator
    first_ten = [next(large_generator) for _ in range(10)]
    print(f"First 10 elements: {first_ten}")

memory_usage_comparison()

print("\n" + "="*60)
print("Performance Optimization Summary:")
print("1. Use built-in functions when possible")
print("2. Prefer list comprehensions over loops")
print("3. Use generators for large datasets")
print("4. Choose appropriate data types")
print("5. Use __slots__ for memory-efficient classes")
print("6. Leverage NumPy for numerical computations")
print("7. Profile your code to identify bottlenecks")
print("8. Use caching for expensive computations")
print("9. Manage memory with context managers")
print("10. Monitor garbage collection behavior")
print("="*60)

## 10. Real-world ML Performance Case Study

In [None]:
print("=== ML Performance Case Study: Text Classification Pipeline ===")

# Simulate a complete text classification pipeline with performance monitoring

class TextClassificationPipeline:
    """Optimized text classification pipeline."""
    
    def __init__(self):
        self.vocab = None
        self.word_to_idx = None
        self.model_weights = None
        
        # Pre-compile regex patterns for efficiency
        self.punct_pattern = re.compile(r'[^\w\s]')
        self.whitespace_pattern = re.compile(r'\s+')
    
    @benchmark
    def preprocess_batch(self, texts):
        """Efficiently preprocess a batch of texts."""
        processed = []
        
        for text in texts:
            # Single-pass preprocessing
            text = self.punct_pattern.sub(' ', text.lower())
            text = self.whitespace_pattern.sub(' ', text).strip()
            
            # Tokenize and filter
            tokens = [word for word in text.split() if len(word) > 2]
            processed.append(tokens)
        
        return processed
    
    @benchmark
    def build_vocabulary(self, processed_texts, max_vocab=5000):
        """Build vocabulary efficiently using Counter."""
        # Count all words
        word_counts = Counter()
        for tokens in processed_texts:
            word_counts.update(tokens)
        
        # Get most common words
        self.vocab = [word for word, _ in word_counts.most_common(max_vocab)]
        self.word_to_idx = {word: idx for idx, word in enumerate(self.vocab)}
        
        return len(self.vocab)
    
    @benchmark
    def vectorize_batch(self, processed_texts):
        """Efficiently vectorize texts using NumPy."""
        if not self.vocab:
            raise ValueError("Must build vocabulary first")
        
        # Pre-allocate NumPy array for efficiency
        n_texts = len(processed_texts)
        n_features = len(self.vocab)
        vectors = np.zeros((n_texts, n_features), dtype=np.float32)
        
        # Vectorize efficiently
        for i, tokens in enumerate(processed_texts):
            token_counts = Counter(tokens)
            for word, count in token_counts.items():
                if word in self.word_to_idx:
                    idx = self.word_to_idx[word]
                    vectors[i, idx] = count
        
        return vectors
    
    @benchmark
    def train_model(self, X, y, learning_rate=0.01, epochs=10):
        """Simple logistic regression training with NumPy."""
        n_features = X.shape[1]
        
        # Initialize weights
        self.model_weights = np.random.normal(0, 0.01, n_features).astype(np.float32)
        
        # Training loop with vectorized operations
        for epoch in range(epochs):
            # Forward pass
            predictions = np.dot(X, self.model_weights)
            probabilities = 1 / (1 + np.exp(-predictions))
            
            # Backward pass
            errors = y - probabilities
            gradient = np.dot(X.T, errors) / len(y)
            
            # Update weights
            self.model_weights += learning_rate * gradient
            
            if (epoch + 1) % 5 == 0:
                loss = -np.mean(y * np.log(probabilities + 1e-15) + 
                              (1 - y) * np.log(1 - probabilities + 1e-15))
                print(f"  Epoch {epoch + 1}: Loss = {loss:.4f}")
    
    @benchmark
    def predict_batch(self, X):
        """Efficient batch prediction."""
        if self.model_weights is None:
            raise ValueError("Model must be trained first")
        
        predictions = np.dot(X, self.model_weights)
        probabilities = 1 / (1 + np.exp(-predictions))
        return (probabilities > 0.5).astype(int)

# Generate sample data for the case study
def generate_sample_data(n_samples=5000):
    """Generate sample text data for classification."""
    positive_words = ['excellent', 'amazing', 'fantastic', 'great', 'wonderful', 'perfect']
    negative_words = ['terrible', 'awful', 'horrible', 'bad', 'worst', 'disappointing']
    neutral_words = ['okay', 'average', 'normal', 'standard', 'typical', 'regular']
    
    texts = []
    labels = []
    
    for i in range(n_samples):
        if i % 3 == 0:  # Positive
            words = np.random.choice(positive_words, size=np.random.randint(3, 8))
            label = 1
        elif i % 3 == 1:  # Negative  
            words = np.random.choice(negative_words, size=np.random.randint(3, 8))
            label = 0
        else:  # Mixed
            words = np.random.choice(neutral_words + positive_words + negative_words, 
                                   size=np.random.randint(5, 12))
            label = np.random.randint(0, 2)
        
        text = ' '.join(words) + ' product review text here'
        texts.append(text)
        labels.append(label)
    
    return texts, labels

# Run the complete pipeline
print("Generating sample data...")
texts, labels = generate_sample_data(3000)
labels = np.array(labels, dtype=np.float32)

print(f"Dataset: {len(texts)} texts, {np.mean(labels):.2f} positive ratio")

# Initialize pipeline
pipeline = TextClassificationPipeline()

print(f"\nInitial memory usage: {get_memory_usage():.2f} MB")

# Step 1: Preprocess
print("\n1. Preprocessing texts...")
processed_texts = pipeline.preprocess_batch(texts)
print(f"Memory after preprocessing: {get_memory_usage():.2f} MB")

# Step 2: Build vocabulary
print("\n2. Building vocabulary...")
vocab_size = pipeline.build_vocabulary(processed_texts, max_vocab=1000)
print(f"Vocabulary size: {vocab_size}")
print(f"Memory after vocabulary: {get_memory_usage():.2f} MB")

# Step 3: Vectorize
print("\n3. Vectorizing texts...")
X = pipeline.vectorize_batch(processed_texts)
print(f"Feature matrix shape: {X.shape}")
print(f"Feature matrix memory: {X.nbytes / 1024 / 1024:.2f} MB")
print(f"Memory after vectorization: {get_memory_usage():.2f} MB")

# Step 4: Train model
print("\n4. Training model...")
pipeline.train_model(X, labels, learning_rate=0.1, epochs=20)
print(f"Memory after training: {get_memory_usage():.2f} MB")

# Step 5: Evaluate
print("\n5. Making predictions...")
predictions = pipeline.predict_batch(X)
accuracy = np.mean(predictions == labels)
print(f"Training accuracy: {accuracy:.3f}")
print(f"Final memory usage: {get_memory_usage():.2f} MB")

# Performance summary
print("\n" + "="*50)
print("Pipeline Performance Summary:")
print(f"• Processed {len(texts)} texts")
print(f"• Built vocabulary of {vocab_size} words")
print(f"• Created {X.shape[0]}x{X.shape[1]} feature matrix")
print(f"• Achieved {accuracy:.1%} training accuracy")
print(f"• Memory-efficient operations throughout")
print("="*50)

## Practice Exercises

### Exercise 1: Memory Profiling
Profile the memory usage of a word embedding loading function and optimize it.

### Exercise 2: Algorithm Optimization
Optimize a cosine similarity calculation between document vectors.

### Exercise 3: Data Structure Choice
Compare performance of different data structures for storing and querying large vocabularies.

### Exercise 4: Batch Processing
Implement efficient batch processing for a text classification pipeline.

### Exercise 5: Memory Leak Detection
Identify and fix memory leaks in a streaming text processing system.

## Key Takeaways

1. **Memory Management**: Understanding Python's memory model is crucial for ML/NLP
2. **Profiling**: Use tools like `tracemalloc`, `cProfile`, and `memory_profiler`
3. **Data Structures**: Choose the right data structure for your use case
4. **Vectorization**: Use NumPy for numerical operations
5. **Generators**: Use generators for memory-efficient processing of large datasets
6. **Optimization**: Profile first, then optimize bottlenecks
7. **Resource Management**: Use context managers and proper cleanup
8. **Caching**: Use `@lru_cache` for expensive computations

These performance and memory management skills are essential for building scalable ML and NLP applications!