# Task 2. Performance Comparison of `HyperLogLog` with Exact Unique Element Counting

Create a script to compare exact unique element counting with counting using `HyperLogLog`.

In [1]:
"""Module for comparing exact unique counting vs. HyperLogLog on IP logs.

This notebook provides:
  - Loading IPs from a log file.
  - Exact unique counting with a set using parallel processing.
  - Approximate unique counting with HyperLogLog.
  - A performance comparison including execution time, memory usage, and error.

Follows Google Python Style Guide:
  https://google.github.io/styleguide/pyguide.html
"""

# Standard library imports
from __future__ import annotations

import math
import multiprocessing as mp
import re
import sys
import time
import tempfile
import os
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from typing import Iterator, List, Optional, Set
import random
import ipaddress
from datetime import datetime, timedelta
from pathlib import Path

# Third-party imports
import mmh3
import pandas as pd

IP_PATTERN = re.compile(r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})")

## Implementation

In [2]:
class HyperLogLog:
    """A HyperLogLog implementation for cardinality estimation.

    This class provides a memory-efficient way to estimate the number of unique
    elements in a large dataset.
    """

    def __init__(self, p: int = 14):
        """Initializes the HyperLogLog structure.

        Args:
            p: The precision parameter, which defines the number of registers as m = 2^p.
               A higher 'p' increases accuracy at the cost of memory.
               Valid range is 4 <= p <= 16.
        """
        if not 4 <= p <= 16:
            raise ValueError("Precision 'p' must be between 4 and 16.")
        self.p: int = p
        self.m: int = 1 << p  # Number of registers, e.g., 2^14 = 16384
        self.registers: List[int] = [0] * self.m
        self.alpha: float = self._get_alpha()

    def _get_alpha(self) -> float:
        """Calculates the alpha constant for bias correction."""
        if self.m == 16: # p=4
            return 0.673
        if self.m == 32: # p=5
            return 0.697
        if self.m == 64: # p=6
            return 0.709
        # General formula for other values of m
        return 0.7213 / (1 + 1.079 / self.m)

    def add(self, item: str) -> None:
        """Adds an item to the HyperLogLog counter.

        Args:
            item: The item to add to the set.
        """
        hashed_item: int = mmh3.hash(str(item), signed=False)
        register_index: int = hashed_item & (self.m - 1)
        bits_for_rho: int = hashed_item >> self.p

        if bits_for_rho == 0:
            rho: int = 32 - self.p + 1
        else:
            rho: int = (32 - self.p) - bits_for_rho.bit_length() + 1

        self.registers[register_index] = max(self.registers[register_index], rho)

    def count(self) -> float:
        """Estimates the cardinality of the set.
        
        Returns:
            Estimated number of unique elements.
        """
        Z: float = 1.0 / sum(2.0 ** -register for register in self.registers)
        raw_estimate: float = self.alpha * (self.m**2) * Z

        if raw_estimate <= 2.5 * self.m:
            zero_registers: int = self.registers.count(0)
            if zero_registers > 0:
                return self.m * math.log(self.m / zero_registers)

        return raw_estimate
    def merge(self, other: 'HyperLogLog') -> None:
        """Merges another HyperLogLog instance into this one.

        Args:
            other: Another HyperLogLog instance to merge.
        
        Raises:
            ValueError: If the 'p' parameters of the two instances do not match.
        """
        if self.p != other.p:
            raise ValueError("To merge HyperLogLog instances, the 'p' parameters must be the same.")

        for i in range(self.m):
            self.registers[i] = max(self.registers[i], other.registers[i])

In [3]:
def process_chunk_hll(chunk_data: tuple[Path, int, int]) -> HyperLogLog:
    """Processes a chunk of the file and returns a HyperLogLog instance for that chunk.
    
    Args:
        chunk_data: A tuple containing the file path, start byte, and end byte.
    
    Returns:
        A HyperLogLog instance with the processed data from the chunk.
    """
    file_path, start, end = chunk_data
    hll: HyperLogLog = HyperLogLog(p=14)
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            f.seek(start)
            if start != 0:
                f.readline()

            while f.tell() < end:
                line: str = f.readline()
                if not line:
                    break
                match: Optional[re.Match] = IP_PATTERN.search(line)
                if match:
                    hll.add(match.group(1))
    except Exception as e:
        print(f"Error processing file chunk for HLL: {e}")
    return hll


def get_hll_count_parallel(file_path: Path, num_workers: int) -> float:
    """Estimates cardinality using HLL in parallel.
    
    Args:
        file_path: Path to the log file.
        num_workers: Number of parallel workers to use.
    
    Returns:
        Estimated number of unique elements using HyperLogLog.
    """
    file_size: int = file_path.stat().st_size
    chunk_size: int = file_size // num_workers

    chunks: list[tuple[Path, int, int]] = []
    for i in range(num_workers):
        start: int = i * chunk_size
        end: int = (i + 1) * chunk_size if i < num_workers - 1 else file_size
        chunks.append((file_path, start, end))

    final_hll: HyperLogLog = HyperLogLog(p=14)
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results = executor.map(process_chunk_hll, chunks)

        for hll_chunk in results:
            final_hll.merge(hll_chunk)

    return final_hll.count()

