In [1]:
spark

In [11]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [24]:
logs_df = spark.read.parquet("gs://gigabrain/ethereum_mainnet/logs/").filter("datestr >= '2023-02-05' and datestr != 'senpana'")

                                                                                

In [25]:
transactions_df = spark.read.parquet("gs://gigabrain/ethereum_mainnet/transactions/").filter("datestr >= '2023-02-05' and datestr != 'senpana'")

                                                                                

In [26]:
traces_df = spark.read.parquet("gs://gigabrain/ethereum_mainnet/traces/").filter("datestr >= '2023-02-05' and datestr != 'senpana'")

                                                                                

In [41]:
token_transfers = spark.read.parquet("gs://gigabrain/ethereum_mainnet/token_transfers/").filter("datestr >= '2023-02-05' and datestr != 'senpana'")

                                                                                

In [28]:
contract_nulls_comparison = logs_df.groupby("datestr").agg(
    F.count(F.col("transaction_hash")),
    F.count(
        F.when(F.col("contract_name").isin("", "N/A"), 1)
    ).alias("contract_nulls"),
    F.count(
        F.when(F.col("event_name").isin("", "N/A"), 1)
    ).alias("event_nulls")).show()




+----------+-----------------------+--------------+-----------+
|   datestr|count(transaction_hash)|contract_nulls|event_nulls|
+----------+-----------------------+--------------+-----------+
|2023-02-06|                2618112|        322409|      55782|
|2023-02-05|                2582223|        658384|      53437|
+----------+-----------------------+--------------+-----------+



                                                                                

In [18]:
322409/2618112 * 100

12.314561027182947

In [19]:
658384/2582223 * 100

25.496790943307374

In [30]:
55782/2618112 * 100

2.1306193165151073

In [31]:
53437/2582223 * 100

2.0694184816725745

# Uniswap V2 Deep Dive

In [32]:
logs_df.filter("contract_name like '%Uniswap%'").show(10)

