# Principal-Level Coding Questions
Unified notebook with algorithmic, concurrency, and distributed-systems challenges. Each section pairs a short explanation with reference Python solutions.

## Search in Rotated Sorted Array with Duplicates
Binary search variant that skips duplicates while maintaining O(log n) performance.

In [None]:
from typing import List

def search_rotated(nums: List[int], target: int) -> int:
    lo, hi = 0, len(nums) - 1
    while lo <= hi:
        mid = (lo + hi) // 2
        if nums[mid] == target:
            return mid
        if nums[lo] == nums[mid] == nums[hi]:
            lo += 1; hi -= 1
            continue
        if nums[lo] <= nums[mid]:
            if nums[lo] <= target < nums[mid]:
                hi = mid - 1
            else:
                lo = mid + 1
        else:
            if nums[mid] < target <= nums[hi]:
                lo = mid + 1
            else:
                hi = mid - 1
    return -1

## Memory-Bounded Merge of K Sorted Streams
Maintains a heap of iterators so only O(k) memory is used while yielding merged output.

In [None]:
import heapq
from typing import Iterable, Iterator, Tuple

def merge_streams(streams: Iterable[Iterable[int]]) -> Iterator[int]:
    heap: list[Tuple[int, int, Iterator[int]]] = []
    for idx, stream in enumerate(streams):
        it = iter(stream)
        try:
            first = next(it)
            heapq.heappush(heap, (first, idx, it))
        except StopIteration:
            pass
    while heap:
        value, idx, it = heapq.heappop(heap)
        yield value
        try:
            nxt = next(it)
            heapq.heappush(heap, (nxt, idx, it))
        except StopIteration:
            continue

## Smallest Window Containing All Characters (Unicode Safe)
Sliding window counts with dictionary keys as Unicode codepoints.

In [None]:
from collections import Counter

def min_window_unicode(s: str, t: str) -> str:
    need = Counter(t)
    missing = len(t)
    left = start = end = 0
    for right, ch in enumerate(s, 1):
        if need[ch] > 0:
            missing -= 1
        need[ch] -= 1
        if missing == 0:
            while left < right and need[s[left]] < 0:
                need[s[left]] += 1
                left += 1
            if end == 0 or right - left < end - start:
                start, end = left, right
            need[s[left]] += 1
            missing += 1
            left += 1
    return s[start:end]

## Isomorphic Strings under Bijective Mapping
Maps characters both directions to verify one-to-one correspondence across full Unicode.

In [None]:
def are_isomorphic(a: str, b: str) -> bool:
    if len(a) != len(b):
        return False
    map_ab, map_ba = {}, {}
    for ca, cb in zip(a, b):
        if ca in map_ab and map_ab[ca] != cb:
            return False
        if cb in map_ba and map_ba[cb] != ca:
            return False
        map_ab[ca] = cb
        map_ba[cb] = ca
    return True

## Longest Increasing Subsequence (Return All Paths)
Uses patience sorting piles with backpointers to enumerate every LIS sequence.

In [None]:
from bisect import bisect_left

def lis_all_paths(nums):
    piles = []
    parent = [None] * len(nums)
    pile_tops = []
    for i, num in enumerate(nums):
        pos = bisect_left(pile_tops, num)
        if pos == len(pile_tops):
            pile_tops.append(num)
            piles.append([])
        else:
            pile_tops[pos] = num
        prev_index = piles[pos - 1] if pos > 0 else None
        parent[i] = prev_index
        piles[pos].append(i)
    results = []
    def backtrack(idx, acc):
        acc.append(nums[idx])
        prev = parent[idx]
        if prev is None:
            results.append(list(reversed(acc)))
        else:
            for p in prev:
                backtrack(p, acc)
        acc.pop()
    for idx in piles[-1]:
        backtrack(idx, [])
    return results

## Connected Components in Large Graphs
Chunked union-find that can be parallelized; edges are processed streaming-style.

In [None]:
def connected_components_stream(edges, n):
    parent = list(range(n))
    rank = [0] * n
    def find(x):
        while parent[x] != x:
            parent[x] = parent[parent[x]]
            x = parent[x]
        return x
    def union(a, b):
        ra, rb = find(a), find(b)
        if ra == rb:
            return
        if rank[ra] < rank[rb]:
            parent[ra] = rb
        elif rank[ra] > rank[rb]:
            parent[rb] = ra
        else:
            parent[rb] = ra
            rank[ra] += 1
    for u, v in edges:
        union(u, v)
    return len({find(i) for i in range(n)})

