# Task 4: Demonstrating Quantum Advantage

In [93]:
"""
This task is intentionally open-ended, so no boilerplate code is provided.

You may use this notebook to develop your solution, or create a separate file if you prefer.
We recommend starting by copying over your previous implementations of the QRNG, TRNG, and PRNG.
Then, explore ways to modularize and combine these components to design a use case that
demonstrates the unique advantages of quantum randomness.

Your write-up can be included directly in this notebook or submitted separately.
You're welcome to prepare it as a Google Doc or LaTeX document and upload a PDF to the GitHub repository—
just be sure to clearly indicate where it can be found if it's not included here.
"""

"\nThis task is intentionally open-ended, so no boilerplate code is provided.\n\nYou may use this notebook to develop your solution, or create a separate file if you prefer.\nWe recommend starting by copying over your previous implementations of the QRNG, TRNG, and PRNG.\nThen, explore ways to modularize and combine these components to design a use case that\ndemonstrates the unique advantages of quantum randomness.\n\nYour write-up can be included directly in this notebook or submitted separately.\nYou're welcome to prepare it as a Google Doc or LaTeX document and upload a PDF to the GitHub repository—\njust be sure to clearly indicate where it can be found if it's not included here.\n"

### Necessary Imports

In [None]:
import numpy as np
import requests
import random
import matplotlib.pyplot as plt
from scipy.stats import chisquare
from qiskit import QuantumCircuit
from qiskit_aer import AerSimulator
from qiskit.visualization import circuit_drawer, plot_circuit_layout
from scipy.special import erf
from collections import Counter
import hashlib
import json
from typing import Tuple, List, Dict, Any
import os
import pyaudio

In [95]:
# Configuration
NUM_TRIALS = 1000
QRNG_API_URL = "https://qrng.idqloud.com/api/1.0"  # Replace with actual API
API_KEY = "aTo4BKRvnc49uRWDk034zaua87vGRXKk9TMLdfkI"import requests
import numpy as np
import random
import os
import matplotlib.pyplot as plt
from scipy.stats import norm
import math
from collections import Counter

### Class for getting qrng values### Template for using api

In [4]:
import requests
from typing import Union, List, Optional

QRNG_API_URL = "https://qrng.idqloud.com/api/1.0"
class QRNGaaS:
    def __init__(self, api_key: str):
        self.base_url = QRNG_API_URL
        self.headers = {
            "X-API-KEY": api_key,
            "Content-Type": "application/json"
        }
        self.max_batch_size = 32  # Actual API limit (32 for int, 64 for short)
        self.max_double_batch = 16
    
    def get_random_ints_batched(self, quantity: int) -> List[int]:
        """Get random 32-bit integers with batching to handle API limits"""
        batches = quantity // self.max_batch_size
        remainder = quantity % self.max_batch_size
        
        results = []
        for _ in range(batches):
            results.extend(self._get_batch(self.max_batch_size))
        
        if remainder > 0:
            results.extend(self._get_batch(remainder))
            
        return results
    
    def _get_batch(self, batch_size: int) -> List[int]:
        """Get a single batch of random integers"""
        params = {"quantity": batch_size}
        try:
            response = requests.get(
                f"{self.base_url}/int",
                headers=self.headers,
                params=params,
                timeout=10  # Add timeout to prevent hanging
            )
            response.raise_for_status()
            return response.json()["data"]
        except requests.exceptions.RequestException as e:
            print(f"API request failed: {e}")
            return []  # Return empty list on failure
        
    def _get_double_batch(self, batch_size: int, min_val: float, max_val: float) -> List[float]:
        params = {"min": min_val, "max": max_val, "quantity": batch_size}
        try:
            resp = requests.get(f"{self.base_url}/double", headers=self.headers, params=params, timeout=10)
            resp.raise_for_status()
            return resp.json()["data"]
        except requests.RequestException as e:
            print(f"Double batch request failed: {e}")
            return []

    def get_random_doubles_batched(
        self,
        quantity: int,
        min_val: float = 0.0,
        max_val: float = 1.0
    ) -> List[float]:
        """
        Retrieve 'quantity' true-random doubles in [min_val, max_val), batching to respect API limits.
        """
        full_batches = quantity // self.max_double_batch
        remainder = quantity % self.max_double_batch
        result: List[float] = []
        for _ in range(full_batches):
            result.extend(self._get_double_batch(self.max_double_batch, min_val, max_val))
        if remainder:
            result.extend(self._get_double_batch(remainder, min_val, max_val))
        return result

        


