# Python Data Structures Learning - 10 Code Questions & Solutions

Master Python data structures with practical coding exercises covering lists, tuples, dictionaries, sets, and advanced techniques using collections and heapq modules.

## Section 1: Setup and Imports

Import standard libraries and define a helper function for testing.

In [11]:
from collections import Counter, deque, OrderedDict, defaultdict
from dataclasses import dataclass
import heapq

def assert_test(condition, message="Test failed"):
    """Helper function for quick assertions"""
    assert condition, message
    print(f"✓ {message}")

print("Libraries imported successfully!")

# Common helpers used by multiple questions (linked lists etc.)
class Node:
    def __init__(self, val, nxt=None):
        self.val = val
        self.next = nxt

def list_from_vals(vals):
    head = None
    for v in reversed(vals):
        head = Node(v, head)
    return head

def vals_from_list(head):
    out = []
    while head:
        out.append(head.val)
        head = head.next
    return out

Libraries imported successfully!


## Q1: Remove Duplicates from a List While Preserving Order

**Problem:** Implement a function `unique_preserve_order(lst)` that returns a list with duplicate elements removed while maintaining the original order of first occurrence.

**Hint:** Use a set to track seen items.

**Example:**
```python
unique_preserve_order([1, 2, 2, 3, 1, 4]) → [1, 2, 3, 4]
unique_preserve_order(['a', 'b', 'a', 'c']) → ['a', 'b', 'c']
```

In [3]:
# Q1 Solution: Remove Duplicates While Preserving Order

def unique_preserve_order(lst):
    """Remove duplicates from list while preserving order of first occurrence"""
    seen = set()
    result = []
    for item in lst:
        if item not in seen:
            seen.add(item)
            result.append(item)
    print(f"Input: {lst} -> Output: {result}")
    return result
    

# Test cases
assert_test(
    unique_preserve_order([1, 2, 2, 3, 1, 4]) == [1, 2, 3, 4],
    "Q1: Remove duplicates from [1, 2, 2, 3, 1, 4]"
)

assert_test(
    unique_preserve_order(['a', 'b', 'a', 'c']) == ['a', 'b', 'c'],
    "Q1: Remove duplicates from strings"
)

assert_test(
    unique_preserve_order([]) == [],
    "Q1: Handle empty list"
)

assert_test(
    unique_preserve_order([1, 1, 1]) == [1],
    "Q1: All duplicates"
)

Input: [1, 2, 2, 3, 1, 4] -> Output: [1, 2, 3, 4]
✓ Q1: Remove duplicates from [1, 2, 2, 3, 1, 4]
Input: ['a', 'b', 'a', 'c'] -> Output: ['a', 'b', 'c']
✓ Q1: Remove duplicates from strings
Input: [] -> Output: []
✓ Q1: Handle empty list
Input: [1, 1, 1] -> Output: [1]
✓ Q1: All duplicates


## Q2: Swap Two Variables Using Tuple Unpacking

**Problem:** Implement a function `swap(a, b)` that swaps the values of two variables using tuple unpacking without using a temporary variable.

**Hint:** Tuples allow elegant swapping via unpacking.

**Example:**
```python
swap(5, 10) → (10, 5)
swap('hello', 'world') → ('world', 'hello')
```

In [None]:
# Q2 Solution: Swap Two Variables Using Tuple Unpacking

def swap(a, b):
    """Swap two values using tuple unpacking"""
    return (b, a)

# Test cases
a, b = swap(5, 10)
assert_test(a == 10 and b == 5, "Q2: Swap integers")

s1, s2 = swap('hello', 'world')
assert_test(s1 == 'world' and s2 == 'hello', "Q2: Swap strings")

lst1, lst2 = swap([1, 2], [3, 4])
assert_test(lst1 == [3, 4] and lst2 == [1, 2], "Q2: Swap lists")

x, y = swap(True, False)
assert_test(x == False and y == True, "Q2: Swap booleans")

## Q3: Invert a Dictionary - Map Values to Lists of Keys

**Problem:** Implement a function `invert_dict(d)` that takes a dictionary and returns a new dictionary where each original value maps to a list of keys that had that value. Preserve insertion order of keys for each value.

**Hint:** Use `defaultdict(list)` from the collections module.

**Example:**
```python
invert_dict({'a': 1, 'b': 2, 'c': 1}) → {1: ['a', 'c'], 2: ['b']}
invert_dict({'x': 'color', 'y': 'color'}) → {'color': ['x', 'y']}
```

In [None]:
# Q3 Solution: Invert Dictionary - Map Values to Lists of Keys

def invert_dict(d):
    """Invert a dictionary to map values to lists of keys"""
    result = defaultdict(list)
    for key, value in d.items():
        result[value].append(key)
    return dict(result)

# Test cases
result1 = invert_dict({'a': 1, 'b': 2, 'c': 1})
assert_test(
    result1 == {1: ['a', 'c'], 2: ['b']},
    "Q3: Invert dictionary with duplicate values"
)

result2 = invert_dict({'x': 'color', 'y': 'color', 'z': 'size'})
assert_test(
    result2 == {'color': ['x', 'y'], 'size': ['z']},
    "Q3: Invert dictionary with strings"
)

result3 = invert_dict({})
assert_test(result3 == {}, "Q3: Handle empty dictionary")

result4 = invert_dict({'a': 1, 'b': 1, 'c': 1})
assert_test(
    result4 == {1: ['a', 'b', 'c']},
    "Q3: All keys map to same value"
)

## Q4: Intersection of Multiple Sets

