# Ledger Data Types 

This file defines a Python implementation that simulates the ledger data types and their interfaces as defined in **Build > Reference documentation > The Compact language > Ledger data types**. 

Ledger data types may refer to Compact types, or types defined in `compact.std` (compact standard library). 

* Compact types: https://github.com/input-output-hk/midnight-docs/pull/127/files
* Standard library: https://github.com/input-output-hk/compactc/blob/master/doc/api/std.compact/exports.md


## Notes

We don't simulate: 
* Overflow of (unsigned) integer types
* Some of the ADTs (map/list/...) make an iterable object available on the TS side
* Container ADTs that store values of type `CoinInfo`, which are automatically qualified by interacting with the ledger when using write operations

## Misc 

In [739]:
from typing import TypeVar, Generic, Type
from typing import Dict, List, Optional, Set, Hashable, Tuple, Callable

### Compact Types

We use the following type aliases since Compact features finite precision integers. For now, we'll map them to Python's `int` type, but we could change these in the future to simulate overflows etc... 

In [740]:
uint64 = int 
uint16 = int

# This is actually a value of type "Field" in the compact interface, but now we'll use strings to represent hashes
MerkleTreeDigest = str

These are defined in `std.compact`. **TODO**: inhabit with a type representing their actual definition

Wrapper class for values that hold a value of type `ZswapCoinPublicKey` or `ContractAddress`. This is a special case of Compact's `Either` type, which I'm not sure how to define generically in Python

**TODO**: Perhaps for this (and potentially more, such as default values or subtyping/casting) reasons it's better to not allow arbitrary python types when constructing generic ledger ADTs, but rather embed Compact types using some kind of class hierarchy. 

In [741]:
ZswapCoinPublicKey = ()
ContractAddress = ()

class CoinInfo(): 
    _info : str 
    def __init__(self , info : str) -> None:
        self._info = info

class QualifiedCoinInfo(): 
    _qualified_info : str 
    def __init__(self , info : str) -> None:
        self._qualified_info = "{QUALIFIED} " + info

In [742]:
"""
Identifies a recipient of funds
 
Funds can be received either 
    
    (1) by a wallet, identified by a public key, or 
    (2) a contract, identified by its address
    
"""
class Recipient: 
    
    _which_value:bool 
    _pubkey_value:ZswapCoinPublicKey 
    _addr_value:ContractAddress

    def __init__(self, which : bool, value) -> None: 
        self._which_value = which
        if which: 
            self._pubkey_value = value 
        else:
            self._addr_value = value 

    def read_pubkey(self) -> ZswapCoinPublicKey: 
        if self._which_value: 
            return self._pubkey_value
        else: 
            raise Exception("Tried to read left choice of a sum but the value was right")

    def read_addr(self) -> ContractAddress: 
        if not self.which_value: 
            return self._addr_value
        else: 
            raise Exception("Tried to read right choice of a sum but the value was left")

    def which(self) -> bool: 
        return self._which_value

In [743]:
"""
According to the docs: 

Writes a CoinInfo to this Cell, which is transformed into a QualifiedCoinInfo 
at runtime by looking up the relevant Merkle tree index. This index must have 
been allocated within the current transaction or this write fails

"""
def qualify(ci:CoinInfo, recepient:Recipient) -> QualifiedCoinInfo: 
    # TODO: implement 
    return QualifiedCoinInfo(ci._info)

In [744]:
T = TypeVar ('T')

class Hashable(Generic[T]):

    _digest : Callable[T, MerkleTreeDigest]
    _hash   : Callable[MerkleTreeDigest, MerkleTreeDigest]

    def __init__(self, digest : Callable[T, MerkleTreeDigest], h : Callable[MerkleTreeDigest, MerkleTreeDigest]) -> None:
        self._digest = digest 
        self._hash   = h 

    def hash(self , s : MerkleTreeDigest) -> MerkleTreeDigest:
        return self._hash(s)

    def digest(self , x : T) -> MerkleTreeDigest: 
        return self._digest(x)

