In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import logging
import socket
from crawler import *
from lib import *
from base64 import b32encode
import sqlite3
from report import report

In [None]:
# A little helper code ... please ignore!

_addresses = fetch_addresses()

def next_address():
    return _addresses.pop()

# Improving our `listener` from last time

Where we left off last time

In [None]:
from lib import *

def listener(address):
    # Establish connection
    sock = handshake(address)
    stream = sock.makefile("rb")
    
    # Print every gossip message we receive
    while True:
        print(read_msg(stream)['command'])

In [None]:
address = ['208.86.162.216', 8333]

listener(address)

Press the &#9632; button to kill the cell above.

This just connects to a node and reads off comands forever. It's nothing special, but this is one of the most important things a Bitcoin node does!

While we won't implement a full Bitcoin node -- that's a massive project -- I'd like to write a few more programs that accomplish some of the core tasks of a bitcoin full or light node.

I think the obvious first one is initial block download. But that a lot to chew. I suggest we write a crawler first. Here's how it might work:

* Initialize an `addresses` list
* Remove one address from `addresses` and call `handshake(address)` to connect to a peer
* Send a [`getaddr` message](https://en.bitcoin.it/wiki/Protocol_documentation#getaddr) to our peer requesting a list of their peers. They should respond with an `addr` message.
* Enter a `while` loop which one bitcoin message from our peer every iteration, just like in `listener`. But instead of printing them out, let's:
    * Wait until we receive an `addr` message (`msg['command'] == b'addr'`)
    * When we do, call `read_addr_payload` on its payload to deserialize this list of peer addresses
    * Add each of these addresses to out `addresses` list, and start all over

In this way we could theoretically visit every node in the network. Let's try to implement this.
 

# Naive Crawler

First, we need to study the Bitcoin wiki to learn how to send a [`getaddr` message](https://en.bitcoin.it/wiki/Protocol_documentation#getaddr.

Luckily for us, `getaddr` is one of those messages which doesn't require a payload. Therefore, we can produce on like this:

In [None]:
serialize_msg(command=b"getaddr", payload=b"")

And we can send it like so:

In [None]:
# make a socket
sock = handshake(address)

msg = serialize_msg(command=b"getaddr", payload=b"")
sock.sendall(msg)
print('"getaddr" sent!')

Very easy! Let's copy the body of `listener`, rename it to `crawler`, and add this code right before the loop.

In [None]:
def crawler(address):
    # Establish connection
    sock = handshake(address)
    stream = sock.makefile("rb")
    
    # Request list of their peers
    msg = serialize_msg(command=b"getaddr", payload=b"")
    sock.sendall(msg)
    
    # TODO: Wait for `addr` response
    while True:
        print(read_msg(stream)['command'])

In [None]:
crawler(address)

Next we modify `listener` to specially handle the `addr` message we just requested. For now, let's just print out the `addr` payload.

In [None]:
from lib import handshake

def crawler(address):
    # Establish connection
    print(f'Connecting to {address[0]}')
    sock = connect(address)
    stream = sock.makefile("rb")
    
    # Request list of their peers
    msg = serialize_msg(command=b"getaddr", payload=b"")
    sock.sendall(msg)
    
    # Wait for `addr` response
    while True:
        msg = read_msg(stream)
        if msg['command'] == b'addr':
            print(f'Received "addr" payload: {msg["payload"]}')
            return
        elif msg['command'] == b'ping':
            sock.sendall(serialize_msg(b'pong'))
        else:
            print(f'Ignoring {msg["command"]} message')


In [None]:
crawler(next_address())

One of the strange things you might notice is that they don't always 

Next, let's interpret the payload of the `addr` message when it finally arrives.

Visit the [protocol docs](https://en.bitcoin.it/wiki/Protocol_documentation#addr) to see what this will require. 

We see that `addr` messages are just a list of `net_addr`s prefixed with a `varint` so we know how many of them there. Given that, let's write a `read_addr_payload` function that can take a byte stream containing the payload of an `addr` message and return a Python list of dictionaries containing `net_addr` attributes.

In [None]:
# FIXME: should we just return the list? Do we need the dictionary?

def read_addr_payload(stream):
    r = {}
    count = read_varint(stream)
    r["addresses"] = [read_address(stream) for _ in range(count)]
    return r

In [None]:
read_addr_payload(BytesIO(b'\x01hC\x90\\\r\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff_\x1c\xda= \x8d'))

Let's call this function from within `crawler`:

In [None]:
def crawler(address):
    # Establish connection
    print(f'Connecting to {address[0]}')
    sock = connect(address)
    stream = sock.makefile("rb")
    
    # Request list of their peers
    msg = serialize_msg(command=b"getaddr", payload=b"")
    sock.sendall(msg)
    
    # Wait for `addr` response
    while True:
        msg = read_msg(stream)
        if msg['command'] == b'addr':
            addr_msg = read_addr_payload(BytesIO(msg['payload']))
            print(f'Received "addr" containing {addr_msg["addresses"]}')
            return
        elif msg['command'] == b'ping':
            sock.sendall(serialize_msg(b'pong'))
        else:
            print(f'Ignoring {msg["command"]} message')


In [None]:
crawler(next_address())

Now that we can our peer's address list, let's extend the save them to an `addresses` list.

In [None]:
def crawler(addresses):
    # Get next address
    address = addresses.pop()
    
    # Connect to this node
    while True:
        # Establish connection
        print(f'Connecting to {address[0]}')
        sock = connect(address)
        stream = sock.makefile("rb")

        # Request list of their peers
        msg = serialize_msg(command=b"getaddr", payload=b"")
        sock.sendall(msg)

        # Wait for `addr` response
        while True:
            msg = read_msg(stream)
            if msg['command'] == b'addr':
                addr_msg = read_addr_payload(BytesIO(msg['payload']))
                print(f'Received "addr" containing {addr_msg["addresses"]}')
                addresses.extend(addr_msg['addresses'])
                break
            elif msg['command'] == b'ping':
                sock.sendall(serialize_msg(b'pong'))
            else:
                print(f'Ignoring {msg["command"]} message')

In [None]:
crawler([next_address()])

If you run this code enough times, you'll get output like this:

```
Connecting to 173.244.167.110
Ignoring b'alert' message
Received "addr" containing [{'time': 1552959039, 'services': b'\r\x04\x00\x00\x00\x00\x00\x00', 'ip': '::ffff:173.244.167.110', 'port': 8333}]
Connecting to 173.244.167.110
Received "addr" containing [{'time': 1552959039, 'services': b'\r\x04\x00\x00\x00\x00\x00\x00', 'ip': '::ffff:173.244.167.110', 'port': 8333}]
Connecting to 173.244.167.110
Received "addr" containing [{'time': 1552959039, 'services': b'\r\x04\x00\x00\x00\x00\x00\x00', 'ip': '::ffff:173.244.167.110', 'port': 8333}]
Connecting to 173.244.167.110
Received "addr" containing [{'time': 1552959039, 'services': b'\r\x04\x00\x00\x00\x00\x00\x00', 'ip': '::ffff:173.244.167.110', 'port': 8333}]
...
...
...
```

It's just connecting to the same address over and over again. Why is this?

Because the `addr` message our peer is sending us contains only 1 address: the address of the peer we're currently connected to! So that will go back into the queue, and we'll just keep connecting to the same peer over-and-over.

We need a check for this:

In [None]:
from ipaddress import ip_address

a = ip_address('::ffff:173.244.167.110')
b = ip_address('173.244.167.110')

In [None]:
a

In [None]:
b

In [None]:
a.compressed

In [None]:
b.compressed

In [None]:
def crawler(addresses):
    # Get next address
    address = addresses.pop()
    
    # Connect to this node
    while True:
        # Establish connection
        print(f'Connecting to {address[0]}')
        sock = connect(address)
        stream = sock.makefile("rb")

        # Request list of their peers
        msg = serialize_msg(command=b"getaddr", payload=b"")
        sock.sendall(msg)

        # Wait for `addr` response
        while True:
            msg = read_msg(stream)
            if msg['command'] == b'addr':
                addr_msg = read_addr_payload(BytesIO(msg['payload']))
                # Only save if it contains new addresses
                print(len(addr_msg["addresses"]), addr_msg["addresses"][0], address)
                if len(addr_msg["addresses"]) > 1:
                    print(f'Received "addr" containing {addr_msg["addresses"]}')
                    addresses.extend([
                        (a['ip'], a['port']) for a in addr_msg["addresses"]
                    ])
                    break
            elif msg['command'] == b'ping':
                sock.sendall(serialize_msg(b'pong'))
            else:
                print(f'Ignoring {msg["command"]} message')

In [None]:
crawler([next_address()])

In [None]:
def crawler(addresses):
    # Get next address
    address = addresses.pop()
    
    # Connect to this node
    while True:
        # Establish connection
        print(f'Connecting to {address[0]}')
        try:
            sock = connect(address)
        except:
            continue
        stream = sock.makefile("rb")

        # Request list of their peers
        msg = serialize_msg(command=b"getaddr", payload=b"")
        sock.sendall(msg)

        # Wait for `addr` response
        while True:
            try:
                msg = read_msg(stream)
            except:
                break
            if msg['command'] == b'addr':
                addr_msg = read_addr_payload(BytesIO(msg['payload']))
                # Only save if it contains new addresses
                print(len(addr_msg["addresses"]), addr_msg["addresses"][0], address)
                if len(addr_msg["addresses"]) > 1:
                    print(f'Received {len(addr_msg["addresses"])} addrs')
                    addresses.extend([
                        (a['ip'], a['port']) for a in addr_msg["addresses"]
                    ])
                    break
            elif msg['command'] == b'ping':
                sock.sendall(serialize_msg(b'pong'))
            else:
                print(f'Ignoring {msg["command"]} message')

In [None]:
crawler([next_address()])

This code is now a mess.

A couple classes could significantly improve it.

The biggest problem in my opinion is the 2 `while` loops. That's confusing, complex, and ugly. These two loops both represent different things.

The outer loop represents the procession of new connections to peers. This is the "crawler" itself.

The inner loop represent the reading of messages until an `addr` shows up. This is like a "connection", is it now?

If we were to break this code up into classes, it might make sense to have a `Crawler` class which handles the outer loop, and a `Connection` class which handles the inner loop.



In [None]:
class Connection:
    
    def __init__(self, address):
        self.address = address
        self.sock = None
        self.peer_addresses = None
        self.finished = False
        
    def handle_addr(self, msg):
        addr_msg = read_addr_payload(BytesIO(msg['payload']))
        # Only save if it contains new addresses
        print('received', len(addr_msg["addresses"]), 'addresses') 
        if len(addr_msg["addresses"]) > 1:
            print(f'Received {len(addr_msg["addresses"])} addrs')
            addresses.extend([
                (a['ip'], a['port']) for a in addr_msg["addresses"]
            ])
            self.finished = True
        
    def open(self):
        # Handshake
        print("connecting to", self.address)
        self.sock = handshake(self.address)
        stream = self.sock.makefile("rb")

        # Request list of their peers
        msg = serialize_msg(command=b"getaddr", payload=b"")
        self.sock.sendall(msg)
        
        while not self.finished:
            msg = read_msg(stream)
            if msg['command'] == b'addr':
                self.handle_addr(msg)
            elif msg['command'] == b'ping':
                self.sock.sendall(serialize_msg(b'pong'))
            else:
                print(f'Ignoring {msg["command"]} message')
    
    def close(self):
        self.sock.close()
    
class Crawler:
    
    def __init__(self, addresses):
        self.addresses = addresses
        
    def crawl(self):
        while True:
            address = self.addresses.pop()
            
            try:
                connection = Connection(address)
                connection.open()
            except Exception as e:
#                 raise
                print(e)
                logging.info(str(e))
                continue

            connection.close()
            if connection.peer_addresses:
                self.addresses.extend(connection.peer_addresses)

In [None]:
addresses = [next_address()]
Crawler(addresses).crawl()

# Timeouts

(Note that `handshake()` already contains a timeout _at the socket level_. This will raise exceptions if no messages are sent over the 

In [None]:
# client.py

import socket, time


addr = "localhost", 11000

def client_2():
    sock = socket.create_connection(addr, timeout=5)
    print('connected. attempting to read message')
    sock.recv(1)

def client_3():
    sock = socket.create_connection(addr, timeout=5)
    print('connected. attempting to read message')
    try:
        sock.recv(1)
    except socket.timeout:
        sock.close()

client_2()


In [None]:
# server.py
import socket, time

addr = "localhost", 11000

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(addr)
sock.listen()
t = time.tim()
conn, addr = sock.accept()
while True:
    print('Will it time out?')
    time.sleep(1)
    print()

Demonstrates socket timeouts, but the server curiously doesn't know she's listening on a dead line! It's best to close your sockets before throwing them away:

In [None]:
# client.py (revision)

import socket, time

addr = "localhost", 11000

sock = socket.create_connection(addr, timeout=5)
start = time.time()
print('connected. attempting to read message')
try:
    sock.recv(1)
except socket.timeout:
    sock.close()
    print(f'Timed out after {time.time() - start} seconds')

Next we need timeouts in the "listen for `addr` message" loop. We don't want to get stuck talking to a peer who just refuses to send an `addr` message.

To implement this we'll need to add a Connection.start_time

In [None]:
class Connection:
    
    def __init__(self, address):
        self.address = address
        self.sock = None
        self.start_time = None
        self.peer_addresses = None
        self.finished = False
        
    def handle_addr(self, msg):
        addr_msg = read_addr_payload(BytesIO(msg['payload']))
        # Only save if it contains new addresses
        print('received', len(addr_msg["addresses"]), 'addresses') 
        if len(addr_msg["addresses"]) > 1:
            print(f'Received {len(addr_msg["addresses"])} addrs')
            addresses.extend([
                (a['ip'], a['port']) for a in addr_msg["addresses"]
            ])
            self.finished = True
        
    def remain_alive(self):
        return not self.finished and time.time() - self.start_time < 60

    def open(self):
        self.start_time = time.time()

        # Handshake
        print("connecting to", self.address)
        self.sock = handshake(self.address)
        stream = self.sock.makefile("rb")

        # Request list of their peers
        msg = serialize_msg(command=b"getaddr", payload=b"")
        self.sock.sendall(msg)
        
        # Wait for response
        while self.remain_alive():
            msg = read_msg(stream)
            if msg['command'] == b'addr':
                self.handle_addr(msg)
            elif msg['command'] == b'ping':
                self.sock.sendall(serialize_msg(b'pong'))
            else:
                print(f'Ignoring {msg["command"]} message')

    def close(self):
        self.sock.close()
    

In [None]:
from lib import handshake, connect

In [None]:
addresses = [next_address()]
Crawler(addresses).crawl()

# Seeds

By this time you've probably hit this error:

![image](../images/empty-list.png)

It would be much better if we could prime our crawler with more addresses. It would be especially good if they were addresses of "high quality" nodes that are always online and have juicy peer lists to share with us.

This is exactly what DNS seeds are for. Prominent bitcoin core developers run DNS servers from domains they contol which resolve not to the address of a machine serving a website -- which is true of most domain names -- but to a list of addresses of high quality bitcoin full nodes.

These domains are actually [hard-coded into Bitcoin Core](https://github.com/bitcoin/bitcoin/blob/v0.17.1/src/chainparams.cpp#L127)!

Most of them run [this crawler / server written by Peter Wuille](https://github.com/sipa/bitcoin-seeder).

Let's learn to query these DNS seeds:

In [None]:
DNS_SEEDS = [
    'dnsseed.bitcoin.dashjr.org', 
    'dnsseed.bluematt.me',
    'seed.bitcoin.sipa.be', 
    'seed.bitcoinstats.com', 
    'seed.bitcoin.sprovoost.nl',
]

In your terminal type:

```shell
host dnsseed.bitcoin.dashjr.org
```

This will perform a DNS lookup. How can we do this from Python?

In [None]:
# getaddrinfo translates hostname -> ip address ... but it's messy
addr_info = socket.getaddrinfo(DNS_SEEDS[0], 0, 0, 0, 0)
addr_info

In [None]:
# ip addresses are the first item in the last entry
ips = [ai[-1][0] for ai in addr_info]

In [None]:
# Remove duplicates
ips = list(set(ips))
ips

In [None]:
# Translate to an (ip, port) tuple for ready consumption by socket.create_connection
addresses = [(ip, 8333) for ip in ips]
addresses

In [None]:
# Can we connect?
sock = handshake(addresses[0])
print(f'Received some bytes: {sock.recv(100)}')

In [None]:
def fetch_addresses():
    addresses = []
    for dns_seed in DNS_SEEDS:
        try:
            addr_info = socket.getaddrinfo(dns_seed, 0
            new_addresses = [(ai[-1][0], 8333) for ai in addr_info]
            addresses.extend(list(set(new_addresses)))
        except Exception as e:
            print(e)
            logger.info(f"error fetching addresses from {dns_seed}")
            continue
    return addresses

In [None]:
fetch_addresses()

In [None]:
socket.getaddrinfo(DNS_SEEDS[0], 0)

In [None]:
# Let's run the crawler with addresses from the DNS seeds

Crawler(fetch_addresses()).crawl()

# Saving the results

Right now we're just throwing away the version messages we receive from our peers -- which contain interesting information about the node sofware they are using. We also don't keep any track record of the addresses we've successfully visited. That's essential if we want to make any claims that "The Bitcoin network contains at least N nodes".

I propose we save this information in a SQLite database. If you've never used SQL before, I suggest you do [this tutorial](http://www.sqlitetutorial.net/)

The essence of the idea is that it is a relational database that lives in a single file. Therefore, it's pretty easy to work with and quite portable -- you can even email the database file if you like!

And since SQLite is perhaps the most deployed piece of software in the world (it's on your phone, you laptop, your TV, your refrigerator, etc) there exists a lot of nice tooling for it. One great tool is [SQLiteBrowser](https://sqlitebrowser.org). Please install it. We'll use it to query the data generated by our crawler.

## SQLite Demo

Here's how to do the two most important operations:
* Add entries to the database with `INSERT`
* Query the database with `SELECT`

In [None]:
import sqlite3

con = sqlite3.connect('test.db')

with con:

    cur = con.cursor()

    cur.execute("CREATE TABLE cars(id INT, name TEXT, price INT)")
    cur.execute("INSERT INTO cars VALUES(1,'Audi',52642)")
    cur.execute("INSERT INTO cars VALUES(2,'Mercedes',57127)")
    cur.execute("INSERT INTO cars VALUES(3,'Skoda',9000)")
    cur.execute("INSERT INTO cars VALUES(4,'Volvo',29000)")
    cur.execute("INSERT INTO cars VALUES(5,'Bentley',350000)")
    cur.execute("INSERT INTO cars VALUES(6,'Citroen',21000)")
    cur.execute("INSERT INTO cars VALUES(7,'Hummer',41400)")
    cur.execute("INSERT INTO cars VALUES(8,'Volkswagen',21600)")

In [None]:
# Query all the cars

with con:

    cur = con.cursor()
    cur.execute("SELECT * FROM cars")

    rows = cur.fetchall()

    for row in rows:
        print(f"{row[0]} {row[1]} {row[2]}")

In [None]:
# Query the expensive cars

with con:

    cur = con.cursor()
    cur.execute("SELECT * FROM cars WHERE price > 30000")

    rows = cur.fetchall()

    for row in rows:
        print(f"{row[0]} {row[1]} {row[2]}")

# Making an `observations` Table

Just like we made a pretend "cars" table above, let's make an "observations" table for our crawler.

This table will contain:
* Everything in `version` messages
* `ip` and `port`
* `run` -- an integer that counts up every time we run the crawler

In [None]:
def execute(query, args={}):
    with sqlite3.connect('test.db') as conn:
        return conn.execute(query, args)

In [None]:
create_observations_table = """
CREATE TABLE IF NOT EXISTS observations (
    run INT,
    ip TEXT,
    port INT,
    services INT,
    timestamp INT,
    receiver_services INT,
    receiver_ip TEXT,
    receiver_port INT,
    sender_services INT,
    sender_ip TEXT,
    sender_port INT,
    nonce TEXT,
    user_agent TEXT,
    latest_block INT,
    relay INT
)
"""

In [None]:
execute(create_observations_table)

In [None]:
RUN = 0  # FIXME

def observe_node(address, args):
    query = """
    INSERT INTO observations (
        run,
        ip,
        port,
        services,
        timestamp,
        receiver_services,
        receiver_ip,
        receiver_port,
        sender_services,
        sender_ip,
        sender_port,
        nonce,
        user_agent,
        latest_block,
        relay
    ) VALUES (
        :run,
        :ip,
        :port,
        :services,
        :timestamp,
        :receiver_services,
        :receiver_ip,
        :receiver_port,
        :sender_services,
        :sender_ip,
        :sender_port,
        :nonce,
        :user_agent,
        :latest_block,
        :relay
    )
    """
    args["nonce"] = str(args["nonce"]) # HACK
    args["ip"] = address[0]
    args["port"] = address[1]
    args["run"] = RUN
    execute(query, args)


In [None]:
version_payload = {
    'version': 70015,
    'services': 1,
    'timestamp': 1553189779,
    'receiver_services': 1,
    'receiver_ip': '0.0.0.0',
    'receiver_port': 8333,
    'sender_services': 1,
    'sender_ip': '0.0.0.0',
    'sender_port': 36128,
    'nonce': 15042168689231199477,
    'user_agent': b'/buidl-bootcamp/',
    'latest_block': 500000,
    'relay': 1,
    'ip': '0.0.0.0',
    'port': 8333}
address = "0.0.0.0", 8333

observe_node(address, version_payload)

In [None]:
version_payload

In [None]:
results = execute("select * from observations").fetchall()
results

In [None]:
# Turn the lists to dictionaries (a little more readable)

params = ['run',
 'ip',
 'port',
 'services',
 'timestamp',
 'receiver_services',
 'receiver_ip',
 'receiver_port',
 'sender_services',
 'sender_ip',
 'sender_port',
 'nonce',
 'user_agent',
 'latest_block',
 'relay']

results_dict = [dict(zip(params, result)) for result in results]
results_dict

In [None]:
# just copied this ...
class Crawler:
    
    def __init__(self, addresses):
        self.addresses = addresses

    def observe_node(self, connection):
        # Save contents of `version` message
        observe_node(connection.address, connection.version_payload)
        
        # Save contents of `addr` message
        self.addresses.extend(connection.peer_addresses)
        
    def crawl(self):
        while True:
            address = self.addresses.pop()
            
            try:
                connection = Connection(address)
                connection.open()
            except Exception as e:
                raise
                print(e)
                logging.info(str(e))
                continue

            connection.close()
            if connection.peer_addresses:
                self.observe_node(connection)

In [None]:
class Connection:
    
    def __init__(self, address):
        self.address = address
        self.sock = None
        self.start_time = None
        self.peer_addresses = None
        self.finished = False
        
    def send_version(self):
        payload = serialize_version_payload()
        msg = serialize_msg(b"version", payload)
        self.sock.sendall(msg)

    def send_verack(self):
        msg = serialize_msg(b"verack", b"")
        self.sock.sendall(msg)

    def send_getaddr(self):
        self.sock.send(serialize_msg(b"getaddr", b""))

    def handle_version(self, stream):
        # Interpret payload stream
        self.version_payload = read_version_payload(stream)

        # Save the address & version payload
        observe_node(self.address, self.version_payload)

        # Complete handshake with a `verack`
        self.send_verack()
    
    def handle_verack(self, stream):
        # With connection established, ask for their peer list
        self.send_getaddr()
        
    def handle_addr(self, stream):
        addr_msg = read_addr_payload(stream)
        # Only save if it contains new addresses
        print('received', len(addr_msg["addresses"]), 'addresses') 
        if len(addr_msg["addresses"]) > 1:
            print(f'Received {len(addr_msg["addresses"])} addrs')
            self.peer_addresses = [
                (a['ip'], a['port']) for a in addr_msg["addresses"]
            ]
            self.finished = True
        
    def handle_msg(self, msg):
        command_str = msg['command'].decode('utf-8')
        method = f"handle_{command_str}"
        if hasattr(self, method):
            stream = BytesIO(msg['payload'])
            getattr(self, method)(stream)
                  
    def remain_alive(self):
        return not self.finished and time.time() - self.start_time < 60

    def open(self):
        self.start_time = time.time()

        # Open TCP connection
        print("connecting to", self.address)
        self.sock = connect(self.address)
        stream = self.sock.makefile("rb")

        # Start handshake
        self.send_version()
                  
        # Handle messages until `addr` msg arrives or timout
        while self.remain_alive():
            msg = read_msg(stream)
            self.handle_msg(msg)
            
    def close(self):
        self.sock.close()

In [None]:
Crawler(fetch_addresses()).crawl()

In [None]:
execute("select ip from observations").fetchall()

# Duplicates

Let's only visit addresses once per run

In [None]:
# just copied this ...
class Crawler:
    
    def __init__(self, addresses):
        self.addresses = addresses
        self.visited = set()
        self.finished = False

    def get_next_address(self):
        """Find an address we haven't visited yet"""
        while self.addresses:
            address = self.addresses.pop(0)  # get new addresses off the front
            if address not in self.visited:
                self.visited.add(address)
                return address
        self.finished = True
        
    def observe_node(self, connection):
        # Save contents of `version` message
        observe_node(connection.address, connection.version_payload)
        
        # Save contents of `addr` message
        self.addresses.extend(connection.peer_addresses)
        
    def crawl(self):
        while not self.finished:
            address = self.get_next_address()
            
            try:
                connection = Connection(address)
                connection.open()
            except Exception as e:
                raise
                print(e)
                logging.info(str(e))
                continue

            connection.close()
            if connection.peer_addresses:
                self.observe_node(connection)

In [None]:
# It only connects to this address once!

addr = ('80.3.242.233', 8333)

Crawler([addr, addr]).crawl()

# Make it fast

Modify Crawler.crawl ...

```
        start = time.time()
        while time.time() - start < 30:
```

Run it with a profiler

```shell
$ python -m cProfile -s time crawler.py
```

You'll get a big ubly report. Two lines should stand out:

```
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       36   46.631    1.295   46.631    1.295 {method 'recv_into' of '_socket.socket' objects}
        2    0.487    0.244    0.487    0.244 {method 'connect' of '_socket.socket' objects}
        5    0.309    0.062    0.311    0.062 {built-in method _socket.getaddrinfo}
        6    0.050    0.008    0.055    0.009 db.py:19(execute_statement)
        7    0.017    0.002    0.017    0.002 {method 'execute' of 'sqlite3.Connection' objects}
       14    0.009    0.001    0.009    0.001 {built-in method _imp.create_dynamic}
     1002    0.005    0.000    0.013    0.000 lib.py:153(read_address)
     1096    0.005    0.000    0.012    0.000 queue.py:121(put)
       26    0.005    0.000    0.005    0.000 {built-in method marshal.loads}
       78    0.003    0.000    0.006    0.000 {built-in method builtins.__build_class__}
     1099    0.002    0.000    0.004    0.000 threading.py:335(notify)
     1006    0.002    0.000    0.004    0.000 lib.py:57(bytes_to_ip)
      124    0.002    0.000    0.007    0.000 <frozen importlib._bootstrap_external>:1356(find_spec)
     1006    0.002    0.000    0.002    0.000 {built-in method _socket.inet_ntop}
        7    0.002    0.000    0.002    0.000 {built-in method _sqlite3.connect}
     4040    0.002    0.000    0.002    0.000 {method 'read' of '_io.BytesIO' objects}
...
...
...
```

The speed of our program is dominated by 1 method: `sock.recv`

`socket.connect` is also taking up time. If I ran this longer we would sometimes get a bigger number here.

Lastly, `socket.getaddrinfo` is kind of slow -- but that's only called once per running of the program so shouldn't be a big problem.

The problem is that we're always wwaiting around for TCP messages. Our cpu is completely idle during this time. Wouldn't it be nice if we could spread the work out and have hundreds or thousands of simultaneous TCP connections waiting for a response?

Let's do exactly that.

We will do this by running a copies of Crawler in threads. The tricky part here is that they will all share the `addresses` list and might both make simultaneous, conflicting changes to the list

In [None]:
from report import report

def threaded_crawler():

    address_queue = fetch_addresses()
    print(address_queue)
    # Run it
    num_threads = 10
    threads = []

    def target():
        return Crawler(address_queue).crawl()

    for _ in range(num_threads):
        thread = Thread(target=target)
        thread.start()
        threads.append(thread)

    # Break out of loop if all threads are dead
    while any([t.is_alive() for t in threads]):

        # Clear terminal window and print fresh report
#         os.system('cls' if os.name == 'nt' else 'clear')
        report(threads, address_queue)
        time.sleep(2)

    print("All threads have finished")

In [None]:
threaded_crawler()

Significantly faster. Not by improving our code, but just by running more copies of it.

One nice thing about this new code is that we only have to handle socket errors in 1 place: `Crawler.crawl`. All errors bubble up and are caught here.

Another nice thing is that the code is more organized. Each method pretty much does one thing. We're no longer staring at a monolith!

A couple problems:
- It's slow as hell. We'll address this last.
- We're throwing away the version messages our peers send us. We should keep them -- many interesting data science projects could be done if we download the version message of every node in the network.
- We don't keep track of the nodes we've visited, or of the nodes the addresses which have caused multiple errors.
- Can't connect to tor nodes
- If they never send an `addr` message we'll be waiting forever. We should time out if they don't send us their peer list after some amount of time
- We could use a better initial address list. And I never really explained where `next_address` came from -- just that we needed it to mix things up a bit. Bitcoin has "dns seeds" which are used by the ndoe software to bootstramp peer connections. We can query these same DNS seeds to get a list of high quality addresses.

Let's tackle the second one. If we can ensure a juicy initial peer list, that will allow us to be more aggressive with the timeouts



- DNS seeds

- Timeouts

- Sqlite demo

- Install sqlitebrowser

- Save results

- Save errors

- `Crawler.visited`

- Run in threads
    - list -> queue
    - lock for `Crawler.visited`?

- Report

- Tor

- Break out wireshark to watch the magic happen
- Connect to local node? 

How does bitcoin core manage all this? Well take a look at peers.dat: https://raghavsood.com/blog//2018/05/20/demystifying-peers-dat


Homework ideas

- write your own dns seed using something like this 
    - https://gist.github.com/andreif/6069838
    - https://github.com/pathes/fakedns/blob/master/fakedns.py
    - https://www.youtube.com/watch?v=ViTAg8YGI5Q
- write a python script to parse peers.dat. Use this go script as a guide: https://github.com/RaghavSood/bitpeers/


# Tor

(do this one once we've got a fast multi-threaded crawler up-and-running. this will be a demonstration of why it's nice to have our errors in SQLite ...)

If you run this crawler long enough, eventually you'll begin to encounter addresses that look like `'fd87:d87e:eb43:20e:46fd:450c:e42c:29a5'`

That curious 6 byte `fd87:d87e:eb43` prefix will show up far more than it should.

Curiously, these prefixes never appear amonth the IPs where connections were successful, only among the unsuccessful connections. 

Try connecting to a few and you'll just get timeouts

So what are they?

They're Tor nodes.

Here's how we can convert these 16 bytes into an "onion" address used by the Tor network:

In [None]:
# First, strip the 6-byte prefix
onion = addr_bytes[6:]
onion

In [None]:
from base64 import b32encode

# Base32 encode the bytes
onion = b32encode(onion)
onion

In [None]:
# Lowercase it (strictly speaking you don't need to do this)
onion = onion.lower()
onion

In [None]:
# Decode to a string and .onion to the end
onion = onion.decode() + ".onion"
onion

In [None]:
# all together now ...

def ip_bytes_to_onion(ip_bytes):
    return b32encode(addr_bytes[6:]).lower().decode("ascii") + ".onion"

ip_bytes_to_onion(addr_bytes)

In [None]:
# But we still can't connect to us from a trusty socket ...

socket.create_connection(onion)

This is because we need to be running Tor locally and need to install a python package that can use tor as a proxy to make this connection

Go to the [Tor website](https://www.torproject.org/download/download-easy.html.en) and install Tor if you haven't already

Run this command in your terminal to check whether it's working:

```shell
$ curl --socks5 localhost:9050 --socks5-hostname localhost:9050 -s https://check.torproject.org/ | cat | grep -m 1 Congratulations | xargs
```

If everything is working, you should get a response declaring `Congratulations. This browser is configured to use Tor.`

Once you've done that, install the pysocks python proxy:

```shell
$ pip install PySocks 
```

Here's a demonstration showing, at the very least, the recipients of requests over this proxy no longer see your original IP address:

In [None]:
import socks
import urllib

print("Old IP", urllib.request.urlopen('http://icanhazip.com').read().decode().strip())
socks.setdefaultproxy(
    proxy_type=socks.PROXY_TYPE_SOCKS5, 
    addr="127.0.0.1", 
    port=9050,
)
socket.socket = socks.socksocket  # swap out socket.socket
print("New IP", urllib.request.urlopen('http://icanhazip.com').read().decode().strip())
import socket  # swap socket.socket back in ...

And finally, let's acually connect to the tor bitcoin node:

In [None]:
sock = socks.create_connection(
    (onion, 8333),
    timeout=20,
    proxy_type=socks.PROXY_TYPE_SOCKS5,
    proxy_addr="127.0.0.1",
    proxy_port=9050
)
stream = sock.makefile('rb')

# Send "version"
payload = serialize_version_payload()
msg = serialize_msg(command=b"version", payload=payload)
sock.sendall(msg)

# Receive "version"
their_version = read_msg(stream)
print(f"Received {their_version}")

# ... it works ...

In [None]:
# The connect() function in lib.py already does this ...
connect((onion, 8333))

In [None]:
connect(('AIHEN7KFBTSCYKNF.onion', 8333))