**Problem:** Implement a function `intersection_of_sets(sets)` that returns the intersection of an iterable of sets (elements common to all sets). Handle empty input by returning an empty set.

**Hint:** Use the `intersection()` method and handle the edge case of no sets.

**Example:**
```python
intersection_of_sets([{1, 2, 3}, {2, 3, 4}, {2, 3, 5}]) → {2, 3}
intersection_of_sets([]) → set()
```

In [None]:
# Q4 Solution: Intersection of Multiple Sets

def intersection_of_sets(sets):
    """Return the intersection of an iterable of sets"""
    sets_list = list(sets)
    if not sets_list:
        return set()
    return sets_list[0].intersection(*sets_list[1:])

# Alternative solution using reduce (more elegant)
# def intersection_of_sets(sets):
#     from functools import reduce
#     sets_list = list(sets)
#     if not sets_list:
#         return set()
#     return reduce(lambda a, b: a.intersection(b), sets_list)

# Test cases
result1 = intersection_of_sets([{1, 2, 3}, {2, 3, 4}, {2, 3, 5}])
assert_test(result1 == {2, 3}, "Q4: Intersection of three sets")

result2 = intersection_of_sets([{1, 2, 3}, {1, 2, 3}, {1, 2, 3}])
assert_test(result2 == {1, 2, 3}, "Q4: Identical sets")

result3 = intersection_of_sets([])
assert_test(result3 == set(), "Q4: Empty input")

result4 = intersection_of_sets([{1, 2}, {3, 4}])
assert_test(result4 == set(), "Q4: No common elements")

## Q5: Flatten a Nested List (2 Levels) with List Comprehension

**Problem:** Implement a function `flatten_2levels(nested)` that flattens a list of lists into a single list using a list comprehension.

**Hint:** Use nested loops in list comprehension: `[item for sublist in nested for item in sublist]`

**Example:**
```python
flatten_2levels([[1, 2], [3, 4], [5, 6]]) → [1, 2, 3, 4, 5, 6]
flatten_2levels([['a'], ['b', 'c'], []]) → ['a', 'b', 'c']
```

In [None]:
# Q5 Solution: Flatten a Nested List with List Comprehension

def flatten_2levels(nested):
    """Flatten a 2-level nested list using list comprehension"""
    return [item for sublist in nested for item in sublist]

# Test cases
result1 = flatten_2levels([[1, 2], [3, 4], [5, 6]])
assert_test(
    result1 == [1, 2, 3, 4, 5, 6],
    "Q5: Flatten list of lists with numbers"
)

result2 = flatten_2levels([['a'], ['b', 'c'], []])
assert_test(
    result2 == ['a', 'b', 'c'],
    "Q5: Flatten with empty sublists"
)

result3 = flatten_2levels([])
assert_test(result3 == [], "Q5: Empty nested list")

result4 = flatten_2levels([[], [], [1, 2, 3]])
assert_test(result4 == [1, 2, 3], "Q5: Multiple empty sublists")

## Q6: Count Element Frequencies with collections.Counter

**Problem:** Implement a function `top_n_frequencies(seq, n)` that returns a list of the top-n most common elements with their counts as tuples using `Counter.most_common()`.

**Hint:** Use `Counter(seq).most_common(n)` to get top-n elements.

**Example:**
```python
top_n_frequencies([1, 1, 1, 2, 2, 3], 2) → [(1, 3), (2, 2)]
top_n_frequencies('aabbcc', 2) → [('a', 2), ('b', 2)]
```

In [None]:
# Q6 Solution: Count Element Frequencies with Counter

def top_n_frequencies(seq, n):
    """Return top-n most common elements with their frequencies"""
    return Counter(seq).most_common(n)

# Test cases
result1 = top_n_frequencies([1, 1, 1, 2, 2, 3], 2)
assert_test(
    result1 == [(1, 3), (2, 2)],
    "Q6: Top 2 frequencies from list"
)

result2 = top_n_frequencies('aabbccdddd', 2)
assert_test(
    result2[0] == ('d', 4) and result2[1][0] in ['a', 'b', 'c'],
    "Q6: Top 2 frequencies from string"
)

result3 = top_n_frequencies([1, 2, 3, 4, 5], 3)
assert_test(
    len(result3) == 3,
    "Q6: Top 3 from list with equal frequencies"
)

result4 = top_n_frequencies([1], 5)
assert_test(
    result4 == [(1, 1)],
    "Q6: n larger than unique elements"
)

## Q7: Simple LRU Cache Using collections.OrderedDict

**Problem:** Implement a simple `LRUCache` class with methods `get(key)` and `put(key, value)` and a fixed capacity. When the cache reaches capacity, evict the least recently used (oldest) item. Use `OrderedDict` to track insertion order.

**Hint:** Use `OrderedDict.move_to_end()` to mark recent access and `popitem(last=False)` to remove oldest.

**Example:**
```python
cache = LRUCache(2)
cache.put('a', 1)
cache.put('b', 2)
cache.get('a')  # returns 1, moves 'a' to end
cache.put('c', 3)  # evicts 'b' (oldest)
cache.get('b')  # returns -1 (not found)
```

In [None]:
# Q7 Solution: Simple LRU Cache Using OrderedDict

class LRUCache:
    """LRU Cache implementation using OrderedDict"""
    
    def __init__(self, capacity):
        self.cache = OrderedDict()
        self.capacity = capacity
    
    def get(self, key):
        """Get value and mark as recently used, return -1 if not found"""
        if key not in self.cache:
            return -1
        # Move to end to mark as recently used
        self.cache.move_to_end(key)
        return self.cache[key]
    
    def put(self, key, value):
        """Put key-value pair, evicting oldest if at capacity"""
        if key in self.cache:
            # Update value and move to end
            self.cache[key] = value
            self.cache.move_to_end(key)
        else:
            # Add new item
            self.cache[key] = value
            if len(self.cache) > self.capacity:
                # Remove oldest (first) item
                self.cache.popitem(last=False)

