In [1]:
# import sys
#!{sys.executable} -m pip install --upgrade pip
# !{sys.executable} -m pip install web3
# !{sys.executable} -m pip install python-dotenv
# !{sys.executable} -m pip install requests
# !{sys.executable} -m pip install pandas
# !{sys.executable} -m pip install tabulate

## Imports

In [2]:
from web3 import Web3
from web3.contract.contract import Contract, ContractFunctions, ContractEvents

from eth_typing import ChecksumAddress
from eth_typing import ChainId

from functools import lru_cache
from dataclasses import dataclass

# For enums
from enum import StrEnum

# To read environment property file
import os
from dotenv import load_dotenv
from pathlib import Path

import requests
import json

# Date calculations
from datetime import datetime, timedelta
import time

# Importing Pandas to create DataFrame
import pandas as pd

# For display table
from tabulate import tabulate

## Constants

In [3]:
# Token pair
TOKEN0_ADDRESS = '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48'
TOKEN1_ADDRESS = '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2'

# Fee tier (0.05%)
FEE_TIER = 500

# Path to ABIs
ABI_PATH = 'assets/abi'

# Pool Events
class PoolEvent(StrEnum):
    BURN = 'Burn'
    COLLECT = 'Collect'
    MINT = 'Mint'
    SWAP = 'Swap'

# NFT Events
class NFTEvent(StrEnum):
    COLLECT = 'Collect'
    INCREASE_LIQUIDITY = 'IncreaseLiquidity'
    DECREASE_LIQUIDITY = 'DecreaseLiquidity'

# How many days back - approximately 1 year
DAYS = 360

# Page size for ether scan search
PAGE_SIZE = 100

# Maximum pages we are going to search; max number of items return = PAGE_SIZE * MAX_PAGES
MAX_PAGES = 5

# Etherscan endpoint
ETHERSCAN_ENDPOINT = 'https://api.etherscan.io/v2/api'

# Coingecko price API endpoint
COINGECKO_PRICE_ENDPOINT = 'https://api.coingecko.com/api/v3/simple/token_price/ethereum?contract_addresses={address}&vs_currencies=usd'

## Load environment variables

In [4]:
dotenv_path = Path('.env/uniswap')
load_dotenv(dotenv_path=dotenv_path)

PROVIDER_URL = os.getenv('PROVIDER_URL')
ETHERSCAN_API_KEY = os.getenv('ETHERSCAN_API_KEY')
WALLET_ADDRESS = os.getenv('WALLET_ADDRESS')
COINGECKO_API_KEY = os.getenv('COINGECKO_API_KEY')

In [5]:
# Initialize web3 instance
web3 = Web3(Web3.HTTPProvider(PROVIDER_URL))

In [6]:
class BaseContract(object):
    def __init__(self, w3:Web3, address:ChecksumAddress, abi_path:str) -> None:
        self.w3client:Web3 = w3
        self.address:ChecksumAddress = address
        self.abi_path:str = abi_path
        self.__chain_id:ChainId = ChainId(w3.eth.chain_id)
        self.__contract:Contract = None
        self.__abi = None
    
    @property
    def chain_id(self) -> ChainId:
        return self.__chain_id        

    def __load_abi(self) -> None:
        with open(self.abi_path) as f:
            self.__abi = json.load(f)
        
    def __init_contract(self, address:ChecksumAddress, path:str) -> Contract:
        if self.__abi is None:
            self.__load_abi()
        return self.w3client.eth.contract(address=self.address, abi=self.__abi)

    @property
    def contract(self) -> Contract:
        """Returns contract instance"""
        if self.__contract is None:
            self.__contract = self.__init_contract(self.address, self.abi_path)
        return self.__contract

    @property
    def functions(self) -> ContractFunctions:
        """Quick access to `self.contract.functions"""
        contract = self.contract
        return contract.functions

    @property
    def events(self) -> ContractEvents:
        """Quick access to `self.contract.events"""
        contract = self.contract
        return contract.events

    def event_signature(self, name:str) -> str:
        """Calculates the event signature for given event name"""
        if self.__abi is None:
            self.__load_abi()
        for item in self.__abi[1:]:
            # Only interested in Event types and given event name
            if ((item['type'] == 'event') and (item['name'] == name)):
                name = item['name']
                inputs = [param['type'] for param in item['inputs']]
                inputs = ','.join(inputs)
                # Hash event signature
                event_signature_text = f'{name}({inputs})'
                return web3.to_hex(web3.keccak(text=event_signature_text))
        return None