### Class for getting mic trng values

In [97]:
class MicrophoneRNG:
    def __init__(self, sample_rate=44100, chunk_size=1024, format=pyaudio.paInt16):
        self.sample_rate = sample_rate
        self.chunk_size = chunk_size
        self.format = format
        self.audio = pyaudio.PyAudio()
        self.stream = None
        self.entropy_pool = bytearray()

    def _start_stream(self):
        """Start audio stream if not already running"""
        if self.stream is None or not self.stream.is_active():
            self.stream = self.audio.open(
                format=self.format,
                channels=1,
                rate=self.sample_rate,
                input=True,
                frames_per_buffer=self.chunk_size
            )

    def _stop_stream(self):
        """Stop audio stream if running"""
        if self.stream is not None and self.stream.is_active():
            self.stream.stop_stream()
            self.stream.close()
            self.stream = None

    def collect_entropy(self, duration_seconds=0.5, visualize=False):
        """Collect entropy from microphone for specified duration"""
        self._start_stream() # TODO: start the stream

        frames = []
        num_chunks = int(self.sample_rate * duration_seconds // self.chunk_size) # TODO: compute the number of chunks using the sample rate, duration in seconds, and chunk size.
        # hint: there is a division involved. make sure to cast to int, or use integer division.


        print(f"Collecting ambient noise for {duration_seconds} seconds...")

        # Read audio data
        for _ in range(max(1, num_chunks)):
            data = self.stream.read(self.chunk_size, exception_on_overflow=False)
            frames.append(data)

        # Convert to numpy array for processing
        if self.format == pyaudio.paInt16:
            format_size = 2  # bytes per sample
            format_char = 'h'  # short integer
        else:
            raise ValueError("Unsupported audio format")

        # Unpack audio data
        audio_data = []
        for frame in frames:
            count = len(frame) // format_size
            fmt = f"{count}{format_char}"
            audio_data.extend(struct.unpack(fmt, frame))

        # Extract entropy from least significant bits
        raw_entropy = bytearray()
        for sample in audio_data:
            # Take the least significant byte (where noise is most prominent)

            lsb = sample & 0xFF # TODO: Get the lowest 8 bits of sample
            # hint: bitwise ops, just like in Part A


            raw_entropy.append(lsb)

        # Visualize if requested
        if visualize:
            plt.figure(figsize=(12, 6))

            # Plot raw audio waveform
            plt.subplot(2, 1, 1)
            plt.plot(audio_data[:1000])
            plt.title("Raw Audio Waveform (first 1000 samples)")
            plt.xlabel("Sample")
            plt.ylabel("Amplitude")

            # Plot histogram of the extracted entropy bytes
            plt.subplot(2, 1, 2)
            plt.hist(raw_entropy, bins=32, color='green', alpha=0.7)
            plt.title("Distribution of Extracted Entropy Bytes")
            plt.xlabel("Byte Value")
            plt.ylabel("Frequency")

            plt.tight_layout()
            plt.show()

        # Add to entropy pool
        self.entropy_pool.extend(raw_entropy)

        return raw_entropy

    def get_random_bytes(self, num_bytes=32):
        """Generate random bytes from the entropy pool"""
        # Make sure we have enough entropy
        while len(self.entropy_pool) < num_bytes * 2:  # Get more than we need
            self.collect_entropy(0.1)

        # Mix entropy with SHA-256
        h = hashlib.sha256()
        h.update(self.entropy_pool)

        # Get a seed from the hash
        seed = h.digest()

        # Generate additional random bytes using the seed
        result = bytearray()
        counter = 0

        while len(result) < num_bytes:
            # Create a unique input for each iteration
            h = hashlib.sha256()
            h.update(seed)
            h.update(counter.to_bytes(4, byteorder='big'))
            result.extend(h.digest())
            counter += 1

        # Remove used entropy from the pool
        self.entropy_pool = self.entropy_pool[num_bytes:]

        return bytes(result[:num_bytes])

    def get_random_int(self, min_val=0, max_val=100):
        """Generate a random integer between min_val and max_val (inclusive)"""
        range_size = max_val - min_val + 1
        if range_size <= 0:
            raise ValueError("Invalid range")

        # Calculate how many bits we need
        bits_needed = range_size.bit_length()
        bytes_needed = (bits_needed + 7) // 8

        # Get random bytes
        random_bytes = self.get_random_bytes(bytes_needed)

        # Convert to integer and map to our range
        value = int.from_bytes(random_bytes, byteorder='big')
        return min_val + (value % range_size)

    def visualize_randomness(self, num_samples=1000):
        """Generate and visualize random numbers"""
        samples = [self.get_random_int(0, 255) for _ in range(num_samples)]

        plt.figure(figsize=(12, 10))

        # Plot 1: Distribution histogram
        plt.subplot(2, 2, 1)
        plt.hist(samples, bins=32, color='blue', alpha=0.7)
        plt.title('Distribution of Random Values')
        plt.xlabel('Value')
        plt.ylabel('Frequency')

        # Plot 2: Sequential values
        plt.subplot(2, 2, 2)
        plt.plot(samples[:100], '.-', alpha=0.7)
        plt.title('First 100 Generated Values')
        plt.xlabel('Sample Index')
        plt.ylabel('Value')

        # Plot 3: Scatter plot of consecutive values
        plt.subplot(2, 2, 3)
        plt.scatter(samples[:-1], samples[1:], alpha=0.5, s=5)
        plt.title('Scatter Plot of Consecutive Values')
        plt.xlabel('Value n')
        plt.ylabel('Value n+1')

        # Plot 4: Autocorrelation
        plt.subplot(2, 2, 4)
        autocorr = np.correlate(samples, samples, mode='full')
        autocorr = autocorr[len(autocorr)//2:]
        autocorr = autocorr / autocorr[0]
        plt.plot(autocorr[:50])
        plt.title('Autocorrelation')
        plt.xlabel('Lag')
        plt.ylabel('Correlation')

        plt.tight_layout()
        plt.show()

        return samples

    def __del__(self):
        """Clean up when object is destroyed"""
        self._stop_stream()
        self.audio.terminate()

In [98]:
class RNGProviders:
    @staticmethod
    def qrng(size: int = 1) -> List[int]:
        """Quantum RNG from IDQuantique's service (simulated)"""
        qrng = QRNGaaS(API_KEY)
        ints = qrng.get_random_ints_batched(quantity=size)
        return ints
    
    @staticmethod
    def trng(size: int = 1) -> List[int]:
        """System TRNG (uses system entropy)"""
        mic_trng = MicrophoneRNG()
        ints = []
        for _ in range(size):
            ints.append(mic_trng.get_random_int(-2147483648, 2147483647))
        return ints
    
    @staticmethod
    def prng(size: int = 1) -> List[int]:
        """Mersenne Twister PRNG"""
        return [random.getrandbits(1) for _ in range(size)]

In [99]:
class BellTest:
    def __init__(self, rng_provider: callable):
        self.rng = rng_provider
        self.results: List[Tuple[int, int, int, int]] = []
        self.simulator = AerSimulator()
        self.rng_bits: List[int] = []  
        
    def run_experiment(self, num_trials: int) -> None:
        for _ in range(num_trials):
            # Alice and Bob choose measurement bases independently
            try:
                a_raw = self.rng(1)[0]
                b_raw = self.rng(1)[0]

                a_basis = a_raw % 2
                b_basis = b_raw % 2

                self.rng_bits.append(a_basis)
                self.rng_bits.append(b_basis)
            except Exception as e:
                print(f"Error getting random numbers: {e}")
                a_basis, b_basis = 0, 0
            
            # Create and measure entangled pair
            qc = self._create_bell_circuit(a_basis, b_basis)
            
            # Simulate and get results
            result = self.simulator.run(qc, shots=1).result()
            counts = result.get_counts(qc)
            outcome = list(counts.keys())[0]
            
            a_result = int(outcome[0])
            b_result = int(outcome[1])
            
            self.results.append((a_basis, b_basis, a_result, b_result))

    def get_rng_bits(self) -> str:
        """Return raw RNG bits used for basis selection as a binary string"""
        return ''.join(str(bit) for bit in self.rng_bits)
    
    def _create_bell_circuit(self, a_basis: int, b_basis: int) -> QuantumCircuit:
        """Create a Bell test circuit with given measurement bases"""
        qc = QuantumCircuit(2, 2)
        
        # Create Bell state (|00> + |11>)/sqrt(2)
        qc.h(0)
        qc.cx(0, 1)
        
        # Measurement bases (0: Z basis, 1: X basis)
        if a_basis:
            qc.h(0)
        if b_basis:
            qc.h(1)
            
        qc.measure([0, 1], [0, 1])
        return qc
    
    def calculate_chsh(self) -> float:
        """Calculate the CHSH inequality value S"""
        correlations = {
            (0, 0): 0, (0, 1): 0,
            (1, 0): 0, (1, 1): 0
        }
        counts = {
            (0, 0): 0, (0, 1): 0,
            (1, 0): 0, (1, 1): 0
        }
        
        for a_basis, b_basis, a_result, b_result in self.results:
            # Convert to ±1
            a_val = 1 if a_result == 0 else -1
            b_val = 1 if b_result == 0 else -1
            
            correlations[(a_basis, b_basis)] += a_val * b_val
            counts[(a_basis, b_basis)] += 1
        
        # Calculate expectation values
        E = {}
        for basis in correlations:
            if counts[basis] > 0:
                E[basis] = correlations[basis] / counts[basis]
            else:
                E[basis] = 0
                
        # CHSH value
        S = E[(0, 0)] - E[(0, 1)] + E[(1, 0)] + E[(1, 1)]
        return S
    
    def run_statistical_tests(self) -> Dict[str, Any]:
        """Run battery of statistical tests on the RNG outputs"""
        # Extract all measurement choices
        choices = [a_basis for a_basis, _, _, _ in self.results] + \
                  [b_basis for _, b_basis, _, _ in self.results]
        
        # Test uniformity
        freq = Counter(choices)
        total = len(choices)
        chi2, p_value = chisquare(list(freq.values()))
        
        # Calculate Shannon entropy
        entropy = -sum((count/total) * np.log2(count/total) 
                  for count in freq.values() if count > 0)
        
        # Autocorrelation test
        shifted = choices[1:] + [choices[0]]
        matches = sum(1 for a, b in zip(choices, shifted) if a == b)
        autocorr = matches / len(choices)
        
        # Run additional tests (simplified versions)
        runs_test = self._perform_runs_test(choices)
        spectral_test = self._perform_spectral_test(choices)
        
        return {
            'chi_squared': (chi2, p_value),
            'shannon_entropy': entropy,
            'autocorrelation': autocorr,
            'runs_test': runs_test,
            'spectral_test': spectral_test,
            'ideal_entropy': 1.0,
            'ideal_autocorr': 0.5
        }
    
    def _perform_runs_test(self, sequence: List[int]) -> Dict[str, float]:
        """Simplified runs test for randomness"""
        runs = 1
        for i in range(1, len(sequence)):
            if sequence[i] != sequence[i-1]:
                runs += 1
                
        n = len(sequence)
        expected_runs = (2 * n - 1) / 3
        std_dev = np.sqrt((16 * n - 29) / 90)
        z_score = (runs - expected_runs) / std_dev
        
        return {
            'runs': runs,
            'expected': expected_runs,
            'z_score': z_score,
            'p_value': 2 * (1 - self._normal_cdf(abs(z_score)))
        }
    
    def _perform_spectral_test(self, sequence: List[int]) -> Dict[str, float]:
        """Simplified spectral test for randomness"""
        # Convert to ±1
        s = np.array([1 if x else -1 for x in sequence])
        n = len(s)
        
        # Discrete Fourier Transform
        fft = np.fft.fft(s)
        magnitudes = np.abs(fft)[1:n//2]  # Ignore DC component and symmetric part
        
        # Compute normalized peak height
        peak_height = np.max(magnitudes) / np.sqrt(n * np.log(n))
        
        return {
            'peak_height': peak_height,
            'threshold': np.sqrt(2.9957326),  # 99% confidence level
            'passed': peak_height < np.sqrt(2.9957326)
        }
    
    def show_circuit(self, a_basis: int, b_basis: int) -> None:
        """Display the Bell circuit with specified measurement bases"""
        qc = self._create_bell_circuit(a_basis, b_basis)
        qc.draw(output="mpl")  # This uses matplotlib to draw
        plt.show()
    
    @staticmethod
    def _normal_cdf(x: float) -> float:
        """Approximation of standard normal CDF"""
        return (1.0 + erf(x / np.sqrt(2.0))) / 2.0

In [100]:
def compare_rng_methods() -> Dict[str, Dict[str, Any]]:
    methods = {
        'QRNG': RNGProviders.qrng,
        'TRNG': RNGProviders.trng,
        'PRNG': RNGProviders.prng
    }

    
    results = {}
    
    for name, provider in methods.items():
        print(f"\nRunning {name}...")
        test = BellTest(provider)
        test.run_experiment(NUM_TRIALS)

        # Get the bit sequence from the BellTest instance
        input_bits = test.get_rng_bits()  # Make sure this returns a string like '010110...'

        chsh_value = test.calculate_chsh()
        stats = test.run_statistical_tests()
        
        results[name] = {
            'CHSH': chsh_value,
            'stats': stats,
            'violation': chsh_value > 2  # Classical bound
        }
        
        print(f"{name} Results:")
        print(f"  CHSH Value: {chsh_value:.4f}")
        print(f"  Violation: {'Yes' if chsh_value > 2 else 'No'}")
        print(f"  Shannon Entropy: {stats['shannon_entropy']:.4f} (Ideal: 1.0)")
        print(f"  Autocorrelation: {stats['autocorrelation']:.4f} (Ideal: 0.5)")
        print(f"  Chi-squared p-value: {stats['chi_squared'][1]:.4f}")
        print(f"  Runs Test z-score: {stats['runs_test']['z_score']:.4f}")
        print(f"  Spectral Test passed: {stats['spectral_test']['passed']}")
    
    return results

In [101]:
def visualize_results(results: Dict[str, Dict[str, Any]]) -> None:
    names = list(results.keys())
    chsh_values = [results[name]['CHSH'] for name in names]
    violations = [results[name]['violation'] for name in names]
    
    plt.figure(figsize=(12, 6))
    
    # CHSH values plot
    plt.subplot(1, 2, 1)
    bars = plt.bar(names, chsh_values, color=['#1f77b4', '#2ca02c', '#d62728'])
    plt.axhline(y=2, color='black', linestyle='--', label='Classical Bound')
    plt.axhline(y=2*np.sqrt(2), color='purple', linestyle=':', label='Quantum Bound')
    plt.ylabel('CHSH Value')
    plt.title('CHSH Inequality Test Results')
    plt.legend()
    
    # Statistical tests plot
    plt.subplot(1, 2, 2)
    entropy = [results[name]['stats']['shannon_entropy'] for name in names]
    autocorr = [results[name]['stats']['autocorrelation'] for name in names]
    
    x = np.arange(len(names))
    width = 0.35
    
    plt.bar(x - width/2, entropy, width, label='Entropy (ideal=1.0)')
    plt.bar(x + width/2, autocorr, width, label='Autocorr (ideal=0.5)')
    plt.xticks(x, names)
    plt.title('Statistical Test Results')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

In [5]:
if __name__ == "__main__":
    # Run the comparison

    results = compare_rng_methods()
    visualize_results(results)

    bell_test = BellTest(RNGProviders.qrng)
    bell_test.show_circuit(a_basis=0, b_basis=1)
    
    # Print detailed results
    print("\nDetailed Results:")
    for name, data in results.items():
        print(f"\n{name}:")
        print(f"  CHSH Value: {data['CHSH']:.4f}")
        print(f"  Violation of Classical Bound: {data['violation']}")
        print("  Statistical Tests:")
        for test, value in data['stats'].items():
            if test not in ['ideal_entropy', 'ideal_autocorr']:
                print(f"    {test}: {value}")    API_KEY = "aTo4BKRvnc49uRWDk034zaua87vGRXKk9TMLdfkI"
    qrng = QRNGaaS(API_KEY)

    # Example: get 100 doubles in batches
    doubles = qrng.get_random_doubles_batched(quantity=100, min_val=-1.0, max_val=1.0)
    print("Random doubles (sample):", doubles[::])

    #Example: get 100 ints in batches
    ints = qrng.get_random_ints_batched(quantity=100)
    print("Random ints (sample):", ints[::])


Running QRNG...
QRNG Results:
  CHSH Value: 2.0958
  Violation: Yes
  Shannon Entropy: 1.0000 (Ideal: 1.0)
  Autocorrelation: 0.5010 (Ideal: 0.5)
  Chi-squared p-value: 0.8933
  Runs Test z-score: -17.7741
  Spectral Test passed: True

Running TRNG...
Collecting ambient noise for 0.1 seconds...
Error getting random numbers: name 'struct' is not defined
Collecting ambient noise for 0.1 seconds...
Error getting random numbers: name 'struct' is not defined
Collecting ambient noise for 0.1 seconds...
Error getting random numbers: name 'struct' is not defined
Collecting ambient noise for 0.1 seconds...
Error getting random numbers: name 'struct' is not defined
Collecting ambient noise for 0.1 seconds...
Error getting random numbers: name 'struct' is not defined
Collecting ambient noise for 0.1 seconds...
Error getting random numbers: name 'struct' is not defined
Collecting ambient noise for 0.1 seconds...
Error getting random numbers: name 'struct' is not defined
Collecting ambient noise f

KeyboardInterrupt: 