# Assignment 1 - Algorithms and Data Structures Analysis

## Overview

This notebook benchmarks Union–Find (four variants) and 3Sum (three variants) using `%timeit` with closure-based harnesses, plots time vs. input size on log–log scales, and applies power-law curve fitting to 3Sum (with an optional large-N fit for Two Pointers). 

## Summary

This notebook tests and visualizes the performance of:

**UnionFind Algorithms** (4 implementations):
- **Quick Find**: O(1) connected, O(N) union - simple but inefficient for many unions
- **Quick Union**: O(N) connected, O(1) union - better for sparse operations
- **Weighted Quick Union**: O(log N) connected, O(log N) union - balanced approach with tree size tracking
- **Weighted Quick Union with Path Compression**: O(α(N)) amortized - optimal performance
- Tested with varying input sizes (1K-100K elements) and proportional operations

**3Sum Algorithms** (3 implementations):
- **Brute Force**: O(N³) - checks all possible triplets
- **Optimized Two Pointers**: O(N²) - sorts array and uses two-pointer technique
- **Hash Set**: O(N²) - uses hash table for constant-time lookups
- Tested with different array sizes (80-8K elements)

**Analysis Methodology**:
- Performance plots comparing execution times across input sizes
- Log-log scale plots to verify Big O complexity slopes (slope = complexity exponent)
- Statistical timing measurements using %timeit for accuracy
- Complexity verification through slope analysis

In [None]:
import sys

# Add src directory to path to import custom algorithm implementations
sys.path.append("../src")

import random

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from scipy.optimize import curve_fit

# Import custom algorithm implementations
from threesum import (
    generate_test_data,
    three_sum_brute_force,
    three_sum_optimized,
    three_sum_optimized_with_hash,
)
from unionfind import (
    QuickFind,
    QuickUnion,
    WeightedQuickUnion,
    WeightedQuickUnionPathCompression,
)

# Configure plotting style for pretty-looking graphs
plt.style.use("seaborn-v0_8")
sns.set_palette("husl")

# Set random seeds for reproducible results across runs for consistency
random.seed(42)
np.random.seed(42)