In [4]:
def get_ips_from_log_stream(file_path: Path) -> Iterator[str]:
    """Extracts IP addresses from a log file by searching each line.
    
    Args:
        file_path: Path to the log file.
    
    Yields:
        IP addresses found in the log file.
    """
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                match: Optional[re.Match] = IP_PATTERN.search(line)
                if match:
                    yield match.group(1)
    except FileNotFoundError:
        return

def process_chunk(chunk_data: tuple[Path, int, int]) -> set[str]:
    """Processes a chunk of a log file to extract unique IP addresses.
    
    Args:
        chunk_data: A tuple containing (file_path, start_byte, end_byte).
    
    Returns:
        A set of unique IP addresses found in the chunk.
    """
    file_path, start, end = chunk_data
    unique_ips: Set[str] = set()
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            f.seek(start)
            if start != 0:
                f.readline()

            while f.tell() < end:
                line: str = f.readline()
                if not line:
                    break
                match: Optional[re.Match] = IP_PATTERN.search(line)
                if match:
                    unique_ips.add(match.group(1))
    except Exception as e:
        print(f"Error processing chunk: {e}")
    return unique_ips

def get_unique_ips_parallel(file_path: Path, num_workers: int) -> set[str]:
    """Extracts unique IPs from a log file using parallel processing.
    
    Args:
        file_path: Path to the log file.
        num_workers: Number of parallel workers to use.
    
    Returns:
        A set of unique IP addresses found in the log file.
    """
    file_size: int = file_path.stat().st_size
    chunk_size: int = file_size // num_workers
    
    chunks = []
    for i in range(num_workers):
        start: int = i * chunk_size
        end: int = (i + 1) * chunk_size if i < num_workers - 1 else file_size
        chunks.append((file_path, start, end))

    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        results: Iterator[set[str]] = executor.map(process_chunk, chunks)

        total_unique_ips: Set[str] = set()
        for ip_set in results:
            total_unique_ips.update(ip_set)

    return total_unique_ips

### Additional generating large log file

In [5]:
def generate_random_ip() -> str:
    """Generates a random IPv4 address."""
    return str(ipaddress.IPv4Address(random.randint(0, 2**32 - 1)))


def generate_log_entry(ip_address: str) -> str:
    """Creates a single log line."""
    timestamp: str = (datetime.now() - timedelta(seconds=random.randint(0, 86400))).strftime(
        "%d/%b/%Y:%H:%M:%S %z"
    )
    methods: List[str] = ["GET", "POST", "PUT", "DELETE"]
    paths: List[str] = [
        "/index.html",
        "/api/data",
        "/assets/style.css",
        "/images/logo.png",
        "/admin/login",
    ]
    statuses: List[int] = [200, 201, 404, 500, 302]
    user_agents: List[str] = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
        "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0",
    ]

    return f'{ip_address} - - [{timestamp}] "{random.choice(methods)} {random.choice(paths)} HTTP/1.1" {random.choice(statuses)} {random.randint(100, 5000)} "-" "{random.choice(user_agents)}"'


