In [2]:
%load_ext autoreload
%autoreload 2

# Instructions

Scraping
- Install the requirements.txt
- Run all

loading to BQ
- need to provide the variables in a .env file (see .env.example)

# Imports

In [1]:
import os
from copy import deepcopy
from datetime import datetime
from pathlib import Path

import polars as pl
import requests
from dotenv import load_dotenv
from google.cloud import bigquery
from tqdm.auto import tqdm



load_dotenv()

True

# Configs

In [2]:
DATA_FOLDER = Path('data')
if DATA_FOLDER.exists() == False:
    os.mkdir(DATA_FOLDER)


############## REGION - Get Payment
REGION_PARAMS = {
    "sdk":"1.28.3"
}
REGIONS_API = 'https://on-ramp-content.api.cx.metamask.io/regions'
REGIONS = [
    {
        "region":"de"
        , "country":"DE"
        , "fiat_currency": "/currencies/fiat/eur"
        , "symbol": "EUR"
    },
    {
        "region":"us-fl"
        , "country":"US"
        , "fiat_currency": "/currencies/fiat/usd"
        , "symbol": "USD"

    },
    {
        "region":"gb"
        , "country":"UK"
        , "fiat_currency": "/currencies/fiat/gbp"
        , "symbol": "GBP"
    }
]


############## QUOTE
CRYPTOCURRENCIES = [
    {
        "name": "Ethereum Mainnet"
        , "id": "/currencies/crypto/1/0x0000000000000000000000000000000000000000"
    },
    {
        "name": "USDT (Ethereum)"
        , "id":  "/currencies/crypto/1/0xdac17f958d2ee523a2206206994597c13d831ec7"
    },
    {
        "name": "USDT (BNB Chain)"
        , "id":  "/currencies/crypto/56/0x55d398326f99059ff775485246999027b3197955"
    },
    
]

# Scrape Payment data for regions

We can get all the payments providers for a given region instead of manually looking them up. 
In an ongoing process this would only be needed to run when a new region is added or on-demand

In [6]:
def extract_payments_data(payment_dict:dict):
    payment_dict.pop('logo', None)
    payment_dict.pop('icons', None)
    return payment_dict

def add_region_payment_data(region_dict:str, region_api:str = REGIONS_API, query_params:dict = REGION_PARAMS):
    # using a deepcopy to avoid mutating the original input
    dict_copy = deepcopy(region_dict)
    response = requests.get(f"{region_api}/{dict_copy['region']}", params=query_params)
    try:
        data = response.json()
        payment_info = [extract_payments_data(x) for x in data["payments"]]
        dict_copy["payments"] = payment_info
        print("Add Payment data")
        return dict_copy
    except Exception as e:
        print("Error adding payment data")
        print(e)

In [7]:
regions_with_payment = []
for region in REGIONS:
    print(f"extracting payment info for", region)
    region_it = add_region_payment_data(region)
    regions_with_payment.append(region_it)    

extracting payment info for {'region': 'de', 'country': 'DE', 'fiat_currency': '/currencies/fiat/eur', 'symbol': 'EUR'}
Add Payment data
extracting payment info for {'region': 'us-fl', 'country': 'US', 'fiat_currency': '/currencies/fiat/usd', 'symbol': 'USD'}
Add Payment data
extracting payment info for {'region': 'gb', 'country': 'UK', 'fiat_currency': '/currencies/fiat/gbp', 'symbol': 'GBP'}
Add Payment data


# Generate Quote Scrapings

Now, we generate a list of all the query parameters needed (except the amount) to make a call to the API. In an ongoing process this could be persisted to a table and run on demand (same considerations as before)

