<a href="https://colab.research.google.com/gist/sambacha/82bba64c3dc6c72506512ab72c279135/report_generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
from bs4 import BeautifulSoup
from web3 import Web3

from analyzer.utils import parse_traces
from analyzer.decoder import trace_transaction
from analyzer.detection_rules import find_fn, analyze_with_no_abi, analyze_mutate_function_input

In [None]:
# AMM price-related functions
'''
0xcd7724c3 getEthToTokenInputPrice(uint256)
0x59e94862 getEthToTokenOutputPrice(uint256)
0x95b68fe7 getTokenToEthInputPrice(uint256)
0x2640f62c getTokenToEthOutputPrice(uint256)
0x0902f1ac getReserves()
0xed8e84f3 calc_token_amount(uint256[2],bool)
0x3883e119 calc_token_amount(uint256[3],bool)
0xcf701ff7 calc_token_amount(uint256[4],bool)
0xcc2b27d7 calc_withdraw_one_coin(uint256,int128)
0x861cdef0 calc_token_amount(address,uint256[4],bool)
0x41b028f3 calc_withdraw_one_coin(address,uint256,int128)
0xd38d2bea getExpectedRate(address,address,uint256,bool)
'''

price_func_names = [
    "getEthToTokenInputPrice",
    "getEthToTokenOutputPrice",
    "getTokenToEthInputPrice",
    "getTokenToEthOutputPrice",
    "getReserves",
    "calc_token_amount",
    "calc_withdraw_one_coin",
    "getExpectedRate"
]

In [None]:
# provide underlying tokens of AMMs to determine swapped tokens
underlying_tokens = {
    '0xbbc81d23ea2c3ec7e56d39296f0cbb648873a5d3': [ # Curve Y
        '0x6b175474e89094c44da98b954eedeac495271d0f',
        '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
        '0xdac17f958d2ee523a2206206994597c13d831ec7',
        '0x0000000000085d4780b73119b644ae5ecd22b376',
    ],
    '0xfcba3e75865d2d561be8d220616520c171f12851': [ # Curve sUSD
        '0x6B175474E89094C44Da98b954EedeAC495271d0F',
        '0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48',
        '0xdAC17F958D2ee523a2206206994597C13D831ec7',
        '0x57Ab1ec28D129707052df4dF418D58a2D46d5f51',
    ],
    '0x0d4a11d5EEaaC28EC3F61d100daF4d40471f1852': [ # UniswapV2 ETH-USDT
        '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
        '0xdac17f958d2ee523a2206206994597c13d831ec7'
    ],
    '0xa478c2975ab1ea89e8196811f51a7b7ade33eb11': [ # UniswapV2 DAI-ETH
        '0x6b175474e89094c44da98b954eedeac495271d0f',
        '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'
    ]
}

def get_underlying_tokens(cont_addr, idx):
    global underlying_tokens
    if cont_addr not in underlying_tokens:
        raise Exception(f"{cont_addr} not supported, please provide information of its underlying tokens")
    return underlying_tokens[cont_addr][idx]

In [None]:
# cache variables
addr2name = {}
sig2name = {}
swap_tokens = {}

# helper functions to get contract/sig name from external websites
def http_get(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36'
    }
    r = requests.get(url, headers=headers)
    r.close()
    return r

def get_cont_name_etherscan(addr):
    url = f'https://etherscan.io/address/{addr}'
    r = http_get(url)
    soup = BeautifulSoup(r.text, 'html.parser')
    div = soup.find(id='ContentPlaceHolder1_tr_tokeninfo')
    if div:
        return div.a.text
    div = soup.find(id='ContentPlaceHolder1_contractCodeDiv')
    if 'Contract Source Code Verified' in str(div):
        return div.find('span', class_='h6').text
    return None

def get_sig_name_4byte(sig):
    url = f'https://www.4byte.directory/api/v1/signatures/?hex_signature={sig}'
    r = http_get(url)
    l = r.json()['results']
    return l[0]['text_signature'].split('(')[0] if l else sig

def get_cont_name(addr):
    global addr2name
    addr = addr.lower()
    if addr not in addr2name:
        name = get_cont_name_etherscan(addr)
        addr2name[addr] = name if name else addr
    return addr2name[addr]

def get_sig_name(sig):
    global sig2name
    if not sig:
        return ''
    if sig not in sig2name:
        name = get_sig_name_4byte(sig)
        sig2name[sig] = name if name else sig
    return sig2name[sig]

In [None]:
# helper functions to parse output from ProMutator
def log(tx):
    print(tx)

