In [1]:
from random import random
from collections import defaultdict
from abc import ABC, abstractmethod
from typing import Dict, Tuple, Optional

def print_bits(input_bit, output_bit):
    print(input_bit, output_bit if output_bit is not None else '.')


class BitSource(ABC):
    @abstractmethod
    def next(self) -> int:
        pass


class RandomnessExtractor(ABC):
    @abstractmethod
    def next(self) -> Optional[int]:
        pass


class BiasedBitSource(BitSource):
    def __init__(self, bias=None):
        if bias is None:
            bias = random()
        self.bias = bias

    def next(self) -> int:
        if random() < self.bias:
            return 0
        else:
            return 1


class VaryingBiasedBitSource(BitSource):
    def __init__(self, min_cutoff=0, max_cutoff=1):
        self.min_cutoff = min_cutoff
        self.max_cutoff = max_cutoff
        self.bias = random()

    def next_bias(self, draw):
        bias = draw**2
        if random() < 0.5:
            bias = 1 - bias
        bias = max(self.min_cutoff, bias)
        bias = min(self.max_cutoff, bias)
        return bias

    def next(self) -> int:
        draw = random()
        if draw < self.bias:
            result = 0
        else:
            result = 1
        self.bias = self.next_bias(draw)
        return result


class PassthroughExtractor(RandomnessExtractor):
    def __init__(self, bit_source: BitSource):
        self.bit_source = bit_source
        self.bits = []

    def next(self) -> Optional[int]:
        bit = self.bit_source.next()
        result = bit
        print_bits(bit, result)
        return result


class VonNeumannExtractor(RandomnessExtractor):
    def __init__(self, bit_source: BitSource):
        self.bit_source = bit_source
        self.bits = []

    def next(self) -> Optional[int]:
        bit = self.bit_source.next()
        if bit not in [0, 1]:
            raise ValueError("Bit must be 0 or 1.")
        self.bits.append(bit)
        result = None
        if len(self.bits) == 2:
            if self.bits == [0, 1]:
                result = 1
            elif self.bits == [1, 0]:
                result = 0
            self.bits = []  # Reset for the next pair of bits
        print_bits(bit, result)
        return result


class ParityExtractor(RandomnessExtractor):
    def __init__(self, bit_source: BitSource, N: int):
        self.bit_source = bit_source
        if N <= 0:
            raise ValueError("N must be a positive integer.")
        self.N = N
        self.bits = []

    def next(self) -> Optional[int]:
        bit = self.bit_source.next()
        if bit not in [0, 1]:
            raise ValueError("Bit must be 0 or 1.")
        self.bits.append(bit)
        result = None
        if len(self.bits) == self.N:
            result = sum(self.bits) % 2
            self.bits = []  # Reset for the next block of N bits
        print_bits(bit, result)
        return result


class MarkovChainBitSource(BitSource):
    def __init__(self, start_state: str):
        self.current_state = start_state
    
    @abstractmethod
    def get_transitions(self, state: str) -> Dict[int, Tuple[str, float]]:
        pass

    def next(self) -> int:
        transitions = self.get_transitions(self.current_state)
        rand = random()
        cumulative_probability = 0.0
        for bit, (next_state, probability) in transitions.items():
            cumulative_probability += probability
            if rand < cumulative_probability:
                self.current_state = next_state
                return bit
        raise RuntimeError("Probabilities do not sum to 1.")


class BlumExtractor(RandomnessExtractor):
    def __init__(self, bit_source: MarkovChainBitSource):
        self.bit_source = bit_source
        self.state_bits = defaultdict(list)  # Stores the last bits for each state

    def next(self) -> Optional[int]:
        current_state = self.bit_source.current_state
        bit = self.bit_source.next()
        self.state_bits[current_state].append(bit)
        result = None
        if len(self.state_bits[current_state]) == 2:
            bits = self.state_bits[current_state]
            if bits == [0, 1]:
                result = 1
            elif bits == [1, 0]:
                result = 0
            self.state_bits[current_state] = []
        print_bits(bit, result)
        return result

In [2]:
bit_source = BiasedBitSource(bias=0.5)
extractor = PassthroughExtractor(bit_source)

for _ in range(100):
    result = extractor.next()

0 0
1 1
1 1
0 0
1 1
0 0
0 0
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
1 1
0 0
0 0
1 1
0 0
0 0
1 1
1 1
1 1
0 0
1 1
1 1
0 0
0 0
1 1
0 0
1 1
0 0
1 1
1 1
1 1
0 0
0 0
1 1
0 0
1 1
1 1
0 0
0 0
1 1
1 1
0 0
0 0
0 0
1 1
1 1
0 0
0 0
1 1
1 1
0 0
1 1
1 1
1 1
1 1
0 0
0 0
0 0
0 0
0 0
1 1
1 1
0 0
1 1
1 1
1 1
0 0
0 0
1 1
0 0
1 1
0 0
0 0
0 0
0 0
1 1
0 0
1 1
0 0
0 0
0 0
0 0
1 1
1 1
1 1
0 0
0 0
1 1
1 1
0 0
0 0
1 1
0 0
1 1


In [3]:
bit_source = VaryingBiasedBitSource(min_cutoff=0.1, max_cutoff=0.7)
extractor = ParityExtractor(bit_source=bit_source, N=4)

for _ in range(100):
    result = extractor.next()