# Test cases
cache = LRUCache(2)
cache.put('a', 1)
cache.put('b', 2)
assert_test(cache.get('a') == 1, "Q7: Get existing key 'a'")

cache.put('c', 3)  # Should evict 'b' (oldest)
assert_test(cache.get('b') == -1, "Q7: 'b' should be evicted")
assert_test(cache.get('a') == 1, "Q7: 'a' should still exist")

cache.put('d', 4)  # Should evict 'c'
assert_test(cache.get('c') == -1, "Q7: 'c' should be evicted")
assert_test(cache.get('d') == 4, "Q7: 'd' should exist")

## Q8: Convert List of Tuples to Dataclass Instances and Filter

**Problem:** Define a dataclass `Item(name: str, price: float, qty: int)`. Implement `tuples_to_items(data)` that converts a list of tuples to Item instances, and `filter_by_price(items, min_price)` that returns items with price >= min_price.

**Hint:** Use `@dataclass` decorator from the dataclasses module.

**Example:**
```python
data = [('apple', 1.5, 10), ('banana', 0.5, 20)]
items = tuples_to_items(data)
filtered = filter_by_price(items, 1.0)  # → [Item('apple', 1.5, 10)]
```

In [None]:
# Q8 Solution: Convert Tuples to Dataclass Instances and Filter

@dataclass
class Item:
    """Represents an item with name, price, and quantity"""
    name: str
    price: float
    qty: int

def tuples_to_items(data):
    """Convert list of tuples to Item dataclass instances"""
    return [Item(name, price, qty) for name, price, qty in data]

def filter_by_price(items, min_price):
    """Filter items by minimum price"""
    return [item for item in items if item.price >= min_price]

# Test cases
data = [('apple', 1.5, 10), ('banana', 0.5, 20), ('orange', 2.0, 5)]
items = tuples_to_items(data)

assert_test(
    len(items) == 3 and items[0].name == 'apple',
    "Q8: Convert tuples to Item instances"
)

filtered = filter_by_price(items, 1.0)
assert_test(
    len(filtered) == 2 and filtered[0].name == 'apple',
    "Q8: Filter items by minimum price"
)

filtered2 = filter_by_price(items, 3.0)
assert_test(
    len(filtered2) == 0,
    "Q8: No items meet minimum price"
)

## Q9: Find k Largest Elements with heapq

**Problem:** Implement a function `k_largest(iterable, k)` that returns a list of the k largest elements using `heapq.nlargest()`.

**Hint:** Use `heapq.nlargest(k, iterable)` to efficiently get the k largest elements.

**Example:**
```python
k_largest([1, 5, 3, 9, 2, 8], 3) → [9, 8, 5]
k_largest([-1, -5, -3, -2], 2) → [-1, -2]
k_largest([1], 5) → [1]
```

In [None]:
# Q9 Solution: Find k Largest Elements with heapq

def k_largest(iterable, k):
    """Return a list of k largest elements"""
    return heapq.nlargest(k, iterable)

# Test cases
result1 = k_largest([1, 5, 3, 9, 2, 8], 3)
assert_test(
    result1 == [9, 8, 5],
    "Q9: Find 3 largest from mixed list"
)

result2 = k_largest([-1, -5, -3, -2], 2)
assert_test(
    result2 == [-1, -2],
    "Q9: Find 2 largest from negative numbers"
)

result3 = k_largest([1], 5)
assert_test(
    result3 == [1],
    "Q9: k larger than list size"
)

result4 = k_largest([5, 5, 5, 1, 1], 2)
assert_test(
    result4 == [5, 5],
    "Q9: Handle duplicate largest values"
)

## Q10: Sliding-Window Maximum Using collections.deque

**Problem:** Implement a function `sliding_max(nums, k)` that returns a list of the maximum value in each sliding window of size k using a monotonic deque. Maintain indices in deque to track valid window elements.

**Hint:** Use `deque` to store indices in decreasing order of their values. Remove indices outside the window and non-maximal values.

**Example:**
```python
sliding_max([1, 3, 1, 2, 0, 5], 3) → [3, 3, 2, 5]
sliding_max([1], 1) → [1]
sliding_max([1, 2, 3], 5) → []
```

In [None]:
# Q10 Solution: Sliding-Window Maximum Using deque (indices)
def sliding_max(nums, k):
    """Return max of each sliding window of size k using a monotonic deque of indices"""
    if k <= 0 or not nums or k > len(nums):
        return []
    dq = deque()  # stores indices, values decreasing
    res = []
    for i, n in enumerate(nums):
        # remove indices outside the window
        while dq and dq[0] <= i - k:
            dq.popleft()
        # remove smaller values from right
        while dq and nums[dq[-1]] < n:
            dq.pop()
        dq.append(i)
        if i >= k - 1:
            res.append(nums[dq[0]])
    return res

# Tests
assert_test(sliding_max([1, 3, 1, 2, 0, 5], 3) == [3, 3, 2, 5], "Q10: Example 1")
assert_test(sliding_max([1], 1) == [1], "Q10: Single element")
assert_test(sliding_max([1, 2, 3], 5) == [], "Q10: k > len(nums) returns []")
assert_test(sliding_max([], 3) == [], "Q10: empty list")
assert_test(sliding_max([9, 8, 7, 6], 2) == [9, 8, 7], "Q10: strictly decreasing")

