# Setup


In [2]:
import numba
import numpy as np

Create a small array of random number such that it can be used to test the algorithms to make sure they work.


In [3]:
np.random.seed(42)
test_array = np.random.randint(1, 100_000, 100, dtype=np.int32)
test_array[0:10]

array([15796,   861, 76821, 54887,  6266, 82387, 37195, 87499, 44132,
       60264], dtype=int32)

# Casual Use


## Implementation #1

Implementation edited slightly to take in an optional input array parameter.


In [4]:
import random


def digit_sum(n):
    """Calculate the sum of digits of a number"""
    return sum(int(digit) for digit in str(n))


def find_difference(numbers=None):
    if not numbers:
        # Generate list of 1 million random integers
        numbers = [random.randint(1, 100000) for _ in range(1000000)]

    # Initialize variables for min and max numbers with digit sum 30
    min_num = float("inf")  # Initialize to positive infinity
    max_num = float("-inf")  # Initialize to negative infinity

    # Find numbers whose digits sum to 30
    for num in numbers:
        if digit_sum(num) == 30:
            min_num = min(min_num, num)
            max_num = max(max_num, num)

    # Check if we found any numbers with digit sum 30
    if min_num == float("inf") or max_num == float("-inf"):
        return "No numbers found with digit sum of 30"

    return max_num - min_num


def casual_implementation_1(numbers=None):
    return find_difference(numbers)

In [5]:
%%time

casual_implementation_1(test_array.tolist())

CPU times: user 77 μs, sys: 0 ns, total: 77 μs
Wall time: 78.7 μs


62910

Benchmark the code: due to slowness, only do 10 trials for this implementation.


In [6]:
%%timeit -n 1 -r 10

casual_implementation_1()

657 ms ± 30.7 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)


## Implementation #2

Implementation edited slightly to take in an optional input array parameter.

For a fair benchmark, the `DigitSumFinder` is pre-instantiated.


In [7]:
import random
from array import array
from typing import Tuple, Optional
import time


class DigitSumFinder:
    def __init__(
        self,
        target_sum: int = 30,
        range_start: int = 1,
        range_end: int = 100_000,
        count: int = 1_000_000,
    ):
        self.target_sum = target_sum
        self.range_start = range_start
        self.range_end = range_end
        self.count = count

        # Pre-calculate digit sums for all possible numbers
        self.digit_sums = self._precompute_digit_sums()

    def _precompute_digit_sums(self) -> array:
        """Precompute digit sums for all possible numbers in range."""
        digit_sums = array("B", [0] * (self.range_end + 1))
        for num in range(self.range_start, self.range_end + 1):
            total = 0
            n = num
            while n:
                total += n % 10
                n //= 10
            digit_sums[num] = total
        return digit_sums

    def find_difference(
        self, input_numbers=None
    ) -> Tuple[int, Optional[int], Optional[int]]:
        """
        Find the difference between max and min numbers with target digit sum.
        Returns: (difference, min_number, max_number)
        """
        min_num = float("inf")
        max_num = float("-inf")
        count_found = 0

        if input_numbers:
            for num in input_numbers:
                if self.digit_sums[num] == self.target_sum:
                    count_found += 1
                    if num < min_num:
                        min_num = num
                    if num > max_num:
                        max_num = num

        else:
            # Generate and process random numbers
            for _ in range(self.count):
                num = random.randint(self.range_start, self.range_end)
                if self.digit_sums[num] == self.target_sum:
                    count_found += 1
                    if num < min_num:
                        min_num = num
                    if num > max_num:
                        max_num = num

        if count_found == 0:
            return 0, None, None

        return max_num - min_num, min_num, max_num


def casual_implementation_2(dsf, numbers=None):
    return dsf.find_difference(numbers)


dsf = DigitSumFinder()

In [8]:
%%time

casual_implementation_2(dsf, test_array.tolist())

CPU times: user 23 μs, sys: 0 ns, total: 23 μs
Wall time: 25 μs


(62910, 23898, 86808)

In [9]:
%%timeit -n 1 -r 10

