# The simplest blockchain

Says it all.

We start by finding a difficulty giving 1s block time. We don't care to hash super efficiently since we aren't securing a trillion dollars. Must loop $2^{22}$ times on MBP M1 to get a reasonable block time with one worker.

In [1]:
from hashlib import sha256
from array import array
from random import randint
import sys


def h(bts):
    m = sha256()
    m.update(bts)
    return m.digest()

def good(difficulty):
    l, s = divmod(difficulty, 8)
    ix = list(range(l))

    def inner(_h):
        i = 0
        for i in ix:
            if not _h[i] == 0:
                return False

        if s and _h[i + 1] >> (8 - s) != 0:
            return False
        return True
    
    return inner

# don't care about being efficient, we are alone :)
def find(bts, difficulty):
    nonce = randint(0, 2 << 8 * 4)
    a = array('Q')
    a.frombytes((nonce).to_bytes(8, sys.byteorder) + bts)  # space for nonce
    
    g = good(difficulty)
    _h = h(a)
    while not g(_h):
        a[0] += 1
        _h = h(a)
    
    return a[0]

difficulty = 21
bts = h(randint(0, 2 ** 20).to_bytes(8, sys.byteorder))

nonce = find(bts, difficulty)
h(nonce.to_bytes(8, sys.byteorder) + bts)

b'\x00\x00\x01\xb0\x82\xb8\\\xe2I\xc7\xb3m\xa8Xp\x86}9\xa6\xc9I\xa7\xf3\x82J\xed\xda\xb6\xbcr.\xef'

We now work bottom up to create a blockchain.

In [4]:
from __future__ import annotations

from dataclasses import dataclass
from typing import List
import struct


zero = b'\0' * 8

Addr  = bytes  # address on the network
THash = bytes  # transaction hash
BHash = bytes  # block hash

@dataclass
class Tx:
    utxo: THash
    to:   Addr
    ix:   int  # block identifier

    def __bytes__(self):
        return struct.pack('8s8sQ', self.utxo, self.to, self.ix)

    @classmethod
    def frombytes(cls, bts):
        return cls(*struct.unpack('8s8sQ', bts))

@dataclass
class Block:
    txs:   List[Tx]  # notion of order
    last:  BHash
    ix:    int       # index of block for consensus
    nonce: bytes     # len 8

    def __bytes__(self):
        return (
            self.nonce + self.last + self.ix.to_bytes(8, sys.byteorder)
            + b''.join(bytes(tx) for tx in self.txs)
        )

    @classmethod
    def frombytes(cls, bts):
        nonce, last = bts[:8], bts[8:16]
        ix = int.from_bytes(bts[16:24], sys.byteorder)
        m = memoryview(bts[24:])
        txs = [Tx.frombytes(m[o:o + 24]) for o in range(0, len(m), 24)]
        return cls(txs, last, ix, nonce)

    # for min-heap sorting
    def __le__(self, o: Block):
        return self.ix >= o.ix


t1 = Tx(b'12345678', b'12345678', 0)
t2 = Tx(b'12345678', b'12345678', 0)
b  = Block([t1, t2], zero, 7, zero)

Block.frombytes(bytes(b))

Block(txs=[Tx(utxo=b'12345678', to=b'12345678', ix=0), Tx(utxo=b'12345678', to=b'12345678', ix=0)], last=b'\x00\x00\x00\x00\x00\x00\x00\x00', ix=7, nonce=b'\x00\x00\x00\x00\x00\x00\x00\x00')

In [5]:
from dataclasses import field
from typing import Dict, Set


@dataclass
class Blockchain:
    blocks: List[Block]
    txs:    Set[THash]
    spent:  Set[THash]
    pool:   Dict[THash, Tx] = field(default_factory=dict)

    @property
    def head(self) -> Block:
        return self.blocks[-1]

    @classmethod
    def genesis(cls, qty: int) -> Blockchain:
        txs = list(
            Tx(utxo=zero, to=i.to_bytes(8, sys.byteorder), ix=0)
            for i in range(qty)
        )

        genesis = Block(txs, last=zero, ix=0, nonce=zero)
        genesis.nonce = find(bytes(genesis)[8:], difficulty).to_bytes(8, sys.byteorder)
        return cls([genesis], set(h(bytes(tx)) for tx in txs), set())

    def verify(self, b: Block) -> bool:
        return (
            b.ix == self.head.ix + 1                             # correct block ordering
            and good(difficulty)(h(bytes(b)))                    # respect difficulty
            and all(tx.utxo in self.pool for tx in b.txs)        # transactions in mempool
            and len(set(tx.utxo for tx in b.txs)) == len(b.txs)  # no double spend
            and all(tx.utxo not in self.spent for tx in b.txs)   # certainly no double spend
            and all(tx.utxo in self.txs for tx in b.txs)         # spend money which exists
            and all(tx.ix == b.ix for tx in b.txs)               # valid transaction data
            and b.last == h(bytes(self.head))                    # valid block hash
        )

    def tx(self, utxo: THash, to: Addr):
        self.pool[utxo] = to

    def mine(self) -> Block:
        # transaction data
        txs = [
            Tx(utxo=utxo, to=to, ix=self.head.ix + 1)
            for utxo, to in self.pool.items()
        ]

        # mine hash
        b = Block(txs, last=h(bytes(self.head)), ix=self.head.ix + 1, nonce=zero)
        bts = bytes(b)[8:]
        b.nonce = find(bts, difficulty).to_bytes(8, sys.byteorder)

        return b

    def add(self, b: Block):
        assert self.verify(b)
        self.blocks.append(b)

        # add txs to spent, remove from pool, add to unspent
        for tx in b.txs:
            _h = h(bytes(tx))
            self.spent.add(tx.utxo)
            del self.pool[tx.utxo]
            self.txs.add(_h)


