In [1]:
import hashlib
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import load_der_public_key
from cryptography.hazmat.primitives.asymmetric import utils
from cryptography.hazmat.primitives.asymmetric.utils import Prehashed



class Transaction:
    """
    Constructor for the transaction class.
    """
    def __init__(self, sender_hash: bytes, recipient_hash: bytes, sender_public_key: bytes, amount: int, 
                 fee: int, nonce: int, signature: bytes, txid: bytes):
        self.sender_hash = sender_hash
        self.recipient_hash = recipient_hash
        self.sender_public_key = sender_public_key
        self.amount = amount
        self.fee = fee
        self.nonce = nonce
        self.signature = signature
        self.txid = txid

        
    # Verification method part of the Transaction class    
    def verify(self, sender_balance, sender_previous_nonce):
        """
        A method to check the validity of the transaction.
        """
        
        assert len(self.sender_hash) == 20, \
            f"Sender hash is {len(self.sender_hash)} bytes, whereas is should be 20 bytes long"
        
        assert len(self.recipient_hash) == 20, \
            f"Recipient hash is {len(self.recipient_hash)} bytes, whereas is should be 20 bytes long"
        
        assert sha1_hash(self.sender_public_key) == self.sender_hash, \
            f"The sender hash must be a SHA-1 hash of the sender public key"
        
        assert (self.amount % 2 == 0) or ((self.amount + 1) % 2) == 0, \
            f"Only integers allowed for transaction, {self.amount} has been entered"
        
        assert self.amount > 0, \
            f"Transaction cannot be less than 1, {self.amount} has been entered"
        
        assert self.amount <= sender_balance, \
            f"Sender's balance [{sender_balance}] not enough to cover the transaction ({self.amount})"
        
        assert (self.fee % 2 == 0) or ((self.fee + 1) % 2) == 0, \
            f"Only integers allowed for fee, {self.fee} has been entered"
        
        assert self.fee >= 0, \
            f"Transaction fee cannot be less than 0, {self.fee} has been entered"        
        
        assert self.fee <= self.amount, \
            f"Fee ({self.fee}) cannot exceed the transaction quantity of ({self.amount})"
        
        assert self.nonce == sender_previous_nonce + 1, \
            f"Sender's nonce of {sender_previous_nonce + 1} does not match expected ({self.nonce})"
        
        assert txid_sha256(self.sender_hash, self.recipient_hash, self.sender_public_key, self.amount, 
                           self.fee, self.nonce, self.signature) == self.txid, \
            f"TXID hash is incorrect"
        
        assert signature_verification(self.recipient_hash, self.amount, self.fee, self.nonce, self.signature, 
                                      self.sender_public_key) is None, \
            f"Signature could not be verified"

        
        
        
# Calculation of Transaction ID        
def txid_sha256(sender_hash, recipient_hash, sender_public_key, amount, fee, nonce, signature):
    """
    SHA256 hash used to create the transaction id (txid).
    """
    
    # Instantiating SHA256 hash
    digest = hashes.Hash(hashes.SHA256())
    
    # Passing the inputs into the hash consecutively.
    digest.update(sender_hash)
    digest.update(recipient_hash)
    digest.update(sender_public_key)

    # Converting the integers to bytes using little endian representation before passing to SHA256 
    digest.update((amount).to_bytes(8, byteorder='little'))
    digest.update((fee).to_bytes(8, byteorder='little'))
    digest.update((nonce).to_bytes(8, byteorder='little'))
    digest.update(signature)

    # Finalise and return the hashed txid
    return digest.finalize()
        
    
    
    