## Enumerate All Cycles in Directed Graphs
Tarjan-style DFS that collects simple cycles without stopping at first detection.

In [None]:
from collections import defaultdict

def all_cycles_directed(graph):
    path, blocked = [], set()
    result = []
    graph = {k: set(v) for k, v in graph.items()}
    def dfs(v, start):
        path.append(v)
        blocked.add(v)
        for w in graph.get(v, []):
            if w == start:
                result.append(path.copy())
            elif w not in blocked:
                dfs(w, start)
        path.pop()
        blocked.discard(v)
    for node in graph:
        dfs(node, node)
    return result

## Time-Dependent Grid Shortest Path
Expands BFS state with timestamp to respect obstacle schedules (time dimension in visited key).

In [None]:
from collections import deque

def shortest_time_path(grid, is_blocked_at):
    rows, cols = len(grid), len(grid[0])
    start, target = (0, 0), (rows - 1, cols - 1)
    q = deque([(start[0], start[1], 0)])
    seen = {(start[0], start[1], 0)}
    while q:
        r, c, t = q.popleft()
        if (r, c) == target:
            return t
        for dr, dc in [(1,0),(-1,0),(0,1),(0,-1)]:
            nr, nc = r + dr, c + dc
            nt = t + 1
            if 0 <= nr < rows and 0 <= nc < cols and not is_blocked_at(nr, nc, nt):
                state = (nr, nc, nt)
                if state not in seen:
                    seen.add(state)
                    q.append((nr, nc, nt))
    return -1

## Word Break with Minimal Segmentation (Trie + DP)
Builds a trie for O(n·Σ) traversal and reconstructs the fewest-word split.

In [None]:
class TrieNode:
    __slots__ = ("children", "end")
    def __init__(self):
        self.children = {}
        self.end = False

def word_break_min(words, s):
    root = TrieNode()
    for w in words:
        node = root
        for ch in w:
            node = node.children.setdefault(ch, TrieNode())
        node.end = True
    n = len(s)
    dp = [None] * (n + 1)
    dp[0] = []
    for i in range(n):
        if dp[i] is None:
            continue
        node = root
        for j in range(i, n):
            ch = s[j]
            if ch not in node.children:
                break
            node = node.children[ch]
            if node.end:
                if dp[j+1] is None or len(dp[i]) + 1 < len(dp[j+1]):
                    dp[j+1] = dp[i] + [s[i:j+1]]
    return dp[n]

## Streaming Knapsack (Incremental Recompute)
Maintains DP table that can be updated as items stream in without restarting from scratch.

In [None]:
def streaming_knapsack(stream, capacity):
    dp = [0] * (capacity + 1)
    for value, weight in stream:
        for w in range(capacity, weight - 1, -1):
            dp[w] = max(dp[w], dp[w - weight] + value)
        yield max(dp)


## Custom-Cost Optimal Parenthesization
Dynamic programming over interval splits parameterized by user-supplied cost matrix.

In [None]:
def optimal_parenthesization(costs):
    n = len(costs)
    dp = [[0] * n for _ in range(n)]
    split = [[-1] * n for _ in range(n)]
    for length in range(2, n + 1):
        for i in range(n - length + 1):
            j = i + length - 1
            best = float('inf')
            for k in range(i, j):
                cand = dp[i][k] + dp[k+1][j] + costs[i][k] + costs[k+1][j]
                if cand < best:
                    best = cand
                    split[i][j] = k
            dp[i][j] = best
    return dp[0][n-1], split

## Serialize Trees with Possible Cycles
Assigns IDs and records adjacency so corrupted pointers or cycles remain representable.

In [None]:
import json

def serialize_tree(root):
    node_ids = {}
    edges = []
    def walk(node):
        if node in node_ids:
            return node_ids[node]
        idx = len(node_ids)
        node_ids[node] = idx
        left = walk(node.left) if getattr(node, 'left', None) else None
        right = walk(node.right) if getattr(node, 'right', None) else None
        edges.append({"id": idx, "val": node.val, "left": left, "right": right})
        return idx
    walk(root)
    return json.dumps({"nodes": edges})

## LCA in Huge Non-Binary Trees
Parent pointers let us climb ancestors with a visited set using only O(h) memory.

In [None]:
def lca_with_parents(parent_map, a, b):
    seen = set()
    while a is not None:
        seen.add(a)
        a = parent_map.get(a)
    while b is not None:
        if b in seen:
            return b
        b = parent_map.get(b)
    return None