casual_implementation_2(dsf)

244 ms ± 6.9 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)


## Implementation #3

Test skipped since this implementation is convoluted.

Because the script uses `ProcessPoolExecutor`, the code [cannot be included in a notebook](https://stackoverflow.com/questions/15900366/all-example-concurrent-futures-code-is-failing-with-brokenprocesspool) and must be benchmarked remotely. Therefore, the script logic is included in `scripts/casual_implementation_3.py` with edits.

The average performance is **130 ms**.


## Implementation #4

Test skipped since this implementation is convoluted.

Because the script uses `ProcessPoolExecutor`, the code [cannot be included in a notebook](https://stackoverflow.com/questions/15900366/all-example-concurrent-futures-code-is-failing-with-brokenprocesspool) and must be benchmarked remotely. Therefore, the script logic is included in `scripts/casual_implementation_4.py` with edits.

The average performance is **160 ms**.


## Implementation #5

Removes some needless/unused Promethous logging code.


In [20]:
from __future__ import annotations

import asyncio
import logging
import multiprocessing as mp
import os
import signal
import sys
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass, field
from functools import partial, wraps
from pathlib import Path
from typing import (
    Any,
    AsyncIterator,
    Callable,
    Final,
    Optional,
    Protocol,
    TypeAlias,
    TypeVar,
    cast,
)

import numpy as np
import numpy.typing as npt
import orjson
import psutil
from numba import jit, prange
from rich.console import Console
from rich.progress import Progress, TaskID
from rich.table import Table

# Type definitions
T = TypeVar("T")
Number: TypeAlias = int | float
ArrayInt: TypeAlias = npt.NDArray[np.int_]
ArrayBool: TypeAlias = npt.NDArray[np.bool_]

# Constants
METRICS_PORT: Final[int] = 8000
DEFAULT_CHUNK_SIZE: Final[int] = 100_000
MAX_MEMORY_USAGE: Final[float] = 0.75  # 75% of available memory


@dataclass(frozen=True, slots=True)
class Config:
    """Application configuration."""

    min_value: int
    max_value: int
    sample_size: int
    target_sum: int
    chunk_size: int = field(default=DEFAULT_CHUNK_SIZE)
    num_processes: int = field(default=mp.cpu_count())
    log_level: str = field(default="INFO")
    metrics_enabled: bool = field(default=True)

    def __post_init__(self) -> None:
        self._validate()

    def _validate(self) -> None:
        """Validate configuration parameters."""
        if self.min_value >= self.max_value:
            raise ValueError("min_value must be less than max_value")
        if self.sample_size <= 0:
            raise ValueError("sample_size must be positive")
        if self.chunk_size <= 0:
            raise ValueError("chunk_size must be positive")
        if self.num_processes <= 0:
            raise ValueError("num_processes must be positive")


@dataclass(frozen=True, slots=True)
class Result:
    """Analysis result."""

    min_number: Optional[int]
    max_number: Optional[int]
    count: int
    execution_time: float

    @property
    def difference(self) -> Optional[int]:
        if self.min_number is None or self.max_number is None:
            return None
        return self.max_number - self.min_number

    def to_json(self) -> str:
        return orjson.dumps(self.__dict__).decode("utf-8")


class MemoryManager:
    """Memory management utilities."""

    @staticmethod
    def get_memory_usage() -> float:
        """Get current memory usage."""
        return psutil.Process().memory_info().rss

    @staticmethod
    def check_memory_availability(required_bytes: int) -> bool:
        """Check if enough memory is available."""
        available = psutil.virtual_memory().available
        return required_bytes <= available * MAX_MEMORY_USAGE

    @staticmethod
    def optimal_chunk_size(total_size: int, dtype_size: int) -> int:
        """Calculate optimal chunk size based on available memory."""
        available = psutil.virtual_memory().available * MAX_MEMORY_USAGE
        return min(total_size, int(available / dtype_size))


class SignalHandler:
    """Handle system signals gracefully."""

    def __init__(self) -> None:
        self.shutdown_flag = False
        signal.signal(signal.SIGINT, self._handle_shutdown)
        signal.signal(signal.SIGTERM, self._handle_shutdown)

    def _handle_shutdown(self, signum: int, frame: Any) -> None:
        self.shutdown_flag = True
        logging.warning("Shutdown signal received. Cleaning up...")


@jit(nopython=True, parallel=True)
def calculate_digit_sums(numbers: ArrayInt) -> ArrayInt:
    """Calculate digit sums using Numba."""
    result = np.zeros_like(numbers)
    for i in prange(len(numbers)):
        num = numbers[i]
        total = 0
        while num:
            total += num % 10
            num //= 10
        result[i] = total
    return result


class NumberAnalyzer:
    """Main number analysis class."""

    def __init__(self, config: Config):
        self.config = config
        # self.logger = self._setup_logger()
        self.console = Console()
        # self.signal_handler = SignalHandler()
        # self._setup_metrics() if config.metrics_enabled else None

    async def _process_chunk(
        self,
        chunk_size: int,
    ) -> tuple[Optional[int], Optional[int], int]:
        """Process a chunk of numbers asynchronously."""
        numbers = np.random.randint(
            self.config.min_value, self.config.max_value + 1, chunk_size, dtype=np.int32
        )

        digit_sums = calculate_digit_sums(numbers)
        mask = digit_sums == self.config.target_sum
        matching_numbers = numbers[mask]

        if len(matching_numbers) == 0:
            return None, None, 0

        # progress.update(task_id, advance=chunk_size)

        return matching_numbers.min(), matching_numbers.max(), len(matching_numbers)

    async def _process_chunks(self) -> Result:
        """Process all chunks of numbers."""
        start_time = asyncio.get_event_loop().time()
        chunk_size = MemoryManager.optimal_chunk_size(
            self.config.sample_size, np.dtype(np.int32).itemsize
        )

        min_number = float("inf")
        max_number = float("-inf")
        total_count = 0

        chunks = [
            self._process_chunk(chunk_size)
            for _ in range(0, self.config.sample_size, chunk_size)
        ]

        results = await asyncio.gather(*chunks)

        for chunk_min, chunk_max, count in results:
            if chunk_min is not None:
                min_number = min(min_number, chunk_min)
                max_number = max(max_number, chunk_max)
                total_count += count

            # if self.signal_handler.shutdown_flag:
            #     break

        execution_time = asyncio.get_event_loop().time() - start_time

        if total_count == 0:
            return Result(None, None, 0, execution_time)

        return Result(min_number, max_number, total_count, execution_time)

    async def analyze(self) -> Result:
        """Perform the number analysis."""
        try:
            # self.logger.info("starting_analysis", config=self.config.__dict__)
            result = await self._process_chunks()
            # self.logger.info("analysis_complete", result=result.__dict__)
            return result
        except Exception as e:
            # self.logger.error("analysis_failed", error=str(e))
            raise

    def display_results(self, result: Result) -> None:
        """Display results in a rich table format."""
        table = Table(title="Analysis Results")
        table.add_column("Metric", style="cyan")
        table.add_column("Value", style="magenta")

        if result.min_number is not None:
            table.add_row("Minimum Number", f"{result.min_number:,}")
            table.add_row("Maximum Number", f"{result.max_number:,}")
            table.add_row("Difference", f"{result.difference:,}")
        else:
            table.add_row("Result", "No numbers found")

        table.add_row("Numbers Found", f"{result.count:,}")
        table.add_row("Execution Time", f"{result.execution_time*1000:.4f}ms")

        self.console.print(table)


async def main() -> None:
    """Main execution function."""
    try:
        config = Config(
            min_value=1, max_value=100_000, sample_size=1_000_000, target_sum=30
        )

        analyzer = NumberAnalyzer(config)
        result = await analyzer.analyze()
        analyzer.display_results(result)

        # Save results
        output_path = Path("results.json")
        output_path.write_text(result.to_json())

    except Exception as e:
        logging.error(f"Fatal error: {e}", exc_info=True)
        sys.exit(1)


config = Config(min_value=1, max_value=100_000, sample_size=1_000_000, target_sum=30)
na = NumberAnalyzer(config)

In [29]:
result = await na.analyze()
na.display_results(result)

Calculate the execution times in a loop, then average.


In [47]:
results = []
for _ in range(1000):
    int_result = await na.analyze()
    results.append(int_result.execution_time)

print(np.array(results).mean() * 1000)
na.display_results(int_result)

6.59027477609925


# Prompt Engineered


## Implementation #1


In [48]:
from numba import jit


@jit(nopython=True)
def pe_1_digit_sum(n):
    total = 0
    while n:
        total += n % 10
        n //= 10
    return total


@jit(nopython=True)
def pe_1_find_difference(numbers):
    min_num = float("inf")
    max_num = float("-inf")

    for num in numbers:
        sum_digits = pe_1_digit_sum(num)
        if sum_digits == 30:
            min_num = min(min_num, num)
            max_num = max(max_num, num)

    return max_num - min_num if max_num != float("-inf") else 0

In [49]:
pe_1_find_difference(test_array)

62910.0

In [50]:
%%timeit -n 1 -r 100

input_numbers = np.random.randint(1, 100_000, 1_000_000, dtype=np.int32)
pe_1_find_difference(input_numbers)

11.2 ms ± 456 μs per loop (mean ± std. dev. of 100 runs, 1 loop each)


## Implementation #2

The use of bit-shifting here leads to incorrect results, but benchmark performed for posterity.

Because the script uses `multiprocessing`, the code [cannot be included in a notebook](https://stackoverflow.com/questions/15900366/all-example-concurrent-futures-code-is-failing-with-brokenprocesspool) and must be benchmarked remotely. Therefore, the script logic is included in `scripts/prompt_engineered_2.py` with edits.

Average runtime is **72ms**.


## Implementation #3

The use of bit-shifting here leads to incorrect results, but benchmark performed for posterity.

`prange` does not support both `parallel` and a `step_size > 1`, so `parallel = False`.


In [84]:
import numpy as np
from numba import jit, uint32, uint8, prange
import os

# Pre-computed lookup table using bit manipulation
LOOKUP = np.zeros(100001, dtype=np.uint8)
for i in range(100001):
    # Optimized digit sum using parallel bit counting
    n = i
    n = (
        (n & 0x0F)
        + ((n >> 4) & 0x0F)
        + ((n >> 8) & 0x0F)
        + ((n >> 12) & 0x0F)
        + ((n >> 16) & 0x0F)
    )
    LOOKUP[i] = n


@jit(nopython=True, parallel=False, cache=True, fastmath=True)
def pe_3_find_min_max(numbers):
    # Process 32 numbers at once using SIMD
    min_val = np.iinfo(np.uint32).max
    max_val = 0

    # Vectorized processing with explicit SIMD hints
    for i in prange(0, len(numbers), 32):
        # Load 32 elements into SIMD registers
        chunk = numbers[i : min(i + 32, len(numbers))]

        # Vectorized lookup and comparison
        sums = LOOKUP[chunk]
        mask = sums == 30

        if np.any(mask):
            valid_nums = chunk[mask]
            min_val = min(min_val, np.min(valid_nums))
            max_val = max(max_val, np.max(valid_nums))

    return min_val, max_val


def pe_3():
    # Optimize CPU affinity and thread count
    # physical_cores = len(os.sched_getaffinity(0))
    os.environ["NUMBA_NUM_THREADS"] = str(os.cpu_count())

    # Generate aligned array for SIMD
    numbers = np.random.randint(1, 100001, size=1_000_000, dtype=np.uint32)
    numbers = np.ascontiguousarray(numbers)

    # Single pass computation
    min_val, max_val = pe_3_find_min_max(numbers)

    return max_val - min_val if max_val > 0 else 0

In [85]:
find_min_max(test_array)

(11535, 99300)

In [87]:
%%timeit -n 1 -r 1000

pe_3()

10.1 ms ± 648 μs per loop (mean ± std. dev. of 1000 runs, 1 loop each)


## Implementation #4

The use of bit-shifting here leads to incorrect results, but benchmark performed for posterity.


In [90]:
import time

import numpy as np
from numba import jit


# Generate hash table at module load time using bit manipulation
@jit(nopython=True, cache=True)
def init_hash_table():
    HASH_TABLE = np.zeros(100001, dtype=np.uint8)
    min_val = np.iinfo(np.uint32).max
    max_val = 0

    # Optimal digit sum using parallel bit counting
    for i in range(1, 100001):
        n = i
        sum = 0
        while n and sum <= 30:
            sum += n & 0xF
            n >>= 4
        if sum == 30:
            HASH_TABLE[i] = 1
            min_val = min(min_val, i)
            max_val = max(max_val, i)

    return min_val, max_val, HASH_TABLE


@jit(nopython=True, parallel=False, cache=True, fastmath=True)
def pe_4_find_min_max(numbers):
    MIN_VALID, MAX_VALID, HASH_TABLE = init_hash_table()
    min_val = MAX_VALID  # Start with known bounds
    max_val = MIN_VALID
    found = False

    # Single vectorized operation
    mask = HASH_TABLE[numbers] == 1
    if np.any(mask):
        valid_nums = numbers[mask]
        min_val = np.min(valid_nums)
        max_val = np.max(valid_nums)
        found = True

    return min_val, max_val, found


def pe_4():
    # Generate numbers with optimal memory layout
    numbers = np.random.randint(1, 100001, size=1_000_000, dtype=np.uint32)

    # Single vectorized lookup
    min_val, max_val, found = pe_4_find_min_max(numbers)

    return max_val - min_val if found else 0

In [91]:
pe_4()

99735

In [92]:
%%timeit -n 1 -r 1000

pe_4()

6.59 ms ± 347 μs per loop (mean ± std. dev. of 1000 runs, 1 loop each)


## Implementation #5


In [93]:
import numpy as np
from numba import jit, uint32
import os

# Pre-computed perfect minimal hash table for numbers with decimal digit sum 30


# Generate hash table at module load time with correct decimal digit sum
@jit(nopython=True, cache=True)
def pe_5_init_hash_table():
    HASH_TABLE = np.zeros(100001, dtype=np.uint8)
    min_val = np.iinfo(np.uint32).max
    max_val = 0

    # Correct decimal digit sum calculation
    for i in range(1, 100001):
        n = i
        sum = 0
        while n:
            sum += n % 10  # Correct decimal digit extraction
            n //= 10
            if sum > 30:  # Early termination optimization
                break
        if sum == 30:
            HASH_TABLE[i] = 1
            min_val = min(min_val, i)
            max_val = max(max_val, i)

    return min_val, max_val, HASH_TABLE


# Initialize at module load time


@jit(nopython=True, parallel=False, cache=True, fastmath=True)
def pe_5_find_min_max(numbers):
    MIN_VALID, MAX_VALID, HASH_TABLE = pe_5_init_hash_table()
    min_val = MAX_VALID
    max_val = MIN_VALID
    found = False

    # Single vectorized operation
    mask = HASH_TABLE[numbers] == 1
    if np.any(mask):
        valid_nums = numbers[mask]
        min_val = np.min(valid_nums)
        max_val = np.max(valid_nums)
        found = True

    return min_val, max_val, found


def pe_5():
    numbers = np.random.randint(1, 100001, size=1_000_000, dtype=np.uint32)

    min_val, max_val, found = pe_5_find_min_max(numbers)

    return max_val - min_val if found else 0

In [95]:
find_results = pe_5_find_min_max(test_array)
print(find_results)
print(find_results[1] - find_results[0])

(23898, 86808, True)
62910


Results appear to be more unstable so need lots more trials.


In [101]:
%%timeit -n 1 -r 2000

pe_5()

6.89 ms ± 505 μs per loop (mean ± std. dev. of 2000 runs, 1 loop each)