# Verification of the Signature
def signature_verification(recipient_hash, amount, fee, nonce, signature, sender_public_key):
    """
    The method verifies the signature given a number of inputs.
    
    """
    # Instantiating SHA256 hash  
    digest = hashes.Hash(hashes.SHA256())

    # Passing the inputs into the hash consecutively.    
    digest.update(recipient_hash)
    
    # Converting the integers to bytes using little endian representation before passing to SHA256 
    digest.update((amount).to_bytes(8, byteorder='little'))
    digest.update((fee).to_bytes(8, byteorder='little'))
    digest.update((nonce).to_bytes(8, byteorder='little'))
    
    # Finalise the hashing method
    data = digest.finalize()

    # Load the senders public key
    public_key = load_der_public_key(sender_public_key)

    # Verify the signature, returning a None type variable if passed and error if failed
    return public_key.verify(signature, data, ec.ECDSA(utils.Prehashed(hashes.SHA256())))





def create_signed_transaction(sender_private_key, recipient_hash, amount, fee, nonce):
    """
    Create a new signed transaction which can then be verified.
    """
    
    # Generate a public key given the sender private key, DER encoded
    sender_public_key = sender_private_key.public_key().public_bytes(encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)
    
    # Create the sender hash, derived from the SHA-1 of the sender public key
    sender_hash = sha1_hash(sender_public_key)

    # Check recipients hash is in appropriate format
    recipient_hash = sha1_hash_check(recipient_hash)

    # Create the signature using SHA256
    sig_digest = hashes.Hash(hashes.SHA256())
    
    # Hash the appropriate data consecutively
    sig_digest.update(recipient_hash)
    sig_digest.update((amount).to_bytes(8, byteorder='little'))
    sig_digest.update((fee).to_bytes(8, byteorder='little'))
    sig_digest.update((nonce).to_bytes(8, byteorder='little'))
    
    # Data has been hashed, finalise the hash
    hased_data = sig_digest.finalize()
    
    # Sign the hashed data using the sender private key
    signature = sender_private_key.sign(hased_data, ec.ECDSA(utils.Prehashed(hashes.SHA256())))

    # Create the txid using SHA256
    txid_digest = hashes.Hash(hashes.SHA256())
    
    # Hash the appropriate data consecutively
    txid_digest.update(sender_hash)
    txid_digest.update(recipient_hash)
    txid_digest.update(sender_public_key)
    txid_digest.update((amount).to_bytes(8, byteorder='little'))
    txid_digest.update((fee).to_bytes(8, byteorder='little'))
    txid_digest.update((nonce).to_bytes(8, byteorder='little'))
    txid_digest.update(signature)
    
    # Data has been hashed, finalise the txid hash  
    txid = txid_digest.finalize()

    # Return the new, signed transation with appropriate variables
    return Transaction(sender_hash, recipient_hash, sender_public_key, amount, fee, nonce, signature, txid)




# Method to SHA-1 hash a key comprised of bytes
def sha1_hash(key: bytes):
    
    # Initialise the SHA-1 hash
    digest = hashes.Hash(hashes.SHA1())
    
    # Hash the key
    digest.update(key)
    
    # Finalise and return the hashed key
    return digest.finalize()




# Helper method to check whether key is hashed and to hash if required
def sha1_hash_check(key):
    
    # If key is not of type bytes
    if type(key) is not bytes:
        
        # Attempt to transform into bytes if key is a string
        try:
            key = bytes(key, 'utf-8')
            
        # Transform into bytes if key is integer     
        except: 
            key = key.to_bytes(8, byteorder='little')
        
        # Initialise SHA-1 hash
        digest = hashes.Hash(hashes.SHA1())
        
        # Hash the key
        digest.update(key)
        
        # Finalise and return the key
        return digest.finalize()
    
    # If key is of type bytes
    else:
        return key    

In [2]:
from cryptography.hazmat.primitives import hashes

class UserState:
    """
    The user state class, holds information regarding balance and nonce for each user. 
    """
    def __init__(self, balance, nonce):
        self.balance = balance
        self.nonce = nonce