# --------------------------------------------------------------------------------------------------        
class FactoryV3Contract(BaseContract):
    def __init__(self, w3:Web3) -> None:
        super().__init__(w3=w3, address='0x1F98431c8aD98523631AE4a59f267346ea31F984',
                         abi_path=f'{ABI_PATH}/factory_v3.json')
# --------------------------------------------------------------------------------------------------        
class PoolV3Contract(BaseContract):
    def __init__(self, w3:Web3, address:ChecksumAddress) -> None:
        super().__init__(w3=w3, address=address, abi_path=f'{ABI_PATH}/pool_v3.json')
# --------------------------------------------------------------------------------------------------        
class NFTPositionManagerContract(BaseContract):
    def __init__(self, w3:Web3):
        super().__init__(w3=w3, address='0xC36442b4a4522E871399CD717aBDD847Ab11FE88',
                         abi_path=f'{ABI_PATH}/nft_position_manager.json')
# --------------------------------------------------------------------------------------------------        
@dataclass
class Token:
    chainId: ChainId
    decimals: int
    symbol: str
    name: str
    address: ChecksumAddress
# --------------------------------------------------------------------------------------------------        
class ERC20Contract(BaseContract):
    def __init__(self, w3:Web3, address:ChecksumAddress, abi_path:str=f'{ABI_PATH}/erc20.json') -> None:
        super().__init__(w3=w3, address=address, abi_path=abi_path)
        self._data = None

    def _get_data(self):
        return Token(
            chainId=super().chain_id,
            decimals=super().functions.decimals().call(),
            symbol=super().functions.symbol().call(),
            name=super().functions.name().call(),
            address=super().contract.address,
        )

    @property
    def data(self) -> Token:
        """Get immutable data"""
        if self._data is None:
            self._data = self._get_data()
        return self._data

In [7]:
# Token pair contracts
token0 = ERC20Contract(w3=web3, address=TOKEN0_ADDRESS)
token1 = ERC20Contract(w3=web3, address=TOKEN1_ADDRESS)

# Factory contract
factory_contract = FactoryV3Contract(w3=web3)
pool_address = factory_contract.functions.getPool(token0.address, token1.address, FEE_TIER).call()

# Create Pool and NFT manager contracts
pool_contract = PoolV3Contract(w3=web3, address=pool_address)
nftmgr_contract = NFTPositionManagerContract(w3=web3)

In [8]:
# Event signatures for pool
pool_signatures = {pool_contract.event_signature(key.value): key.value for key in [
    PoolEvent.BURN, PoolEvent.MINT, PoolEvent.COLLECT]}

# Event signatures for nft manager
nft_signatures = {nftmgr_contract.event_signature(key.value): key.value for key in [
    NFTEvent.COLLECT, NFTEvent.INCREASE_LIQUIDITY, NFTEvent.DECREASE_LIQUIDITY]}
# nft_signatures

In [9]:
@lru_cache
def calculate_start_block(chainid:ChainId) -> int:
    """ Return the start block using DAYS constant
    Parameters:
    chainid: ChainId
        chain id

    Returns:
    int
        The start block number
    """
    # This is the starting date (DAYS befoe the current date)
    d = datetime.today() - timedelta(DAYS)
    ts_days_ago = int(d.timestamp())
    # We can use this as a starting block
    params = {'chainid':chainid.value, 'module':'block', 'action':'getblocknobytime', 'timestamp':{ts_days_ago},
              'closest':'before', 'apikey':{ETHERSCAN_API_KEY}}
    # Store the value from the result
    start_block = json.loads(requests.get(url=ETHERSCAN_ENDPOINT, params=params).text)['result']
    return start_block

## Utility method to check an addreess has a contract or not
### Reference: __[How to detect an address is a contract?](https://ethereum.stackexchange.com/questions/28521/how-to-detect-if-an-address-is-a-contract)__

In [10]:
def is_contract_address(address):
    return web3.to_hex(web3.eth.get_code(web3.to_checksum_address(address))) != '0x'