In [745]:

class MerkleTreePath(Generic[T]): 

    leaf : T 
    entries : List[MerkleTreeDigest]
    
    def __init__(self, l : T , xs : List[MerkleTreeDigest]): 
        self.leaf    = l 
        self.entries = xs

    def __str__(self) -> str: 
        return str(self.entries)

    def get_root(self, digest, h : Hashable[T]) -> MerkleTreeDigest:
        if (len(entries) > 0):
            sh = h.digest(self.leaf)
            for i in reversed(range(0 , len(entries) - 1)):
              sh = h.hash(sh + entries[i])
        

### Other

The following represents any kind of run-time failure that might occur while executing a piece of compact code

In [746]:
class RuntimeError(Exception):
    pass

## Counter

In [747]:
class MK_Counter: 
    _counter:uint64

    def __init__(self) -> None: 
        self._counter = 0

    # Decrement the counter by the given amount, or throw a run-time error if that would bring the counter below 0
    def decrement(self , amount : uint16) -> None: # amount has 16 bit precision 
        if self._counter >= amount: 
            self._counter -= amount
        else: 
            raise RuntimeError(f'Cannot decrement counter value {self._counter} with amount {amount}')

    # Increment the counter by the given amount
    def increment(self , amount : uint16) -> None: # amount has 16 bit precision
        self._counter += amount     

    # Check if the counter is smaller than a given threshold
    def less_than(self, threshold : uint64) -> bool:
        return self._counter < threshold

    def read(self) -> uint64: 
        return self._counter 

    def reset_to_default(self) -> None: 
        self._counter = 0

## Cell 



Stores a value of any type

In [748]:
T_Cell = TypeVar('T_Cell')

class MK_Cell(Generic[T_Cell]): 

    # The current resp. default value of the type stored in the cell
    _value         : T_Cell 
    _default_value : T_Cell

    # Initializes the cell. Creation of a new cell requires us to pass in a default value for the 
    # type that's stored in the cell. Since all types in Compact have a default value, and `T_Cell` 
    # should be a Compact type, this should always exist
    def __init__(self, default : T_Cell) -> None: 
        self._value = default
        self._default_value = default

    def read(self) -> T_Cell: 
        return self._value

    def reset_to_default(self) -> None:
        self._value = self._default_value

    def write(self, new_value : T_Cell) -> None: 
        self._value = new_value 

    def write_coin(self, ci : CoinInfo, recipient : Recipient) -> None: 
        if not isinstance(self._value, QualifiedCoinInfo):
            raise RuntimeError(f'Tried to invoke the write_coin operation on a cell with the wrong type. Value: {self._value}')

        self._value = qualify(ci, recipient)

## Set

Based on Python's `Set`

In [749]:
T_Set = TypeVar('T_Set')

class MK_Set(Set[T_Set]): 

    _type_arg : Type[T_Set]

    """
        We require the type argument to be passed explicitly when creating a new MK_Set, 
        because otherwise it's not present at runtime for checking wheter a call to the 
        insert_coin method is valid. 

        If we don't do this there's no way to perform the check when inserting a coininfo 
        into the empty set; the generic argument is not kept at runtime, and there are no 
        values yet to infer it from. 
    """
    def __init__(self, generic_arg: Type[T_Set]): 
        self._type_arg = generic_arg
        super(MK_Set, self).__init__()
    
    def is_empty(self) -> bool: 
        return len(self) == 0
    
    def member(self, elem : T_Set) -> bool: 
        return elem in self

    def reset_to_default(self) -> None: 
        self.clear()

    def size(self) -> uint64: 
        return len(self)

    def insert(self, elem : T_Set) -> None: 
        self.add(elem)
        
    def insert_coin(self, ci : CoinInfo, recepient : Recipient) -> None: 
        if not self._type_arg == QualifiedCoinInfo:
            raise RuntimeError(f'Tried to invoke the insert_coin operation on a set that stores values of a different type. Values: {self}')
        self.add(qualify(ci , recepient))