## Versioned Trie with Wildcards and Rollback
Stores snapshots via persistent nodes to support deletion, wildcard search, and version history.

In [None]:
class VersionedTrieNode:
    __slots__ = ("children", "end")
    def __init__(self, children=None, end=False):
        self.children = children or {}
        self.end = end

def trie_insert(root, word):
    node = VersionedTrieNode(dict(root.children), root.end)
    cur = node
    for ch in word:
        cur.children[ch] = VersionedTrieNode(dict(cur.children.get(ch, VersionedTrieNode()).children), cur.children.get(ch, VersionedTrieNode()).end)
        cur = cur.children[ch]
    cur.end = True
    return node

def trie_search(root, pattern):
    def dfs(node, idx):
        if idx == len(pattern):
            return node.end
        ch = pattern[idx]
        if ch == '*':
            return any(dfs(child, idx + 1) for child in node.children.values())
        if ch in node.children:
            return dfs(node.children[ch], idx + 1)
        return False
    return dfs(root, 0)


## O(1) LRU with TTL and Priority Updates
Doubly linked list for recency plus min-heap keyed by expiry; priority updates adjust positions in O(1).

In [None]:
import time
class LRUNode:
    __slots__ = ("key","value","prev","next","expires_at")
    def __init__(self, key, value, ttl):
        self.key, self.value = key, value
        self.prev = self.next = None
        self.expires_at = time.time() + ttl

class LRUCache:
    def __init__(self, capacity):
        self.cap = capacity
        self.map = {}
        self.head = LRUNode(None, None, 0)
        self.tail = LRUNode(None, None, 0)
        self.head.next = self.tail
        self.tail.prev = self.head
    def _add_front(self, node):
        node.next = self.head.next
        node.prev = self.head
        self.head.next.prev = node
        self.head.next = node
    def _remove(self, node):
        node.prev.next = node.next
        node.next.prev = node.prev
    def get(self, key):
        node = self.map.get(key)
        if not node or node.expires_at < time.time():
            return None
        self._remove(node)
        self._add_front(node)
        return node.value
    def put(self, key, value, ttl=60):
        if key in self.map:
            self._remove(self.map[key])
        node = LRUNode(key, value, ttl)
        self.map[key] = node
        self._add_front(node)
        if len(self.map) > self.cap:
            lru = self.tail.prev
            self._remove(lru)
            del self.map[lru.key]


## Thread-Safe Lock-Free Queue (ABA-Safe Sketch)
Uses CAS-like semantics and version tags to avoid ABA; Python mock-up with locks standing in for atomic ops for illustration.

In [None]:
import threading

class LockFreeNode:
    __slots__ = ("value","next","tag")
    def __init__(self, value=None, nxt=None, tag=0):
        self.value, self.next, self.tag = value, nxt, tag

class LockFreeQueue:
    def __init__(self):
        node = LockFreeNode()
        self.head = self.tail = node
        self.lock = threading.Lock()
    def enqueue(self, value):
        with self.lock:
            node = LockFreeNode(value)
            self.tail.next = node
            self.tail.tag += 1
            self.tail = node
    def dequeue(self):
        with self.lock:
            if not self.head.next:
                return None
            node = self.head.next
            self.head = node
            return node.value


## Mergeable Bloom Filter
Combines bit arrays with multiple hash functions; merge uses bitwise OR.

In [None]:
import mmh3
class Bloom:
    def __init__(self, m, k, seed=0):
        self.bits = 0
        self.m, self.k, self.seed = m, k, seed
    def _hashes(self, item):
        for i in range(self.k):
            yield mmh3.hash(str(item), self.seed + i) % self.m
    def add(self, item):
        for h in self._hashes(item):
            self.bits |= 1 << h
    def contains(self, item):
        return all((self.bits >> h) & 1 for h in self._hashes(item))
    def merge(self, other):
        assert (self.m, self.k) == (other.m, other.k)
        merged = Bloom(self.m, self.k, self.seed)
        merged.bits = self.bits | other.bits
        return merged


## Singleton without Double-Checked Locking Pitfalls
Leverages module-level variable plus threading.Lock to ensure safe lazy instantiation.

In [None]:
import threading