In [11]:
@lru_cache
def get_transactions(chainid:ChainId, action:str, start_block:int) -> list[str]:
    """ Return a list of transactions for the WALLET_ADDRESS
    Parameters:
    chainid: ChainId
        chain id
    action : str
        action parameter to the API, txlist, tokennfttx etc.
    start_block: int
        start block to start the search
        
    Returns:
    list
        A list of transactions for the wallet
    """
    params = {'chainid':chainid.value, 'module':'account', 'action':action, 'address':WALLET_ADDRESS, 'offset':{PAGE_SIZE},
              'startblock':{start_block}, 'toBlock':99999999, 'sort':'asc', 'apikey':{ETHERSCAN_API_KEY}}
    txlist = []
    for page in range(MAX_PAGES):
        params['page'] = page+1
        txlist_response = json.loads(requests.get(url=ETHERSCAN_ENDPOINT, params=params).text)
        if txlist_response['status'] == '0': break
        txlist = txlist + txlist_response['result']
    return txlist

In [12]:
# Get the start block
start_block = calculate_start_block(chainid=ChainId.ETH)

# Normal transactions
trans = get_transactions(chainid=ChainId.ETH, action='txlist', start_block=start_block)
# Filter out non contract and error transactions, only interested in hash
normal_list = [x['hash'] for x in trans if is_contract_address(x['to']) and x['isError']=='0']

# NFT transactions
trans = get_transactions(chainid=ChainId.ETH, action='tokennfttx', start_block=start_block)
nft_list = [x['hash'] for x in trans]

# Merge two lists, remove any duplicates
txlist = list(set(normal_list + nft_list))

In [13]:
# len(txlist)

In [14]:
def is_tx_in_scope(tran_hash:str):
    """ Return the transaction hash if it is in scope or else None is returned
    Parameters:
    tran_hash : str
        transaction hash

    Returns:
    str
        Transaction hash if in scope or None
    """
    # Get transaction receipt
    for log in web3.eth.get_transaction_receipt(tran_hash).logs:
        # Check topic 0 is in our interested signatures
        if pool_contract.address == log.address and web3.to_hex(log['topics'][0]) in pool_signatures:
            return tran_hash
    return None

In [15]:
# Only include transactions with logs with pool contracts address and topics with pool signatures
txlogs = set(map(is_tx_in_scope, txlist))
# Remove any None values
txlogs = [x for x in txlogs if x is not None] 
# len(txlogs)

## Token pair information

In [16]:
@lru_cache
def get_token_price(address:str) -> float:
    """ Return the current token price
    Parameters:
    address : str
        token address

    Returns:
    list
        The current price of the token
    """
    url = COINGECKO_PRICE_ENDPOINT.format(address=address)
    
    headers = {
        'accept': 'application/json',
        'x-cg-demo-api-key': COINGECKO_API_KEY
    }
    response = requests.get(url, headers=headers)
    return float(json.loads(response.text)[address.lower()]['usd'])

In [17]:
def get_token_pair_info(processed_log:dict) -> dict:
    """ Return the token pair information
    Parameters:
    processed_log : dict
        dictionalry of processed logs

    Returns:
    dict
        A dictionary with token information for each token; keys are 'amount', 'symbol', 'price'
    """
    
    # Initalize the dictionary to return
    keys = ['amount', 'symbol', 'price']
    token_pair = dict(token0={key: None for key in keys}, token1={key: None for key in keys})

    pool_address = processed_log['address']
    pool_contract = PoolV3Contract(w3=web3, address=pool_address)
    assert pool_contract != None, 'Pool contract must exist'
    
    # Get decimals and symbols from token contracts
    address0 = pool_contract.functions.token0().call()
    contract0 = ERC20Contract(w3=web3, address=address0)
    decimal0 = contract0.functions.decimals().call()

    # Decimals and symbol for token0
    token_pair['token0']['decimal'] = decimal0
    token_pair['token0']['symbol'] = contract0.functions.symbol.call()
    
    address1 = pool_contract.functions.token1().call()
    contract1 = ERC20Contract(w3=web3, address=address1)
    decimal1 = contract1.functions.decimals().call()
    
    # Decimals and symbol for token1
    token_pair['token1']['decimal'] = decimal1
    token_pair['token1']['symbol'] = contract1.functions.symbol.call()

    # Amounts for tokens
    token_pair['token0']['amount'] = processed_log['args']['amount0']/pow(10, decimal0)
    token_pair['token1']['amount'] = processed_log['args']['amount1']/pow(10, decimal1)

    # Get the prices for tokens
    token_pair['token0']['price'] = get_token_price(address=address0)
    token_pair['token1']['price'] = get_token_price(address=address1)

    return token_pair