## Map

Based on Python's `Dict`

In [750]:
T_Key = TypeVar('T_Key')
T_Val = TypeVar('T_Key') 

class MK_Map(dict[T_Key,T_Val]): 

    _default_value : T_Val 

    def __init__(self , default : T_Val) -> None: 
        self._default_value = default
        super(MK_Map, self).__init__()
    
    def insert(self, k : T_Key, v : T_Val) -> None: 
        self[k] = v

    def insert_coin(self, k : T_Key, ci : CoinInfo, recipient : Recipient) -> None: 
        if not isinstance(self._default_value, QualifiedCoinInfo):
            raise RuntimeError(f'Tried to invoke the insert_coin operation on a map that stores values of a different type. Values:{self}')
        self.insert(k , qualify(ci , recipient))
    
    def insert_default(self, k : T_Key) -> None: 
        self[k] = self._default_value

    # The empty dictionary evaluates to `False`, apparently ... 
    def is_empty(self) -> bool: 
        return not bool(self)

    def lookup(self, k : T_Key) -> T_Val: 
        return self[k]

    def member(self, k : T_Key) -> bool: 
        return k in self

    def remove(self, k : T_Key) -> bool: 
        del self[k]

    def reset_to_default(self) -> None:
        self.clear()

    def size(self) -> uint64: 
        return len(self)

    


## List

Based on Python's `List`

In [751]:
T_List = TypeVar('T_List')

class MK_List(list[T_List]): 

    _type_arg : Type[T_List] 

    def __init__(self, generic_arg : Type[T_List]) -> None: 
        self._type_arg = generic_arg 
        super(MK_List, self).__init__()
    
    def head(self) -> Optional[T_List]: 
        if not self: 
            return None
        else: 
            return self[0]

    def is_empty(self) -> bool: 
        return not self

    def length(self) -> uint64: 
        return len(self)

    def pop_front(self) -> None: 
        self.pop(0)

    def push_front(self, value : T_List) -> None: 
        self.insert(0 , value)

    def push_front_coin(self, ci : CoinInfo , recipient : Recipient) -> None: 
        if not self._type_arg == QualifiedCoinInfo:
            raise RuntimeError(f'Tried to invoke the insert_coin operation on a list that stores values of a different type. Values:{self}')
        self.insert(0 , qualify(ci , recipient))
    
    def reset_to_default(self) -> None: 
        self.clear()


## MerkleTree

https://github.com/input-output-hk/midnight-ledger-prototype/blob/master/docs/api/ledger/classes/StateBoundedMerkleTree.md


In [752]:
from typing import Callable

T_MT = TypeVar('T_MT') 

