# Assorted helpers

In [None]:
#| default_exp helpers

In [None]:
#| export

from json import dumps
from web3 import Web3, constants
from eth_abi import encode
from typing import List, Tuple, Optional, Callable
from decimal import Decimal, getcontext
from datetime import datetime, timedelta
from dataclasses import dataclass
import networkx as nx
import math, time, asyncio, decimal, secrets, socket
from contextlib import contextmanager, asynccontextmanager
from fastcore.test import test_eq

In [None]:
#| export

def normalize_address(address: str) -> str: return Web3.to_checksum_address(address.lower())

ADDRESS_ZERO = constants.ADDRESS_ZERO
MAX_UINT256 = Web3.to_int(hexstr='0x' + 'f' * 64)

def chunk(list_to_chunk: List, n: int):
    for i in range(0, len(list_to_chunk), n):
        yield list_to_chunk[i : i + n]


def amount_to_k_string(amount: float) -> str:
    """Turns 2000 to "2K" """
    return f"{round(amount/1000, 2)}K"


def format_currency(value: float, symbol: str = "$", prefix: bool = True) -> str:
    v = "{:0,.2f}".format(value)
    return f"{symbol}{v}" if prefix else f"{v} {symbol}"


def format_percentage(value: float) -> str:
    return "{:0,.2f} %".format(value)


def amount_to_m_string(amount: float) -> str:
    """Turns 2000000 to "2M" """
    return f"{round(amount/1000000, 2)}M"

In [None]:
#| export

def float_to_uint256(amount: float, decimals: int = 18) -> int:
    """Convert float to uint256 with decimal scaling"""
    # Convert float to Decimal for precision
    amount_decimal = Decimal(str(amount))
    # Scale by decimals
    scaled_amount = amount_decimal * Decimal(10 ** decimals)
    # Convert to integer
    return int(scaled_amount)

In [None]:
float_to_uint256(1.0, 18)

1000000000000000000

In [None]:
#| export

def get_future_timestamp(deadline_minutes: float) -> int:
    """Convert minutes from now to future unix timestamp"""
    future_time = datetime.now() + timedelta(minutes=deadline_minutes)
    return int(future_time.timestamp())

In [None]:
get_future_timestamp(5)

1750942347

In [None]:
#| export

def apply_slippage(amount: int, slippage: float) -> int:
    if slippage < 0 or slippage > 1: raise ValueError("Slippage must be between 0 and 1")
    return int(math.ceil(amount * (1 - slippage)))

In [None]:
#| export

def parse_ether(ether: str) -> int:
    # Set precision high enough to handle 18 decimal places
    getcontext().prec = 50
    
    try:
        # Convert to Decimal for precise arithmetic
        ether_decimal = Decimal(str(ether))
        
        # Convert to wei (multiply by 10^18)
        wei_decimal = ether_decimal * Decimal('1000000000000000000')
        
        # Convert to integer
        return int(wei_decimal)
    
    except (ValueError, TypeError, decimal.InvalidOperation) as e:
        raise ValueError(f"Invalid ether value: {ether}") from e

In [None]:
#| export

def get_unique_str(length: int) -> str:
    """
    Generate a cryptographically secure random string of specified length.
    
    This is the Python equivalent of the TypeScript function using crypto.getRandomValues().
    Uses secrets module for cryptographic security.
    
    Args:
        length: Desired length of the random string
        
    Returns:
        str: Random string of specified length containing digits
        
    Examples:
        >>> len(get_unique_str(10))
        10
        >>> get_unique_str(5).isdigit()
        True
    """
    # Generate random bytes and convert to string of digits
    random_bytes = secrets.token_bytes(length)
    return ''.join(str(byte % 10) for byte in random_bytes)[:length]

def get_salt() -> str: return f"0x{get_unique_str(64)}" 

In [None]:
get_unique_str(64)

'7635613601916267396248769565479189614153287360361504658375907466'

In [None]:
#| export

def to_bytes32(val: str) -> bytes: 
    # Remove 0x prefix and pad to 64 hex characters (32 bytes)
    hex_val = val.replace('0x', '').zfill(64)
    return bytes.fromhex(hex_val)

def to_bytes32_str(val: str) -> str: return f"0x{to_bytes32(val).hex()}"

In [None]:
from fastcore.test import test_eq