class Block:
    """
    The block constructor class.
    """
    def __init__(self, previous, height, miner, transactions, timestamp, difficulty, block_id=None, nonce=None):
        self.previous = previous
        self.height = height
        self.miner = miner
        self.transactions = transactions
        self.timestamp = timestamp
        self.difficulty = difficulty
        self.block_id = block_id
        self.nonce = nonce
        
        
        
    def verify_and_get_changes(self, difficulty, previous_user_states):
        """
        Verify input data is correct and amend the user state as per each transaction.
        """

        assert self.difficulty == difficulty, \
            f"Incorrect difficulty"
        
        assert blockid_calc(self.previous, self.miner, self.transactions, self.timestamp, 
                            self.difficulty, self.nonce) == self.block_id, \
            f"Block id does not match expected."
        
        assert len(self.transactions) <= 25, \
            f"{len(self.transactions)} have been entered, maximum allowed is 25."
        
        assert len(self.miner) == 20, \
            f"Miner hash length incorrect, {len(self.miner)} bytes have been entered, 20 bytes expected."
        
        assert int(self.block_id.hex(), 16) <= (2 ** 256 // self.difficulty), \
            f"Invalid proof of work."

        
        
        """
        In this section the user state will be amended as per the transaction/s.
        This is accomplished by verifying the first transaction in the list, 
        then creating a user state for the recipient if currently not in the user state.
        The sender and recipients balance, nonce, fees and mining reward are updated accordingly. 
        If the user state is empty the miner will be created and the reward added to their balance.
        """
        
        # Taking a copy of the previous user state
        new_state = previous_user_states.copy()

        # Iterating through each transaction in the list
        for index,transaction in enumerate(self.transactions):
            
            # Verifying the transaction
            transaction.verify(new_state[transaction.sender_hash].balance, \
                               new_state[transaction.sender_hash].nonce)

            # Add recipient to the user state if missing
            if transaction.recipient_hash not in new_state.keys():
                new_state[transaction.recipient_hash] = UserState(0, -1)

            # Update balance and nonce of the sender
            if transaction.sender_hash in new_state.keys():
                new_state[transaction.sender_hash].balance -= transaction.amount
                new_state[transaction.sender_hash].nonce += 1

            # Update the balance of the recipient
            if transaction.recipient_hash in new_state.keys():
                new_state[transaction.recipient_hash].balance += (transaction.amount - transaction.fee)

            # Update miners balance with the fee
            if self.miner in new_state.keys():
                new_state[self.miner].balance += transaction.fee

        # Update the miners balance with the reward
        if self.miner in new_state.keys():
            new_state[self.miner].balance += 10_000
        else:
            new_state[self.miner] = UserState(10_000, -1)

        return new_state
    
    
    def get_changes_for_undo(self, user_states_after):
        """
        Reverse the changes actioned by the verify_and_get_changes() method.
        The balance, miner reward, fees and nonces are revered to the states 
        before the block changes were applied.
        
        ...
        Attributes
        ----------
        user_states_after : User state class
        
        Returns the original user state class before the block changes were applied.
        """
        
        # Take a copy of the user state input.
        old_state = user_states_after.copy()
        
        # Iterate over the transactions.
        for index,transaction in enumerate(self.transactions):
            
            # Add the amount sent back to the sender and reverse the nonce by one.
            old_state[transaction.sender_hash].balance += transaction.amount
            old_state[transaction.sender_hash].nonce -= 1
            
            # Deduct the transaction amount and fee from the recipient.
            old_state[transaction.recipient_hash].balance -= (transaction.amount - transaction.fee)
            
            # Deduct the miners balance by the fee.
            old_state[self.miner].balance -= transaction.fee
        
        # Deduct the miners balance by the mining reward.
        old_state[self.miner].balance -= 10_000

        return old_state
 
        
    
def blockid_calc(previous, miner, transaction_ids, timestamp, difficulty, nonce):
    """
    Calculate the block_id for verification.
    """
    # Initialise the SHA-256 and hash the previous and the miner inputs
    digest = hashes.Hash(hashes.SHA256())
    digest.update(previous)
    digest.update(miner)
    
    # Iterate through the transaction list & hash each transaction id consecutively
    for t_id in transaction_ids:
        digest.update(t_id.txid)
    
    # Hash the timestamp, difficulty and nonce inputs
    digest.update((timestamp).to_bytes(8, byteorder='little'))
    digest.update((difficulty).to_bytes(16, byteorder='little'))    
    digest.update((nonce).to_bytes(8, byteorder='little'))   
    
    # Finalise and return the complete hash
    return digest.finalize()



def mine_block(previous, height, miner, transactions, timestamp, difficulty, cutoff_time):
    """
    Provide the proof of work, determined by the nonce of the block_id.
    The previous, height, miner, transactions, timestamp & difficulty of the block are hashed
    with the nonce updated consecutively until one that matches the difficulty is found.
    """
    
    # Set the target as determined by the difficulty
    target = 2 ** 256 // difficulty

    # Set the maximum nonce through which to iterate
    max_nonce = 2**64 - 1

    # Initialise the SHA-256 hash and hash the previous and the miner inputs
    digest = hashes.Hash(hashes.SHA256())
    digest.update(previous)
    digest.update(miner)

    # Iterate through the transaction list & hash each transaction id consecutively    
    for t_id in transactions:
        digest.update(t_id.txid)

    # Hash the timestamp & difficulty inputs        
    digest.update((timestamp).to_bytes(8, byteorder='little'))
    digest.update((difficulty).to_bytes(16, byteorder='little'))
    
    start = time()
    
    while time() < cutoff_time:

        # Iterate through each nonce up to the maximum
        for nonce in range(max_nonce):

            # Take a copy of the previous hash
            hash_copy = digest.copy()

            # Update the hash using the nonce
            hash_copy.update((nonce).to_bytes(8, byteorder='little'))

            # Finalize the hash
            block_id = hash_copy.finalize()

            # Transform the hash into a big endian integer
            block_int = int(block_id.hex(), 16)

            # Check if integer below or equal the target
            if block_int <= target:

                # Exit the loop if appropriate integer found
                break
        
        # Exist the time loop
        break

    return Block(previous, height, miner, transactions, timestamp, difficulty, block_id, nonce)

In [3]:
# Library required to perform a deep copy.
import copy

class BlockchainState:
    """
    Keeps track of the blockchain state maintaining the longest chain, user states 
    and difficulty of the chain.
    
    ...
    Methods
    -------
    calculate_difficulty(self):
        calculates difficulty of the blockchain given the chain length and block timestamp.
        
    verify_and_apply_block(self, block):
        Verifies the block before applying the user state changes and adding it to the end of the longest chain.
        
    undo_last_block(self):
        Reverses the changes performed by the verify_and_apply_block method.
    """
    
    
    def __init__(self, longest_chain: list, user_states: dict, total_difficulty: int):
        """
        Keeps track of the blockchain state. 

        ...
        Attributes
        ----------
        longest_chain : list
            Contains the blocks in the longest chain.
        user_states : dictionary
            Contains the details of the user states.
        total_difficulty : int
            The total difficulty of the blockchain.
        """
            
        self.longest_chain = longest_chain
        self.user_states = user_states
        self.total_difficulty = total_difficulty

        
    def calculate_difficulty(self):
        """
        Calculate the mining rate in order to adjust the difficulty.
        Mining rate must be maintained at circa one block every 120 seconds. 
        
        ...
        Attributes
        ----------
        self : self
            
        Returns the average time in between blocks.
        """
        
        # If length of chain is less than or equal to 10, default to difficulty of 1000.
        if len(self.longest_chain) <= 10:
            return 1000
        
        # If longer than 10, calculate the average time per block.
        else:
            
            # Represents 10 blocks.
            k=10
            
            # Sum the difficulty of thew last 10 blocks in the chain.
            total_difficulty_for_period = sum([(i.difficulty) for i in self.longest_chain[-k:]])
            
            # Subtracting timestamp of previous block from block which is 11 blocks away from the end of the chain
            total_time_for_period = self.longest_chain[-1].timestamp - self.longest_chain[-1-k].timestamp
            
            # If timestamp for period is 0 default to 1 (in order to pass test_difficulty_with_zero_time)
            if total_time_for_period == 0:
                total_time_for_period = 1
            
            # Average time to mine a block.
            return (total_difficulty_for_period // total_time_for_period) * 120
        
        
    def verify_and_apply_block(self, block):
        """
        Verifies the block before adding it to the end of the longest chain 
        and updates the user state for all parties involved.
        
        ...
        Attributes
        ----------
        block : Block class
            A constructed block.
            
        Returns the updated Blockchain state class.
        """
        
        # Verify that the height of the block matches the existing longest chain.
        assert block.height == len(self.longest_chain), \
            f"The height of the block [{block.height}] does not equal the lenght of the longest chain [{len(self.longest_chain)}]"
        
        # Verify previous hash of the block.
        # If no chain yet exists, the previous hash of the block must be 0*32.
        if len(self.longest_chain) == 0:
            assert block.previous == bytes([0] * 32), \
                f"previous block id"
            
        # If length of chain is greater than 1.
        else:
            
            # Verify that block previous hash matches the hash of the last block in the longest chain.
            assert block.previous == self.longest_chain[-1].block_id, \
                f"previous block id"

        # If block exists in chain verify that the previous block matches the timestamp of the last block in the chain.
        if len(self.longest_chain) != 0:
            assert block.timestamp >= self.longest_chain[-1].timestamp, \
                f"Block time stamp [{block.timestamp}] must be at least that of the most recent block in the chain [{self.longest_chain[-1].timestamp}]"
        
        # Update the user state with the block information.
        new_state = block.verify_and_get_changes(self.calculate_difficulty(), self.user_states)
        
        # Verify if this information is valid.
        assert new_state, \
            f"Block did not pass verification"
        

        # Once all checks pass, amend the Blockchain state with the new information.
        # Add the block to the end of the longest chain.
        self.longest_chain.append(block)
        
        # Update the total difficulty of the blockchain state.
        self.total_difficulty += block.difficulty
        
        # Update the blockchain user states.
        self.user_states.update(new_state)
        
        # Return the new user state.
        return BlockchainState(self.longest_chain, self.user_states, self.total_difficulty)
    
    
    def undo_last_block(self):
        """
        Reverses the changes performed to the Blockchain state class by the verify_and_apply_block function.
        Reverts the blockchain state by reducing the total difficulty, reverting the user states 
        and deleting the block from the longest chain.
        
        ...
        Attributes
        ----------
        self : self
            
        Returns : None
        """
        
        # Reduce the total blockchain state difficulty by the block difficulty.
        self.total_difficulty -= self.longest_chain[-1].difficulty
        
        # Undo the user state changes performed by the block.
        self.longest_chain[-1].get_changes_for_undo(self.user_states)
        
        # Delete the last chain in the block.
        del self.longest_chain[-1]
        
        
def verify_reorg(old_state, new_branch):
    """
    Verifies whether the new longest chain is valid, accepting it as the longest chain if true and 
    reverting to the original longest chain if false.
    
    ...
    Attributes
    ----------
    old_state : Blockchain state class
        The existing state of the blockchain, corresponding to the previous longest chain
    new_branch : list of Blocks
        Represents the new longest chain to be verified
    
    Returns the new reorganised blockchain state comprised of the new longest chain if verification passes, 
    else returns the old blockchain state.
    """

    # Create a deep copy of the old state which will be returned if verification fails.
    new_state = BlockchainState(copy.deepcopy(old_state.longest_chain), copy.deepcopy(old_state.user_states.copy()), \
                                copy.deepcopy(old_state.total_difficulty))
    
    # Calculate the height where the new longest chain is to be appeneded.
    goal_height = len(old_state.longest_chain) - new_branch[0].height
    
    # Undo all blocks in the old state up to the old height.
    for _ in range(0, goal_height):
        new_state.undo_last_block()
    
    # Verify and apply all blocks in the new branch block list.
    for block in new_branch:
        new_state.verify_and_apply_block(block)
        
    # Verify that the new total block difficulty is greater than the old state.
    if new_state.total_difficulty > old_state.total_difficulty:
        return new_state
    
    # If verification fails return the old state and raise exception.
    else:
        old_state
        raise Exception("total difficulty")