# Simulates a bounded merkle tree 
class MK_MerkleTree(Generic[T_MT]): 

    depth     : int # Maximum depth of the tree
    _size     : int # Maximum size of the tree with bf 2  
    
    # Stores the nodes as follows (value, hash, sibling hash) 
    _store    : List[Tuple[T_MT,MerkleTreeDigest,MerkleTreeDigest]]
    
    _hash          : Hashable[T_MT]
    _default_value : T_MT
    
    # When creating a new Merkle tree, we have to provide its depth bound as well as 
    # a hash functions for the type of values stored inside. 
    def __init__(self, d : int, h : Hashable[T_MT]) -> None: 
        self.depth     = d 
        self._store    = list()
        self._hash     = h

        # Size of the internal storage, assuming a branching factor of 2  
        self._size     = 2 ** d  

    def __str__(self) -> str:
        return str(self._store) 
    
    def _root_node(self) -> Tuple[T_MT,MerkleTreeDigest,MerkleTreeDigest]:
        return _store[0]
    
    # Recalculates hashes, starting from a given index
    def _recalculate(self, index : int) -> None: 
        if len(self._store) > 0: 
            for i in reversed(range(len(self._store) - 1)): 
                self._store[i] = (self._store[i][0] , self._store[i][1], self._hash.hash(self._store[i+1][2] + self._store[i+1][1]))
    
    # Checks if given root is the root of this merkle tree
    def check_root(self , rt : MerkleTreeDigest) -> bool: 
        return self.root() == rt 
    
    # Inserts a value at the first available index, if the tree is not full 
    def insert(self, value : T_MT) -> None: 
        # Check if tree is full
        if self.is_full(): 
            raise RuntimeError("Tried to insert into a full Merkle tree") 
        self._store.append((value , self._hash.digest(value), ""))
        self._recalculate(len(self._store) - 1) # recalculate, starting from the last element in the store

    # Inserts a given value at an *existing* index, or the first free index
    def insert_index(self, value : T_MT, index : uint64) -> None: 
        if (index > len(self._store)):
            raise RuntimeError(f'Index too large: {index}')

        if index == len(self._store): 
            self.insert(value)
        elif index == (len(self._store)-1): 
            self._store[index] = (value , self._hash.digest(value), "")
        else:
            self._store[index] = (value , self._hash.digest(value) , self._hash.hash(self._store[index+1][2]+self._store[index+1][1]))
        self._recalculate(index)

    def insert_index_default(self, index : uint64) -> None:
        self.insert_index(self, default_value , index)

    def is_full(self) -> bool: 
        return len(self._store) >= self._size

    def reset_to_default(self) -> None: 
        self._store.clear()

    # The methods below are callable only from TypeScript
    def root(self) -> MerkleTreeDigest: 
        if len(self._store) == 0: 
            raise RuntimeError("Tried to calculate the root of an empty Merkle tree")
        return self._hash.hash(self._store[0][2] + self._store[0][1])

    def findPathForLeaf(self, value : T_MT) -> MerkleTreePath[T_MT]: 
        entries = list()  
        for x in self._store: 
            entries.append(x[1])
            if x[0] == value: # Found it!
                return MerkleTreePath(value , entries)
        raise RuntimeError(f'Could not find path for value {value}')

    def firstFree(self) -> uint64: 
        return len(self._store)

    # TODO: the function above should perhaps be implemented in terms of this one, 
    # instead of the other way around
    def pathForLeaf(self, index : uint64, value : T_MT) -> MerkleTreePath[T_MT]: 
        if not self._store[index][0] == value:
            raise RuntimeError(f'Tried to get path for value {value}, but it does not match the stored value at index {index}')
        return self.findPathForLeaf(value)

## HistoricMerkleTree

A Merkle tree that keeps all historic roots. 

The root changes if: 
* A value is inserted at the first free index
* A value is inserted at a specified index
* A default value is inserted at a specified index
* The tree is reset to the empty tree

In [753]:
T_HMT = TypeVar('T_HMT') 

class MK_HistoricMerkleTree(MK_MerkleTree[T_HMT]): 

    _history : List[MerkleTreeDigest]
    
    def __init__(self, d : int, h : Hashable[T_HMT]):  
        self._history = list()
        super(MK_HistoricMerkleTree, self).__init__(d , h)

    def reset_history(self) -> None: 
        self._history.clear()      

    # Checks if a given root is (1) the current root of the tree or (2) one of its historic roots 
    def h_check_root(self , root : MerkleTreeDigest) -> bool: 
        return self.check_root(root) or root in self._history

    """ 
        We overload all methods that may change the root of the tree to first append the current root to 
        the list of historic roots. When using a historic Merkle tree we should be careful to use these 
        methods instead of the corresponding ones from the "normal" tree. 
    """
    def h_insert(self , value : T_HMT) -> None: 
        if len(self._store) > 0: 
            self._history.append(self.root())
        self.insert(value)

    def h_insert_index(self, value : T_HMT, index : uint64) -> None:
        if len(self._store) > 0:
            self._history.append(self.root()) 
        self.insert_index(value , index)

    def h_insert_index_default(self, index : uint64) -> None:    
        if len(self._store) > 0:
            self._history.append(self.root()) 
        self.insert_index_default(index)

    def h_reset_to_default(self) -> None: 
        if len(self._store) > 0:
            self._history.append(self.root()) 
        self.reset_to_default()

    