In [18]:
def event_action(event_name:str, processed_log:dict) -> str:
    """ Return the action event for an event
    Parameters:
    event_name : str
        event name
    processed_log : dict
        dictionalry of processed logs

    Returns:
    str
        An action event as a string
    """
    token_pair = get_token_pair_info(processed_log=processed_log)
    
    amount0 = token_pair['token0']['amount']
    amount1 = token_pair['token1']['amount']
    
    symbol0 = token_pair['token0']['symbol']
    symbol1 = token_pair['token1']['symbol']
    
    price0 = token_pair['token0']['price']
    price1 = token_pair['token1']['price']

    if event_name == 'Burn':
        return(
            f'Remove {amount0:,} (${round((amount0 * price0),2):,.2f}) '
            f'{symbol0} and {amount1:,} (${round((amount1 * price1),2):,.2f}) {symbol1} Liquidity from Uniswap v3')
    elif event_name == 'Collect':
        return(
            f'Collect {amount0:,} (${round((amount0 * price0),2):,.2f}) '
            f'{symbol0} and {amount1:,} (${round((amount1 * price1),2):,.2f}) {symbol1} from Uniswap v3')        
    elif event_name == 'Mint':
        return(
        f'Add {amount0:,} (${round((amount0 * price0),2):,.2f}) '
        f'{symbol0} and {amount1:,} (${round((amount1 * price1),2):,.2f}) {symbol1} Liquidity to Uniswap v3')
    else:            
        if amount0 < 0:
            rate1 = (amount0 * -1) / amount1
            return(
                f'Swapped {amount1:,} (${round((amount1 * price1),2):,.2f}) '
                f'{symbol1} [@ {rate1} {symbol1} per {symbol0}] for {(amount0 * -1)} '
                f'(${round(((amount0 * -1) * price0),2):,.2f}) {symbol0} on Uniswap v3')
        else:
            rate0 = amount1 / (amount0 * -1)
            return(
                f'Swapped {amount0:,} (${round((amount0 * price0),2):,.2f}) '
                f'{symbol0} [@ {rate0} {symbol0} per {symbol1}] for {(amount1 * -1)} '
                f'(${round(((amount1 * -1) * price1),2):,.2f}) {symbol1} on Uniswap v3')

In [19]:
def handle_events(logs:list, swap_signature:str=None) -> list:
    # List of responses
    resp_list = []
    # Saves a pool event for NFT event to access it
    stack = []
    for log in logs:
        key = web3.to_hex(log['topics'][0])
        if key in pool_signatures:
            event_name = pool_signatures[key]
            # Process log using pool contract
            processed_log = pool_contract.events[event_name]().process_log(log)
            # Check for a log event without any transaction value for both tokens
            if (processed_log['args']['amount0'] == 0) and (processed_log['args']['amount1'] == 0):
                # Continue with the next log event
                continue
            # Add it to the stack to retrieve when we arrive at the NFT event
            stack.append(event_action(event_name=event_name, processed_log=processed_log))
        elif key in nft_signatures:
            event_name = nft_signatures[key]
            # Process log using nft contract
            processed_log = nftmgr_contract.events[event_name]().process_log(log)
            # Check for a log event without any transaction value for both tokens
            if (processed_log['args']['amount0'] == 0) and (processed_log['args']['amount1'] == 0):
                # Contune with the next log event
                continue
            else:
                # Token id + the messge from the pool contract
                token_id = processed_log['args']['tokenId']
                resp_list.append(f'{stack.pop()} Token ID {token_id}')
        elif key == swap_signature:
            event_name = 'Swap'
            processed_log = pool_contract.events[event_name]().process_log(log)
            # There is no token associated with Swap. Just add this to the response
            resp_list.append(event_action(event_name=event_name, processed_log=processed_log))
        else:
            # Ignore as this event as it is not in our signatures
            pass
    return resp_list

