# Advanced Practice Problems

---

## Topics Covered
- Decorators and Metaclasses
- Concurrency (Threading, Async)
- Design Patterns
- Memory Management
- Advanced Data Structures
- Algorithm Optimization

Each problem includes hints and a solution.

---

## Problem 1: Singleton Metaclass

Implement a Singleton pattern using a metaclass.

In [2]:
class SingletonMeta(type):
    """Metaclass that creates Singleton classes"""
    pass

class Database(metaclass=SingletonMeta):
    def __init__(self):
        self.connection = "Connected"

# Test
db1 = Database()
db2 = Database()
print(f"Same instance: {db1 is db2}")  # True

Same instance: False


In [3]:
# Hint: Store instances in _instances dict
# Override __call__ to control instance creation

In [4]:
# Solution 1:
class SingletonMeta(type):
    _instances = {}
    
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super().__call__(*args, **kwargs)
        return cls._instances[cls]

class Database(metaclass=SingletonMeta):
    def __init__(self):
        self.connection = "Connected"

db1 = Database()
db2 = Database()
print(f"Same instance: {db1 is db2}")

Same instance: True


---

## Problem 2: Thread-Safe Counter

Implement a thread-safe counter using locks.

In [5]:
import threading

class ThreadSafeCounter:
    """Thread-safe counter"""
    def __init__(self):
        pass
    
    def increment(self):
        pass
    
    def get_value(self):
        pass