# Examples 

## Counter

In [754]:
c = MK_Counter() 
# c.decrement(1) -> raises a "RuntimeError" 
c.increment(10) 
c.decrement(5)
c.read()

5

In [755]:
c.less_than(10)

True

In [756]:
c.reset_to_default()
c.increment(5)
c.read()

5

## Cell


In [757]:
cell1 : MK_Cell[int] 
cell1 = MK_Cell(0) 
cell1.read()

0

In [758]:
cell1.write(10)
cell1.read()

10

In [759]:
cell1.reset_to_default()
cell1.read()

0

In [760]:
# Define a new recipient "alice", identified by their public key
alice = Recipient(True , ()) 
# cell1.write_coin(() , recp) # -> raises a RuntimeError because the cell doesn't contain a CoinInfo 

coin1 : CoinInfo 
coin1 = CoinInfo("coin 1")

cell2 : MK_Cell[QualifiedCoinInfo]
cell2 = MK_Cell(QualifiedCoinInfo("default"))

cell2.write_coin(coin1 , alice)
cell2.read()._qualified_info

'{QUALIFIED} coin 1'

## Set

In [761]:
set1 = MK_Set(int)
set1.insert(1)
set1.insert(2)
set1.insert(3)
set1

{1, 2, 3}

In [762]:
print(set1.size())
set1.remove(2)
print(set1.size())
set1

3
2


{1, 3}

In [763]:
set1.member(3)

True

In [764]:
set1.remove(3)
set1.member(3)

False

In [765]:
set1.is_empty()

False

In [766]:
set1.reset_to_default()
set1.is_empty()

True

In [767]:
# set1.insert_coin(coin1 , alice) -> Throws a runtime error because the set does not store coininfo

set2 = MK_Set(QualifiedCoinInfo)
set2.insert_coin(coin1 , alice)

## Map


In [768]:
map1 : MK_Map[int, str]
map1 = MK_Map("")

map1.insert(0 , "zero")
map1.insert(1 , "one")
map1.insert(2 , "two")
map1.insert_default(3)
map1

{0: 'zero', 1: 'one', 2: 'two', 3: ''}

In [769]:
map1.lookup(1)

'one'

In [770]:
print(map1.is_empty())
map1.member(1)

False


True

In [771]:
map1.remove(3)
map1

{0: 'zero', 1: 'one', 2: 'two'}

In [772]:
print(map1.size())
map1.reset_to_default()
print(map1.size())
map1.is_empty()

3
0


True

In [773]:
map2 : MK_Map[QualifiedCoinInfo]
map2 = MK_Map(QualifiedCoinInfo(""))

map2.insert_coin(0 , coin1 , alice)
# map1.insert_coin(0 , coin1 , alice) # --> error because map1 does not store coins 
map2

{0: <__main__.QualifiedCoinInfo at 0x108cabb60>}

## List 

In [774]:
list1 : MK_List[int] 
list1 = MK_List(int)
print(list1.is_empty())
print(list1.head())

True
None


In [775]:
list1.push_front(1)
list1.push_front(2)
list1.push_front(3)
list1

[3, 2, 1]

In [776]:
print(list1.head())
list1.pop_front()
list1 

3


[2, 1]

In [777]:
print(list1.length())
list1.reset_to_default()
list1

2


[]

In [778]:
# list1.push_front_coin(coin1 , alice) # -> error, the list does not store coin values

In [779]:
list2 : MK_List[QualifiedCoinInfo] 
list2 = MK_List(QualifiedCoinInfo)
list2.push_front_coin(coin1 , alice)
list2