## Additional Problems (Q11–Q30): Data Structures with Solutions

Below are 20 additional practical data-structure problems with concise implementations and tests. Each problem has a short description and a working Python solution using the helpers defined earlier.


### Q11: Reverse a Singly Linked List (Iterative)

Problem: Reverse a singly linked list in-place and return new head.


In [None]:
# Q11 Solution
class Node:
    def __init__(self, val, nxt=None):
        self.val = val
        self.next = nxt

def reverse_list(head):
    prev = None
    cur = head
    while cur:
        nxt = cur.next
        cur.next = prev
        prev = cur
        cur = nxt
    return prev

# Helper to build and read list
def list_from_vals(vals):
    head = None
    for v in reversed(vals):
        head = Node(v, head)
    return head

def vals_from_list(head):
    out = []
    while head:
        out.append(head.val)
        head = head.next
    return out

h = list_from_vals([1,2,3,4])
rh = reverse_list(h)
assert_test(vals_from_list(rh) == [4,3,2,1], "Q11: Reverse linked list")

### Q12: Stack with O(1) min() Operation

Problem: Implement a stack supporting push, pop, top, and min in constant time.


In [7]:
# Q12 Solution
class MinStack:
    def __init__(self):
        self.stack = []  # pairs (val, current_min)
    def push(self, x):
        curmin = x if not self.stack else min(x, self.stack[-1][1])
        self.stack.append((x, curmin))
    def pop(self):
        return self.stack.pop()[0] if self.stack else None
    def top(self):
        return self.stack[-1][0] if self.stack else None
    def get_min(self):
        return self.stack[-1][1] if self.stack else None

s = MinStack()
s.push(3); s.push(5); s.push(2); s.push(2)
assert_test(s.get_min() == 2, "Q12: min after pushes")
s.pop(); s.pop()
assert_test(s.get_min() == 3, "Q12: min after pops")

✓ Q12: min after pushes
✓ Q12: min after pops


### Q13: Implement Queue using Two Stacks

Problem: Implement enqueue and dequeue using two stacks (amortized O(1)).


In [None]:
# Q13 Solution
class MyQueue:
    def __init__(self):
        self.s1 = []
        self.s2 = []
    def push(self, x):
        self.s1.append(x)
    def pop(self):
        if not self.s2:
            while self.s1:
                self.s2.append(self.s1.pop())
        return self.s2.pop() if self.s2 else None
    def peek(self):
        v = self.pop()
        if v is not None:
            self.s2.append(v)
        return v
    def empty(self):
        return not (self.s1 or self.s2)

q = MyQueue()
q.push(1); q.push(2)
assert_test(q.pop() == 1, "Q13: dequeue returns 1")
assert_test(not q.empty(), "Q13: not empty after one pop")
assert_test(q.pop() == 2, "Q13: dequeue returns 2")
assert_test(q.empty(), "Q13: empty after all pops")

### Q14: Evaluate Reverse Polish Notation (RPN)

Problem: Evaluate arithmetic expression in RPN using a stack.


In [None]:
# Q14 Solution
import operator
ops = {'+': operator.add, '-': operator.sub, '*': operator.mul, '/': lambda a,b: int(operator.truediv(a,b))}
def eval_rpn(tokens):
    st = []
    for t in tokens:
        if t in ops:
            b = st.pop(); a = st.pop(); st.append(ops[t](a,b))
        else:
            st.append(int(t))
    return st[-1]

assert_test(eval_rpn([
2
,
1
,
,
3
,
]) == 9, "Q14: (2+1)*3")
assert_test(eval_rpn([
4
,
13
,
5
,
,
]) == 6, "Q14: 4 + 13/5")

### Q15: Balanced Parentheses

Problem: Check if a string of brackets is balanced using a stack.


In [None]:
# Q15 Solution
pairs = {')':'(', ']':'[', '}':'{'}
def is_balanced(s):
    st = []
    for ch in s:
        if ch in '([{': st.append(ch)
        elif ch in pairs:
            if not st or st.pop() != pairs[ch]: return False
    return not st

assert_test(is_balanced('()[]{}'), "Q15: mixed balanced")
assert_test(not is_balanced('(]'), "Q15: not balanced")

### Q16: Binary Search (Iterative)

Problem: Implement classic binary search on sorted list, returning index or -1.


In [None]:
# Q16 Solution
def binary_search(a, target):
    lo, hi = 0, len(a)-1
    while lo <= hi:
        mid = (lo+hi)//2
        if a[mid] == target: return mid
        if a[mid] < target: lo = mid+1
        else: hi = mid-1
    return -1

assert_test(binary_search([1,2,3,4,5],3) == 2, "Q16: found 3 at index 2")
assert_test(binary_search([], 1) == -1, "Q16: empty list")

### Q17: Merge Two Sorted Linked Lists

Problem: Merge two sorted singly linked lists and return head of merged list.


In [None]:
# Q17 Solution
def merge_two_lists(l1, l2):
    dummy = Node(0); tail = dummy
    while l1 and l2:
        if l1.val < l2.val:
            tail.next = l1; l1 = l1.next
        else:
            tail.next = l2; l2 = l2.next
        tail = tail.next
    tail.next = l1 or l2
    return dummy.next

a = list_from_vals([1,3,5])
b = list_from_vals([2,4])
m = merge_two_lists(a,b)
assert_test(vals_from_list(m) == [1,2,3,4,5], "Q17: merge sorted lists")