def generate_large_log_file(file_path: Path, num_unique_ips: int = 100_000, num_log_entries: int = 1_000_000) -> None:
    """Generates a large log file with random data.

    Args:
        file_path: The path to the file for saving logs.
        num_unique_ips: The number of unique IP addresses.
        num_log_entries: The total number of log entries.
    """
    print(f"Generating {num_unique_ips} unique IP addresses...")
    unique_ips: List[str] = [generate_random_ip() for _ in range(num_unique_ips)]

    print(f"Generating {num_log_entries} log entries into file '{file_path}'...")
    with open(file_path, "w") as f:
        for i in range(num_log_entries):
            if random.random() < 0.8:
                ip: str = random.choice(unique_ips)
            else:
                ip: str = generate_random_ip()

            log_entry: str = generate_log_entry(ip)
            f.write(log_entry + "\n")
            if (i + 1) % (num_log_entries // 10) == 0:
                print(f"  ...generated {i + 1} entries")

    print("Log file generation complete.")


def test_large_dataset(num_unique_ips: int = 150_000, num_log_entries: int = 2_000_000):
    """Runs testing on a large dataset using a temporary file.

    Args:
        num_unique_ips: Number of unique IP addresses to generate.
        num_log_entries: Total number of log entries to generate.
    """

    temp_dir: str = tempfile.gettempdir()
    log_file_path: Path = Path(temp_dir) / "large_access_temp.log"

    try:
        generate_large_log_file(log_file_path, num_unique_ips, num_log_entries)

        print("\nStarting parallel processing on the large log file...")

        num_workers: int = mp.cpu_count()

        # Paralel precise counting
        start_time_exact: float = time.time()
        unique_ips_set: set[str] = get_unique_ips_parallel(log_file_path, num_workers)
        exact_count: int = len(unique_ips_set)
        exact_time: float = time.time() - start_time_exact

        # Paralel HLL counting
        start_time_hll: float = time.time()
        hll_count: float = get_hll_count_parallel(log_file_path, num_workers)
        hll_time: float = time.time() - start_time_hll

        if exact_count > 0:
            error: float = abs(exact_count - hll_count) / exact_count * 100
            exact_mem: int = sys.getsizeof(unique_ips_set)
            # memory used by HLL registers remains fixed
            hll_mem: int = sys.getsizeof(HyperLogLog(p=14).registers)

            results: dict[str, list[str | float | int]] = {
                "Method": ["Exact Count (Parallel)", "HyperLogLog (Parallel)"],
                "Unique Elements": [f"{exact_count}", f"{int(hll_count)}"],
                "Execution Time (s)": [f"{exact_time:.4f}", f"{hll_time:.4f}"],
                "Memory Usage (bytes)": [f"{exact_mem}", f"{hll_mem}"],
                "Error": ["0.00%", f"{error:.2f}%"],
            }

            df: pd.DataFrame = pd.DataFrame(results)
            print("\nComparison Results:")
            display(df)
        else:
            print(
                "\nCould not find any IP addresses. Please verify the log file format and the IP_PATTERN regex."
            )

    finally:
        if os.path.exists(log_file_path):
            os.remove(log_file_path)
            print(f"\nTemporary file '{log_file_path}' has been deleted.")

## Results

In [6]:
def main():
    LOG_FILE_PATH = Path("../data/lms-stage-access.log")

    num_workers: int = mp.cpu_count()
    start_time_exact: float = time.time()
    unique_ips_set: set[str] = get_unique_ips_parallel(LOG_FILE_PATH, num_workers)
    exact_count: int = len(unique_ips_set)
    exact_time: float = time.time() - start_time_exact

    hll = HyperLogLog(p=14)
    start_time_hll: float = time.time()
    ip_stream: Iterator[str] = get_ips_from_log_stream(LOG_FILE_PATH)
    for ip in ip_stream:
        hll.add(ip)
    hll_count: float = hll.count()
    hll_time: float = time.time() - start_time_hll

    if exact_count > 0:
        error: float = abs(exact_count - hll_count) / exact_count * 100
        exact_mem: int = sys.getsizeof(unique_ips_set)
        hll_mem: int = sys.getsizeof(hll.registers)

        results: dict[str, list[str | float | int]] = {
            "Method": ["Exact Count (Set)", "HyperLogLog"],
            "Unique Elements": [f"{exact_count}", f"{int(hll_count)}"],
            "Execution Time (s)": [f"{exact_time:.4f}", f"{hll_time:.4f}"],
            "Memory Usage (bytes)": [f"{exact_mem}", f"{hll_mem}"],
            "Error": ["0.00%", f"{error:.2f}%"],
        }

        df: pd.DataFrame = pd.DataFrame(results)
        print("\nComparison Results:")
        display(df)
    else:
        print("\nCould not find any IP addresses. Please verify the log file format and the IP_PATTERN regex.")

if __name__ == "__main__":
    # Main task
    main()
    # Additional test on the large dataset
    test_large_dataset(num_unique_ips=10_500_000, num_log_entries=200_000_000)


Comparison Results:


Unnamed: 0,Method,Unique Elements,Execution Time (s),Memory Usage (bytes),Error
0,Exact Count (Set),28,0.0786,1240,0.00%
1,HyperLogLog,28,0.0665,131128,0.09%


Generating 10500000 unique IP addresses...
Generating 200000000 log entries into file '/tmp/large_access_temp.log'...
  ...generated 20000000 entries
  ...generated 40000000 entries
  ...generated 60000000 entries
  ...generated 80000000 entries
  ...generated 100000000 entries
  ...generated 120000000 entries
  ...generated 140000000 entries
  ...generated 160000000 entries
  ...generated 180000000 entries
  ...generated 200000000 entries
Log file generation complete.

Starting parallel processing on the large log file...

Comparison Results:


Unnamed: 0,Method,Unique Elements,Execution Time (s),Memory Usage (bytes),Error
0,Exact Count (Parallel),50211427,86.6692,2147483864,0.00%
1,HyperLogLog (Parallel),50616234,76.3209,131128,0.81%



Temporary file '/tmp/large_access_temp.log' has been deleted.