In [None]:
test_eq(str(type(to_bytes32("0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189"))), "<class 'bytes'>")

test_eq(to_bytes32("0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189").hex(), "0000000000000000000000001217bfe6c773eec6cc4a38b5dc45b92292b6e189".lower())
test_eq(to_bytes32("0x1174A4719FaF964AfE2179A404b4830EC0DCB8D5").hex(), "0000000000000000000000001174A4719FaF964AfE2179A404b4830EC0DCB8D5".lower())
test_eq(to_bytes32("0x0000000000000000000000000000000000000000").hex(), "0000000000000000000000000000000000000000000000000000000000000000".lower())
test_eq(to_bytes32(ADDRESS_ZERO).hex(), "0000000000000000000000000000000000000000000000000000000000000000")

test_eq(to_bytes32_str("0x1217bfe6c773eec6cc4a38b5dc45b92292b6e189"), "0x0000000000000000000000001217bfe6c773eec6cc4a38b5dc45b92292b6e189")

In [None]:
test_eq(parse_ether("1"), 1000000000000000000)
test_eq(parse_ether("0.5"), 500000000000000000)
test_eq(parse_ether("1.5"), 1500000000000000000)
test_eq(parse_ether("0.001"), 1000000000000000)

In [None]:
# 0.05% slippage
test_eq(apply_slippage(232165, 0.05 / 100), 232049)

# 1% slippage
test_eq(apply_slippage(232165, 1 / 100), 229844)

# 5% slippage
test_eq(apply_slippage(232165, 5 / 100), 220557)

# 0.01% slippage
test_eq(apply_slippage(19936, 0.01 / 100), 19935)

# 0.1% slippage
test_eq(apply_slippage(19936, 0.1 / 100), 19917)

## Graph department

This is where all the nerds hang out

In [None]:
#| export
# Claude 3.7 sonnet made this

@dataclass
class Pair: token0: str; token1: str; pool: str

def find_all_paths(pairs: List[Pair], start_token: str, end_token: str, cutoff=3) -> List[List[Tuple]]:
    # MultiGraph required to support parallel edges
    # same tokens can be present in different pools, hence parallel edges
    # specific pool identifier is stored inside edge attribute
    G, complete_paths = nx.MultiGraph(), []
    for pair in pairs: G.add_edge(pair.token0, pair.token1, pool=pair.pool)
    node_paths =  [p for p in nx.all_simple_paths(G, source=start_token, target=end_token, cutoff=cutoff)]
    for path in node_paths:
        edge_path = []
        # For each consecutive pair of nodes in the path
        for i in range(len(path) - 1):
            current = path[i]
            next_node = path[i + 1]
            
            # Get all edges between these nodes
            edges = G.get_edge_data(current, next_node)
            
            # There might be multiple edges (pools) between these nodes
            # Add all possible edges to create different complete paths
            current_paths = [] if not edge_path else edge_path.copy()
            new_edge_paths = []
            
            # If this is the first segment, initialize with empty path
            if not current_paths:
                current_paths = [[]]
                
            # For each possible edge between current and next_node
            for edge_key, edge_attrs in edges.items():
                pool = edge_attrs['pool']
                for current_path in current_paths:
                    # Create a new path that includes this edge
                    new_path = current_path + [(current, next_node, pool)]
                    new_edge_paths.append(new_path)
            
            edge_path = new_edge_paths
        
        # Add all possible edge paths to the complete paths
        complete_paths.extend(edge_path)
    
    # seen is a list of strings that look like this ["pool1-pool2-pool3", ...]
    uniques, seen = [], []

    for path in complete_paths:
        p = '-'.join(map(lambda x: x[2], path))
        if p not in seen:
            uniques.append(path)
            seen.append(p)

    # remove duplicates
    return uniques


In [None]:
 # Example pairs - including a cycle
pairs = [
    Pair("A", "B", "pool1"),
    Pair("A", "B", "pool12"),
    Pair("A", "B", "pool13"),
    Pair("B", "C", "pool2"),
    Pair("A", "D", "pool3"),
    Pair("D", "C", "pool4"),
    Pair("B", "D", "pool5"),
    Pair("C", "E", "pool6"),
    Pair("E", "A", "pool7"),
]

# Find paths from A to C
paths = find_all_paths(pairs, "A", "C")