### Q18: Detect Cycle in Linked List (Floyd's Tortoise and Hare)

Problem: Return True if a singly linked list has a cycle.


In [12]:
# Q18 Solution
def has_cycle(head):
    slow = fast = head
    while fast and fast.next:
        slow = slow.next
        fast = fast.next.next
        if slow is fast: return True
    return False

# build a cyclic list for test
c = list_from_vals([1,2,3])
tail = c; 
while tail.next: tail = tail.next
tail.next = c.next  # create cycle
assert_test(has_cycle(c) is True, "Q18: detect cycle")
# acyclic list
d = list_from_vals([1,2,3,4])
assert_test(has_cycle(d) is False, "Q18: no cycle")

✓ Q18: detect cycle
✓ Q18: no cycle


### Q19: Iterative Inorder Traversal of Binary Tree

Problem: Return inorder traversal using an explicit stack (no recursion).


In [None]:
# Q19 Solution
class BTNode:
    def __init__(self, v, l=None, r=None):
        self.val = v; self.left = l; self.right = r

def inorder_iter(root):
    res = []; stack = []; cur = root
    while cur or stack:
        while cur:
            stack.append(cur); cur = cur.left
        cur = stack.pop(); res.append(cur.val); cur = cur.right
    return res

root = BTNode(1, BTNode(2), BTNode(3))
assert_test(inorder_iter(root) == [2,1,3], "Q19: inorder traversal")

### Q20: Topological Sort (Kahn's Algorithm)

Problem: Given directed acyclic graph adjacency list, return one topological ordering.


In [None]:
# Q20 Solution
from collections import deque
def topo_sort(adj):
    indeg = {u:0 for u in adj}
    for u in adj:
        for v in adj[u]: indeg[v] = indeg.get(v,0)+1
    q = deque([u for u in indeg if indeg[u]==0])
    res = []
    while q:
        u = q.popleft(); res.append(u)
        for v in adj.get(u,[]):
            indeg[v] -= 1
            if indeg[v]==0: q.append(v)
    return res if len(res)==len(indeg) else []

adj = {'a':['b','c'],'b':['d'],'c':['d'],'d':[]}
order = topo_sort(adj)
assert_test(set(order) == set(['a','b','c','d']), "Q20: topo sort covers nodes")

### Q21: Trie (Prefix Tree) - insert and search

Problem: Implement a basic Trie supporting insert and search (full word).


In [None]:
# Q21 Solution
class TrieNode:
    def __init__(self):
        self.children = {}
        self.is_word = False

class Trie:
    def __init__(self): self.root = TrieNode()
    def insert(self, word):
        cur = self.root
        for ch in word: cur = cur.children.setdefault(ch, TrieNode())
        cur.is_word = True
    def search(self, word):
        cur = self.root
        for ch in word:
            if ch not in cur.children: return False
            cur = cur.children[ch]
        return cur.is_word

t = Trie(); t.insert('cat'); t.insert('car')
assert_test(t.search('cat') and not t.search('ca') and t.search('car'), "Q21: trie insert/search")

### Q22: Longest Consecutive Sequence in Array

Problem: Find length of longest consecutive elements sequence using a set (O(n)).


In [None]:
# Q22 Solution
def longest_consecutive(nums):
    s = set(nums); best = 0
    for x in s:
        if x-1 not in s:
            cur = x; cnt = 1
            while cur+1 in s: cur += 1; cnt += 1
            best = max(best, cnt)
    return best

assert_test(longest_consecutive([100,4,200,1,3,2]) == 4, "Q22: 1-4 sequence")

### Q23: Disjoint Set (Union-Find) with Path Compression

Problem: Implement union and find with path compression and union by rank.


In [None]:
# Q23 Solution
class DSU:
    def __init__(self, n):
        self.parent = list(range(n)); self.rank = [0]*n
    def find(self, x):
        if self.parent[x]!=x: self.parent[x] = self.find(self.parent[x])
        return self.parent[x]
    def union(self, a,b):
        ra, rb = self.find(a), self.find(b)
        if ra==rb: return False
        if self.rank[ra] < self.rank[rb]: self.parent[ra]=rb
        elif self.rank[rb] < self.rank[ra]: self.parent[rb]=ra
        else: self.parent[rb]=ra; self.rank[ra]+=1
        return True

d = DSU(5)
d.union(0,1); d.union(1,2)
assert_test(d.find(2) == d.find(0), "Q23: union/find")

### Q24: Median from Data Stream (Two Heaps)

Problem: Maintain median while streaming numbers using two heaps.


In [None]:
# Q24 Solution
import heapq
class MedianFinder:
    def __init__(self):
        self.lo = []  # maxheap via negatives
        self.hi = []  # minheap
    def add(self, x):
        if not self.lo or x <= -self.lo[0]: heapq.heappush(self.lo, -x)
        else: heapq.heappush(self.hi, x)
        if len(self.lo) > len(self.hi)+1: heapq.heappush(self.hi, -heapq.heappop(self.lo))
        if len(self.hi) > len(self.lo): heapq.heappush(self.lo, -heapq.heappop(self.hi))
    def median(self):
        if not self.lo: return None
        if len(self.lo) > len(self.hi): return -self.lo[0]
        return (-self.lo[0] + self.hi[0]) / 2

m = MedianFinder(); [m.add(x) for x in [1,2,3]]
assert_test(m.median() == 2, "Q24: median of 1,2,3")

### Q25: Circular Buffer (Fixed Capacity)

Problem: Implement a simple ring buffer supporting append and get all elements in order.


