In [1]:
#!/usr/bin/env python3
"""
Process transactions data from UWI Homes API endpoint.

This script:
1. Fetches transaction data from the API
2. Creates raw and processed DataFrames
3. Uploads raw data to BigQuery raw_data dataset
4. Creates daily aggregations
5. Uploads aggregated data to BigQuery marts dataset
"""

from dotenv import load_dotenv
import pandas as pd
import os
import requests
from datetime import datetime
from mezo.clients import BigQueryClient
from mezo.visual_utils import ProgressIndicators, ExceptionHandler, with_progress
from mezo.datetime_utils import format_datetimes

In [15]:

@with_progress("Fetching transactions from API")
def fetch_transactions_data(api_url: str) -> dict:
    """Fetch data from the UWI Homes API endpoint."""
    try:
        response = requests.get(api_url, timeout=30)
        response.raise_for_status()
        data = response.json()
        
        if not data:
            raise ValueError("API returned empty response")
            
        return data
        
    except requests.exceptions.RequestException as e:
        raise Exception(f"Failed to fetch data from API: {e}")
    except Exception as e:
        raise Exception(f"Error processing API response: {e}")


@with_progress("Processing raw transactions data")
def process_raw_data(api_data: dict) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Process the raw API data into DataFrames.
    
    Returns:
        tuple: (raw_df, metadata_df) - Raw transactions and metadata
    """
    if not api_data or 'transactions' not in api_data:
        raise ValueError("Invalid API data structure - missing transactions")
    
    # Extract metadata
    metadata = {
        'dapp_name': api_data.get('dapp_name'),
        'contract_address': api_data.get('contract_address'),
        'creation_block': api_data.get('creation_block'),
        'period_start': api_data.get('period', {}).get('start'),
        'period_end': api_data.get('period', {}).get('end'),
        'total_transactions': api_data.get('summary', {}).get('total_transactions'),
        'total_volume': api_data.get('summary', {}).get('total_volume'),
        'unique_addresses': api_data.get('summary', {}).get('unique_addresses'),
        'total_fees': api_data.get('summary', {}).get('total_fees'),
        'fetch_timestamp': datetime.now().isoformat()
    }
    
    metadata_df = pd.DataFrame([metadata])
    
    # Extract transactions
    transactions = api_data.get('transactions', [])
    if not transactions:
        raise ValueError("No transactions found in API data")
    
    raw_df = pd.json_normalize(transactions)
    
    # Ensure we have required columns
    required_cols = ['transaction_hash', 'timestamp', 'amount', 'transaction_type']
    missing_cols = [col for col in required_cols if col not in raw_df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")
    
    # Add metadata to transactions
    raw_df['dapp_name'] = metadata['dapp_name']
    raw_df['contract_address'] = metadata['contract_address']
    raw_df['fetch_timestamp'] = metadata['fetch_timestamp']
    
    return raw_df, metadata_df


@with_progress("Cleaning transactions data")
def clean_transactions_data(raw_df: pd.DataFrame) -> pd.DataFrame:
    """Clean and format the transactions data."""
    if not ExceptionHandler.validate_dataframe(raw_df, "Raw transactions", ['timestamp', 'amount']):
        raise ValueError("Invalid raw transactions data")
    
    df = raw_df.copy()
    
    # # Extract date for aggregations
    df['date'] = pd.to_datetime(df['timestamp']).dt.date
    
    # Convert amount to numeric
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
    df['fee'] = pd.to_numeric(df.get('fee', 0), errors='coerce').fillna(0)
    
    # Clean transaction types
    df['transaction_type'] = df['transaction_type'].str.lower().str.strip()
    
    # Add derived fields
    df['day_of_week'] = pd.to_datetime(df['date']).dt.day_name()
    
    # Sort by date
    df = df.sort_values('date', ascending=False).reset_index(drop=True)
    
    return df


@with_progress("Creating daily aggregations")
def create_daily_aggregations(clean_df: pd.DataFrame) -> pd.DataFrame:
    """Create daily aggregated data."""
    if not ExceptionHandler.validate_dataframe(clean_df, "Clean transactions", ['date', 'amount']):
        raise ValueError("Invalid clean transactions data")
    
    # Group by date and transaction type
    daily_agg = clean_df.groupby(['date', 'transaction_type']).agg({
        'transaction_hash': 'count',  # Count of transactions
        'amount': ['sum', 'mean', 'median'],  # Amount statistics
        'fee': ['sum', 'mean'],  # Fee statistics
        'from': 'nunique',  # Unique senders
        'to': 'nunique'   # Unique receivers
    }).reset_index()
    
    # Flatten column names
    daily_agg.columns = [
        'date', 'transaction_type', 'transaction_count', 
        'total_amount', 'avg_amount', 'median_amount',
        'total_fees', 'avg_fees', 'unique_senders', 'unique_receivers'
    ]
    
    # Pivot to get transaction types as columns
    pivot_df = daily_agg.pivot(
        index='date', 
        columns='transaction_type', 
        values=['transaction_count', 'total_amount', 'total_fees']
    ).reset_index()
    
    # Flatten column names after pivot
    pivot_df.columns = [
        f"{col[0]}_{col[1]}" if col[1] else col[0] 
        for col in pivot_df.columns
    ]
    
    # Fill NaN values with 0
    pivot_df = pivot_df.fillna(0)
    
    # Add total columns
    amount_cols = [col for col in pivot_df.columns if col.startswith('total_amount_')]
    if amount_cols:
        pivot_df['total_volume_all'] = pivot_df[amount_cols].sum(axis=1)
    
    count_cols = [col for col in pivot_df.columns if col.startswith('transaction_count_')]
    if count_cols:
        pivot_df['total_transactions_all'] = pivot_df[count_cols].sum(axis=1)
    
    fee_cols = [col for col in pivot_df.columns if col.startswith('total_fees_')]
    if fee_cols:
        pivot_df['total_fees_all'] = pivot_df[fee_cols].sum(axis=1)
    
    # Convert date to string for BigQuery compatibility
    pivot_df['date'] = pivot_df['date'].astype(str)
    
    # Sort by date
    pivot_df = pivot_df.sort_values('date', ascending=False).reset_index(drop=True)
    
    return pivot_df

In [3]:
################################################
# Setup environment and clients
################################################
ProgressIndicators.print_step("Loading environment variables", "start")
load_dotenv(dotenv_path='../.env', override=True)
ProgressIndicators.print_step("Environment loaded successfully", "success")

# Initialize BigQuery client (disabled for testing)
# ProgressIndicators.print_step("Initializing BigQuery", "start")
# bq = BigQueryClient(key='GOOGLE_CLOUD_KEY', project_id='mezo-portal-data')
# ProgressIndicators.print_step("BigQuery initialized", "success")
ProgressIndicators.print_step("Skipping BigQuery initialization for testing", "warning")

################################################
# Fetch data from API
################################################
api_url = "https://be-mezo-prod.uwihomes.com/api/v1/transactions"

ProgressIndicators.print_step("Fetching data from UWI Homes API", "start")
api_data = fetch_transactions_data(api_url)
ProgressIndicators.print_step("API data fetched successfully", "success")


🔄 Loading environment variables...
✅ Environment loaded successfully
⚠️ Skipping BigQuery initialization for testing

🔄 Fetching data from UWI Homes API...

🔄 Fetching transactions from API...
✅ Fetching transactions from API
✅ API data fetched successfully


In [4]:
################################################
# Process raw data
################################################
raw_transactions_df, metadata_df = process_raw_data(api_data)

ProgressIndicators.print_step(f"Processed {len(raw_transactions_df)} raw transactions", "success")

################################################
# Upload raw data to BigQuery (DISABLED FOR TESTING)
################################################
ProgressIndicators.print_step("Skipping BigQuery upload for testing", "warning")

# # Upload raw transactions
# if len(raw_transactions_df) > 0:
#     # Ensure transaction_hash is string type for BigQuery compatibility
#     raw_transactions_df['transaction_hash'] = raw_transactions_df['transaction_hash'].astype(str)
#     bq.update_table(raw_transactions_df, 'raw_data', 'uwi_transactions_raw', 'transaction_hash')
#     ProgressIndicators.print_step("Uploaded raw transactions to BigQuery", "success")

# # Upload metadata
# if len(metadata_df) > 0:
#     bq.update_table(metadata_df, 'raw_data', 'uwi_metadata_raw', 'fetch_timestamp')
#     ProgressIndicators.print_step("Uploaded metadata to BigQuery", "success")


🔄 Processing raw transactions data...
✅ Processing raw transactions data
✅ Processed 7 raw transactions
⚠️ Skipping BigQuery upload for testing


In [6]:
raw_transactions_df

Unnamed: 0,transaction_hash,timestamp,from,to,amount,fee,transaction_type,block_number,dapp_name,contract_address,fetch_timestamp
0,0xe2d62d029866eef837bd9afe8b4ec65bab995da93821...,2025-09-04T08:48:27.000Z,0x9677b632c2520a3e23bcebf2b2288db59facddde,0x45657b3df048ce382737f8df92bb4184b00fadc4,1000.0,3.08770605e-07,deposit,2928073,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346
1,0xa9c2c8019d5f4c9c8881ce2a2408e04cc68cbe2b2667...,2025-09-04T04:42:40.000Z,0xc200bd2ca668ca83832927252d18454b6c94135e,0x45657b3df048ce382737f8df92bb4184b00fadc4,0.1,2.8066545e-07,deposit,2923972,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346
2,0x8f2611ca8f8857e8000b46e3c467a958fa2b1adc787b...,2025-08-29T22:12:49.000Z,0x83db892a6688b966122be219fa873bf24d7db79f,0x45657b3df048ce382737f8df92bb4184b00fadc4,139.54830770371373,3.2845878e-07,deposit,2797690,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346
3,0xad794ff5c8335bbde0acc11d0dabbd588492ed546f94...,2025-08-29T01:10:53.000Z,0x6446c5ee010f97acb489885ec3d684ef4e6bc5fa,0x45657b3df048ce382737f8df92bb4184b00fadc4,500.0,3.557268e-07,deposit,2776769,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346
4,0xf45fa90452c7c1b4fe3c7fbbc6a2beb9163ca6ec40dc...,2025-08-28T09:49:50.000Z,0x45657b3df048ce382737f8df92bb4184b00fadc4,0x148ff319130afdd355a90cea0b707dbafe8b7870,3.0,1.009373625e-07,withdraw,2761477,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346
5,0xde3b785e5be805b58611686aa54ee5ddb1c24fa37821...,2025-08-28T09:45:52.000Z,0x45657b3df048ce382737f8df92bb4184b00fadc4,0x148ff319130afdd355a90cea0b707dbafe8b7870,7.0,1.0688535e-07,withdraw,2761411,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346
6,0xad0a77b922072683bec87bf1681168513a2282339c41...,2025-08-28T09:43:42.000Z,0x148ff319130afdd355a90cea0b707dbafe8b7870,0x45657b3df048ce382737f8df92bb4184b00fadc4,10.0,3.5161425e-07,deposit,2761375,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346


In [7]:
metadata_df

Unnamed: 0,dapp_name,contract_address,creation_block,period_start,period_end,total_transactions,total_volume,unique_addresses,total_fees,fetch_timestamp
0,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2755249,,,7,1659.6483077037135,,1.8330585975e-06,2025-09-04T16:22:48.386346


In [16]:
################################################
# Clean and process data
################################################
clean_transactions_df = clean_transactions_data(raw_transactions_df)
ProgressIndicators.print_step(f"Cleaned {len(clean_transactions_df)} transactions", "success")

clean_transactions_df


🔄 Cleaning transactions data...
✅ Raw transactions validation passed (7 rows)
✅ Cleaning transactions data
✅ Cleaned 7 transactions


Unnamed: 0,transaction_hash,timestamp,from,to,amount,fee,transaction_type,block_number,dapp_name,contract_address,fetch_timestamp,date,day_of_week
0,0xe2d62d029866eef837bd9afe8b4ec65bab995da93821...,2025-09-04T08:48:27.000Z,0x9677b632c2520a3e23bcebf2b2288db59facddde,0x45657b3df048ce382737f8df92bb4184b00fadc4,1000.0,3.087706e-07,deposit,2928073,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346,2025-09-04,Thursday
1,0xa9c2c8019d5f4c9c8881ce2a2408e04cc68cbe2b2667...,2025-09-04T04:42:40.000Z,0xc200bd2ca668ca83832927252d18454b6c94135e,0x45657b3df048ce382737f8df92bb4184b00fadc4,0.1,2.806654e-07,deposit,2923972,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346,2025-09-04,Thursday
2,0x8f2611ca8f8857e8000b46e3c467a958fa2b1adc787b...,2025-08-29T22:12:49.000Z,0x83db892a6688b966122be219fa873bf24d7db79f,0x45657b3df048ce382737f8df92bb4184b00fadc4,139.548308,3.284588e-07,deposit,2797690,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346,2025-08-29,Friday
3,0xad794ff5c8335bbde0acc11d0dabbd588492ed546f94...,2025-08-29T01:10:53.000Z,0x6446c5ee010f97acb489885ec3d684ef4e6bc5fa,0x45657b3df048ce382737f8df92bb4184b00fadc4,500.0,3.557268e-07,deposit,2776769,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346,2025-08-29,Friday
4,0xf45fa90452c7c1b4fe3c7fbbc6a2beb9163ca6ec40dc...,2025-08-28T09:49:50.000Z,0x45657b3df048ce382737f8df92bb4184b00fadc4,0x148ff319130afdd355a90cea0b707dbafe8b7870,3.0,1.009374e-07,withdraw,2761477,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346,2025-08-28,Thursday
5,0xde3b785e5be805b58611686aa54ee5ddb1c24fa37821...,2025-08-28T09:45:52.000Z,0x45657b3df048ce382737f8df92bb4184b00fadc4,0x148ff319130afdd355a90cea0b707dbafe8b7870,7.0,1.068854e-07,withdraw,2761411,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346,2025-08-28,Thursday
6,0xad0a77b922072683bec87bf1681168513a2282339c41...,2025-08-28T09:43:42.000Z,0x148ff319130afdd355a90cea0b707dbafe8b7870,0x45657b3df048ce382737f8df92bb4184b00fadc4,10.0,3.516143e-07,deposit,2761375,MUSDVault,0x45657b3DF048ce382737F8DF92bB4184b00fADc4,2025-09-04T16:22:48.386346,2025-08-28,Thursday


In [13]:
cols = ['timestamp', 'from', 'to', 'amount', 'fee', 'transaction_type',
        'transaction_hash', 'block_number', 'fetch_timestamp']

Index(['transaction_hash', 'timestamp', 'from', 'to', 'amount', 'fee',
       'transaction_type', 'block_number', 'dapp_name', 'contract_address',
       'fetch_timestamp', 'day_of_week'],
      dtype='object')

In [17]:
################################################
# Create daily aggregations
################################################
daily_aggregations_df = create_daily_aggregations(clean_transactions_df)
ProgressIndicators.print_step(f"Created daily aggregations for {len(daily_aggregations_df)} days", "success")

################################################
# Upload processed data to BigQuery marts (DISABLED FOR TESTING)
################################################
ProgressIndicators.print_step("Skipping BigQuery marts upload for testing", "warning")

# if len(daily_aggregations_df) > 0:
#     bq.update_table(daily_aggregations_df, 'marts', 'daily_uwi_transactions', 'date')
#     ProgressIndicators.print_step("Uploaded daily aggregations to marts", "success")

daily_aggregations_df


🔄 Creating daily aggregations...
✅ Clean transactions validation passed (7 rows)
✅ Creating daily aggregations
✅ Created daily aggregations for 3 days
⚠️ Skipping BigQuery marts upload for testing


Unnamed: 0,date,transaction_count_deposit,transaction_count_withdraw,total_amount_deposit,total_amount_withdraw,total_fees_deposit,total_fees_withdraw,total_volume_all,total_transactions_all,total_fees_all
0,2025-09-04,2.0,0.0,1000.1,0.0,5.894361e-07,0.0,1000.1,2.0,5.894361e-07
1,2025-08-29,2.0,0.0,639.548308,0.0,6.841856e-07,0.0,639.548308,2.0,6.841856e-07
2,2025-08-28,1.0,2.0,10.0,10.0,3.516143e-07,2.078227e-07,20.0,3.0,5.59437e-07


In [18]:
################################################
# Calculate and display summary statistics
################################################
ProgressIndicators.print_step("Calculating summary statistics", "start")

total_transactions = len(clean_transactions_df)
total_volume = clean_transactions_df['amount'].sum()
total_fees = clean_transactions_df['fee'].sum()
unique_addresses = pd.concat([
    clean_transactions_df['from'], 
    clean_transactions_df['to']
]).nunique()
avg_transaction_size = clean_transactions_df['amount'].mean()

date_range = f"{clean_transactions_df['timestamp'].min()} to {clean_transactions_df['timestamp'].max()}"

ProgressIndicators.print_step("Summary statistics calculated", "success")

# Display sample data for verification
print(f"\n📄 Sample Raw Data (first 3 rows):")
print(raw_transactions_df[['transaction_hash', 'timestamp', 'amount', 'transaction_type']].head(3))

print(f"\n📊 Daily Aggregations Summary:")
print(daily_aggregations_df[['date'] + [col for col in daily_aggregations_df.columns if 'total' in col]].head())

ProgressIndicators.print_summary_box(
    "📊 UWI HOMES TRANSACTIONS SUMMARY",
    {
        "Total Transactions": total_transactions,
        "Total Volume": f"{total_volume:,.2f}",
        "Total Fees": f"{total_fees:,.2f}",
        "Unique Addresses": unique_addresses,
        "Average Transaction": f"{avg_transaction_size:,.2f}",
        "Date Range": date_range
    }
)

ProgressIndicators.print_header("🚀 UWI HOMES PROCESSING COMPLETED SUCCESSFULLY 🚀")


🔄 Calculating summary statistics...
✅ Summary statistics calculated

📄 Sample Raw Data (first 3 rows):
                                    transaction_hash  \
0  0xe2d62d029866eef837bd9afe8b4ec65bab995da93821...   
1  0xa9c2c8019d5f4c9c8881ce2a2408e04cc68cbe2b2667...   
2  0x8f2611ca8f8857e8000b46e3c467a958fa2b1adc787b...   

                  timestamp                  amount transaction_type  
0  2025-09-04T08:48:27.000Z                  1000.0          deposit  
1  2025-09-04T04:42:40.000Z                     0.1          deposit  
2  2025-08-29T22:12:49.000Z  139.548307703713748745          deposit  

📊 Daily Aggregations Summary:
         date  total_amount_deposit  total_amount_withdraw  \
0  2025-09-04           1000.100000                    0.0   
1  2025-08-29            639.548308                    0.0   
2  2025-08-28             10.000000                   10.0   

   total_fees_deposit  total_fees_withdraw  total_volume_all  \
0        5.894361e-07         0.000000e+00 