bc = Blockchain.genesis(1)
bc.tx(utxo=h(bytes(bc.head.txs[0])), to=zero)
bc.add(bc.mine())

bc

Blockchain(blocks=[Block(txs=[Tx(utxo=b'\x00\x00\x00\x00\x00\x00\x00\x00', to=b'\x00\x00\x00\x00\x00\x00\x00\x00', ix=0)], last=b'\x00\x00\x00\x00\x00\x00\x00\x00', ix=0, nonce=b'RJf\xa1\x01\x00\x00\x00'), Block(txs=[Tx(utxo=b'\x9d\x90\x8e\xcf\xb6\xb2V\xde\xf8\xb4\x9a|PNl\x88\x9cK\x0eA\xfel\xe3\xe0\x18c\xdd{a\xa2\n\xa0', to=b'\x00\x00\x00\x00\x00\x00\x00\x00', ix=1)], last=b"\x00\x00\x01Qs\x11\xcb\x7f\xd6\x93w\x9b\x95qZ\x0fR1\xef\x0e\xa9\xa6g\xa0\x9e\n\xa0'\xbca\xd1J", ix=1, nonce=b'tF.\x99\x00\x00\x00\x00')], txs={b'\x9d\x90\x8e\xcf\xb6\xb2V\xde\xf8\xb4\x9a|PNl\x88\x9cK\x0eA\xfel\xe3\xe0\x18c\xdd{a\xa2\n\xa0', b'a\x01\x15\xca\x13\x10\x1cZ4!\xccaS-+\xa3\x84\xad\xea\xa4\xa0\xccVJ\xa3a\xbeV\xe5\xfa\x15c'}, spent={b'\x9d\x90\x8e\xcf\xb6\xb2V\xde\xf8\xb4\x9a|PNl\x88\x9cK\x0eA\xfel\xe3\xe0\x18c\xdd{a\xa2\n\xa0'}, pool={})

In [6]:
h(bytes(bc.head))

b'\x00\x00\x00\x88\x9c\xba\xf3\xa9\x10\x12\x8a\x05k\x90\x13\x7fh\xd4\xf9\x17\xa2\x91\xec\x15\xa9`\xcfZ\t\xc8\xc1k'

Now that we have a functioning blockchain, we can work on a toy network layer. It has two responsibilities: propagating transactions and blocks, and forming consensus (longest chain). All clients are full clients. All clients broadcast to all (known) clients.

In [12]:
import asyncio as aio


bc = Blockchain.genesis(1)
heads = {h(bytes(bc.head)): bc}

@dataclass
class Msg:
    tp:  int
    bts: bytes

    def __bytes__(self):
        return (
            (len(self.bts) + 16).to_bytes(8, sys.byteorder)
            + self.tp.to_bytes(8, sys.byteorder)
            + self.bts
        )
    
    @classmethod
    def frombytes(cls, bts: bytes):
        l, tp = struct.unpack('QQ', bts[:16])
        return cls(tp, bts[16:l])

@dataclass
class Protocol:
    def connection_made(self, transport):
        self.t = transport

    def datagram_received(self, data, addr):
        msg = Msg.frombytes(data)

        if msg.tp == 0:    # tx
            tx = Tx.frombytes(msg.bts)
            bc.tx(tx.utxo, tx.to)

        elif msg.tp == 1:  # block
            b = Block.frombytes(msg.bts)

            for head, bc in heads.items():
                if b.last == head:
                    if bc.verify(b):
                        bc.add(b)
                    else:
                        ...

                    break
            else:  # head not found
                ...

        elif msg.tp == 2:  # request chain
            ix = struct.unpack('Q', msg.bts)
            bc = max(heads, key=lambda _h: len(heads[_h].blocks))

            # send all blocks of longest blockchain
            for b in bc.blocks:
                self.sendblock(b)
        
        else:
            ...
    
    def connection_lost(self, exc):
        ...

    def sendblock(self, b: Block):
        self.t.sendto(Msg(1, bytes(b)))
    
    def getchain(self):
        self.t.sendto(Msg(2, b''))


b  = Block([], zero, 7, zero)

m = Msg(1, bytes(b))
Block.frombytes(Msg.frombytes(bytes(m)).bts)

Block(txs=[], last=b'\x00\x00\x00\x00\x00\x00\x00\x00', ix=7, nonce=b'\x00\x00\x00\x00\x00\x00\x00\x00')

In [8]:
async def main(*peers):
    ts = []
    loop = aio.get_running_loop()

    if not peers:
        t, protocol = await loop.create_datagram_endpoint(
            lambda: Protocol(),
            local_addr=('127.0.0.1', 9999))
        
        ts.append(t)

    else:
        for addr, port in peers:
            t, protocol = await loop.create_connection(
                lambda: Protocol(),
                addr, port)
            
            ts.append(t)
        
        print('get chain')
        aio.create_task(protocol.getchain())

    try:
        while True:
            await aio.sleep(3600)
    finally:
        for t in ts:
            t.close()

In [None]:
aio.create_task(main())
aio.create_task(main(('127.0.0.1', 9999)))

In [None]:
aio.all_tasks(aio.get_running_loop())