In [None]:
# Q25 Solution
class CircularBuffer:
    def __init__(self, k):
        self.buf = [None]*k; self.k=k; self.start=0; self.size=0
    def append(self, x):
        if self.size < self.k:
            self.buf[(self.start + self.size) % self.k] = x; self.size += 1
        else:
            self.buf[self.start] = x; self.start = (self.start+1)%self.k
    def to_list(self):
        return [self.buf[(self.start+i)%self.k] for i in range(self.size)]

cb = CircularBuffer(3)
cb.append(1); cb.append(2); cb.append(3)
assert_test(cb.to_list() == [1,2,3], "Q25: buffer full")
cb.append(4)
assert_test(cb.to_list() == [2,3,4], "Q25: overwritten oldest")

### Q26: Isomorphic Strings

Problem: Determine if two strings are isomorphic (character mapping is one-to-one).


In [None]:
# Q26 Solution
def is_isomorphic(s,t):
    if len(s)!=len(t): return False
    m1 = {}; m2 = {}
    for a,b in zip(s,t):
        if m1.get(a, b) != b or m2.get(b, a) != a: return False
        m1[a]=b; m2[b]=a
    return True

assert_test(is_isomorphic('egg','add'), "Q26: egg/add")
assert_test(not is_isomorphic('foo','bar'), "Q26: foo/bar not iso")

### Q27: Group Anagrams

Problem: Group list of strings into anagram groups using sorted key or counts.


In [None]:
# Q27 Solution
def group_anagrams(strs):
    d = defaultdict(list)
    for s in strs: d[''.join(sorted(s))].append(s)
    return list(d.values())

res = group_anagrams(['eat','tea','tan','ate','nat','bat'])
assert_test(any(sorted(group)==sorted(['eat','tea','ate']) for group in res), "Q27: anagram groups contain eat/tea/ate")

### Q28: Rotate Matrix 90 Degrees In-Place

Problem: Rotate n x n matrix clockwise by 90 degrees in-place.


In [None]:
# Q28 Solution
def rotate_matrix(m):
    n = len(m)
    for i in range(n):
        for j in range(i,n):
            m[i][j], m[j][i] = m[j][i], m[i][j]
    for i in range(n):
        m[i].reverse()

mat = [[1,2,3],[4,5,6],[7,8,9]]
rotate_matrix(mat)
assert_test(mat == [[7,4,1],[8,5,2],[9,6,3]], "Q28: rotate 3x3")

### Q29: Number of Islands (DFS)

Problem: Count islands of '1's in a grid using DFS/BFS.


In [None]:
# Q29 Solution
def num_islands(grid):
    if not grid: return 0
    R, C = len(grid), len(grid[0])
    def dfs(i,j):
        if i<0 or j<0 or i>=R or j>=C or grid[i][j] != '1': return
        grid[i][j] = '0'
        for di,dj in [(1,0),(-1,0),(0,1),(0,-1)]: dfs(i+di,j+dj)
    cnt = 0
    for i in range(R):
        for j in range(C):
            if grid[i][j]=='1': cnt += 1; dfs(i,j)
    return cnt

g = [[
1
,
1
,

,

,

],[
1
,
1
,

,

,

],[

,

,
1
,

,

],[

,

,

,
1
,
1
]]
assert_test(num_islands([row[:] for row in g]) == 3, "Q29: count islands")

### Q30: Sparse Matrix Transpose (Dictionary of Keys)

Problem: Represent sparse matrix as dict {(i,j):val} and compute transpose efficiently.


In [None]:
# Q30 Solution
def sparse_transpose(d):
    return {(j,i):v for (i,j),v in d.items()}

s = {(0,1):5, (2,0):3}
t = sparse_transpose(s)
assert_test(t == {(1,0):5, (0,2):3}, "Q30: sparse transpose")

# Snowflake — 15 Medium Programming Questions & Detailed Answers

This document contains 15 medium-difficulty Snowflake programming questions with clear, practical solutions including SQL examples, Snowflake-specific features, and short explanations.

---

Q1 — Load semi-structured JSON from stage and extract nested fields

Problem
- You have newline-delimited JSON files in an internal stage. Each record has nested objects and arrays. Load into a table with extracted fields and preserve the variant.

Solution
- Create table and file format, then COPY INTO using the VARIANT column and SELECT to extract nested fields:

```sql
CREATE OR REPLACE FILE FORMAT my_json_fmt TYPE = 'JSON' STRIP_OUTER_ARRAY = FALSE;

CREATE OR REPLACE TABLE orders_raw (raw VARIANT);

COPY INTO orders_raw
  FROM @my_stage/path/
  FILE_FORMAT = (FORMAT_NAME = 'my_json_fmt')
  ON_ERROR = 'CONTINUE';

-- Extract nested fields
CREATE OR REPLACE TABLE orders AS
SELECT
  raw:id::STRING AS order_id,
  raw:customer.id::STRING AS customer_id,
  raw:customer.name::STRING AS customer_name,
  raw:items AS items_variant,
  raw:total::NUMBER AS total
FROM orders_raw;

-- Flatten items array
SELECT
  o.order_id,
  f.value:sku::STRING AS sku,
  f.value:qty::NUMBER AS qty
FROM orders o,
LATERAL FLATTEN(input => o.items_variant) f;
```

Explanation
- Use a VARIANT column to store raw JSON, then type-cast paths using the colon syntax. `LATERAL FLATTEN` expands arrays. Keep raw variant for auditing.

---

Q2 — Implement slowly changing dimension (SCD Type 2) using MERGE

Problem
- Maintain `dim_customer` as SCD Type 2 with effective start/end timestamps when loading a changed records file.