[<__main__.QualifiedCoinInfo at 0x108de9eb0>]

## MerkleTree

In [780]:
"""
Defines how to do hashing for the merkle tree examples
"""

from hashlib import sha256

def str_digest(inp : str) -> MerkleTreeDigest: 
    return sha256(inp.encode('utf-8')).hexdigest()

def str_hash(inp : MerkleTreeDigest) -> MerkleTreeDigest: 
    return str_digest(inp)

h : Hashable[str]
h = Hashable(str_digest , str_hash) 

In [781]:
# Creates a new MerkleTree with depth 4, that uses the object 'h' to calculate roots
m1 : MK_MerkleTree[str]
m1 = MK_MerkleTree(4 , h)

# m1.check_root(str_digest("")) # -> error, the tree is empty

# Some values to be inserted into the tree
xs = "Clear white is flying in my eyes, underneath the blue sky".split()
for x in xs: 
    m1.insert(x)

# Reference implementation for root calculation. Mimics a right-fold
# over the internal storage of the tree. 
def calc_root(ys : List[str]) -> str: 
    sib = "" 
    dig = ""
    for i in reversed(range(0 , len(ys))): 
        dig = str_digest(ys[i]) 
        """ debug
        print(f'Digest: {dig}')
        print(f'Sibling: {sib}')
        print(f'*********')
        """
        sib = str_hash(sib+dig)
    return sib

# Make sure that the root of the tree matches the reference implementation
assert m1.check_root(calc_root(xs))

# m1.insert_index("foo" , 15) # -> error, since the index is larger than the first free index

ff = m1.firstFree()
m1.insert_index("." , ff)

# Check that recalcutation of the root proceeds correctly after inserting a new element 
# At the first free index
assert m1.check_root(calc_root(xs + ["."]))

# Now let's update a value somewhere in the middle. 
m1.insert_index("blueblue", 9) 
rt = calc_root("Clear white is flying in my eyes, underneath the blueblue sky .".split())
# Check whether recalculation updated all the sibling hashes correctly
assert m1.check_root(rt)

# Finally, let's update exactly the last index, and see if root recalculation works right in that case
m1.insert_index(".." , m1.firstFree()-1)
assert m1.check_root(calc_root("Clear white is flying in my eyes, underneath the blueblue sky ..".split()))

# Tree now has 12 elements, should be able to hold 16
assert not m1.is_full()

m1.insert("The")    # 13 elements
m1.insert("way")    # 14 elements
m1.insert("they")   # 15 elements
m1.insert("seldom") # 16 elements

assert m1.is_full()

p1 = m1.findPathForLeaf("underneath") 
# m1.findPathForLeaf("above") # -> Error, no such leaf!

p2 = m1.pathForLeaf(7 , "underneath")
# m1.findPathForLeaf(8, "underneat") # -> Error, wrong index!

# Clear the tree
m1.reset_to_default()
assert m1.firstFree() == 0

## HistoricMerkleTree

In [791]:
m2 : MK_HistoricMerkleTree[str] 
m2 = MK_HistoricMerkleTree(4 , h) 

# Insert the same leafs as before 
xs = "Clear white is flying in my eyes, underneath the blue sky".split()
for x in xs: 
    m2.h_insert(x)

"""
But, because this is a *historic* merkle tree, all the roots are kept! 

This means that root checking ought to succeed for any of the past roots. 
"""
roots = list()
for i in range(0 , len(xs)): 
    sub = [ys for ys in xs[ : i+1]]
    roots.append(calc_root(sub))

for rt in roots: 
    assert m2.h_check_root(rt)

m2.h_insert_index("above" , 7)

# After inserting both the new and old root succeed when checking the root of the tree
assert m2.h_check_root(calc_root(xs))
assert m2.h_check_root(calc_root("Clear white is flying in my eyes, above the blue sky".split()))
