In [1]:
from __future__ import annotations

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives import hashes, serialization
from datetime import datetime

import json
import numpy

In [2]:
def int_to_bytes(n: int) -> bytes:
    return n.to_bytes((n.bit_length() + 7) // 8, byteorder='big')

In [3]:
class User:

    def __init__(self, public_key: rsa.RSAPublicKey) -> None:
        self.public_key = public_key
        self.modulus = public_key.public_numbers().n
        self.address = numpy.base_repr(self._hash_modulus(), 36)

    def __str__(self) -> str:
        return json.dumps(self, cls=BlockchainEncoder)

    def __repr__(self) -> str:
        return str(self)

    def __eq__(self, other: User) -> bool:
        return self.modulus == other.modulus

    def __hash__(self) -> int:
        return hash(self.modulus)

    def _hash_modulus(self) -> int:
        digest = hashes.Hash(hashes.BLAKE2s(32))
        digest.update(int_to_bytes(self.modulus))
        return int.from_bytes(digest.finalize(), byteorder='big')

In [4]:
class Log:

    def __init__(self, key_path: str, password: str,
                 sender: User, receiver: User, amount: float,
                 item: str | None = None) -> None:
        self.sender = sender
        self.receiver = receiver
        self.amount = amount
        self.item = item
        self.signature = self.sign(key_path, password)
        self.timestamp = str(datetime.now())

    def __str__(self) -> str:
        return json.dumps(self, cls=BlockchainEncoder)

    def __bytes__(self) -> bytes:
        return str(self).encode()

    def __repr__(self) -> str:
        return str(self)

    def sign(self, key_path: str, password: str) -> int:
        with open(key_path, 'rb') as file:
            private_key = serialization.load_pem_private_key(
                file.read(),
                password=password.encode()
            )
            signature = private_key.sign(
                str(self.amount).encode(),
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return int.from_bytes(signature, byteorder='big')
            #return numpy.base_repr(signature, 36)

    def is_valid(self) -> bool:
        try:
            self.sender.public_key.verify(
                int_to_bytes(self.signature),
                str(self.amount).encode(),
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True

        except InvalidSignature:
            return False

In [5]:
class Block:

    def __init__(self, logs: list[Log], prev: Block | None = None) -> None:
        self.logs = logs
        if prev:
            self.prev_hash, self.nonce = prev.find_hash()

        else:
            self.prev_hash, self.nonce = None, None

        self.timestamp = str(datetime.now())

    def __str__(self) -> str:
        return json.dumps(self, cls=BlockchainEncoder)

    def __bytes__(self) -> bytes:
        return str(self).encode()

    def __repr__(self) -> str:
        return str(self)

    def find_hash(self) -> tuple[int, int]:
        nonce = 0
        digest = self._sha256(nonce)
        while digest % 1_000_000 != 0:
            nonce += 1
            digest = self._sha256(nonce)

        return digest, nonce

    def _sha256(self, nonce: int) -> int:
        digest = hashes.Hash(hashes.SHA256())
        digest.update(bytes(self) + int_to_bytes(nonce))
        return int.from_bytes(digest.finalize(), byteorder='big')

    def is_genesis(self) -> bool:
        return not any(self.nonce, self.prev_hash, self.logs)

In [6]:
class Blockchain:

    def __init__(self) -> None:
        self.blocks = [Block([])]
        self.pending_logs = []
        self.users = {}

    def __len__(self) -> int:
        return len(self.blocks)

    def __str__(self) -> str:
        return json.dumps(self, cls=BlockchainEncoder, indent=4)

    def __repr__(self) -> str:
        return str(self)

    def new_block(self) -> None:
        block = Block(self.pending_logs, self.last_block)
        self.blocks.append(block)
        self.pending_logs = []

    def new_user(
        self,
        password: str,
        private_path: str = 'private.pem',
        public_path: str = 'public.pem',
    ) -> None:
        private_key, public_key = self._new_keys()
        user = User(public_key)
        while user.address in self.users:
            private_key, public_key = self._new_keys()
            user = User(public_key)

        self.users[user.address] = user
        self._cache_private_key(private_key, password, private_path)
        self._cache_public_key(public_key, public_path)
        print(f'New wallet created with password "{password}".')
        print(f'Keys stored in "{private_path}" and "{public_path}".')
        print('Do not share your password or private key with anyone!')
        self.new_log(private_path, password, user.address, user.address, 0.0)

    def _new_keys(self) -> tuple[rsa.RSAPrivateKey, rsa.RSAPublicKey]:
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
        )
        public_key = private_key.public_key()
        return private_key, public_key

    def _cache_private_key(
        self,
        private_key: rsa.RSAPrivateKey,
        password: str,
        path: str,
    ) -> None:
        algorithm = serialization.BestAvailableEncryption(password.encode())
        private_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=algorithm
        )
        with open(path, 'wb') as file:
            file.write(private_pem)

    def _cache_public_key(
        self,
        public_key: rsa.RSAPublicKey,
        path: str,
    ) -> None:
        public_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        with open(path, 'wb') as file:
            file.write(public_pem)

    def new_log(
        self,
        key_path: str,
        password: str,
        sender_addr: str,
        receiver_addr: str,
        amount: float,
        item: str | None = None,
    ) -> None:
        sender = self.users[sender_addr]
        receiver = self.users[receiver_addr]
        log = Log(key_path, password, sender, receiver, amount, item)
        self.pending_logs.append(log)

    def is_valid(self) -> bool:
        if len(self) == 1:
            return self.last_block.is_genesis()

        prev_hash, _ = self.penultimate_block.find_hash()
        return prev_hash == self.last_block.prev_hash

    @property
    def penultimate_block(self) -> Block | None:
        if len(self) > 1:
            return self.blocks[-2]

        return None

    @property
    def last_block(self) -> Block:
        return self.blocks[-1]

In [7]:
class BlockchainEncoder(json.JSONEncoder):

    def default(self, obj: Any) -> dict[str, Any]:
        if isinstance(obj, User):
            return {'address': obj.address}

        elif isinstance(obj, Log):
            return {
                'timestamp': obj.timestamp,
                'sender': obj.sender,
                'receiver': obj.receiver,
                'amount': obj.amount,
                'item': obj.item,
                'signature': obj.signature
            }

        elif isinstance(obj, Block):
            return {
                'timestamp': obj.timestamp,
                'nonce': obj.nonce,
                'prevHash': obj.prev_hash,
                'logs': obj.logs
            }

        elif isinstance(obj, Blockchain):
            return {
                'length': len(obj),
                'pendingLogs': obj.pending_logs,
                'blocks': obj.blocks
            }

        return json.JSONEncoder.default(self, obj)

In [8]:
b = Blockchain()

In [10]:
b.new_user(password='banana')

New wallet created with password "banana".
Keys stored in "private.pem" and "public.pem".
Do not share your password or private key with anyone!


In [11]:
b.new_user('orange', 'private2.pem', 'public2.pem')

New wallet created with password "orange".
Keys stored in "private2.pem" and "public2.pem".
Do not share your password or private key with anyone!


In [24]:
b.new_log('private.pem', 'banana', other_addr, my_addr, 0.0, 'QmNYRVXCmyyQX1CRdFBDVJgcRCkzQSwqsQytNhcP58jqzx')