
# Geth Transactions into MaprDB JSON

This notebook contains the flow to pull the Ethereum Blockchain through a local geth client (running in a docker container), perform light transformations to obtain valid JSON transaction records, and push the data into an existing Mapr-DB cluster (using the data-access-gateway RESTful iterface). While the data loads, an second notebook will use the REST api to send a query to MapR-DB, and retrieve selected attributes of "interesting" transactions (for example, those whose creators significantly overpaid to prioritize) for further analysis.   

### Before you begin
For best results, this jupyter server should be running in a docker container (as testuser, with preconfigured python environment), from on an "edge node"* of a secured MapR6.0.1-MEP5.0.0 cluster. In addition: 
- geth client must be connected to its peers & accessible over private IP (replace 172.16.9.41 with your own IP)
- testuser should exist on all nodes, and have a home directory on mfs where it can create the maprdb table
- one or more maprdb rest gateways should be accessible over private IP (replace 172.16.9.42 and 172.16.9.238)
- mapr cluster must be alive and stay alive - might want to keep an eye on it during the load

*An "edge node" here means a linux host (i'm using centos7.4) capable of running docker containers, and no special MapR packages or configurations required. This notebook can be optionally securely persisted to MapR-FS, by starting this docker container with a volume mount on top of a mapr-loopbacknfs client (on the underlying host) using testuser's mapr ticket, but this is not required for the demo.


## Authenticate to MapR-DB Rest Gateway
Data Access Gateway supports Basic Auth (username & password) along with jwt tokens. Here's a curl example that takes in a username:password parameter, and attempts to create a /tmp/smoketest table in maprdb json. 
```
curl -k -X PUT 'https://172.16.9.42:8243/api/v2/table/%2Ftmp%2Fsmoketest' -u testuser:testuser
```
To avoid authenticating testuser against the CLDB with every request, we can pass in the password once to obtain a bearer token, and pass that into header of every subsequent request. The token works across multiple gateways of a mapr cluster, as it is generated based on each cluster's maprserverticket (and not the default example key :)

In [1]:
import requests
from requests.auth import HTTPBasicAuth
import json

mapr_rest_auth = 'https://172.16.9.42:8243/auth/v2/token'
headers = {'content-type': 'application/json'}
bearerToken = None

try:
    bearerToken = requests.post(
            mapr_rest_auth, 
            headers=headers, verify=False,
            auth=HTTPBasicAuth('testuser', 'testuser')
        ).json()
except requests.exceptions.ConnectionError as e:
    pass



> **Tip:** bearer tokens expire by default every 30 minutes, property which can be configured in **/opt/mapr/data-access-gateway/conf/properties.cfg** on the host of the rest gateway that is generating the token below. To decode a jwt token (for debugging purposes), you can paste it into https://jwt.io/ 

In [2]:
# Optional: print the bearer token to see what it looks like
bearerToken

{'token': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiJ0ZXN0dXNlciIsImF1ZCI6IndlYiIsImV4cCI6MTUyMzQ2NTY4MywiaWF0IjoxNTIzNDQ3NjgzfQ.HO9JKx7sG5Ib6JIRq-fRIscBofVytrDQ8Ve9Q28Fgz_EN1UmjCv6caxSJTiLIgrSgD885klaLGgeYlTbCyjP0w'}

### Construct a header around your jwt token
Bearer token header is missing keyword "Bearer" before it can be used as a json header, so we make a custom header in which we pass in the testuser's bearer token to use throughout the app.

In [3]:
headers = { 
'content-type': 'application/json', 
'Authorization': 'Bearer '+bearerToken['token'] 
} 
#headers

>**Tip:** Supress warnings about the self-signed certificate of maprdb data access gateway, so we dont OOM the notebook browser on inserts. 

In [4]:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

## Create all_transactions_table in MapR-DB

In [5]:
transaction_put_url = 'https://172.16.9.42:8243/api/v2/table/%2Fuser%2Ftestuser%2Feth%2Fall_transactions_table'
response = None

try:
    response = requests.put(
            transaction_put_url, 
            headers=headers, verify=False
        )
    print(response)
except requests.exceptions.ConnectionError as e:
    pass

# Note: a 409 response means the table already exists (which is good if you're running this for the second time)
# 201 means table created successfully, and 401 is most likely caused by an expired token

<Response [409]>


## Prepare geth as a data source to populate MaprDB table 
- geth container should be accessible from private IP of docker host (replace 172.16.9.41)
- web3 is a python library for interacting with Ethereum http://web3py.readthedocs.io/en/stable/ that should be installed in the python environment provided to this kernel

In [6]:
# The following code connects to my geth container (replace with your own private IP).  
from web3 import Web3, HTTPProvider, IPCProvider

gethRPCUrl='http://172.16.9.41:8545'
web3 = Web3(HTTPProvider(gethRPCUrl))

In [7]:
# Optional - print out one block to see what the data looks like
dict(web3.eth.getBlock(5417612))

{'difficulty': 3144332239149986,
 'extraData': '0x6e616e6f706f6f6c2e6f7267',
 'gasLimit': 8000029,
 'gasUsed': 5490557,
 'hash': '0xe89ad03ba9bf8783ab57bbdcbeac35f120c5f76188b9ca7205ad1762142f8313',
 'logsBloom': '0x0082100004c000040011000001018000108048408010000022c800021008011820040000100000100000a000200820420000801000111090010001000441420002082140082884350810410801020440000000000104021001b000000000001001208008020240200050004000000800050000000804c0040000851010002020200400900000040480010240001000800020a0a40080601800000400202205004800e80000520408802000005010808000000d0311028460100088c0008a00020001000200c000c0001000060080000004080005000800052088000000002011100401010800000100801084a400804450820814101000110420021000240002',
 'miner': '0x52bc44d5378309ee2abf1539bf71de1b7d7be3b5',
 'mixHash': '0x6e3e411c3e227ac52e2c3f95b1571a60f35d59d29f5d5748fb2ceac70aefd801',
 'nonce': '0x1407ddd02d488255',
 'number': 5417612,
 'parentHash': '0x6c98f547435e057e31a488e186ae9709528febbfce6d702cb34e876dd4027

In [41]:
# Define a function to retrieve all transactions for a given block
def getAllTransactions(block):
    allTransactions = []
    
    for transaction in dict(web3.eth.getBlock(block,full_transactions=True))['transactions']:
        allTransactions.append((dict(transaction)))
        
    return allTransactions

In [None]:
# Optional: print transactions for a given block to see what the data looks like (and to make sure the function works)
getAllTransactions(5412388)

### Define a helper function to insert transactions (for specified block range) into (specified) MaprDB table

In [61]:
def getTransactionsAndInsertToDB(blockstart,blockend,txstable):
    for block in range(blockstart,blockend):
        txsLastBlock=getAllTransactions(block)
        
        #print("Inserting to maprdb")
        rest_put_txs_url = 'https://172.16.9.238:8243/api/v2/table/%2Fuser%2Ftestuser%2Feth%2F'+txstable
      
        try:
            for transaction in txsLastBlock:
                transaction['_id']=transaction['hash']
                #print(transaction)
                response = requests.post(
                    rest_put_txs_url, 
                    headers=headers, verify=False,
                    data=json.dumps(transaction)
            )
        except Exception as e:
            print(e)
            pass

## Insert transactions (for latest N blocks) into all_transactions_table in MaprDB

In [None]:
# retrieve the latest block number, so we can get a recent range of blocks
currentblock = web3.eth.getBlock('latest').number

getTransactionsAndInsertToDB(blockstart=currentblock-100000,
                           blockend=currentblock,
                           txstable="all_transactions_table")

## Retrieve all the data from a MapR-DB table
An quick way to smoke test if data got inserted is to paste directly to the browser. You can limit the results returned by the query by passing in a **limit** parameter at the end of the REST call (to avoid OOM-ing the your browser)
https://172.16.9.42:8243/api/v2/table/%2Fuser%2Ftestuser%2Feth%2Fall_transactions_table?limit=10


Alternatively, you can limit results brought back to the gateway by setting **rest.result.limit** in **/opt/mapr/data-access-gateway/conf/properties.cfg** on each data-access-gateway and restart using mapr-cli command

```maprcli node services -nodes `hostname` -name data-access-gateway -action restart```

In [None]:
# For the demo, we can define a function that retrieves the results back to notebook 
def retrieveDataFromMaprdb(tablename):
    rest_get_trades_url = 'https://172.16.9.42:8243/api/v2/table/%2Fuser%2Ftestuser%2Feth%2F'+tablename+'?limit=5'

    try:
        table = requests.get(
            rest_get_trades_url, 
            headers=headers, verify=False
        )
        return table
    except requests.exceptions.ConnectionError as e:
        pass

In [None]:
retrieved_table = retrieveDataFromMaprdb('all_transactions_table')
print(retrieved_table.json())

## Retrieve filtered data from MapR-DB table with conditions and projections

What would be really interesting is to see who is burning the most eth on gas, but since we cannot filter data from maprdb directly based on (gas * gasPrice), the next best thing would be to figure out who is seriously overpaying gas (> 100x usual gas price) and see if any of those transactions are big enough to be worth tracking down on etherscan.

**Example querying all_transactions_table where gasPrice is unusually (<100x) high from web browser**
https://172.16.9.42:8243/api/v2/table/%2Fuser%2Ftestuser%2Fall_transactions_table?condition={"$gt":{"gasPrice":400000000000}}

**Same query with projection (selected fields to return), limit and an orderBy (which seems to require limit)**
https://172.16.9.42:8243/api/v2/table/%2Fuser%2Ftestuser%2Fall_transactions_table?condition={"$gt":{"gasPrice":400000000000}}&fields=gas,gasPrice,to,from&limit=100&orderBy=gas

In [None]:
# for a more sustainable way to query with conditions, we can create a function
# appending localparams this way allows us to get around encoding issues for special characters

def retrieveFilteredDataFromMaprdb(tablename, condition, projection):
    rest_get_trades_url = 'https://172.16.9.42:8243/api/v2/table/%2Fuser%2Ftestuser%2Feth%2F'+tablename
    localparams='condition='+condition
    localparams+='&fields='+projection

    
    try:
        table = requests.get(
            rest_get_trades_url, 
            headers=headers, verify=False,
            params=localparams
        )
        return table
    except requests.exceptions.ConnectionError as e:
        pass


In [None]:
filtered_table = retrieveFilteredDataFromMaprdb("all_transactions_table",
                                                '{"$gt":{"gasPrice":400000000000}}',
                                                "_id,hash,gasPrice,gas,to,from")
filtered_table.json()

### Enrich locally and print out pretty

In [None]:
# Create new empty json to hold enriched transactions
PriceSanitizedMeow= []
filtered_table=filtered_table.json()
for originalTrasanction in filtered_table['DocumentStream']:
    
    # Add a new column 'ActualEtherUsed'
    originalTrasanction['ActualEtherUsed'] = originalTrasanction['gas'] * web3.fromWei(originalTrasanction['gasPrice'],unit='ether')
    
    # Append enhanced Transaction to the PriceSanitizedMeow
    PriceSanitizedMeow.append(originalTrasanction)

In [None]:
# print the enriched json
PriceSanitizedMeow

In [None]:
# Pretty it up and sort it locally

import pandas as pd
pd.set_option('display.max_colwidth', -1)
prettydf = pd.DataFrame(PriceSanitizedMeow)
prettydf['hash'] = 'https://etherscan.io/tx/'+prettydf['hash']
prettydf.sort_values(by=prettydf.columns[1], ascending=False)

In [None]:
!pip freeze