print("Libraries imported successfully!")
print(f"Python version: {sys.version}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"Matplotlib version: {matplotlib.__version__}")

In [None]:
# Helper functions for performance testing and data generation


def setup_unionfind_test(uf_class, n: int, operations: list[tuple[int, int]]):
    """
    Setup UnionFind instance and operations for %timeit testing.

    This function creates a closure that contains the UnionFind instance
    and all operations to be performed, allowing %timeit to measure
    just the algorithm execution time without setup overhead.
    """
    uf = uf_class(n)

    def run_operations():
        for p, q in operations:
            uf.union(p, q)

    return run_operations


def setup_unionfind_connected_test(
    uf_class, n: int, union_ops: list[tuple[int, int]], connected_ops: list[int]
):
    """
    Setup UnionFind instance with pre-existing unions for connected operation testing.

    This function creates a UnionFind structure with some connections already
    established, then measures the performance of connected operations.
    """
    uf = uf_class(n)

    # Pre-populate with union operations to create a realistic structure
    for p, q in union_ops:
        uf.union(p, q)

    def run_connected_operations():
        for element in connected_ops:
            uf.find(element)

    return run_connected_operations


def generate_unionfind_operations(n: int, num_operations: int) -> list[tuple[int, int]]:
    """
    Generate random union operations for testing.

    Creates a list of random (p, q) pairs where both p and q are
    valid indices in the range [0, n-1]. The number of operations
    is typically proportional to n to ensure meaningful performance
    comparisons across different input sizes.
    """
    operations = []
    for _ in range(num_operations):
        p = random.randint(0, n - 1)
        q = random.randint(0, n - 1)
        operations.append((p, q))
    return operations


def generate_connected_operations(n: int, num_operations: int) -> list[int]:
    """
    Generate random connected operations for testing.

    Creates a list of random element indices for connected operations.
    These represent realistic queries about which component an element belongs to.
    """
    operations = []
    for _ in range(num_operations):
        element = random.randint(0, n - 1)
        operations.append(element)
    return operations


def setup_threesum_test(func, nums: list[int]):
    """
    Setup 3Sum function for %timeit testing.

    Creates a closure that contains the test data and function
    to be tested, allowing %timeit to measure just the algorithm
    execution time without data generation overhead.

    Uses value-based deduplication to avoid inflated result counts.
    """

    def run_threesum():
        return func(nums, return_values=True)

    return run_threesum


print("Performance measurement functions defined!")

In [None]:
# =============================================================================
# UnionFind Performance Analysis - Union and Connected Operations
# =============================================================================
# This section tests the performance of four different UnionFind implementations
# across varying input sizes to verify their theoretical time complexities.
# It measures UNION and CONNECTED operations separately.

print("=== UnionFind Performance Analysis ===")

# Test parameters: Input sizes from 1K to 100K elements
# These sizes are chosen to show clear performance differences between algorithms
# and allow for meaningful slope analysis in log-log plots
n_values = [1000, 5000, 10000, 50000, 100000]

# UnionFind algorithms to test - ordered from least to most efficient
uf_algorithms = [
    ("Quick Find", QuickFind),  # O(1) connected, O(N) union
    ("Quick Union", QuickUnion),  # O(N) connected, O(1) union
    ("Weighted Quick Union", WeightedQuickUnion),  # O(log N) connected, O(log N) union
    (
        "Weighted Quick Union with Path Compression",
        WeightedQuickUnionPathCompression,
    ),  # O(α(N)) amortized
]

# Store performance results for analysis and visualization
uf_union_results = []
uf_connected_results = []

# Test each algorithm with each input size
for n in n_values:
    # Scale operations proportionally to N
    # 0.9 ratio ensures sufficient operations while avoiding complete connectivity
    # This creates a realistic workload where most elements get connected
    num_operations = int(0.9 * n)
    print(f"\nTesting with N = {n}, Operations = {num_operations}")

    # Generate consistent operations for both union and connected tests
    union_operations = generate_unionfind_operations(n, num_operations)
    connected_operations = generate_connected_operations(n, num_operations)

    # Test each UnionFind implementation with the same set of operations
    for name, uf_class in uf_algorithms:
        # UNION operation testing
        union_test_func = setup_unionfind_test(uf_class, n, union_operations)
        union_result = %timeit -q -o union_test_func() # pyright: ignore

        # Store union results
        uf_union_results.append(
            {
                "Algorithm": name,
                "Operation": "Union",
                "N": n,
                "Operations": num_operations,
                "Time (s)": union_result.best,
                "Average (s)": union_result.average,
            }
        )

        # CONNECTED operation testing (with pre-established structure)
        connected_test_func = setup_unionfind_connected_test(
            uf_class, n, union_operations, connected_operations
        )
        connected_result = %timeit -q -o connected_test_func() # pyright: ignore

        # Store connected results
        uf_connected_results.append(
            {
                "Algorithm": name,
                "Operation": "Connected",
                "N": n,
                "Operations": num_operations,
                "Time (s)": connected_result.best,
                "Average (s)": connected_result.average,
            }
        )

        print(f"  {name}:")
        print(
            f"    Union:     {union_result.best:.6f}s (best), "
            f"{union_result.average:.6f}s (avg)"
        )
        print(
            f"    Connected: {connected_result.best:.6f}s (best), "
            f"{connected_result.average:.6f}s (avg)"
        )

# Combine results for easier analysis
uf_results = uf_union_results + uf_connected_results

print(f"\nUnionFind analysis completed! {len(uf_results)} measurements taken.")
print(f"Union measurements: {len(uf_union_results)}")
print(f"Connected measurements: {len(uf_connected_results)}")

In [None]:
# =============================================================================
# 3Sum Performance Analysis
# =============================================================================
# This section tests three different approaches to the 3Sum problem:
# 1. Brute Force: O(N³) - checks all possible triplets
# 2. Optimized Two Pointers: O(N²) - sorts array and uses two-pointer technique
# 3. Hash Set: O(N²) - uses hash table for constant-time lookups
#
# NOTE: All algorithms use value-based deduplication (return_values=True)
# to avoid inflated result counts from duplicate values in random test data.
# The measurements include the complete "find" operation including any setup
# like sorting, as this is part of the overall algorithmic solution.

print("=== 3Sum Performance Analysis ===")

# Test parameters for 3Sum algorithms
# Different array sizes are used for different algorithms based on their complexity:
# - Brute force: smaller sizes (80-400) due to O(N³) complexity
# - Optimized algorithms: larger sizes (500-8000) due to O(N²) complexity
array_sizes_brute = [80, 120, 200, 300, 400]  # Smaller sizes for O(N³) algorithm
array_sizes_optimized = [
    500,
    1000,
    2000,
    5000,
    8000,
]  # Larger sizes for O(N²) algorithms

# 3Sum algorithms to test - ordered by theoretical efficiency
threesum_algorithms = [
    ("Brute Force", three_sum_brute_force),  # O(N³) - checks all triplets
    ("Optimized Two Pointers", three_sum_optimized),  # O(N²) - two-pointer technique
    ("Hash Set", three_sum_optimized_with_hash),  # O(N²) - hash table approach
]

# Store performance results for analysis and visualization
threesum_results = []

# Test brute force algorithm with smaller array sizes
# Due to O(N³) complexity, we use smaller sizes to keep execution time reasonable
print("\n--- Testing Brute Force Find Operations with smaller sizes ---")
for size in array_sizes_brute:
    print(f"\nTesting Brute Force find triplets with array size = {size}")

    # Generate random test data for this array size
    test_data = generate_test_data(size)

    name, func = threesum_algorithms[0]  # Brute Force algorithm
    test_func = setup_threesum_test(func, test_data)

    # Run algorithm once to get solution count and verify correctness
    sample_result = func(test_data, return_values=True)
    result = %timeit -q -o test_func()  # pyright: ignore

    # Store results including number of solutions found
    threesum_results.append(
        {
            "Algorithm": name,
            "Operation": "Find",
            "Array Size": size,
            "Time (s)": result.best,
            "Average (s)": result.average,
            "Solutions Found": len(sample_result),
        }
    )
    print(
        f"  {name}: {result.best:.6f}s (best), "
        f"{result.average:.6f}s (avg), {len(sample_result)} unique value triplets"
    )

# Test optimized algorithms with larger array sizes
print("\n--- Testing Optimized Find Operations with larger sizes ---")
for size in array_sizes_optimized:
    print(f"\nTesting find triplets with array size = {size}")

    # Generate random test data for this array size
    test_data = generate_test_data(size)

    # Test both optimized algorithms (skip brute force)
    for name, func in threesum_algorithms[1:]:  # Skip brute force
        test_func = setup_threesum_test(func, test_data)

        # Run algorithm once to get solution count and verify correctness
        sample_result = func(test_data, return_values=True)
        result = %timeit -q -o test_func()  # pyright: ignore

        # Store results including number of solutions found
        threesum_results.append(
            {
                "Algorithm": name,
                "Operation": "Find",
                "Array Size": size,
                "Time (s)": result.best,
                "Average (s)": result.average,
                "Solutions Found": len(sample_result),
            }
        )
        print(
            f"  {name}: {result.best:.6f}s (best), "
            f"{result.average:.6f}s (avg), {len(sample_result)} unique value triplets"
        )

print(
    f"\n3Sum find triplets analysis completed! "
    f"{len(threesum_results)} measurements taken."
)

In [None]:
# =============================================================================
# Power-Law Curve Fitting - Simple Constants Extraction
# =============================================================================

# Convert results to DataFrames for easier analysis
threesum_df = pd.DataFrame(threesum_results)

def power_law(x, a, b):
    return a * (x**b)

def power_law_with_offset(x, a, b, c):
    """Power law with constant offset: c + a * x^b"""
    return c + a * (x**b)

print("=== Power-Law Constants: time ≈ a·N^b ===\n")

# Store fitting results for overlay plotting
fitted_params = {}

MIN_FITTING_POINTS = 2  # Minimum data points required for curve fitting
MIN_LARGE_POINTS = 3  # Minimum points for large-size fitting strategy

for algorithm in threesum_df["Algorithm"].unique():
    data = threesum_df[threesum_df["Algorithm"] == algorithm].sort_values("Array Size")
    print(f"\nFitting {algorithm}:")
    print(f"  Data points: {len(data)}")
    print(f"  Size range: {data['Array Size'].min()}-{data['Array Size'].max()}")
    print(f"  Time range: {data['Time (s)'].min():.6f}-{data['Time (s)'].max():.6f}")

    # Show actual data points to understand the trend
    for _, row in data.iterrows():
        print(f"    Size {row['Array Size']:4d}: {row['Time (s)']:.6f}s")

    if len(data) >= MIN_FITTING_POINTS:
        # Try simple power law first
        try:
            (a, b), _ = curve_fit(power_law, data["Array Size"], data["Time (s)"])
            simple_fit = (a, b)
            print(f"  Simple fit: a={a:.2e}, b={b:.2f}")
        except Exception as e:
            print(f"  Simple fitting failed: {e}")
            simple_fit = None

        # For Two Pointers, try fitting only larger sizes where scaling is clear
        if "Two Pointers" in algorithm and len(data) >= MIN_LARGE_POINTS:
            try:
                # Use only the largest 3 data points where true scaling emerges
                large_data = data.tail(MIN_LARGE_POINTS)
                print(
                    f"  Trying fit on large sizes only: "
                    f"{large_data['Array Size'].tolist()}"
                )
                (a_large, b_large), _ = curve_fit(
                    power_law, large_data["Array Size"], large_data["Time (s)"]
                )
                print(f"  Large-size fit: a={a_large:.2e}, b={b_large:.2f}")

                # Use the large-size fit if it has better scaling behavior
                if b_large > b if simple_fit else True:
                    fitted_params[algorithm] = (a_large, b_large, "large_only")
                    print("  → Using large-size fit (better scaling)")
                elif simple_fit:
                    fitted_params[algorithm] = (a, b, "simple")
                    print("  → Using simple fit")
            except Exception as e:
                print(f"  Large-size fitting failed: {e}")
                if simple_fit:
                    fitted_params[algorithm] = (a, b, "simple")
        elif simple_fit:
            fitted_params[algorithm] = (a, b, "simple")

print(f"\nSuccessfully fitted {len(fitted_params)} algorithms")

In [None]:
# =============================================================================
# Performance Visualization with Union and Connected Operations
# =============================================================================

print("=== Creating Performance Visualizations ===")

# Convert results to Pandas DataFrames for easier plotting and analysis
uf_df = pd.DataFrame(uf_results)
threesum_df = pd.DataFrame(threesum_results)

# Slope verification for complexity analysis
print("\n=== Slope Analysis for Complexity Verification ===")

# Styling constants for consistent and professional appearance
MIN_DATA_POINTS = 2  # Minimum data points required for slope analysis
LARGE_SIZE_TAIL = 3  # Number of largest points to use for large-size fits

# Plot styling constants
FIGURE_SIZE = (14, 16)  # Figure size for better readability
LINE_WIDTH = 2  # Data line width
FIT_LINE_WIDTH = 2  # Fitted curve line width
FIT_ALPHA = 0.7  # Fitted curve transparency
MARKER_SIZE = 6  # Consistent marker size
GRID_ALPHA = 0.3  # Grid transparency
LEGEND_FONTSIZE = 9  # Legend font size
TITLE_FONTSIZE = 12  # Title font size
LABEL_FONTSIZE = 10  # Axis label font size
FIT_POINTS = 100  # Number of points for smooth fitted curves

# Common plotting parameters to avoid repetition
PLOT_PARAMS = {
    "markersize": MARKER_SIZE,
    "linewidth": LINE_WIDTH,
    "markeredgewidth": 0.5,
    "markeredgecolor": "white",
}

# UnionFind slope analysis for both operations
print("\nUnionFind Complexity Verification:")
for operation in ["Union", "Connected"]:
    print(f"\n{operation} Operations:")
    operation_data = uf_df[uf_df["Operation"] == operation]
    for algorithm in operation_data["Algorithm"].unique():
        data = operation_data[operation_data["Algorithm"] == algorithm].sort_values("N")
        if len(data) >= MIN_DATA_POINTS:
            log_n = np.log10(data["N"].values)
            log_time = np.log10(data["Time (s)"].values)
            slope = (log_time[-1] - log_time[0]) / (log_n[-1] - log_n[0])
            print(f"  {algorithm}: slope = {slope:.2f}")

# 3Sum slope analysis
print("\n3Sum Find Operations Complexity Verification:")
for algorithm in threesum_df["Algorithm"].unique():
    data = threesum_df[threesum_df["Algorithm"] == algorithm].sort_values("Array Size")
    if len(data) >= MIN_DATA_POINTS:
        log_size = np.log10(data["Array Size"].values)
        log_time = np.log10(data["Time (s)"].values)
        slope = (log_time[-1] - log_time[0]) / (log_size[-1] - log_size[0])
        print(f"  {algorithm}: slope = {slope:.2f}")

# Create figure
plt.figure(figsize=FIGURE_SIZE)

# Define consistent color palette and styles for each algorithm
colors = sns.color_palette("husl", n_colors=4)
uf_styles = {
    "Quick Find": {"color": colors[0], "marker": "o", "linestyle": "-"},
    "Quick Union": {"color": colors[1], "marker": "s", "linestyle": "--"},
    "Weighted Quick Union": {"color": colors[2], "marker": "^", "linestyle": "-."},
    "Weighted Quick Union with Path Compression": {
        "color": colors[3],
        "marker": "D",
        "linestyle": ":",
    },
}

threesum_colors = sns.color_palette("husl", n_colors=3)
threesum_styles = {
    "Brute Force": {"color": threesum_colors[0], "marker": "o", "linestyle": "-"},
    "Optimized Two Pointers": {
        "color": threesum_colors[1],
        "marker": "s",
        "linestyle": "--",
    },
    "Hash Set": {"color": threesum_colors[2], "marker": "^", "linestyle": "-."},
}


def plot_algorithm_data(data, x_col, y_col, styles, plot_func=plt.plot, **kwargs):
    """
    Helper function to plot algorithm data with consistent styling.

    Args:
        data: DataFrame with algorithm performance data
        x_col: Column name for x-axis data
        y_col: Column name for y-axis data
        styles: Dictionary mapping algorithm names to style dictionaries
        plot_func: Plotting function to use (plt.plot, plt.loglog, etc.)
        **kwargs: Additional parameters to pass to the plotting function
    """
    for algorithm in data["Algorithm"].unique():
        algo_data = data[data["Algorithm"] == algorithm]
        style = styles[algorithm]
        plot_func(
            algo_data[x_col],
            algo_data[y_col],
            label=algorithm,
            color=style["color"],
            marker=style["marker"],
            linestyle=style["linestyle"],
            **PLOT_PARAMS,
            **kwargs
        )


def setup_subplot(title, xlabel, ylabel):
    """Helper function to set up subplot with consistent formatting."""
    plt.title(title, fontsize=TITLE_FONTSIZE, fontweight="bold")
    plt.xlabel(xlabel, fontsize=LABEL_FONTSIZE)
    plt.ylabel(ylabel, fontsize=LABEL_FONTSIZE)
    plt.legend(fontsize=LEGEND_FONTSIZE)
    plt.grid(True, alpha=GRID_ALPHA)


def setup_loglog_subplot(title, xlabel, ylabel):
    """Helper function to set up log-log subplot with consistent formatting."""
    setup_subplot(title, xlabel, ylabel)
    plt.grid(True, alpha=GRID_ALPHA, which="both")


# Subplot 1: UnionFind Union Operations (Linear Scale)
plt.subplot(3, 2, 1)
union_data = uf_df[uf_df["Operation"] == "Union"]
plot_algorithm_data(union_data, "N", "Time (s)", uf_styles)
setup_subplot(
    "UnionFind Union Operations",
    "Number of Elements (N)",
    "Time (seconds)"
)

# Subplot 2: UnionFind Union Operations (Log-Log Scale)
plt.subplot(3, 2, 2)
plot_algorithm_data(union_data, "N", "Time (s)", uf_styles, plt.loglog)
setup_loglog_subplot(
    "UnionFind Union Operations (Log-Log)",
    "Number of Elements (N) - Log Scale",
    "Time (seconds) - Log Scale"
)

# Subplot 3: UnionFind Connected Operations (Linear Scale)
plt.subplot(3, 2, 3)
connected_data = uf_df[uf_df["Operation"] == "Connected"]
plot_algorithm_data(connected_data, "N", "Time (s)", uf_styles)
setup_subplot(
    "UnionFind Connected Operations",
    "Number of Elements (N)",
    "Time (seconds)"
)

# Subplot 4: UnionFind Connected Operations (Log-Log Scale)
plt.subplot(3, 2, 4)
plot_algorithm_data(connected_data, "N", "Time (s)", uf_styles, plt.loglog)
setup_loglog_subplot(
    "UnionFind Connected Operations (Log-Log)",
    "Number of Elements (N) - Log Scale",
    "Time (seconds) - Log Scale"
)

# Subplot 5: 3Sum Operations (Linear Scale)
plt.subplot(3, 2, 5)
plot_algorithm_data(threesum_df, "Array Size", "Time (s)", threesum_styles)

# Add curve-fit overlays for 3Sum algorithms
for algorithm, (a, b, fit_type) in fitted_params.items():
    data = threesum_df[threesum_df["Algorithm"] == algorithm].sort_values("Array Size")
    style = threesum_styles[algorithm]

    if fit_type == "large_only":
        # Use only the largest 3 points for the fit range
        x_range = data.tail(LARGE_SIZE_TAIL)["Array Size"]
        x_fit = np.linspace(x_range.min(), x_range.max(), FIT_POINTS)
    else:
        # Use full range
        x_fit = np.linspace(
            data["Array Size"].min(), data["Array Size"].max(), FIT_POINTS
        )

    y_fit = power_law(x_fit, a, b)
    plt.plot(
        x_fit,
        y_fit,
        linestyle=":",
        color=style["color"],
        alpha=FIT_ALPHA,
        linewidth=FIT_LINE_WIDTH,
        label=f"{algorithm} fit (b={b:.1f})",
    )

setup_subplot("3Sum Operations", "Array Size", "Time (seconds)")

# Subplot 6: 3Sum Operations (Log-Log Scale)
plt.subplot(3, 2, 6)
plot_algorithm_data(threesum_df, "Array Size", "Time (s)", threesum_styles, plt.loglog)

# Add curve-fit overlays for 3Sum algorithms (Log-Log scale)
for algorithm, (a, b, fit_type) in fitted_params.items():
    data = threesum_df[threesum_df["Algorithm"] == algorithm].sort_values("Array Size")
    style = threesum_styles[algorithm]

    if fit_type == "large_only":
        # Use only the largest 3 points for the fit range
        x_range = data.tail(LARGE_SIZE_TAIL)["Array Size"]
        x_fit = np.linspace(x_range.min(), x_range.max(), FIT_POINTS)
    else:
        # Use full range
        x_fit = np.linspace(
            data["Array Size"].min(), data["Array Size"].max(), FIT_POINTS
        )

    y_fit = power_law(x_fit, a, b)
    plt.loglog(
        x_fit,
        y_fit,
        linestyle=":",
        color=style["color"],
        alpha=FIT_ALPHA,
        linewidth=FIT_LINE_WIDTH,
        label=f"{algorithm} fit (b={b:.1f})",
    )

setup_loglog_subplot(
    "3Sum Operations (Log-Log)",
    "Array Size - Log Scale",
    "Time (seconds) - Log Scale"
)

plt.tight_layout(pad=2.0)
plt.show()

print("Performance visualizations with curve-fit overlays created!")