In [8]:
quote_base_params = []
for region in regions_with_payment:
    for payment in region["payments"]:
        for crypto in CRYPTOCURRENCIES:
            quote_base_params.append({
                "country": region["country"]
                , "region": region["region"] 
                , "crypto_name": crypto["name"]
                , "payment_name": payment["name"]
                , "scrape_params": {
                    "regionId":f"/regions/{region['region']}"
                    , "paymentMethodId": payment["id"]
                    , "cryptoCurrencyId": crypto["id"]
                    , "fiatCurrencyId": region["fiat_currency"]
                }
                
            })

# Scraping a quote

the following sections define the logic to finally make a call, parse the response and persist to a parquet file

In [32]:
def get_metamask_rate(crypto_id:str, fiat:str) -> float:
    AMOUNT_API = "https://on-ramp-content.api.cx.metamask.io{crypto_id}/amount" 
    amount_params = {
        "amount":1
        , "sdk":"1.28.3"
        , "fiat": fiat
    }
    r = requests.get(AMOUNT_API.format(crypto_id=crypto_id), params=amount_params, timeout=30)
    if r.status_code == 200:
        return r.json().get('value')

def get_usd_exchange_rate(fiat:str) -> float:
    FIAT_EXCHANGE_RATE_API = "https://price.api.cx.metamask.io/v1/exchange-rates" 
    amount_params = {
        "baseCurrency":"usd"
    }
    r = requests.get(FIAT_EXCHANGE_RATE_API, params=amount_params)
    if r.status_code == 200:
        return r.json()[fiat].get("value")
    
def parse_quote_data(provider_json) -> dict:
    # print(provider_json)
    quote_data = {}
    # quote_data["provider"] = provider_json.get("provider")
    
    fields_to_scrape = [ {"parents": [], "key":"provider", "default":None}
        , {"parents": ["quote"], "key":"amountIn", "default":None}
        , {"parents": ["quote"], "key":"amountOut", "default":None}
        , {"parents": ["quote"], "key":"exchangeRate", "default":None}
        , {"parents": ["quote"], "key":"networkFee", "default":None}
        , {"parents": ["quote"], "key":"providerFee", "default":None}
        , {"parents": ["quote"], "key":"extraFee", "default": None}
        , {"parents": ["quote"], "key":"bestRate", "default":False}
        , {"parents": ["quote", "crypto"] , "key":"decimals", "default":None}
        , {"parents": ["quote", "crypto"] , "key":"name", "default":None}
        , {"parents": ["quote", "crypto"] , "key":"symbol", "default":None}
        , {"parents": ["quote", "crypto", "network"], "key":"chainId", "default":None}
        , {"parents": ["quote", "crypto", "network"], "key":"chainName", "default":None}
        , {"parents": ["quote", "crypto", "network"], "key":"shortName", "default":None}
        , {"parents": ["quote", "fiat"] , "key":"name", "default":None}
        , {"parents": ["quote", "fiat"] , "key":"symbol", "default":None}
    ]
        
    # populate the field
    for field in fields_to_scrape:
        it_json = deepcopy(provider_json)
        if len(field["parents"]) > 0 :
            for parent in field["parents"]:
                it_json = it_json.get(parent, {})
        quote_data[field["key"]] = it_json.get(field["key"],field["default"])
        
    # quote_data["amountOutFiat"] = quote_data.get("amountOut",*quote_data["exchangeRate"]
    # quote_data["totalFee"] = quote_data["amountIn"] - quote_data["amountOutFiat"]
    # quote_data["spread"] = quote_data["totalFee"] - quote_data["networkFee"] - quote_data["providerFee"] - quote_data["extraFee"]
    return quote_data
            
                    
    
    # # this is to keep the dict filling with the same pattern
    # # for both error and success dict
    # json_ref = provider_json
    # if isinstance(provider_json.get("quote"), dict):
    #     json_ref = provider_json.get("quote")
    # for field in quote_fields_to_scrape:
    #     quote_data[field["key"]] = json_ref.get(field["key"], field["default"])
    # return quote_data