def get_sig_from_abi(abi):
    if abi['name'] == '':
        return '0xcd7724c3'
    if abi['name'] == 'getEthToTokenOutputPrice':
        return '0x59e94862'
    if abi['name'] == 'getTokenToEthInputPrice':
        return '0x95b68fe7'
    if abi['name'] == 'getTokenToEthOutputPrice':
        return '0x2640f62c'
    if abi['name'] == 'getReserves':
        return '0x0902f1ac'
    if abi['name'] == 'calc_token_amount':
        if len(abi['inputs']) == 3:
            return '0x861cdef0'
        if abi['inputs'][0]['type'] == 'uint256[2]':
            return '0xed8e84f3'
        if abi['inputs'][0]['type'] == 'uint256[3]':
            return '0x3883e119'
        if abi['inputs'][0]['type'] == 'uint256[4]':
            return '0x861cdef0'
        raise Exception('abi malformed')
    if abi['name'] == 'calc_withdraw_one_coin':
        if len(abi['inputs']) == 2:
            return '0xcc2b27d7'
        if len(abi['inputs']) == 3:
            return '0x41b028f3'
        raise Exception()
    if abi['name'] == 'getExpectedRate':
        return '0xd38d2bea'
    raise Exception()

def find_fn_with_args(parsed, fn_names):
    calls = []
    if 'input' in parsed.keys():
        try:
            if parsed['input'][0].fn_name in fn_names:
                calls.append({
                    'input': parsed['input'],
                    'from': parsed['from'],
                    'to': parsed['to'],
                })
        except:
            pass
    if 'calls' in parsed.keys():
        for call in parsed['calls']:
            tmp = find_fn_with_args(call, fn_names)
            for c in tmp:
                calls.append(c)
    return calls

def call_to_json(call):
    cont_addr = call['input'][0].address
    return {
        'cont_addr': cont_addr,
        'cont_name': get_cont_name(cont_addr),
        'price_func_name': call['input'][0].fn_name,
        'price_func_sig': get_sig_from_abi(call['input'][0].abi),
        'args': call['input'][1]
    }

def get_normal_traces(tx_hash):
    traces = trace_transaction(tx_hash)['result']
    parsed = parse_traces(traces)
    calls = find_fn_with_args(parsed, price_func_names)
    inputs = parsed['input']
    if isinstance(inputs, tuple):
        name, sig, args = inputs[0].abi['name'], '', inputs[1]
    else:
        name, sig, args = '', inputs[:10], ''
    return {
        'tx_hash': tx_hash,
        'cont_addr': parsed['to'],
        'cont_name': parsed['to_name'],
        'func_name': name if name else get_sig_name(sig),
        'func_sig': sig,
        'price_func_calls': [call_to_json(c) for c in calls]
    }

def get_normal_transfers_and_mutations(tx_hash, mutate_input=False):
    analyze_fn = analyze_mutate_function_input if mutate_input else analyze_with_no_abi
    detailed_outputs = analyze_fn(tx_hash, log, True)
    events = eval(detailed_outputs['normalTransfers'])
    normal_transfers = [{
        'cont_addr': event['address'],
        'cont_name': get_cont_name(event['address']),
        'value': event['args']['value']
    } for event in events]
    
    mutations = []
    for price_func_idx, output in enumerate(detailed_outputs['detailOutput']):
        mutation = {'mutated_transfers': []}
        if mutate_input:
            mutation['mutate_field'] = output['mutateField']
            mutation['mutate_rate'] = output['mutateRate']
            
        for data in output['suspicious']:
            if 'mutatedTransfers' in data['suspicious']:
                mutated_transfers = eval(data['suspicious']['mutatedTransfers'])
                detected = [{
                    'type': 'Transfer Amount Changed',
                    'transfer_event_idx': int(mutated['logIndex']),
                    'value': mutated['args']['value']
                } for mutated in mutated_transfers]
            elif data['suspicious']['type'] == 'reverted':
                detected = [{
                    'type': 'State Changed',
                    'msg': data['suspicious']['msg']
                }]
            elif data['suspicious']['type'] == 'normal':
                detected = [{
                    'type': 'Normal'
                }]
            mutation['mutated_transfers'].append({
                'price_func_idx': price_func_idx,
                'multiply_factor': float(data['rate']),
                'detected': detected
            })
        mutations.append(mutation)
    return normal_transfers, mutations

In [None]:
# helper functions to determine swapped tokens of the given transaction
def is_swap_in(price_func_sig, multiply_factor):
    if price_func_sig in [
        '0xcd7724c3', # getEthToTokenInputPrice
        '0x2640f62c', # getTokenToEthOutputPrice
        '0x0902f1ac', # getReserves
        '0xcc2b27d7', # calc_withdraw_one_coin(uint256,int128)
        '0x41b028f3'  # calc_withdraw_one_coin(address,uint256,int128)
    ]:
        return multiply_factor > 1
    return multiply_factor < 1