Solution

```sql
-- staging_customer has incoming snapshot with natural_key and attributes
MERGE INTO dim_customer T
USING (
  SELECT *, CURRENT_TIMESTAMP() AS load_ts FROM staging_customer
) S
ON T.natural_key = S.natural_key AND T.end_ts IS NULL
WHEN MATCHED AND (
     (T.attr1 IS DISTINCT FROM S.attr1) OR
     (T.attr2 IS DISTINCT FROM S.attr2)
  ) THEN
  UPDATE SET end_ts = S.load_ts
WHEN NOT MATCHED THEN
  INSERT (natural_key, attr1, attr2, start_ts, end_ts)
  VALUES (S.natural_key, S.attr1, S.attr2, S.load_ts, NULL);
```

Explanation
- Use `MERGE` to close existing current rows by setting `end_ts`. Then insert new current rows. `IS DISTINCT FROM` handles NULL-aware comparisons.

---

Q3 — Stream + Task pattern: incremental ingest from staged files

Problem
- Use Snowflake Streams and Tasks to apply new staged CSV files (COPY INTO staging table) into a target table incrementally.

Solution

1. Create a staging table and load raw files with COPY INTO. Create a stream on staging

```sql
CREATE OR REPLACE TABLE raw_events (payload VARIANT, src_file STRING, load_ts TIMESTAMP);
CREATE OR REPLACE STREAM raw_events_stream ON TABLE raw_events;

-- task that runs COPY into raw_events periodically (or triggered externally)
CREATE OR REPLACE TASK task_load_raw
  WAREHOUSE = my_wh
  SCHEDULE = 'USING CRON 0 * * * * UTC'
AS
  COPY INTO raw_events FROM @my_stage/ FILE_FORMAT = (TYPE = 'CSV');

-- Task that processes the stream into curated table
CREATE OR REPLACE TASK task_process_stream
  WAREHOUSE = my_wh
  AFTER task_load_raw
AS
  INSERT INTO events SELECT payload:col1::STRING, payload:col2::NUMBER FROM raw_events_stream WHERE METADATA$ISROWDELETED = FALSE;

-- enable tasks
ALTER TASK task_load_raw RESUME;
ALTER TASK task_process_stream RESUME;
```

Explanation
- Streams capture DML changes on the staging table; Tasks run scheduled pipelines. Use `METADATA$ISROWDELETED` to detect inserts vs deletes (if needed).

---

Q4 — Efficiently retrieve top-N per group using window functions

Problem
- Return top 3 orders by amount per customer from `orders` table.

Solution

```sql
SELECT order_id, customer_id, total
FROM (
  SELECT order_id, customer_id, total,
         ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY total DESC) AS rn
  FROM orders
)
WHERE rn <= 3;
```

Explanation
- Use `ROW_NUMBER()` partitioned by customer and order by total descending. This pattern is efficient and idiomatic.

---

Q5 — Querying semi-structured data with conditional logic

Problem
- JSON `events` column has variable fields. Compute `event_type` with fallback logic and extract nested value when present.

Solution

```sql
SELECT
  e:id::STRING AS id,
  COALESCE(e:type::STRING, e:meta.type::STRING, 'unknown') AS event_type,
  CASE
    WHEN e:event.payload IS NOT NULL THEN e:event.payload:data::STRING
    ELSE NULL
  END AS payload_data
FROM raw_events;
```

Explanation
- Use `COALESCE` + variant path extraction. Use `CASE` to avoid runtime errors accessing absent paths.

---

Q6 — Time travel and cloning for safe data repair

Problem
- Accidentally deleted rows; restore state from time-travel to a new table for comparison and recovery.

Solution

```sql
-- create clone from 1 hour ago
CREATE TABLE orders_restore AS
  SELECT * FROM orders AT (TIMESTAMP => DATEADD(hour, -1, CURRENT_TIMESTAMP()));

-- compare or merge back missing rows
MERGE INTO orders T
USING orders_restore S
ON T.order_id = S.order_id
WHEN NOT MATCHED THEN INSERT *;
```

Explanation
- Time travel allows querying past data; cloning into a new table is zero-copy and fast. Use `MERGE` to recover missing rows.

---

Q7 — Implement a stored procedure in Snowflake using Snowpark Python to deduplicate and write results

Problem
- Use Snowpark Python procedure to deduplicate a table by natural key and write the deduped result into a target table.

Solution

```sql
CREATE OR REPLACE PROCEDURE dedupe_table(src_table STRING, dst_table STRING)
  RETURNS STRING
  LANGUAGE PYTHON
  RUNTIME_VERSION = '3.11'
  PACKAGES = ('snowflake-snowpark-python')
AS
$$
def run(session, src_table, dst_table):
    df = session.table(src_table)
    # keep latest by event_ts
    windowed = df.with_column('rn', F.row_number().over(F.Window.partition_by('natural_key').order_by(F.col('event_ts').desc())))
    deduped = windowed.filter(F.col('rn') == 1).drop('rn')
    deduped.write.save_as_table(dst_table, mode='overwrite')
    return 'OK'
$$;

CALL dedupe_table('raw_src', 'curated_dst');
```

Explanation
- Snowpark Python lets you express complex logic with DataFrame APIs server-side. Use window functions and save result to a table.

---

Q8 — Optimize large table joins: clustering keys and pruning

Problem
- Joining a very large fact table to dimension filtered by date range is slow. Suggest improvements.

Solution

Guidance:
- Use proper clustering on the fact table (e.g., `CLUSTER BY (event_date)`) to improve pruning.
- Ensure filters are on clustered columns. Use micro-partition pruning (verified via `QUERY_HISTORY`/`QUERY_PROFILE`).
- Consider using a materialized view for frequently-run filtered aggregation.