def get_quote(query_params: dict , amount: float) -> dict:
    
    QUOTE_API = 'https://on-ramp.api.cx.metamask.io/providers/all/quote'
    copy_params = deepcopy(query_params)
    
    # pop scrape_params to move up them in the schema
    scrape_params = copy_params.pop("scrape_params")
    scrape_params["amount"] = amount
    scrape_params["request_time"] = datetime.now()
    
    # add them at the same level to keep tabular format
    copy_params.update(scrape_params)
    
    
    # scrape quote
    response = requests.get(QUOTE_API, params=scrape_params)
    

    
    print(response.status_code)
    # return response
    """ 
    I don't want to assume that a provider that is not in the "success" means it does not
    support the currency, it could be just an error at this specific quoting
    Logging the error for completeness.
    It will also make coverage counting easier later on...
    """
    try:
        raw = response.json()
        all_quotes = []
        i=1
        for key in ["success", "error"]:
            if len(raw[key]) > 0:
                for provider_quote in raw[key]:
                    single_quote = parse_quote_data(provider_quote)
                    single_quote["position"] = i if key == 'success' else None
                    single_quote["has_quote"] = True if key =='success' else False     
                    single_quote.update(copy_params)
                    all_quotes.append(single_quote)
                    if key == 'success':
                        i += 1
        return {"status": "success", "payload": all_quotes}
    except Exception as e: 
        print("Error scraping", copy_params)
        print(e)
        print()
        return {"status": "failure", "payload": response}

def _format_string(x):
    return x.replace("(","").replace(")", "").replace(" ", "-").lower()

def generate_filename(query_params:dict, amount, format="parquet", add_date:bool=True):
    keys_to_format = ["region", "crypto_name", "payment_name"]
    keys_formatted = [_format_string(query_params[x]) for x in keys_to_format]
    keys_formatted.append(str(amount))
    keys_formatted.append(query_params["scrape_params"]["fiatCurrencyId"].split("/")[-1])
    if add_date:
        keys_formatted.insert(0, datetime.now().strftime("%Y%m%d_%H%M"))
    return f'{"__".join(keys_formatted)}.{format}'

def scrape_and_persist_quote(quote_params:dict, amount:float, data_folder:Path):
    
    quote_schema = {
        "provider":pl.String
        , "amountIn": pl.Float64
        , "amountOut": pl.Float64
        , "exchangeRate": pl.Float64
        , "networkFee": pl.Float64
        , "providerFee": pl.Float64
        , "extraFee": pl.Float64
        , "bestRate": pl.Boolean
        , "decimals": pl.Int32
        , "name": pl.String
        , "symbol": pl.String
        , "chainId": pl.String
        , "chainName": pl.String
        , "shortName": pl.String
        , "position": pl.Int32
        , "has_quote": pl.Boolean
        , "regionId": pl.String 
        , "country": pl.String 
        , "region": pl.String
        , "crypto_name": pl.String
        , "payment_name": pl.String
        , "paymentMethodId":pl.String 
        , "cryptoCurrencyId":pl.String
        , "fiatCurrencyId": pl.String 
        , "amount":pl.Float64
        , "request_time": pl.Datetime
    } 
    
    
    quote_data = get_quote(
        quote_params
        , amount=amount
    )
    
    
    if quote_data["status"] == 'success':
        
        # display_rate is the one used to show the conversion in the frontend
        # this is used to calculate the spreadFee
        display_rate = get_metamask_rate(
            crypto_id=quote_params["scrape_params"]["cryptoCurrencyId"]
            , fiat=quote_params["scrape_params"]["fiatCurrencyId"]
        )
        quote_data["display_rate"] = display_rate
            
        # To have all price points in a common fiat currency if needed
        fiat_usd_rate = get_usd_exchange_rate(quote_params["scrape_params"]["fiatCurrencyId"].split("/")[-1])
        quote_data["fiat_usd_rate"] = fiat_usd_rate
    
        filename = generate_filename(quote_params, amount)
        (
            pl.DataFrame(quote_data["payload"], schema = quote_schema)
            .with_columns(
                pl.lit(display_rate).cast(pl.Float32).alias("display_rate")
                , pl.lit(fiat_usd_rate).cast(pl.Float32).alias("fiat_usd_rate")
            )
            .with_columns(
                ( pl.col("amountOut") * pl.col("display_rate") ).alias("amountOutFiat")
                , (pl.col("amountIn") -  pl.col("amountOut") * pl.col("display_rate") ).alias("totalFee")
                , pl.col("amount").truediv(pl.col("fiat_usd_rate")).alias("amountUSD")
            )
            .with_columns(
                ( pl.col("totalFee") - pl.col("networkFee") - pl.col("providerFee") - pl.col("extraFee")).alias("spreadFee")
            )
            .write_parquet(data_folder / filename, compression="snappy")
        )

    return quote_data