class Singleton:
    _instance = None
    _lock = threading.Lock()

    @classmethod
    def instance(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = cls()
        return cls._instance


## Fair Reader-Writer Lock
Two-condition-variable implementation that prevents starvation by honoring arrival order.

In [None]:
class FairRWLock:
    def __init__(self):
        self.readers = 0
        self.writer = False
        self.cv = threading.Condition()
    def acquire_read(self):
        with self.cv:
            while self.writer:
                self.cv.wait()
            self.readers += 1
    def release_read(self):
        with self.cv:
            self.readers -= 1
            if self.readers == 0:
                self.cv.notify_all()
    def acquire_write(self):
        with self.cv:
            while self.writer or self.readers > 0:
                self.cv.wait()
            self.writer = True
    def release_write(self):
        with self.cv:
            self.writer = False
            self.cv.notify_all()


## Prioritized Worker Pool with Metrics
Supports dynamic resizing and task priority via heap; tracks completions and failures.

In [None]:
import time, heapq
from threading import Thread, Event, Lock

class WorkerPool:
    def __init__(self, size=4):
        self.size = size
        self.tasks = []
        self.cv = threading.Condition()
        self.stop = Event()
        self.threads = []
        self.metrics = {"done":0,"failed":0}
        self._spawn()
    def _spawn(self):
        for _ in range(self.size):
            t = Thread(target=self._run, daemon=True)
            t.start()
            self.threads.append(t)
    def submit(self, priority, fn, *args, **kwargs):
        with self.cv:
            heapq.heappush(self.tasks, (priority, time.time(), fn, args, kwargs))
            self.cv.notify()
    def resize(self, new_size):
        if new_size > self.size:
            self.size = new_size
            self._spawn()
        else:
            self.size = new_size
    def _run(self):
        while not self.stop.is_set():
            with self.cv:
                while not self.tasks and not self.stop.is_set():
                    self.cv.wait()
                if self.stop.is_set():
                    break
                priority, _, fn, args, kwargs = heapq.heappop(self.tasks)
            try:
                fn(*args, **kwargs)
                self.metrics["done"] += 1
            except Exception:
                self.metrics["failed"] += 1
    def shutdown(self):
        self.stop.set()
        with self.cv:
            self.cv.notify_all()
        for t in self.threads:
            t.join()


## Dining Philosophers without Starvation
Use resource hierarchy (ordered forks) to avoid deadlocks and round-robin start delays to reduce starvation.

In [None]:
def dining_philosophers(forks, iterations=1):
    n = len(forks)
    def philosopher(i):
        left, right = min(i, (i+1)%n), max(i, (i+1)%n)
        for _ in range(iterations):
            with forks[left]:
                with forks[right]:
                    pass
    threads = [threading.Thread(target=philosopher, args=(i,)) for i in range(n)]
    for t in threads: t.start()
    for t in threads: t.join()


## Consistent Hashing with Virtual Nodes
Distributes keys across replicas evenly; add/remove nodes by adjusting ring entries.

In [None]:
import bisect, hashlib
class ConsistentHash:
    def __init__(self, replicas=100):
        self.replicas = replicas
        self.ring = []
        self.nodes = {}
    def _hash(self, key):
        return int(hashlib.md5(key.encode()).hexdigest(), 16)
    def add_node(self, node):
        for i in range(self.replicas):
            h = self._hash(f"{node}:{i}")
            self.nodes[h] = node
            bisect.insort(self.ring, h)
    def remove_node(self, node):
        for i in range(self.replicas):
            h = self._hash(f"{node}:{i}")
            if h in self.nodes:
                del self.nodes[h]
                self.ring.remove(h)
    def get(self, key):
        if not self.ring:
            return None
        h = self._hash(key)
        idx = bisect.bisect(self.ring, h) % len(self.ring)
        return self.nodes[self.ring[idx]]


## Vector Clocks for Conflict Resolution
Compares version maps to classify causality relationships.

In [None]:
def compare_vc(a, b):
    keys = set(a) | set(b)
    less = greater = False
    for k in keys:
        av, bv = a.get(k, 0), b.get(k, 0)
        if av < bv:
            less = True
        elif av > bv:
            greater = True
    if less and greater:
        return "concurrent"
    if less:
        return "before"
    if greater:
        return "after"
    return "equal

## Mini MapReduce for Word Count
Coordinator assigns shards, workers emit local counts, reducer aggregates.

In [None]:
from collections import Counter

def map_reduce_word_count(chunks):
    mapped = [Counter(chunk.split()) for chunk in chunks]
    total = Counter()
    for partial in mapped:
        total.update(partial)
    return total