In [1]:
############## PLEASE RUN THIS CELL FIRST! ###################

# import everything and define a test runner function
from importlib import reload
from helper import run
import network

from block import GENESIS_BLOCK

from helper import calculate_new_bits

from network import (
    GetHeadersMessage,
    NetworkEnvelope,
    SimpleNode,
    VersionMessage,
    VerAckMessage,
    NETWORK_MAGIC,
    TESTNET_NETWORK_MAGIC,
)

methods = []

### Exercise 1

Determine what this network message is:

`f9beb4d976657261636b000000000000000000005df6e0e2`

In [2]:
# Exercise 1

message_hex = 'f9beb4d976657261636b000000000000000000005df6e0e2'
def parse(cls, s, testnet=False):
    magic = s.read(4)
    if magic == b'':
        raise IOError('Connection reset!')
    if testnet:
        expected_magic = TESTNET_NETWORK_MAGIC
    else:
        expected_magic = NETWORK_MAGIC
    if magic != expected_magic:
        raise SyntaxError('magic is not right {} vs {}'.format(magic.hex(), 
          expected_magic.hex()))
    command = s.read(12)
    command = command.strip(b'\x00')
    payload_length = little_endian_to_int(s.read(4))
    checksum = s.read(4)
    payload = s.read(payload_length)
    calculated_checksum = hash256(payload)[:4]
    if calculated_checksum != checksum:
        raise IOError('checksum does not match')
    return cls(command, payload, testnet=testnet)

### Exercise 2

Write the `parse` method for `NetworkEnvelope`.

#### Make [this test](/edit/code-ch10/network.py) pass: `network.py:NetworkEnvelopeTest:test_parse`

In [3]:
# Exercise 2

from network import NetworkEnvelope
from io import BytesIO
message_hex = 'f9beb4d976657261636b000000000000000000005df6e0e2'
stream = BytesIO(bytes.fromhex(message_hex))
envelope = NetworkEnvelope.parse(stream)
print(envelope.command)

NotImplementedError: 

### Exercise 3

Write the `serialize` method for `NetworkEnvelope`.

#### Make [this test](/edit/code-ch10/network.py) pass: `network.py:NetworkEnvelopeTest:test_serialize`

In [None]:
# Exercise 3

def serialize(self):
    result = self.magic
    result += self.command + b'\x00' * (12 - len(self.command))
    result += int_to_little_endian(len(self.payload), 4)
    result += hash256(self.payload)[:4]
    result += self.payload
    return result


methods.append(serialize)


### Exercise 4

Write the `serialize` method for `VersionMessage`.

#### Make [this test](/edit/code-ch10/network.py) pass: `network.py:VersionMessageTest:test_serialize`

In [None]:
# Exercise 4
def serialize(self):
    result = int_to_little_endian(self.version, 4)
    result += int_to_little_endian(self.services, 8)
    result += int_to_little_endian(self.timestamp, 8)
    result += int_to_little_endian(self.receiver_services, 8)
    result += b'\x00' * 10 + b'\xff\xff' + self.receiver_ip
    result += self.receiver_port.to_bytes(2, 'big')
    result += int_to_little_endian(self.sender_services, 8)
    result += b'\x00' * 10 + b'\xff\xff' + self.sender_ip
    result += self.sender_port.to_bytes(2, 'big')
    result += self.nonce
    result += encode_varint(len(self.user_agent))
    result += self.user_agent
    result += int_to_little_endian(self.latest_block, 4)
    if self.relay:
        result += b'\x01'
    else:
        result += b'\x00'
    return result



methods.append(serialize)

### Exercise 5

Write the `handshake` method for `SimpleNode`

#### Make [this test](/edit/code-ch10/network.py) pass: `network.py:SimpleNodeTest:test_handshake`

In [None]:
# Exercise 5


def handshake(self):
    version = VersionMessage()
    self.send(version)
    self.wait_for(VerAckMessage)

### Exercise 6

Write the `serialize` method for `GetHeadersMessage`.

#### Make [this test](/edit/code-ch10/network.py) pass: `network.py:GetHeadersMessageTest:test_serialize`

In [None]:
# Exercise 6
def serialize(self):
    result = int_to_little_endian(self.version, 4)
    result += encode_varint(self.num_hashes)
    result += self.start_block[::-1]
    result += self.end_block[::-1]
    return result


methods.append(serialize)



In [None]:
from io import BytesIO
from network import SimpleNode, GetHeadersMessage, HeadersMessage
from block import Block, GENESIS_BLOCK, LOWEST_BITS
from helper import calculate_new_bits
previous = Block.parse(BytesIO(GENESIS_BLOCK))
first_epoch_timestamp = previous.timestamp
expected_bits = LOWEST_BITS
count = 1
node = SimpleNode('mainnet.programmingbitcoin.com', testnet=False)
node.handshake()
for _ in range(19):
    getheaders = GetHeadersMessage(start_block=previous.hash())
    node.send(getheaders)
    headers = node.wait_for(HeadersMessage)
    for header in headers.blocks:
        if not header.check_pow():
            raise RuntimeError('bad PoW at block {}'.format(count))
        if header.prev_block != previous.hash():
            raise RuntimeError('discontinuous block at {}'.format(count))
        if count % 2016 == 0:
            time_diff = previous.timestamp - first_epoch_timestamp
            expected_bits = calculate_new_bits(previous.bits, time_diff)
            print(expected_bits.hex())
            first_epoch_timestamp = header.timestamp
        if header.bits != expected_bits:
            raise RuntimeError('bad bits at block {}'.format(count))
        previous = header
        count += 1
        

In [None]:

class ChapterTest(TestCase):

    def test_apply(self):
        NetworkEnvelope.parse = parse
        NetworkEnvelope.serialize = methods[0]
        VersionMessage.serialize = methods[1]
        SimpleNode.handshake = handshake
        GetHeadersMessage.serialize = methods[2]