In [99]:
# r = scrape_and_persist_quote(quote_base_params[20]
#                     , amount= 36                
#                     , data_folder=DATA_FOLDER
#                     )

200


# Bulk scraping

At this point we have a full scraping process for a single price point, we now proceed to scrape all the points we want to

In [56]:
def logarithmic_binning(min_value=30, max_value=30000, n_bins=30):
    bins = [min_value * (max_value / min_value)**(i/n_bins) for i in range(n_bins + 1)]
    bins = [round(b) for b in bins]

    midpoints = []
    for i in range(len(bins) - 1):
        from_value = bins[i]
        to_value = bins[i + 1]
        midpoint = round((from_value + to_value) / 2)
        midpoints.append({"price_from": from_value, "price_midpoint": midpoint, "price_to": to_value})

    return midpoints

# Example usage:
bin_midpoints = logarithmic_binning()

Utility functions to help during the scraping process

In [55]:
def find_index_by_filename(target_filename, all_params):
    target_without_date = '__'.join(target_filename.split('__')[1:])
    print(target_without_date)
    for index, (quote_param, bin) in enumerate(all_params):
        # print(generate_filename(quote_param, bin["price_midpoint"], add_date=False))
        if generate_filename(quote_param, bin["price_midpoint"], add_date=False) == target_without_date:
            return index
    return None

def find_index_by_item(target_item, all_params):
    index = next((i for i, item in enumerate(all_params) if item == target_item), None)
    print(f"Index of the specified object in combined_params: {index}")
    return index



In a production process we could try to parallalelize this process

In [33]:
combined_params = [(quote_param, bin) for bin in bin_midpoints for quote_param in quote_base_params]
for i in tqdm(combined_params, desc="Scraping quotes", position=0, leave=True):
        r = scrape_and_persist_quote(
            quote_params=i[0],
            amount=i[1]["price_midpoint"],
            data_folder=DATA_FOLDER
        )
        # to not overload the API
        # time.sleep(0.2)

Scraping quotes:   0%|          | 0/778 [00:00<?, ?it/s]

200
200
200
200
200
200
200
200
200
200
500
Error scraping {'country': 'UK', 'region': 'gb', 'crypto_name': 'Ethereum Mainnet', 'payment_name': 'PSE', 'regionId': '/regions/gb', 'paymentMethodId': '/payments/pse', 'cryptoCurrencyId': '/currencies/crypto/1/0x0000000000000000000000000000000000000000', 'fiatCurrencyId': '/currencies/fiat/gbp', 'amount': 852, 'request_time': datetime.datetime(2024, 8, 3, 22, 1, 46, 38126)}
'success'

500
Error scraping {'country': 'UK', 'region': 'gb', 'crypto_name': 'USDT (Ethereum)', 'payment_name': 'PSE', 'regionId': '/regions/gb', 'paymentMethodId': '/payments/pse', 'cryptoCurrencyId': '/currencies/crypto/1/0xdac17f958d2ee523a2206206994597c13d831ec7', 'fiatCurrencyId': '/currencies/fiat/gbp', 'amount': 852, 'request_time': datetime.datetime(2024, 8, 3, 22, 1, 47, 545254)}
'success'

