In [22]:
import requests
import json
import time
import re
import pprint as pp
import web3
import sys
from functools import lru_cache
from web3 import Web3
from web3.auto import w3
from web3.contract import Contract
from web3._utils.events import get_event_data
from web3._utils.abi import exclude_indexed_event_inputs, get_abi_input_names, get_indexed_event_inputs, normalize_event_input_types
from web3.exceptions import MismatchedABI, LogTopicError
from web3.types import ABIEvent
from eth_utils import event_abi_to_log_topic, to_hex
from hexbytes import HexBytes
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import *

from graphframes import GraphFrame
from graphframes.examples import Graphs
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))



In [13]:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")

eth_tx = spark.read.format("bigquery")\
    .option('table', 'bigquery-public-data:crypto_ethereum.transactions') \
    .load()

eth_tx.createOrReplaceTempView('eth_tx')
sql_hash = """
select hash, from_address, to_address, block_timestamp as ts, input from eth_tx
where to_address = lower('0x7be8076f4ea4a4ad08075c2508e481d6c946d12b')
    and DATE(block_timestamp) = "2022-01-17"
ORDER BY ts DESC
"""
df_tx_0 = spark.sql(sql_hash)


In [16]:
df_tx = df_tx_0.withColumn("date",to_date("ts")).drop('ts')
df_tx.groupBy('date').count().show()

+----------+------+
|      date| count|
+----------+------+
|2022-01-17|110724|
+----------+------+



In [5]:

def decode_tuple(t, target_field):
    output = dict()
    for i in range(len(t)):
        if isinstance(t[i], (bytes, bytearray)):
            output[target_field[i]['name']] = to_hex(t[i])
        elif isinstance(t[i], (tuple)):
            output[target_field[i]['name']] = decode_tuple(t[i], target_field[i]['components'])
        else:
            output[target_field[i]['name']] = t[i]
    return output


def decode_list_tuple(l, target_field):
    output = l
    for i in range(len(l)):
        output[i] = decode_tuple(l[i], target_field)
    return output


def decode_list(l):
    output = l
    for i in range(len(l)):
        if isinstance(l[i], (bytes, bytearray)):
            output[i] = to_hex(l[i])
        else:
            output[i] = l[i]
    return output


def convert_to_hex(arg, target_schema):
    output = dict()
    for k in arg:
        if isinstance(arg[k], (bytes, bytearray)):
            output[k] = to_hex(arg[k])
        elif isinstance(arg[k], (list)) and len(arg[k]) > 0:
            target = [a for a in target_schema if 'name' in a and a['name'] == k][0]
            if target['type'] == 'tuple[]':
                target_field = target['components']
                output[k] = decode_list_tuple(arg[k], target_field)
            else:
                output[k] = decode_list(arg[k])
        elif isinstance(arg[k], (tuple)):
            target_field = [a['components'] for a in target_schema if 'name' in a and a['name'] == k][0]
            output[k] = decode_tuple(arg[k], target_field)
        else:
            output[k] = arg[k]
    return output

# @lru_cache(maxsize=None)
def _get_contract(address, abi):
    """
    This helps speed up execution of decoding across a large dataset by caching the contract object
    It assumes that we are decoding a small set, on the order of thousands, of target smart contracts
    """
    if isinstance(abi, (str)):
        abi = json.loads(abi)

    contract = w3.eth.contract(address=Web3.toChecksumAddress(address), abi=abi)
    return (contract, abi)

@udf
def decode_tx(address, input_data, abi):
    if abi is not None:
        try:
            (contract, abi) = _get_contract(address, abi)
            func_obj, func_params = contract.decode_function_input(input_data)
            target_schema = [a['inputs'] for a in abi if 'name' in a and a['name'] == func_obj.fn_name][0]
            decoded_func_params = convert_to_hex(func_params, target_schema)
            return func_obj.fn_name # json.dumps(decoded_func_params), json.dumps(target_schema))
        except:
            e = sys.exc_info()[0]
            return ('decode error', repr(e), None)
    else:
        return ('no matching abi', None, None)


In [17]:
TX_HASH = '0x56f2ce34e4b20578742ed8ddc9fcbacaec62d477a530dff7ace8da2fe64b1208'
OPENSEA_CONTRACT_ADDR = "0x7be8076f4ea4a4ad08075c2508e481d6c946d12b"
BORED_APE_ADDR = "0xBC4CA0EdA7647A8aB7C2061c2E118A18a936f13D".lower()

def fetch_abi(contract_addr):
    ABI_ENDPOINT = 'https://api.etherscan.io/api?module=contract&action=getabi&address='
    response = requests.get('%s%s'%(ABI_ENDPOINT, contract_addr))
    response_json = response.json()
    abi_json = json.loads(response_json['result'])
    return json.dumps(abi_json)

opensea_abi = fetch_abi(OPENSEA_CONTRACT_ADDR)
time.sleep(5)
nft_abi = fetch_abi(BORED_APE_ADDR)
pp.pprint(nft_abi)


