# 9.0 Heaps

## 9.1 Compute the running median

### Problem Statement
Given an input stream of numbers, return the median value in the stream each time a new number is added using only constant time operations.

#### Clarification
The median of even list of numbers is the average of the two middle values.

In [1]:
import collections
import heapq
import unittest


class RunningMedian(object):
    """RunningMedian returns the median from a stream of numbers."""
    
    def __init__(self):
        # Use two heaps to compute the median in constant time.
        # In a stream of N numbers, minheap will hold the largest N/2.
        # In a stream of N numbers, maxheap will hold the smallest N/2.
        self.minheap = []
        self.maxheap = []  # Negate entries to use heapq as maxheap.

    def median(self, x):
        """Add a number to the stream and return the median value."""
        # Insert x into the heap that preserves the invariant of each.
        if len(self.maxheap) and x <= -self.maxheap[0]:
            heapq.heappush(self.maxheap, -x)
        else:
            heapq.heappush(self.minheap, x)

        # Check whether heaps need to be rebalanced.
        n, diff = (len(self.maxheap) + len(self.minheap),
                   len(self.maxheap) - len(self.minheap))
        iseven = n % 2 == 0
        if (iseven and abs(diff) > 0) or (not(iseven) and abs(diff) > 1):
            if diff > 0:  # Copy max to min.
                item = heapq.heappop(self.maxheap)
                heapq.heappush(self.minheap, -item)
            else:  # Copy min to max.
                item = heapq.heappop(self.minheap)
                heapq.heappush(self.maxheap, -item)

        # Return the median.
        if iseven: 
            return (self.minheap[0] + -self.maxheap[0]) / 2.
        return -self.maxheap[0] if diff > 0 else self.minheap[0]


class RunningMedianTest(unittest.TestCase):

    def test_running_median(self):
        case = collections.namedtuple('case', ['input','expected'])
        cases = [
            case([1,3,7,2,5,0], [1,2,3,2.5,3,2.5]),
            case([7,2,5,3,1,0], [7,4.5,5,4,3,2.5]),
            case([2,7,3,1,5,0], [2,4.5,3,2.5,3,2.5]),
            case([2,1,5,7,2,0,5], [2,1.5,2,3.5,2,2,2]),
        ]
        for c in cases:
            median = RunningMedian()
            for ind, x in enumerate(c.input):
                rcv = median.median(x)
                self.assertEqual(rcv, c.expected[ind])


unittest.main(RunningMedianTest(), argv=[''], verbosity=2, exit=False)

test_running_median (__main__.RunningMedianTest) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.007s

OK


<unittest.main.TestProgram at 0x7fc138498710>

## 9.2 Find most similar websites

### Problem Statement
Given an input list of $n$ tuples of urls visited by a user: $[(url_1, user_1), \cdots, (url_n, user_n)]$, return the top $k$ pairs of urls with the most similar users.

The output from this function should be of the form $(url_i, url_j, s_ij)$
where
* $url_i$ and $url_j$ with $i \neq j$ are a pair of urls
* $s_i$ is the similarity score of the users visiting this pair of urls 