def get_swap_token_addr(call):
    cont_addr = call['cont_addr'].lower()
    price_func_sig = call['price_func_sig']
    args = call['args']
    if price_func_sig in [
        '0xcd7724c3', # getEthToTokenInputPrice
        '0x59e94862', # getEthToTokenOutputPrice
        '0x95b68fe7', # getTokenToEthInputPrice
        '0x2640f62c', # getTokenToEthOutputPrice
        '0x0902f1ac', # getReserves
        '0xd38d2bea'  # getExpectedRate
    ]:
        return get_underlying_tokens(cont_addr, 0)
    if price_func_sig in [
        '0xcc2b27d7', # calc_withdraw_one_coin
        '0x41b028f3'
    ]:
        return get_underlying_tokens(cont_addr, args['i'])
    if price_func_sig in [
        '0xed8e84f3', # calc_token_amount(uint256[2],bool)
        '0x3883e119', # calc_token_amount(uint256[3],bool)
        '0xcf701ff7', # calc_token_amount(uint256[4],bool)
        '0x861cdef0'  # calc_token_amount(address,uint256[4],bool)
    ]: #
        return get_underlying_tokens(cont_addr, args['_amounts'])

In [None]:
def generate_report(tx_hash, detection_rule='RT'):
    # get applied detection rule
    if detection_rule == 'RT':
        mutate_input = False
    elif detection_rule == 'RR':
        mutate_input = True
    else:
        raise Exception('Unknown detection rule')
        
    # all possible state types
    state_typs = ['State Changed', 'Transfer Amount Changed']
    
    # retrieve all mutation info from api
    normal_tx = get_normal_traces(tx_hash)
    transfer_events, mutations = get_normal_transfers_and_mutations(tx_hash, mutate_input)
    price_func_calls = normal_tx['price_func_calls']
    
    # get normal tx info
    s = ''
    s += f"Analyzing transaction: {normal_tx['tx_hash']}\n"
    s += f"Entry function: {normal_tx['cont_name']}.{normal_tx['func_name']} ({normal_tx['func_sig']})\n"
    
    # get price-related function info
    cnt = len(price_func_calls)
    end = 's' if cnt > 1 else ''
    s += '\n'
    s += f"Found {cnt} price-related function{end}:\n"
    for call in price_func_calls:
        price_func = get_sig_name(call['price_func_sig'])
        s += f"  - {call['cont_name']}.{price_func}\n"
    
    # get transfer event info
    cnt = len(transfer_events)
    end = 's' if cnt > 1 else ''
    s += '\n'
    s += f"Collected {cnt} Transfer event{end}:\n"
    for event in transfer_events:
        s += f"  - {event['cont_name']}.Transfer\n"
    
    # get successful mutations
    s += '\n'
    s += f"List of successful mutations:\n"
    
    # get detected type (depends on whether the function arguments are mutated)
    detected_typ = 'Rule of Revert' if mutate_input else 'Rule of Transfer'
    
    cached_rate, cached_change = 1, ''
    for mutation in mutations:
        # get mutated int-type argument info
        if mutate_input:
            s += f"Mutated int-type argument: {mutation['mutate_field']}\n"
            s += f"Multiply factor: {mutation['mutate_rate']}\n"
        
        # collect info of all mutated transfers
        for mt in sorted(mutation['mutated_transfers'],
                         key=lambda x: (x['multiply_factor'], x['detected'][0]['type'])):
            
            # get swapped token
            call = price_func_calls[mt['price_func_idx']]
            swap_token_addr = get_swap_token_addr(call)
            swap_token_name = get_cont_name(swap_token_addr)

            # info of mutated price-related function
            t = ''
            t += f"Price-related function: {call['cont_name']}.{call['price_func_name']} ({call['price_func_sig']})\n"
            t += f"Multiply factor: {mt['multiply_factor']}\n"

            got_transfer, got_change, got_all = False, False, False
            for idx, mutated in enumerate(mt['detected']):
                if mutated['type'] == state_typs[1]:
                    # detected trasfer changes
                    event = transfer_events[idx]
                    if event['value'] != mutated['value']:
                        t += f"Detected: {detected_typ}\n"
                        t += cached_change
                        t += f"  - {event['cont_name']} transfer amount changed\n"
                        t += f"    - Before: {event['value']}\n"
                        t += f"    - After: {mutated['value']}\n"
                        if cached_change:
                            got_all = True
                        got_transfer = True
                        cached_change = ''
                elif mutated['type'] == state_typs[0]:
                    if mutate_input:
                        if cached_rate != mutation['mutate_rate']:
                            cached_change = ''
                            cached_change += f"  - Normal transaction reverts when mutating AMM state\n"
                            cached_change += f"    - Message: {mutated['msg']}\n"
                            cached_rate = mutation['mutate_rate']
                        got_change = True
            
            # steps to cause such changes
            t += f"Attack Steps:\n"
            if is_swap_in(call['price_func_sig'], mt['multiply_factor']):
                t += f"  - Swap in {swap_token_name} to {call['cont_name']}\n"
            else:
                t += f"  - Swap out {swap_token_name} from {call['cont_name']}\n"
            t += f"  - Call {normal_tx['cont_name']}.{normal_tx['func_name']}\n"
            t += '\n'
            
            # check if should show detailed steps based on detection rule
            show_steps = got_all if mutate_input else got_transfer
            if show_steps:
                s += t
    return s