print(f"All paths from A to C:")
for i, path in enumerate(paths):
    for path in paths:
        print(f"Path {i+1}: {' -> '.join(list(map(lambda x: x[2], path)))}")
    
# Let's also test with a different example involving a cycle
cycle_pairs = [
    Pair("X", "Y", "pool8"),
    Pair("Y", "Z", "pool9"),
    Pair("Z", "X", "pool10"),
]

# Find paths from X to Z
cycle_paths = find_all_paths(cycle_pairs, "X", "Z")

print(f"\nAll paths from X to Z in a cyclical graph:")
for i, path in enumerate(cycle_paths):
    print(f"Path {i+1}: {' -> '.join(list(map(lambda x: x[2], path)))}")

All paths from A to C:
Path 1: pool1 -> pool2
Path 1: pool12 -> pool2
Path 1: pool13 -> pool2
Path 1: pool1 -> pool5 -> pool4
Path 1: pool12 -> pool5 -> pool4
Path 1: pool13 -> pool5 -> pool4
Path 1: pool3 -> pool4
Path 1: pool3 -> pool5 -> pool2
Path 1: pool7 -> pool6
Path 2: pool1 -> pool2
Path 2: pool12 -> pool2
Path 2: pool13 -> pool2
Path 2: pool1 -> pool5 -> pool4
Path 2: pool12 -> pool5 -> pool4
Path 2: pool13 -> pool5 -> pool4
Path 2: pool3 -> pool4
Path 2: pool3 -> pool5 -> pool2
Path 2: pool7 -> pool6
Path 3: pool1 -> pool2
Path 3: pool12 -> pool2
Path 3: pool13 -> pool2
Path 3: pool1 -> pool5 -> pool4
Path 3: pool12 -> pool5 -> pool4
Path 3: pool13 -> pool5 -> pool4
Path 3: pool3 -> pool4
Path 3: pool3 -> pool5 -> pool2
Path 3: pool7 -> pool6
Path 4: pool1 -> pool2
Path 4: pool12 -> pool2
Path 4: pool13 -> pool2
Path 4: pool1 -> pool5 -> pool4
Path 4: pool12 -> pool5 -> pool4
Path 4: pool13 -> pool5 -> pool4
Path 4: pool3 -> pool4
Path 4: pool3 -> pool5 -> pool2
Path 4: pool

## Superswaps helpers

In [None]:
# | export


# TODO: get rid of ICACallData, use tuples instead
@dataclass(frozen=True)
class ICACallData: to: str; value: int; data: str


def hash_ICA_calls(calls: List[ICACallData], salt: str) -> bytes:
  call_tuples = [(bytes.fromhex(call.to.replace('0x', '')), call.value, bytes.fromhex(call.data.replace('0x', '')))  for call in calls]
  encoded = encode(["(bytes32,uint256,bytes)[]"], [call_tuples])
  return Web3.keccak(hexstr=f"{salt}{encoded.hex()}")

def serialize_ica_calls(calls: List[ICACallData]) -> List[dict]:
    """
    Convert a list of ICACallData to JSON string.
    """
    return list(map(lambda call: {"to": call.to, "value": str(call.value), "data": call.data}, calls))

In [None]:
r = hash_ICA_calls(calls=[
  ICACallData(
    to="0x0000000000000000000000001217bfe6c773eec6cc4a38b5dc45b92292b6e189",
    value=0,
    data="0x095ea7b30000000000000000000000006df1c91424f79e40e33b1a48f0687b666be710750000000000000000000000000000000000000000000000000000000000413bdc"
  ),
  ICACallData(
    to="0x0000000000000000000000006Df1c91424F79E40E33B1A48F0687B666bE71075",
    value=0,
    data="0x24856bc3000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000002a1a1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000001200000000000000000000000001e7a6b63f98484514610a9f0d5b399d4f7a9b1da00000000000000000000000000000000000000000000000000000000000ffc0300000000000000000000000000000000000000000000000000000000000fd48500000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002b1217bfe6c773eec6cc4a38b5dc45b92292b6e189000001078d782b760474a361dda0af3839290b0ef57ad6000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000014000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000107000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000600000000000000000000000001217bfe6c773eec6cc4a38b5dc45b92292b6e1890000000000000000000000001e7a6b63f98484514610a9f0d5b399d4f7a9b1da8000000000000000000000000000000000000000000000000000000000000000"
  ),
], salt="0x2098192114821323714521814891133166129651266242167207511481761894")