500
Error scraping {'country': 'UK', 'region': 'gb', 'crypto_name': 'USDT (BNB Chain)', 'payment_name': 'PSE', 'regionId': '/regions/gb', 'paymentMethodId': '/payments/pse'

# Load to BQ

The advantage of using Parquet is that the schema is kept by the format and BQ only need to read it.

In [51]:
def load_parquet_to_bigquery(client: bigquery.Client, file_path:Path, project_id:str, dataset_id:str, table_id:str):
    dataset_ref = client.dataset(dataset_id, project=project_id)
    table_ref = dataset_ref.table(table_id)
    
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.PARQUET
    job_config.autodetect = True
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

    with open(file_path, "rb") as source_file:
        job = client.load_table_from_file(
            source_file, table_ref, job_config=job_config
        )

    job.result()  # Wait for the job to complete
    print(f"Loaded {file_path} to {project_id}.{dataset_id}.{table_id}")

In [54]:
client = bigquery.Client(project=os.getenv('BILLING_PROJECT_ID'))
files = list(DATA_FOLDER.glob("*.parquet"))

for file in tqdm(files[2:], desc="uploading data", position=0, leave=True):
    load_parquet_to_bigquery(
        client=client
        , file_path=file 
        , project_id="slafaurie-sandbox"
        , dataset_id="metamask"
        , table_id="metamask_data"
    )

uploading data:   0%|          | 0/1138 [00:00<?, ?it/s]

Loaded data\20240803_1954__de__ethereum-mainnet__debit-or-credit__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de__ethereum-mainnet__khipu__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de__ethereum-mainnet__sepa-bank-transfer__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de__usdt-bnb-chain__apple-pay__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de__usdt-bnb-chain__debit-or-credit__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de__usdt-bnb-chain__khipu__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de__usdt-bnb-chain__sepa-bank-transfer__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de__usdt-ethereum__apple-pay__34__eur.parquet to slafaurie-sandbox.metamask.metamask_data
Loaded data\20240803_1954__de

# Export data for EDA

For the EDA, I use a BI tool (Tableau).

In [3]:
df = pl.read_parquet(DATA_FOLDER.glob("*.parquet"))

In [None]:
## saving to excel because it guarantees no problem with formatting in Tableau
(
    df
    .with_columns(
        pl.col("provider").str.split("/").list.last().alias("provider_clean")
    )
    .write_excel("metamask-quote-data.xlsx")
)

In [None]:
## unpivot data to generate a table suitable to have a 100% stacked bar chart to visualize TotalFee make up
(
    df
    .filter(pl.col("has_quote"))
    .with_columns(
        pl.col("provider").str.split("/").list.last().alias("provider_clean")
    )
    .unpivot(
        index=["provider_clean", "amount", "country", "crypto_name", "payment_name"]
        , on=["networkFee", "providerFee", "extraFee", "spreadFee"]
    )
    .write_excel("metamask-quote-unpivot.xlsx")
)

In [12]:
## Select one row and unpivot all columns
pl.Config.set_tbl_rows(100)


# Create the DataFrame
df_unpivoted = (
    df
    .with_columns(
        pl.col("provider").str.split("/").list.last().alias("provider_clean")
    )
    .head(1)  # Select only one row
    .unpivot(
        on=[],  # No id variables, unpivot all columns
        variable_name="column",
        value_name="value"
    )
)

df_unpivoted

column,value
str,str
"""provider""","""/providers/mercuryo"""
"""amountIn""","""34.0"""
"""amountOut""","""0.012603950839954914"""
"""exchangeRate""","""2653.929741931591"""
"""networkFee""","""0.044"""
"""providerFee""","""0.506"""
"""extraFee""","""0.0"""
"""bestRate""","""true"""
"""decimals""","""18"""
"""name""","""Euro"""