+---------+--------------------+-----------------+--------------------+--------------------+--------------------+------------+--------------------+-------------------+----------+-------------+--------------------+--------------------+----------+
|log_index|    transaction_hash|transaction_index|             address|                data|              topics|block_number|          block_hash|    block_timestamp|event_name|contract_name|        decoded_data|              schema|   datestr|
+---------+--------------------+-----------------+--------------------+--------------------+--------------------+------------+--------------------+-------------------+----------+-------------+--------------------+--------------------+----------+
|      104|0x80a8d7d3b90634e...|               49|0x6db05459e5cf096...|0x000000000000000...|[0xd78ad95fa46c99...|    16573172|0x05e317440b6a9ca...|2023-02-06 23:56:35|      Swap|UniswapV2Pair|{"sender": "0xEf1...|[{"indexed": true...|2023-02-06|
|      298|0x320

In [34]:
logs_df.filter("contract_name like '%UniswapV2Pair%'").groupby("event_name").count().show()



+----------+------+
|event_name| count|
+----------+------+
|      Swap|228778|
|  Approval|  1632|
|  Transfer|  7846|
|      Sync|234139|
|      Burn|  1145|
|      Mint|  3418|
+----------+------+



                                                                                

In [35]:
sample_swap_events = logs_df.filter("contract_name like '%UniswapV2Pair%'").filter("event_name = 'Swap'").take(10)

In [36]:
sample_swap_events = [event.asDict() for event in sample_swap_events]

In [38]:
sample_swap_events[0]

{'log_index': 104,
 'transaction_hash': '0x80a8d7d3b90634ec744c32a5189827d9d2ebe4ec8cabb9a9d6348ac536329152',
 'transaction_index': 49,
 'address': '0x6db05459e5cf096b5a234c15e6431e2b7bb7613c',
 'data': '0x0000000000000000000000000000000000000000000000c25c502136b13ea24f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000017eb4589b8c14a9',
 'topics': ['0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822',
  '0x000000000000000000000000ef1c6e67703c7bd7107eed8303fbe6ec2554bf6b',
  '0x000000000000000000000000ef1c6e67703c7bd7107eed8303fbe6ec2554bf6b'],
 'block_number': 16573172,
 'block_hash': '0x05e317440b6a9ca3f585e99d2fe47fbdc80e6f3cff128a7879e92fb5d1f5937a',
 'block_timestamp': datetime.datetime(2023, 2, 6, 23, 56, 35),
 'event_name': 'Swap',
 'contract_name': 'UniswapV2Pair',
 'decoded_data': '{"sender": "0xEf1c6E67703c7BD7107eed8303Fbe6EC2554BF6B", 

In [56]:
### Lets collect all the events in this transaction
all_events = logs_df.filter("transaction_hash = '0x80a8d7d3b90634ec744c32a5189827d9d2ebe4ec8cabb9a9d6348ac536329152'").collect()

                                                                                

In [57]:
all_events = list(sorted(map(lambda x: x.asDict(), all_events), key=lambda y: y["log_index"]))

In [None]:
## https://etherscan.io/tx/0x80a8d7d3b90634ec744c32a5189827d9d2ebe4ec8cabb9a9d6348ac536329152
## This transaction Swaps WOOF INU for WETH (Wrapped Ether)

## Decoded data for each event is a string serialized json blob 

In [58]:
all_events

[{'log_index': 98,
  'transaction_hash': '0x80a8d7d3b90634ec744c32a5189827d9d2ebe4ec8cabb9a9d6348ac536329152',
  'transaction_index': 49,
  'address': '0x000000000022d473030f116ddee9f6b43ac78ba3',
  'data': '0x000000000000000000000000ffffffffffffffffffffffffffffffffffffffff0000000000000000000000000000000000000000000000000000000064091fe40000000000000000000000000000000000000000000000000000000000000000',
  'topics': ['0xc6a377bfc4eb120024a8ac08eef205be16b817020812c73223e81d1bdb9708ec',
   '0x0000000000000000000000001933d2ed081909fe835d1ae36305e0b84490cd0d',
   '0x0000000000000000000000000e3d15be419e29c3791676c8553264958b95a6e7',
   '0x000000000000000000000000ef1c6e67703c7bd7107eed8303fbe6ec2554bf6b'],
  'block_number': 16573172,
  'block_hash': '0x05e317440b6a9ca3f585e99d2fe47fbdc80e6f3cff128a7879e92fb5d1f5937a',
  'block_timestamp': datetime.datetime(2023, 2, 6, 23, 56, 35),
  'event_name': 'Permit',
  'contract_name': 'Permit2',
  'decoded_data': '{"owner": "0x1933D2ed081909fE835D1Ae363

In [67]:
import json
decoded_data = json.loads(all_events[6]["decoded_data"])

In [66]:
all_events[6]

{'log_index': 104,
 'transaction_hash': '0x80a8d7d3b90634ec744c32a5189827d9d2ebe4ec8cabb9a9d6348ac536329152',
 'transaction_index': 49,
 'address': '0x6db05459e5cf096b5a234c15e6431e2b7bb7613c',
 'data': '0x0000000000000000000000000000000000000000000000c25c502136b13ea24f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000017eb4589b8c14a9',
 'topics': ['0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822',
  '0x000000000000000000000000ef1c6e67703c7bd7107eed8303fbe6ec2554bf6b',
  '0x000000000000000000000000ef1c6e67703c7bd7107eed8303fbe6ec2554bf6b'],
 'block_number': 16573172,
 'block_hash': '0x05e317440b6a9ca3f585e99d2fe47fbdc80e6f3cff128a7879e92fb5d1f5937a',
 'block_timestamp': datetime.datetime(2023, 2, 6, 23, 56, 35),
 'event_name': 'Swap',
 'contract_name': 'UniswapV2Pair',
 'decoded_data': '{"sender": "0xEf1c6E67703c7BD7107eed8303Fbe6EC2554BF6B", 

In [82]:
decoded_data

{'sender': '0xEf1c6E67703c7BD7107eed8303Fbe6EC2554BF6B',
 'to': '0xEf1c6E67703c7BD7107eed8303Fbe6EC2554BF6B',
 'amount0In': 3585320203468064858703,
 'amount1In': 0,
 'amount0Out': 0,
 'amount1Out': 107721733763241129}

# Large Scale Event Specific Transformations on Decoded Data

In [102]:
event_schema = T.StructType([
    T.StructField("sender",T.StringType(),True),
    T.StructField("to",T.StringType(),True),
    T.StructField("amount0In",T.DecimalType(38, 7),True), # Note: This is max precision. Some tokens exceed this and probably need to be string coerced.
    T.StructField("amount1In",T.DecimalType(38, 7),True),
    T.StructField("amount0Out",T.DecimalType(38, 7),True),
    T.StructField("amount1Out",T.DecimalType(38, 7),True),
])

In [103]:
## This is what a Swap schema looks like.

In [104]:
from decimal import Decimal

def swap_decoder(swap_blob):
    ret = json.loads(swap_blob)
    ret["amount0In"] = Decimal(ret["amount0In"])
    ret["amount1In"] = Decimal(ret["amount1In"])
    ret["amount0Out"] = Decimal(ret["amount0Out"])
    ret["amount1Out"] = Decimal(ret["amount1Out"])
    return ret

swap_decoder_udf = F.udf(swap_decoder, event_schema)

In [105]:
logs_df.filter("contract_name like '%UniswapV2Pair%'").filter("event_name = 'Swap'").withColumn("swap_info", swap_decoder_udf(F.col("decoded_data"))).select("*", "swap_info.*").show(100)

+---------+--------------------+-----------------+--------------------+--------------------+--------------------+------------+--------------------+-------------------+----------+-------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|log_index|    transaction_hash|transaction_index|             address|                data|              topics|block_number|          block_hash|    block_timestamp|event_name|contract_name|        decoded_data|              schema|   datestr|           swap_info|              sender|                  to|           amount0In|           amount1In|          amount0Out|          amount1Out|
+---------+--------------------+-----------------+--------------------+--------------------+--------------------+------------+--------------------+-------------------+----------+-------------+--------------------+-

                                                                                