# Test
counter = ThreadSafeCounter()
threads = [threading.Thread(target=counter.increment) for _ in range(1000)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Final count: {counter.get_value()}")  # 1000

Final count: None


In [6]:
# Hint: Use threading.Lock()
# Acquire lock before modifying, release after

In [7]:
# Solution 2:
import threading

class ThreadSafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()
    
    def increment(self):
        with self._lock:
            self._value += 1
    
    def get_value(self):
        with self._lock:
            return self._value

counter = ThreadSafeCounter()
threads = [threading.Thread(target=counter.increment) for _ in range(1000)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"Final count: {counter.get_value()}")

Final count: 1000


---

## Problem 3: Async Web Scraper Pattern

Simulate concurrent URL fetching using asyncio.

In [8]:
import asyncio

async def fetch_url(url, delay):
    """Simulate fetching URL with delay"""
    pass

async def fetch_all(urls):
    """Fetch all URLs concurrently"""
    pass

# Test
urls = [("url1", 1), ("url2", 2), ("url3", 1)]
# Should complete in ~2 seconds, not 4

In [9]:
# Hint: Use asyncio.sleep() to simulate delay
# Use asyncio.gather() to run concurrently

In [10]:
# Solution 3:
import asyncio
import time

async def fetch_url(url, delay):
    print(f"Fetching {url}...")
    await asyncio.sleep(delay)
    return f"Content from {url}"

async def fetch_all(urls):
    tasks = [fetch_url(url, delay) for url, delay in urls]
    return await asyncio.gather(*tasks)

async def main():
    urls = [("url1", 1), ("url2", 2), ("url3", 1)]
    start = time.time()
    results = await fetch_all(urls)
    print(f"Results: {results}")
    print(f"Time: {time.time() - start:.2f}s")

await main()

Fetching url1...
Fetching url2...
Fetching url3...
Results: ['Content from url1', 'Content from url2', 'Content from url3']
Time: 2.02s


---

## Problem 4: Observer Pattern

Implement the Observer design pattern.

In [11]:
class Subject:
    """Observable subject"""
    def __init__(self):
        pass
    def attach(self, observer):
        pass
    def detach(self, observer):
        pass
    def notify(self, message):
        pass

class Observer:
    """Observer interface"""
    def update(self, message):
        pass

# Test
subject = Subject()
# Add observers and test notification

In [12]:
# Hint: Subject maintains list of observers
# notify() calls update() on each observer

In [13]:
# Solution 4:
from abc import ABC, abstractmethod

class Observer(ABC):
    @abstractmethod
    def update(self, message):
        pass

class Subject:
    def __init__(self):
        self._observers = []
    
    def attach(self, observer):
        self._observers.append(observer)
    
    def detach(self, observer):
        self._observers.remove(observer)
    
    def notify(self, message):
        for observer in self._observers:
            observer.update(message)

class EmailObserver(Observer):
    def __init__(self, email):
        self.email = email
    def update(self, message):
        print(f"Email to {self.email}: {message}")

class LogObserver(Observer):
    def update(self, message):
        print(f"LOG: {message}")

subject = Subject()
subject.attach(EmailObserver("user@example.com"))
subject.attach(LogObserver())
subject.notify("System started")

Email to user@example.com: System started
LOG: System started


---

## Problem 5: Descriptor for Validation

Create a descriptor that validates attribute values.

In [14]:
class ValidatedAttribute:
    """Descriptor with validation"""
    def __init__(self, validator, name=None):
        pass
    def __set_name__(self, owner, name):
        pass
    def __get__(self, obj, objtype=None):
        pass
    def __set__(self, obj, value):
        pass

class Person:
    age = ValidatedAttribute(lambda x: isinstance(x, int) and 0 <= x <= 150)
    name = ValidatedAttribute(lambda x: isinstance(x, str) and len(x) > 0)

# Test
p = Person()
p.name = "Alice"
p.age = 30

In [15]:
# Hint: Store values in obj.__dict__ with mangled name
# Call validator in __set__, raise ValueError if invalid

In [16]:
# Solution 5:
class ValidatedAttribute:
    def __init__(self, validator):
        self.validator = validator
        self.name = None
    
    def __set_name__(self, owner, name):
        self.name = f"_validated_{name}"
    
    def __get__(self, obj, objtype=None):
        if obj is None:
            return self
        return getattr(obj, self.name, None)
    
    def __set__(self, obj, value):
        if not self.validator(value):
            raise ValueError(f"Invalid value: {value}")
        setattr(obj, self.name, value)

class Person:
    age = ValidatedAttribute(lambda x: isinstance(x, int) and 0 <= x <= 150)
    name = ValidatedAttribute(lambda x: isinstance(x, str) and len(x) > 0)

p = Person()
p.name = "Alice"
p.age = 30
print(f"{p.name}, {p.age}")

try:
    p.age = -5
except ValueError as e:
    print(f"Error: {e}")

Alice, 30
Error: Invalid value: -5


---

## Problem 6: Producer-Consumer with Queue

Implement producer-consumer pattern using threading and Queue.

In [17]:
import threading
import queue
import time

def producer(q, items):
    """Produce items into queue"""
    pass

def consumer(q, name):
    """Consume items from queue"""
    pass

# Test with multiple producers and consumers

In [18]:
# Hint: Use queue.Queue() for thread-safe queue
# Use None as sentinel to signal end

In [19]:
# Solution 6:
import threading
import queue
import time

def producer(q, items, name):
    for item in items:
        print(f"Producer {name}: adding {item}")
        q.put(item)
        time.sleep(0.1)
    q.put(None)  # Sentinel

def consumer(q, name):
    while True:
        item = q.get()
        if item is None:
            q.put(None)  # Pass sentinel to other consumers
            break
        print(f"Consumer {name}: processing {item}")
        time.sleep(0.15)

q = queue.Queue()

prod = threading.Thread(target=producer, args=(q, range(5), "P1"))
cons1 = threading.Thread(target=consumer, args=(q, "C1"))
cons2 = threading.Thread(target=consumer, args=(q, "C2"))

prod.start()
cons1.start()
cons2.start()

prod.join()
cons1.join()
cons2.join()
print("Done")

Producer P1: adding 0
Consumer C1: processing 0
Producer P1: adding 1
Consumer C2: processing 1
Producer P1: adding 2
Consumer C1: processing 2
Producer P1: adding 3
Consumer C2: processing 3
Producer P1: adding 4
Consumer C1: processing 4
Done


---

## Problem 7: Trie (Prefix Tree)

Implement a Trie for efficient string prefix operations.

In [20]:
class Trie:
    def __init__(self):
        pass
    
    def insert(self, word):
        pass
    
    def search(self, word):
        """Return True if word exists"""
        pass
    
    def starts_with(self, prefix):
        """Return True if any word starts with prefix"""
        pass

# Test
trie = Trie()
trie.insert("apple")
print(trie.search("apple"))     # True
print(trie.search("app"))       # False
print(trie.starts_with("app"))  # True

None
None
None


In [21]:
# Hint: Each node has dict of children and end-of-word flag
# Traverse/create nodes for each character

In [22]:
# Solution 7:
class TrieNode:
    def __init__(self):
        self.children = {}
        self.is_end = False

class Trie:
    def __init__(self):
        self.root = TrieNode()
    
    def insert(self, word):
        node = self.root
        for char in word:
            if char not in node.children:
                node.children[char] = TrieNode()
            node = node.children[char]
        node.is_end = True
    
    def _find_node(self, prefix):
        node = self.root
        for char in prefix:
            if char not in node.children:
                return None
            node = node.children[char]
        return node
    
    def search(self, word):
        node = self._find_node(word)
        return node is not None and node.is_end
    
    def starts_with(self, prefix):
        return self._find_node(prefix) is not None

trie = Trie()
trie.insert("apple")
print(trie.search("apple"))
print(trie.search("app"))
print(trie.starts_with("app"))

True
False
True


---

## Problem 8: Rate Limiter

Implement a rate limiter using token bucket algorithm.

In [23]:
import time

class RateLimiter:
    """Token bucket rate limiter"""
    def __init__(self, rate, capacity):
        pass
    
    def allow(self):
        """Return True if request is allowed"""
        pass

# Test
limiter = RateLimiter(rate=2, capacity=5)  # 2 tokens/sec, max 5
for i in range(10):
    print(f"Request {i}: {'Allowed' if limiter.allow() else 'Denied'}")
    time.sleep(0.3)

Request 0: Denied
Request 1: Denied
Request 2: Denied
Request 3: Denied
Request 4: Denied
Request 5: Denied
Request 6: Denied
Request 7: Denied
Request 8: Denied
Request 9: Denied


In [24]:
# Hint: Track tokens and last refill time
# Add tokens based on time elapsed before checking

In [25]:
# Solution 8:
import time

class RateLimiter:
    def __init__(self, rate, capacity):
        self.rate = rate  # tokens per second
        self.capacity = capacity
        self.tokens = capacity
        self.last_time = time.time()
    
    def allow(self):
        now = time.time()
        elapsed = now - self.last_time
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_time = now
        
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

limiter = RateLimiter(rate=2, capacity=5)
for i in range(10):
    print(f"Request {i}: {'Allowed' if limiter.allow() else 'Denied'}")
    time.sleep(0.3)

Request 0: Allowed
Request 1: Allowed
Request 2: Allowed
Request 3: Allowed
Request 4: Allowed
Request 5: Allowed
Request 6: Allowed
Request 7: Allowed
Request 8: Allowed
Request 9: Allowed


---

## Problem 9: Lazy Property with Caching

Create a decorator for lazy-evaluated cached properties.

In [26]:
class lazy_property:
    """Decorator for lazy-evaluated cached property"""
    def __init__(self, func):
        pass
    def __get__(self, obj, objtype=None):
        pass

class DataProcessor:
    def __init__(self, data):
        self.data = data
    
    @lazy_property
    def processed(self):
        print("Processing...")
        return [x * 2 for x in self.data]

# Test
dp = DataProcessor([1, 2, 3])
print(dp.processed)  # Computes
print(dp.processed)  # Cached

None
None


In [27]:
# Hint: Store result in obj.__dict__ on first access
# Subsequent accesses find it directly in __dict__

In [28]:
# Solution 9:
class lazy_property:
    def __init__(self, func):
        self.func = func
        self.name = func.__name__
    
    def __get__(self, obj, objtype=None):
        if obj is None:
            return self
        value = self.func(obj)
        setattr(obj, self.name, value)  # Cache in instance
        return value

class DataProcessor:
    def __init__(self, data):
        self.data = data
    
    @lazy_property
    def processed(self):
        print("Processing...")
        return [x * 2 for x in self.data]

dp = DataProcessor([1, 2, 3])
print(dp.processed)
print(dp.processed)

Processing...
[2, 4, 6]
[2, 4, 6]


---

## Problem 10: Event Emitter

Implement an event emitter with on, off, and emit methods.

In [29]:
class EventEmitter:
    def __init__(self):
        pass
    
    def on(self, event, callback):
        """Register callback for event"""
        pass
    
    def off(self, event, callback):
        """Remove callback for event"""
        pass
    
    def emit(self, event, *args, **kwargs):
        """Trigger all callbacks for event"""
        pass

# Test
emitter = EventEmitter()
emitter.on("data", lambda x: print(f"Received: {x}"))
emitter.emit("data", "Hello")

In [30]:
# Hint: Use defaultdict(list) to store callbacks
# emit calls all registered callbacks with args

In [31]:
# Solution 10:
from collections import defaultdict

class EventEmitter:
    def __init__(self):
        self._events = defaultdict(list)
    
    def on(self, event, callback):
        self._events[event].append(callback)
        return self
    
    def off(self, event, callback):
        if event in self._events:
            self._events[event].remove(callback)
        return self
    
    def emit(self, event, *args, **kwargs):
        for callback in self._events[event]:
            callback(*args, **kwargs)
        return self

emitter = EventEmitter()
emitter.on("data", lambda x: print(f"Handler 1: {x}"))
emitter.on("data", lambda x: print(f"Handler 2: {x}"))
emitter.emit("data", "Hello")

Handler 1: Hello
Handler 2: Hello


<__main__.EventEmitter at 0x1dd0853dfd0>

---

## Problem 11: Retry Decorator with Backoff

Create a decorator that retries failed functions with exponential backoff.

In [None]:
def retry(max_attempts=3, backoff=2):
    """Retry decorator with exponential backoff"""
    pass

@retry(max_attempts=3, backoff=2)
def unstable_function():
    import random
    if random.random() < 0.7:
        raise Exception("Random failure")
    return "Success!"

# Test
print(unstable_function())

In [None]:
# Hint: Catch exceptions and retry with increasing sleep
# Sleep time = backoff ** attempt

In [None]:
# Solution 11:
import time
from functools import wraps

def retry(max_attempts=3, backoff=2):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    sleep_time = backoff ** attempt
                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {sleep_time}s...")
                    time.sleep(sleep_time)
        return wrapper
    return decorator

@retry(max_attempts=5, backoff=1)
def unstable_function():
    import random
    if random.random() < 0.7:
        raise Exception("Random failure")
    return "Success!"

try:
    print(unstable_function())
except Exception as e:
    print(f"Final failure: {e}")

---

## Problem 12: Memory-Efficient Large File Reader

Create a generator that reads large files in chunks.

In [None]:
def read_in_chunks(file_path, chunk_size=1024):
    """Generator that yields file chunks"""
    pass

def process_large_file(file_path):
    """Process file without loading entirely into memory"""
    pass

In [None]:
# Hint: Use file.read(chunk_size) in a loop
# yield chunks until empty

In [None]:
# Solution 12:
def read_in_chunks(file_path, chunk_size=1024):
    with open(file_path, 'r') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

def read_lines_lazy(file_path):
    """Alternative: yield lines one at a time"""
    with open(file_path, 'r') as f:
        for line in f:
            yield line.strip()

# Demo with a test file
test_file = "test_large.txt"
with open(test_file, 'w') as f:
    f.write("Line 1\nLine 2\nLine 3\n" * 100)

# Process without loading all into memory
line_count = sum(1 for line in read_lines_lazy(test_file))
print(f"Lines: {line_count}")

import os
os.remove(test_file)

---

## Problem 13: Graph BFS and DFS

Implement BFS and DFS for graph traversal.

In [None]:
class Graph:
    def __init__(self):
        self.graph = {}
    
    def add_edge(self, u, v):
        pass
    
    def bfs(self, start):
        """Breadth-first search"""
        pass
    
    def dfs(self, start):
        """Depth-first search"""
        pass

# Test
g = Graph()
for u, v in [(0,1), (0,2), (1,2), (2,3)]:
    g.add_edge(u, v)
print(f"BFS: {g.bfs(0)}")
print(f"DFS: {g.dfs(0)}")

In [None]:
# Hint: BFS uses queue (deque), DFS uses stack or recursion
# Track visited nodes to avoid cycles

In [None]:
# Solution 13:
from collections import deque, defaultdict

class Graph:
    def __init__(self):
        self.graph = defaultdict(list)
    
    def add_edge(self, u, v):
        self.graph[u].append(v)
        self.graph[v].append(u)  # Undirected
    
    def bfs(self, start):
        visited = set([start])
        queue = deque([start])
        result = []
        
        while queue:
            node = queue.popleft()
            result.append(node)
            for neighbor in self.graph[node]:
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append(neighbor)
        return result
    
    def dfs(self, start):
        visited = set()
        result = []
        
        def _dfs(node):
            visited.add(node)
            result.append(node)
            for neighbor in self.graph[node]:
                if neighbor not in visited:
                    _dfs(neighbor)
        
        _dfs(start)
        return result

g = Graph()
for u, v in [(0,1), (0,2), (1,2), (2,3)]:
    g.add_edge(u, v)
print(f"BFS: {g.bfs(0)}")
print(f"DFS: {g.dfs(0)}")

---

## Problem 14: Async Context Manager

Create an async context manager for resource management.

In [None]:
import asyncio

class AsyncResource:
    """Async context manager for resource"""
    async def __aenter__(self):
        pass
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        pass

# Test
async def main():
    async with AsyncResource() as resource:
        print("Using resource")

In [None]:
# Hint: Use async/await in __aenter__ and __aexit__
# Similar to regular context manager but async

In [None]:
# Solution 14:
import asyncio

class AsyncDatabase:
    def __init__(self, name):
        self.name = name
        self.connected = False
    
    async def __aenter__(self):
        print(f"Connecting to {self.name}...")
        await asyncio.sleep(0.5)  # Simulate connection
        self.connected = True
        print("Connected!")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection...")
        await asyncio.sleep(0.2)  # Simulate cleanup
        self.connected = False
        print("Connection closed.")
    
    async def query(self, sql):
        await asyncio.sleep(0.1)
        return f"Result of: {sql}"

async def main():
    async with AsyncDatabase("mydb") as db:
        result = await db.query("SELECT * FROM users")
        print(result)

await main()

---

## Problem 15: Dynamic Class Creation

Create classes dynamically using type().

In [None]:
def create_model(name, fields):
    """Create a class dynamically with given fields"""
    pass

# Test
User = create_model("User", ["name", "email", "age"])
user = User(name="Alice", email="alice@example.com", age=30)
print(user)

In [None]:
# Hint: Use type(name, bases, dict) to create class
# Add __init__, __repr__ to the class dict

In [None]:
# Solution 15:
def create_model(name, fields):
    def __init__(self, **kwargs):
        for field in fields:
            setattr(self, field, kwargs.get(field))
    
    def __repr__(self):
        attrs = ", ".join(f"{f}={getattr(self, f)!r}" for f in fields)
        return f"{name}({attrs})"
    
    def __eq__(self, other):
        if not isinstance(other, self.__class__):
            return False
        return all(getattr(self, f) == getattr(other, f) for f in fields)
    
    return type(name, (), {
        "__init__": __init__,
        "__repr__": __repr__,
        "__eq__": __eq__,
        "_fields": fields
    })

User = create_model("User", ["name", "email", "age"])
user1 = User(name="Alice", email="alice@example.com", age=30)
user2 = User(name="Alice", email="alice@example.com", age=30)

print(user1)
print(f"Equal: {user1 == user2}")

---

## Summary

| Problem | Concepts |
|---------|----------|
| Singleton | Metaclass, design pattern |
| Thread-Safe Counter | Threading, locks |
| Async Scraper | asyncio, gather |
| Observer Pattern | OOP, design pattern |
| Validated Descriptor | Descriptors, validation |
| Producer-Consumer | Threading, Queue |
| Trie | Data structure |
| Rate Limiter | Token bucket algorithm |
| Lazy Property | Descriptors, caching |
| Event Emitter | Callbacks, pub/sub |
| Retry Decorator | Decorators, error handling |
| Chunk Reader | Generators, memory |
| Graph Traversal | BFS, DFS algorithms |
| Async Context Manager | async/await, resources |
| Dynamic Classes | type(), metaprogramming |