In [20]:
def get_transaction_actions(transactions:set) -> list:
    burn_signatures = [key for key, val in pool_signatures.items() if val == 'Burn']
    burn_signatures.append([key for key, val in nft_signatures.items() if val == 'DecreaseLiquidity'][0])

    mint_signatures = [key for key, val in pool_signatures.items() if val == 'Mint']
    mint_signatures.append([key for key, val in nft_signatures.items() if val == 'IncreaseLiquidity'][0])

    collect_signatures = [key for key, val in pool_signatures.items() if val == 'Collect']
    collect_signatures.append([key for key, val in nft_signatures.items() if val == 'Collect'][0])

    swap_signature = pool_contract.event_signature('Swap')

    # List of transaction actions to return
    action_list = []
    for tx in transactions:
        # A list contains action details
        action = []
        
        receipt = web3.eth.get_transaction_receipt(tx)

        # A list to collect results from various events
        messages = []

        # Is it a Burn event?
        logs = [x for x in receipt.logs if web3.to_hex(x['topics'][0]) in burn_signatures]
        # Has to be at least 2, Burn and DecreaseLiquidity
        if len(logs) > 1:
            # Handle burn events
            messages.append(handle_events(logs=logs))

        # Is it a Collect event?
        logs = [x for x in receipt.logs if web3.to_hex(x['topics'][0]) in collect_signatures]
        # Has to be at least 2, Collect (pool) and Collect (NFT)
        if len(logs) > 1:
            # Handle collect events
            messages.append(handle_events(logs=logs))

        # Is it a Mint event?
        logs = [x for x in receipt.logs if web3.to_hex(x['topics'][0]) in mint_signatures]
        # Has to be at least 2, Mint and IncreaseLiquidity
        if len(logs) > 1:
            # Handle mint events
            messages.append(handle_events(logs=logs))

        # Is it a Swap event?
        logs = [x for x in receipt.logs if web3.to_hex(x['topics'][0]) == swap_signature]
        if logs:
            # Handle swap events
            messages.append(handle_events(logs=logs, swap_signature=swap_signature))

        # Messages may contain multiple lists; flatten them
        messages = [item for x in messages for item in x]
        # Remove any None values
        messages = [x for x in messages if x is not None]
        if messages:
            action.append(tx)
            # Calculate the timestamp of the transaction
            rec = web3.eth.get_transaction(transaction_hash=tx)
            action.append(web3.eth.get_block(block_identifier=rec.blockNumber).timestamp)
            # Join each message by new line char
            action.append(f'{os.linesep}'.join(messages))
            action_list.append(action)
    return action_list

In [21]:
actions = get_transaction_actions(transactions=txlogs)
# Create DF from list of actions
df = pd.DataFrame(data=actions, columns=['tx', 'timestamp', 'details'])
# Sort by timestamp
df = df.sort_values(by=['timestamp'])
# Convert unix timestamp
df['timestamp'] = df['timestamp'].apply(lambda x: time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(x)))
# Truncate tx for formatting
df['tx'] = df['tx'].apply(lambda x: '{first_part} ... {last_part}'.format(first_part=x[:6], last_part=x[-6:]))

# Header
print('LP events for the wallet {first_part} ... {last_part} - (Uniswap Pool {symbol0}/{symbol1} {fee}%)'.\
    format(first_part=WALLET_ADDRESS[:4], last_part=WALLET_ADDRESS[-4:], symbol0=token0.functions.symbol.call(),
           symbol1=token1.functions.symbol.call(), fee=FEE_TIER/10000))
# Crate a tabulate table
table = tabulate(df.values.tolist(), headers=['Tx Hash', 'Time', 'Details'], tablefmt="grid",
                 maxcolwidths=[20, 10, None])
print(table)

LP events for the wallet 0xc9 ... 6c88 - (Uniswap Pool USDC/WETH 0.05%)
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------------------------+
| Tx Hash           | Time       | Details                                                                                                                               |
| 0x1ad2 ... 33b917 | 2025-01-09 | Add 6,561.830118 ($6,561.20) USDC and 2.030304730511885 ($3,683.87) WETH Liquidity to Uniswap v3 Token ID 903752                      |
|                   | 05:27:23   | Swapped 1.965695269488115 ($3,566.64) WETH [@ 3338.813152208017 WETH per USDC] for 6563.089219 ($6,562.46) USDC on Uniswap v3         |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------------------------+
| 0xd0b3 ... 3d8f52 | 2025-01-10 | Collect 83.241689 ($83.23) USDC and 0.