1 .
1 .
0 .
0 0
1 .
1 .
1 .
0 1
1 .
1 .
1 .
0 1
0 .
0 .
1 .
0 1
0 .
0 .
0 .
0 0
1 .
1 .
0 .
1 1
0 .
0 .
1 .
0 1
0 .
1 .
1 .
1 1
1 .
0 .
1 .
0 0
0 .
0 .
0 .
0 0
1 .
1 .
0 .
1 1
0 .
1 .
0 .
1 0
1 .
0 .
0 .
0 1
0 .
1 .
1 .
1 1
0 .
0 .
1 .
1 0
0 .
1 .
0 .
1 0
1 .
0 .
1 .
0 0
0 .
0 .
0 .
0 0
1 .
1 .
1 .
1 0
0 .
0 .
1 .
1 0
0 .
1 .
0 .
0 1
0 .
1 .
1 .
1 1
1 .
0 .
1 .
1 1
0 .
1 .
0 .
1 0
0 .
1 .
1 .
0 0


In [4]:
bit_source = BiasedBitSource()
extractor = VonNeumannExtractor(bit_source)

for _ in range(100):
    result = extractor.next()

0 .
1 1
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
0 0
1 .
1 .
1 .
1 .
1 .
1 .
1 .
0 0
1 .
1 .
1 .
0 0
1 .
1 .
1 .
0 0
1 .
1 .
0 .
1 1
1 .
1 .
1 .
1 .
1 .
1 .
1 .
1 .
0 .
1 1
0 .
0 .
0 .
1 1
1 .
1 .
1 .
1 .
1 .
1 .
0 .
1 1
1 .
1 .
1 .
0 0
1 .
1 .
1 .
1 .
1 .
1 .
0 .
0 .
1 .
1 .
1 .
0 0
0 .
0 .
1 .
1 .
1 .
1 .
1 .
1 .
0 .
1 1
1 .
1 .
0 .
1 1
1 .
0 0
1 .
1 .
1 .
1 .
0 .
1 1


In [5]:
class ExampleMarkovChainBitSource(MarkovChainBitSource):
    def __init__(self):
        super().__init__(start_state='A')
        self.transitions = {
            'A': {0: ('B', 0.5), 1: ('C', 0.5)},
            'B': {0: ('A', 0.7), 1: ('C', 0.3)},
            'C': {0: ('A', 0.4), 1: ('B', 0.6)},
        }
    
    def get_transitions(self, state: str) -> Dict[int, Tuple[str, float]]:
        return self.transitions[state]


bit_source = ExampleMarkovChainBitSource()
extractor = ParityExtractor(bit_source=bit_source, N=4)
for _ in range(100):
    result = extractor.next()

0 .
1 .
0 .
1 0
0 .
1 .
0 .
0 1
0 .
1 .
0 .
1 0
1 .
1 .
0 .
0 0
1 .
0 .
0 .
1 0
1 .
0 .
1 .
0 0
1 .
1 .
1 .
0 1
0 .
1 .
0 .
0 1
0 .
0 .
1 .
0 1
1 .
0 .
0 .
0 1
1 .
1 .
1 .
0 1
0 .
0 .
0 .
0 0
1 .
1 .
1 .
1 0
0 .
0 .
1 .
0 1
1 .
1 .
0 .
1 1
1 .
0 .
0 .
0 1
0 .
0 .
1 .
1 0
1 .
0 .
0 .
1 0
0 .
1 .
1 .
0 0
0 .
0 .
1 .
1 0
0 .
0 .
1 .
1 0
0 .
1 .
1 .
0 0
1 .
1 .
0 .
0 0
0 .
0 .
1 .
1 0
0 .
1 .
0 .
1 0


In [6]:
class ExampleMarkovChainBitSource(MarkovChainBitSource):
    def __init__(self):
        super().__init__(start_state='A')
        self.transitions = {
            'A': {0: ('B', 0.5), 1: ('C', 0.5)},
            'B': {0: ('A', 0.7), 1: ('C', 0.3)},
            'C': {0: ('A', 0.4), 1: ('B', 0.6)},
        }
    
    def get_transitions(self, state: str) -> Dict[int, Tuple[str, float]]:
        return self.transitions[state]


bit_source = ExampleMarkovChainBitSource()
extractor = BlumExtractor(bit_source)

for _ in range(100):
    result = extractor.next()

0 .
0 .
1 1
0 .
1 .
1 1
0 .
1 .
0 .
0 .
1 .
1 1
0 0
1 1
0 .
1 .
0 .
1 .
0 .
0 .
0 .
1 1
0 .
1 .
0 .
1 .
0 .
0 .
0 .
1 1
1 .
1 .
1 .
0 0
0 .
0 .
1 1
1 .
0 .
0 .
0 .
0 .
0 .
0 .
0 .
1 1
1 .
0 .
0 .
1 .
1 .
0 0
0 .
0 .
0 .
1 1
1 .
1 .
0 .
0 .
1 .
0 .
0 .
0 .
1 1
0 .
1 .
1 1
1 1
1 .
0 .
1 .
0 0
1 .
1 .
0 .
0 0
0 .
0 .
1 1
1 .
0 .
0 .
0 .
0 .
0 .
0 .
0 .
1 .
0 .
0 0
0 .
0 .
1 1
1 1
1 .
0 .
1 1
0 .
0 .