#### Clarification
The intersection over union aka [Jaccard similarity](https://en.wikipedia.org/wiki/Jaccard_index) normalizes the count of users shared by a pair of urls by the union of users visiting those urls.  This makes the metric suitable for cases like this where the number samples between pairs of urls varies.

In [2]:
import collections
import heapq
import unittest


def topk_similar_users(rawdata, k):
    """Return the top k pairs of urls with the most similar users."""
    
    # Build a map from url to set of users.
    urls_and_users = collections.defaultdict(set)
    for url, user in rawdata:
        urls_and_users[url].add(user)  # Handles dedupe.

    # Compute the similarity for each n(n-1)/2 unique pair of urls.
    # Use a min heap to mantain a running top-k.
    scores, urls = [], list(urls_and_users.keys())
    for i in range(len(urls)):
        for j in range(i+1,len(urls)):
            urli, urlj = urls[i], urls[j]
            cap = len(urls_and_users[urli]
                      .intersection(urls_and_users[urlj]))
            cup = len(urls_and_users[urli]
                      .union(urls_and_users[urlj]))
            score = cap / cup if cup > 0 else 0.
            entry = (score, urli, urlj)
            if len(scores) < k:  # Add the first k scores.
                heapq.heappush(scores, entry)
            elif score > scores[0][0]:  # Replace kth score with current.
                heapq.heappop(scores)
                heapq.heappush(scores, entry)

    return sorted(scores, reverse=True)


class TopKSimilarUsers(unittest.TestCase):
    
    def setUp(self):
        self.rawdata1 = [
            ("w1", "u1"), ("w1", "u2"), ("w1", "u3"),
            ("w2", "u2"), ("w2", "u3"), ("w2", "u4"),
            ("w3", "u4"), ("w3", "u5"), ("w3", "u6"), ("w3", "u7"),
            ("w4", "u5"), ("w4", "u6"), ("w4", "u7"), ("w4", "u8"),
            ("w5", "u6"), ("w5", "u7"), ("w5", "u8"), ("w5", "u9"),
            ("w5", "u10"),
            ("w5", "u11"),
            ("w5", "u12")]
        self.k1 = 3
        self.topk1 = [
            (0.6,'w3','w4'),
            (0.5,'w1','w2'),
            (0.375,'w4','w5'),
        ]

    def test_topk_similar_users(self):
        case = collections.namedtuple('case', ['rawdata','k','expected'])
        cases = [
            case(self.rawdata1,self.k1, self.topk1),
        ]
        for c in cases:
            rcv = topk_similar_users(c.rawdata, c.k)
            self.assertEqual(rcv, c.expected)


unittest.main(TopKSimilarUsers(), argv=[''], verbosity=2, exit=False)

test_topk_similar_users (__main__.TopKSimilarUsers) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.002s

OK


<unittest.main.TestProgram at 0x7fc138406da0>

## 9.3 Generate regular numbers

### Problem Statement
Given an integer $n$, write a program that generates the first $n$ regular numbers.

[regular number](https://en.wikipedia.org/wiki/Regular_number) (def.)
> Regular numbers are numbers that evenly divide powers of 60 (or, equivalently powers of 30). As an example, 60^2 = 3600 = 48 × 75, so both 48 and 75 are divisors of a power of 60. Thus, they are regular numbers. Equivalently, they are the numbers whose only prime divisors are 2, 3, and 5.

## 9.4 Build a Huffman tree

### Problem Statement
Given a dictionary of character frequencies, build a Huffman tree, and use it to determine a mapping between characters and their encoded binary string.

Minimize the average code length of characters represented by the tree.
$$
\sum_{c \in C}^{|C|} p_c b_c
$$
where
* $C$ is the alphabet
* $p_c$ is the frequency of character $c$
* $b_c$ is the number of bits used to represent character $c$

#### Background

A [Huffman tree](https://en.wikipedia.org/wiki/Huffman_coding) is a binary tree in which the path to each leaf node in the tree is described by a series of binary digits depending on whether a node along the path is a left child (0) or right child (1).  Unlike fixed length binary representations, the tree is constructed to minimize the path length of the most frequently occuring characters.

In [3]:
import collections
import heapq
import unittest


class BinaryTreeNode(object):
    """Node in a binary tree."""
    
    def __init__(self, data=None, left=None, right=None):
        self.data = data
        self.left = left
        self.right = right


def make_huffman_tree(frequencies):
    """Make a Huffman tree from a dictionary of character frequencies."""
    assert len(frequencies) > 0, 'invalid: empty character frequencies'

    # Populate a min-heap of character frequency.
    minheap, counts = [], collections.defaultdict(int)
    for char, frequency in frequencies.items():
        # The count reflecting the order of insertion is used as 
        # a tiebreaker when inserting elements of same value into heap.
        counts[frequency] += 1
        entry = (frequency, counts[frequency], BinaryTreeNode(char))
        heapq.heappush(minheap, entry)

    # Successively merge increasingly larger frequency subtrees.
    # Higher frequency characters have shorter encoding lengths.
    while len(minheap) > 1:  # Require two subtrees to merge.
        entry1 = heapq.heappop(minheap)
        entry2 = heapq.heappop(minheap)
        sumfreq = entry1[0] + entry2[0]
        counts[sumfreq] += 1
        # Merge the subtrees, smaller subtree is right child.
        node = BinaryTreeNode(data=None)
        if entry1[0] < entry2[0]:
            node.left, node.right = entry2[2], entry1[2]
        else:
            node.left, node.right = entry1[2], entry2[2]
        heapq.heappush(minheap, (sumfreq, counts[sumfreq], node))

    return heapq.heappop(minheap)[2]


def get_encodings(huffman, encoding=''):
    """Return all binary encodings from the Huffman tree."""
    # Recursively obtain the encoding from each leaf in tree.
    if huffman.left is None and huffman.right is None:
        return {huffman.data:encoding}
    encodings = {}
    if huffman.left:
        encodings.update(get_encodings(huffman.left, encoding+'0'))
    if huffman.right:
        encodings.update(get_encodings(huffman.right, encoding+'1'))
    return encodings


class MakeHuffmanTreeTest(unittest.TestCase):
    
    def setUp(self):
        self.frequencies1 = {'A':0.6,'B':0.25,'C':.1,'D':0.05}
        self.encodings1 = {'A':'0','B':'10','C':'110','D':'111'}
        self.frequencies2 = {'A':3,'B':2,'C':6,'D':8,'E':2,'F':6}
        self.encodings2 = {'A':'011','B':'0100','C':'10','D':'00',
                           'E':'0101','F':'11'}

    def test_make_huffman_tree(self):
        case = collections.namedtuple('case', ['input','expected'])
        cases = [
            case(self.frequencies1, self.encodings1),
            case(self.frequencies2, self.encodings2),
        ]
        for c in cases:
            rcv = make_huffman_tree(c.input)
            encodings = get_encodings(rcv)
            self.assertEqual(encodings, c.expected)


unittest.main(MakeHuffmanTreeTest(), argv=[''], verbosity=2, exit=False)

test_make_huffman_tree (__main__.MakeHuffmanTreeTest) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.010s

OK


<unittest.main.TestProgram at 0x7fc13842a320>