Example:

```sql
ALTER TABLE fact_events CLUSTER BY (event_date);

CREATE MATERIALIZED VIEW mv_daily AS
SELECT event_date, COUNT(*) AS cnt FROM fact_events GROUP BY event_date;
```

Explanation
- Clustering improves physical pruning; materialized views precompute aggregates.

---

Q9 — Use `FLATTEN` + windowing to compute metrics across nested arrays

Problem
- Each `session` record contains an array of `actions`. Compute total actions and top action per session.

Solution

```sql
SELECT s.session_id,
       COUNT(a.value) AS total_actions,
       ARRAY_AGG(a.value ORDER BY a.value_count DESC)[0] AS top_action
FROM (
  SELECT session_id, f.value AS action
  FROM sessions,
       LATERAL FLATTEN(input => sessions.actions) f
) a
GROUP BY s.session_id;
```

Explanation
- Flatten then aggregate. For more complex top-K use `ROW_NUMBER()` over partition.

---

Q10 — COPY performance tuning for bulk CSV loads

Problem
- Loading multi-GB CSV files to Snowflake is slow. What settings and patterns improve speed?

Solution

Recommendations:
- Compress files (gzip) before staging.
- Increase `MAX_CONCURRENCY_LEVEL` and use a larger warehouse size for COPY.
- Use `PURGE = TRUE` only after successful loads to avoid repeated processing.
- Use consistent `FILE_FORMAT` settings (e.g., `SKIP_HEADER`, `FIELD_OPTIONALLY_ENCLOSED_BY`).

Example COPY:

```sql
COPY INTO my_table
FROM @my_stage/bulk/
FILE_FORMAT = (type='CSV' compression='GZIP' field_delimiter=',' skip_header=1)
ON_ERROR='ABORT_STATEMENT'
MAX_CONCURRENCY_LEVEL = 8;
```

Explanation
- Compression reduces network IO. Larger warehouses and parallelism speed ingestion.

---

Q11 — Implement multi-table atomic update with transactions

Problem
- Update multiple related tables atomically (if any update fails, rollback all changes).

Solution

```sql
BEGIN TRANSACTION;
  UPDATE acct_balances SET balance = balance - 100 WHERE acct_id = 1;
  UPDATE acct_balances SET balance = balance + 100 WHERE acct_id = 2;
  INSERT INTO transfers (from_acct, to_acct, amount, ts) VALUES (1,2,100,CURRENT_TIMESTAMP());
COMMIT;

-- In case of error, use ROLLBACK;
```

Explanation
- Snowflake supports multi-statement transactions; use `BEGIN`/`COMMIT` to ensure atomicity.

---

Q12 — Use `QUALIFY` to simplify filtering on analytic results

Problem
- Filter rows by the output of a window function (e.g., top 1 per group) without subquery nesting.

Solution

```sql
SELECT order_id, customer_id, total,
       ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY total DESC) AS rn
FROM orders
QUALIFY rn = 1;
```

Explanation
- `QUALIFY` lets you filter on analytic/window expressions directly, reducing subquery complexity.

---

Q13 — Use `MERGE` to implement upsert with delete semantics (synchronization)

Problem
- Synchronize target table to match source: insert new, update changed, and delete target rows not present in source.

Solution

```sql
MERGE INTO target T
USING source S
ON T.key = S.key
WHEN MATCHED THEN UPDATE SET T.col1 = S.col1, T.col2 = S.col2
WHEN NOT MATCHED THEN INSERT (key, col1, col2) VALUES (S.key, S.col1, S.col2)
WHEN NOT MATCHED BY SOURCE AND T.update_source = 'external' THEN DELETE;
```

Explanation
- `MERGE` supports `WHEN NOT MATCHED BY SOURCE` to delete rows that no longer exist upstream; use carefully to avoid accidental data loss.

---

Q14 — Implement GDPR-style soft deletion using masking + time ranges

Problem
- Support soft-deletion for PII data and automatically mask values for deleted users while retaining analytics.

Solution

Approach:
- Add `deleted_at` timestamp column. Use conditional masking in views: if `deleted_at` IS NOT NULL then mask PII columns using `HASH()` or `NULL`.

```sql
CREATE OR REPLACE VIEW customer_view AS
SELECT id,
       CASE WHEN deleted_at IS NULL THEN email ELSE NULL END AS email,
       CASE WHEN deleted_at IS NULL THEN ssn ELSE 'REDACTED' END AS ssn,
       deleted_at
FROM customer;
```

Explanation
- Views provide controlled access—masking PII while retaining row-level metrics.

---

Q15 — Use materialized view to accelerate frequent aggregation with freshness constraints

Problem
- A dashboard queries daily aggregates frequently; reduce latency while keeping results near-real-time.

Solution

```sql
CREATE MATERIALIZED VIEW mv_daily_sales
CLUSTER BY (sale_date)
AS
SELECT sale_date, SUM(amount) AS total_sales, COUNT(*) AS tx_count
FROM sales
GROUP BY sale_date;

-- Refresh occurs automatically; monitor via ACCOUNT_USAGE.MATERIALIZED_VIEWS
```

Explanation
- Materialized views precompute aggregates; they trade storage for query latency. Monitor maintenance cost and invalidation frequency.

---

File created: `snowflake_medium_questions.md`

If you want, I can also:
- Convert this to a notebook cell, or
- Attempt syntactic validation of these SQL snippets if you provide Snowflake credentials.