In [None]:
# Transactions for detecting Warp Finance's vulnerability
# Detection rules, RT and RR, are applied respectively
tx_hash_warp_rt = '0xe16b8eb01f13aa897fb5329401a7ca475fea87addd8e523ceaf2df3d785b5859'
tx_hash_warp_rr = '0xc869de9b05b82e370071b4203ef3366dc8a7cdbcb699795af622d5c9d6089072'

report_rt = generate_report(tx_hash_warp_rt, detection_rule='RT')
report_rr = generate_report(tx_hash_warp_rr, detection_rule='RR')

[93m start anaylzing tx: 0xe16b8eb01f13aa897fb5329401a7ca475fea87addd8e523ceaf2df3d785b5859
entry point: 0x84be8517aa1ac3027b89a83e64b8c039c71b9176
collects 1 transfers events
mutating 0x0902f1ac : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 1.01
mutating 0x0902f1ac : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.99
mutating 0x0902f1ac : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 2
mutating 0x0902f1ac : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.5
mutating 0x0902f1ac : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 5
mutating 0x0902f1ac : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.2
mutating 0x0902f1ac : 0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5 rate: 1.01
mutating 0x0902f1ac : 0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5 rate: 0.99
mutating 0x0902f1ac : 0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5 rate: 2
mutating 0x0902f1ac : 0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5 rate: 0.5
mutating 0x0902f1ac : 0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5 rate: 5
mu

[93m start anaylzing tx: 0xc869de9b05b82e370071b4203ef3366dc8a7cdbcb699795af622d5c9d6089072
entry point: 0xba539b9a5c2d412cb10e5770435f362094f9541c
collects 1 transfers events
decoding traces
mutated suspicious field _amount rate 2
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 1.1
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.9
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 1.5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.75
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 2
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.2
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 10
mutating ['getReserves'] : 0xa478c2975ab1ea89e81968

mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.9
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 1.5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.75
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 2
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.2
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 10
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.1
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 100
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.01
mutating ['getReserves'] : 0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5 rate: 1.1
mutating ['getReserves'] : 0xae461ca67b15dc

mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.9
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 1.5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.75
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 2
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 5
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.2
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 10
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.1
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 100
mutating ['getReserves'] : 0xa478c2975ab1ea89e8196811f51a7b7ade33eb11 rate: 0.01
mutating ['getReserves'] : 0xae461ca67b15dc8dc81ce7615e0320da1a9ab8d5 rate: 1.1
mutating ['getReserves'] : 0xae461ca67b15dc

In [None]:
print(report_rt)
print(report_rr)

Analyzing transaction: 0xe16b8eb01f13aa897fb5329401a7ca475fea87addd8e523ceaf2df3d785b5859
Entry function: WarpVaultLP.withdrawCollateral ()

Found 7 price-related functions:
  - Uniswap DAI/ETH LP (UNI-V2).getReserves
  - Uniswap DAI/ETH LP (UNI-V2).getReserves
  - Uniswap DAI/ETH LP (UNI-V2).getReserves
  - Uniswap DAI/ETH LP (UNI-V2).getReserves
  - Uniswap DAI/ETH LP (UNI-V2).getReserves
  - Uniswap DAI/ETH LP (UNI-V2).getReserves
  - Uniswap DAI/ETH LP (UNI-V2).getReserves

Collected 1 Transfer event:
  - Uniswap ETH/USDT LP (UNI-V2).Transfer

List of successful mutations:
Price-related function: Uniswap DAI/ETH LP (UNI-V2).getReserves (0x0902f1ac)
Multiply factor: 0.2
Detected: Rule of Transfer
  - Uniswap ETH/USDT LP (UNI-V2) transfer amount changed
    - Before: 62191488622738
    - After: 62191488625868
Attack Steps:
  - Swap out Dai Stablecoin (DAI) from Uniswap DAI/ETH LP (UNI-V2)
  - Call WarpVaultLP.withdrawCollateral

Price-related function: Uniswap DAI/ETH LP (UNI-V2).get