('[{"inputs": [{"internalType": "string", "name": "name", "type": "string"}, '
 '{"internalType": "string", "name": "symbol", "type": "string"}, '
 '{"internalType": "uint256", "name": "maxNftSupply", "type": "uint256"}, '
 '{"internalType": "uint256", "name": "saleStart", "type": "uint256"}], '
 '"stateMutability": "nonpayable", "type": "constructor"}, {"anonymous": '
 'false, "inputs": [{"indexed": true, "internalType": "address", "name": '
 '"owner", "type": "address"}, {"indexed": true, "internalType": "address", '
 '"name": "approved", "type": "address"}, {"indexed": true, "internalType": '
 '"uint256", "name": "tokenId", "type": "uint256"}], "name": "Approval", '
 '"type": "event"}, {"anonymous": false, "inputs": [{"indexed": true, '
 '"internalType": "address", "name": "owner", "type": "address"}, {"indexed": '
 'true, "internalType": "address", "name": "operator", "type": "address"}, '
 '{"indexed": false, "internalType": "bool", "name": "approved", "type": '
 '"bool"}], "name"

## Decoding OpenSea Tx
. addrs[4] seems to be the address of the collection

TODO:
separate bidding and directly purchase


In [7]:
df_tx.show()

+--------------------+--------------------+--------------------+--------------------+----------+
|                hash|        from_address|          to_address|               input|      date|
+--------------------+--------------------+--------------------+--------------------+----------+
|0x1e75593428ac5ea...|0x668e961736454a2...|0x7be8076f4ea4a4a...|0xab834bab0000000...|2022-01-18|
|0x02168a3e8328ee7...|0x038c3851a90150a...|0x7be8076f4ea4a4a...|0xab834bab0000000...|2022-01-18|
|0x9bcaeb7c7cd3f2a...|0xaee0ff29af0a9f6...|0x7be8076f4ea4a4a...|0xab834bab0000000...|2022-01-18|
|0x05be8cfa500a496...|0x97618f26046f441...|0x7be8076f4ea4a4a...|0xab834bab0000000...|2022-01-18|
|0xc3ae30e829e1572...|0x4d5f30669bf7130...|0x7be8076f4ea4a4a...|0xab834bab0000000...|2022-01-18|
|0x1bc03b9cfbb3a4c...|0x035e6150f5fcd4b...|0x7be8076f4ea4a4a...|0xab834bab0000000...|2022-01-18|
|0xb449ccd4103d9ce...|0xd3a024d9602eb67...|0x7be8076f4ea4a4a...|0xab834bab0000000...|2022-01-18|
|0xcbb2ddbc67d2d81...|0xe7802d

In [20]:
def decode(partitionData):
    (contract, abi) = _get_contract(OPENSEA_CONTRACT_ADDR, opensea_abi)
    for row in partitionData:
        if row['input'] is None:
            continue
        try:
            func_obj, func_params = contract.decode_function_input(row['input'])
            target_schema = [a['inputs'] for a in abi if 'name' in a and a['name'] == func_obj.fn_name][0]
            decoded_func_params = convert_to_hex(func_params, target_schema)
            if func_obj.fn_name == 'atomicMatch_':
#                 from_ = decoded_func_params['from']
#                 to_ = decoded_func_params['to']
#                 id_ = decoded_func_params['tokenId']
                collectible = decoded_func_params['addrs'][4].lower()
                price = int(decoded_func_params['uints'][4]) / 10**18
                if collectible == BORED_APE_ADDR:
                    yield (row['hash'], func_obj.fn_name, collectible, price) # json.dumps(decoded_func_params), json.dumps(target_schema))
        except:
            e = sys.exc_info()[0]
            return ('decode error', repr(e), None)


df2 = df_tx.rdd.mapPartitions(decode).toDF(['tx_hash', 'fn_name', 'collectible', 'price'])


In [21]:
df2.show(truncate=False)

+------------------------------------------------------------------+------------+------------------------------------------+------+
|tx_hash                                                           |fn_name     |collectible                               |price |
+------------------------------------------------------------------+------------+------------------------------------------+------+
|0x1f22c9a58b96741eca57f049d4ca0317e363ca70ea87caad8232cc37d4af883c|atomicMatch_|0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d|86.69 |
|0x4216fac1a8f86e2ff54121a1434b9a889c7d22d8bfab7d9d7dd457eafe656746|atomicMatch_|0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d|90.0  |
|0xf0ef114988edd60b12469123d05f5dfff2e7b9544179a1e73ea6b25bcb47d63f|atomicMatch_|0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d|54.89 |
|0xf88cada41c2377491b97a211d64651fd342f4160d9ae02703b0924f4353a2efb|atomicMatch_|0xbc4ca0eda7647a8ab7c2061c2e118a18a936f13d|99.24 |
|0x209b12f61a93efab9a7a7641b6045a3dab10761d9f7ca6cbcaca75557d91ddac|atomicMa