test_eq(r.hex(), "81dccec12150cf3c23f3163872679f68986cbb5f877ae3b59d59813feb9b4afe".lower())

In [None]:
serialize_ica_calls([
  ICACallData(
    to="0x0000000000000000000000001217bfe6c773eec6cc4a38b5dc45b92292b6e189",
    value=0,
    data="0x095ea7b30000000000000000000000006df1c91424f79e40e33b1a48f0687b666be710750000000000000000000000000000000000000000000000000000000000413bdc"
  ),
  ICACallData(
    to="0x0000000000000000000000006Df1c91424F79E40E33B1A48F0687B666bE71075",
    value=0,
    data="0x24856bc3000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000002a1a1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000260000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000001200000000000000000000000001e7a6b63f98484514610a9f0d5b399d4f7a9b1da00000000000000000000000000000000000000000000000000000000000ffc0300000000000000000000000000000000000000000000000000000000000fd48500000000000000000000000000000000000000000000000000000000000000c000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002b1217bfe6c773eec6cc4a38b5dc45b92292b6e189000001078d782b760474a361dda0af3839290b0ef57ad6000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000014000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000107000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000600000000000000000000000001217bfe6c773eec6cc4a38b5dc45b92292b6e1890000000000000000000000001e7a6b63f98484514610a9f0d5b399d4f7a9b1da8000000000000000000000000000000000000000000000000000000000000000"
  ),
])

[{'to': '0x0000000000000000000000001217bfe6c773eec6cc4a38b5dc45b92292b6e189',
  'value': '0',
  'data': '0x095ea7b30000000000000000000000006df1c91424f79e40e33b1a48f0687b666be710750000000000000000000000000000000000000000000000000000000000413bdc'},
 {'to': '0x0000000000000000000000006Df1c91424F79E40E33B1A48F0687B666bE71075',
  'value': '0',
  'data': '0x24856bc3000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000002a1a10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000002600000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000

## Home made timer helper for benchmarking

In [None]:
#| export
# Claude 4 sonnet made this

class Timer:
    """Simple timer utility for measuring execution time"""
    
    def __init__(self, name: str = "Operation", precision: int = 4, callback: Optional[Callable] = None):
        self.name = name
        self.precision = precision
        self.callback = callback
        self.start_time: Optional[float] = None
        self.end_time: Optional[float] = None
        self.elapsed: Optional[float] = None
    
    def __enter__(self):
        self.start_time = time.perf_counter()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.end_time = time.perf_counter()
        self.elapsed = self.end_time - self.start_time
        result = f"{self.name} took {self.elapsed:.{self.precision}f} seconds"
        
        if self.callback:
            self.callback(self.elapsed, result)
        else:
            print(result)
    
    async def __aenter__(self):
        self.start_time = time.perf_counter()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.end_time = time.perf_counter()
        self.elapsed = self.end_time - self.start_time
        result = f"{self.name} took {self.elapsed:.{self.precision}f} seconds"
        
        if self.callback:
            if asyncio.iscoroutinefunction(self.callback):
                await self.callback(self.elapsed, result)
            else:
                self.callback(self.elapsed, result)
        else:
            print(result)

@contextmanager
def time_it(name: str = "Operation", precision: int = 4, callback: Optional[Callable] = None):
    """Context manager for timing synchronous code execution"""
    timer = Timer(name, precision, callback)
    with timer:
        yield timer

@asynccontextmanager
async def atime_it(name: str = "Operation", precision: int = 4, callback: Optional[Callable] = None):
    """Async context manager for timing asynchronous code execution"""
    timer = Timer(name, precision, callback)
    async with timer:
        yield timer

Here's how to use it

In [None]:
# Synchronous code
with time_it("Calling sugar onchain"):
    # Your database query here
    time.sleep(1)

# Asynchronous code
async with atime_it("Another API call"):
    await asyncio.sleep(1)

Calling sugar onchain took 1.0041 seconds
Another API call took 1.0025 seconds


In [None]:
#| export

def require_supersim():
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        result = sock.connect_ex(('127.0.0.1', 4444))
        # are you running supersim?
        test_eq(result, 0)
    except socket.error as err:
        test_eq(err, None)
    finally:
        sock.close()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()