In [14]:
import os
import sys
import time
import logging
import pandas as pd
from dotenv import load_dotenv
import psycopg2
from psycopg2 import sql
from psycopg2.extras import RealDictCursor

logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)

load_dotenv()

db_url = os.environ.get("DATABASE_URL")
db_parts = db_url.replace("postgresql://", "").split("@")
db_credentials = db_parts[0].split(":")
db_host_info = db_parts[1].split("/")
db_host_port = db_host_info[0].split(":")

db_user = db_credentials[0]
db_password = db_credentials[1]
db_host = db_host_port[0]
db_port = db_host_port[1] if len(db_host_port) > 1 else "5432"
db_name = db_host_info[1].split("?")[0]

conn = psycopg2.connect(
    host=db_host,
    port=db_port,
    database=db_name,
    user=db_user,
    password=db_password
)

cursor = conn.cursor(cursor_factory=RealDictCursor)
print(f"✅ Connected to: {db_name} database")

✅ Connected to: ateneaservice database


In [15]:
cursor.execute("""
    SELECT table_name 
    FROM information_schema.tables 
    WHERE table_schema = 'public'
    ORDER BY table_name
""")
tables = cursor.fetchall()
table_names = [table['table_name'] for table in tables]

print(f"Found {len(table_names)} tables in {db_name}:")
for i, table in enumerate(table_names):
    print(f"  {i+1:2d}. {table}")

def get_table_info(table_name):
    cursor.execute(sql.SQL("SELECT COUNT(*) as count FROM {}").format(
        sql.Identifier(table_name)
    ))
    count = cursor.fetchone()['count']
    
    cursor.execute("""
        SELECT column_name, data_type, is_nullable
        FROM information_schema.columns
        WHERE table_name = %s
        ORDER BY ordinal_position
    """, (table_name,))
    columns = cursor.fetchall()
    
    print(f"\n📊 Table: {table_name} ({count:,} rows)")
    print("╔═══════════════════╦══════════════╦════════════╗")
    print("║ Column            ║ Data Type    ║ Nullable   ║")
    print("╠═══════════════════╬══════════════╬════════════╣")
    
    for col in columns:
        name = col['column_name']
        dtype = col['data_type']
        nullable = "YES" if col['is_nullable'] == "YES" else "NO"
        print(f"║ {name[:17].ljust(17)} ║ {dtype[:12].ljust(12)} ║ {nullable.ljust(10)} ║")
    
    print("╚═══════════════════╩══════════════╩════════════╝")

table_number = 13
if table_names and 1 <= table_number <= len(table_names):
    get_table_info(table_names[table_number-1])

Found 15 tables in ateneaservice:
   1. athenasampledata
   2. backup_activities
   3. countries
   4. last_activities
   5. provider_methods
   6. providers
   7. referrals
   8. rewards
   9. track_notifications
  10. trades
  11. transfers
  12. user_history
  13. users
  14. users_clone
  15. utms

📊 Table: users (433,695 rows)
╔═══════════════════╦══════════════╦════════════╗
║ Column            ║ Data Type    ║ Nullable   ║
╠═══════════════════╬══════════════╬════════════╣
║ id                ║ uuid         ║ NO         ║
║ user_id           ║ text         ║ YES        ║
║ email             ║ text         ║ NO         ║
║ account_number    ║ text         ║ YES        ║
║ status            ║ USER-DEFINED ║ NO         ║
║ email_verified    ║ boolean      ║ YES        ║
║ name              ║ text         ║ YES        ║
║ last_name         ║ text         ║ YES        ║
║ phone             ║ text         ║ YES        ║
║ country           ║ text         ║ YES        ║
║ birth_date    

In [16]:
try:
    start_time = time.time()
    
    cursor.execute("""
        SELECT EXISTS (
            SELECT FROM information_schema.tables 
            WHERE table_schema = 'public' 
            AND table_name = 'users'
        ) as exists
    """)
    table_exists = cursor.fetchone()['exists']
    
    if not table_exists:
        print("❌ 'users' table not found in database")
    else:
        cursor.execute("SELECT COUNT(*) as count FROM users")
        total_users = cursor.fetchone()['count']
        
        cursor.execute("""
            SELECT column_name, data_type, character_maximum_length
            FROM information_schema.columns
            WHERE table_name = 'users'
            ORDER BY ordinal_position
        """)
        columns = cursor.fetchall()
        schema_df = pd.DataFrame(columns)
        
        cursor.execute("""
            SELECT id, user_id, email, status, country, is_investor, is_trader
            FROM users
            LIMIT 5
        """)
        users_data = cursor.fetchall()
        users_df = pd.DataFrame(users_data)
        
        query_time = time.time() - start_time
        
        print(f"Query execution time: {query_time:.2f} seconds")
        print(f"Total number of users: {total_users:,}")
        
        print("\nUsers table schema:")
        display(schema_df)
        
        print("\nSample user data (5 rows):")
        display(users_df)
        
except Exception as e:
    print(f"❌ Error reading users table: {str(e)}")

Query execution time: 0.46 seconds
Total number of users: 433,695

Users table schema:


Unnamed: 0,column_name,data_type,character_maximum_length
0,id,uuid,
1,user_id,text,
2,email,text,
3,account_number,text,
4,status,USER-DEFINED,
5,email_verified,boolean,
6,name,text,
7,last_name,text,
8,phone,text,
9,country,text,



Sample user data (5 rows):


Unnamed: 0,id,user_id,email,status,country,is_investor,is_trader
0,22f2ddb6-7174-48a6-8fda-f90247aa59ee,22f2ddb6-7174-48a6-8fda-f90247aa59ee,giustinoadessolupo@gmail.com,ONBOARD_COMPLETED,VE,True,
1,abc32536-9f52-4f3f-ad50-d2eebad88a7e,abc32536-9f52-4f3f-ad50-d2eebad88a7e,richardroy@hotmail.es,ONBOARD_COMPLETED,PE,True,True
2,c0dcff88-0bbc-4973-9379-c4a3e63ae4a8,c0dcff88-0bbc-4973-9379-c4a3e63ae4a8,zapatafelipe573@gmail.com,ONBOARD_COMPLETED,CO,True,
3,d23912f7-b21c-4d29-b8ae-b6c90160d5a5,d23912f7-b21c-4d29-b8ae-b6c90160d5a5,aplicacionesvarias17@gmail.com,ONBOARD_COMPLETED,CR,True,True
4,12a4a886-f367-4c52-8030-67a7cf65aeea,12a4a886-f367-4c52-8030-67a7cf65aeea,isidoramrbl@gmail.com,ONBOARD_COMPLETED,CL,True,


In [17]:
import matplotlib.pyplot as plt
import time
import pandas as pd
import warnings
from psycopg2.extras import RealDictCursor

warnings.filterwarnings('ignore', category=UserWarning, module='pandas.io.sql')

try:
    start_time = time.time()
    
    cursor = conn.cursor(cursor_factory=RealDictCursor)
    
    cursor.execute("""
        SELECT 
            COUNT(*) as total_users,
            SUM(CASE WHEN is_investor = true THEN 1 ELSE 0 END) as investors_true,
            SUM(CASE WHEN is_investor = false THEN 1 ELSE 0 END) as investors_false,
            SUM(CASE WHEN is_investor IS NULL THEN 1 ELSE 0 END) as investors_null
        FROM users
        WHERE status = 'ONBOARD_COMPLETED'
    """)
    initial_stats = cursor.fetchone()
    print("Initial distribution for ONBOARD_COMPLETED users:")
    print(f"• Total users: {initial_stats['total_users']:,}")
    print(f"• True investors: {initial_stats['investors_true']:,}")
    print(f"• False investors: {initial_stats['investors_false']:,}")
    print(f"• Null investors: {initial_stats['investors_null']:,}")
    
    if initial_stats['investors_null'] > 0:
        cursor.execute("""
            UPDATE users 
            SET 
                is_investor = false,
                updated_at = CURRENT_TIMESTAMP
            WHERE status = 'ONBOARD_COMPLETED' 
            AND is_investor IS NULL
        """)
        null_updated = cursor.rowcount
        conn.commit()
        print(f"\nStep 1: Updated {null_updated:,} NULL values to FALSE")
    else:
        print("\nStep 1: No NULL values to update")
    
    cursor.execute("""
        SELECT COUNT(*) as count
        FROM users
        WHERE status = 'ONBOARD_COMPLETED'
        AND is_investor = false
    """)
    update_count = cursor.fetchone()['count']
    print(f"\nStep 2: Found {update_count:,} users to update from FALSE to TRUE")
    
    if update_count > 0:
        proceed = input("Proceed with update? (yes/no): ")
        
        if proceed.lower() == "yes":
            if update_count < 10000:
                print("Using direct update (faster for smaller datasets)...")
                
                update_start = time.time()
                cursor.execute("""
                    UPDATE users
                    SET 
                        is_investor = true,
                        investor_date = CURRENT_TIMESTAMP,
                        updated_at = CURRENT_TIMESTAMP
                    WHERE status = 'ONBOARD_COMPLETED'
                    AND is_investor = false
                """)
                conn.commit()
                
                print(f"Update completed in {time.time() - update_start:.2f} seconds")
            else:
                
                cursor.execute("""
                    SELECT id 
                    FROM users
                    WHERE status = 'ONBOARD_COMPLETED'
                    AND is_investor = false
                """)
                
                all_ids = [row['id'] for row in cursor.fetchall()]
                batch_size = 10000
                total_updated = 0
                update_start = time.time()
                
                for i in range(0, len(all_ids), batch_size):
                    batch_ids = all_ids[i:i+batch_size]
                    
                    batch_ids_str = ','.join(f"'{str(id)}'" for id in batch_ids)
                    
                    cursor.execute(f"""
                        UPDATE users
                        SET 
                            is_investor = true,
                            investor_date = CURRENT_TIMESTAMP,
                            updated_at = CURRENT_TIMESTAMP
                        WHERE id IN ({batch_ids_str})
                    """)
                    
                    conn.commit()
                    total_updated += len(batch_ids)

                            
            cursor.execute("""
                SELECT 
                    COUNT(*) as total_users,
                    SUM(CASE WHEN is_investor = true THEN 1 ELSE 0 END) as investors_true,
                    SUM(CASE WHEN is_investor = false THEN 1 ELSE 0 END) as investors_false,
                    SUM(CASE WHEN is_investor IS NULL THEN 1 ELSE 0 END) as investors_null
                FROM users
                WHERE status = 'ONBOARD_COMPLETED'
            """)
            final_stats = cursor.fetchone()
            
            print("\nFinal distribution for ONBOARD_COMPLETED users:")
            print(f"• Total users: {final_stats['total_users']:,}")
            print(f"• True investors: {final_stats['investors_true']:,}")
            print(f"• False investors: {final_stats['investors_false']:,}")
            print(f"• Null investors: {final_stats['investors_null']:,}")
            
            labels = ['Investors', 'Non-Investors']
            sizes = [final_stats['investors_true'], final_stats['investors_false'] + final_stats['investors_null']]
            
            plt.figure(figsize=(8, 6))
            plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90)
            plt.axis('equal')
            plt.title('Investor Distribution After Update')
            plt.tight_layout()
            plt.show()
        else:
            print("Update cancelled by user")
    else:
        print("No users need updating")
    
    cursor.close()
    total_time = time.time() - start_time
    print(f"\nTotal execution time: {total_time:.2f} seconds")
    
except Exception as e:
    print(f"❌ Error: {str(e)}")
    conn.rollback()

Initial distribution for ONBOARD_COMPLETED users:
• Total users: 263,782
• True investors: 263,782
• False investors: 0
• Null investors: 0

Step 1: No NULL values to update

Step 2: Found 0 users to update from FALSE to TRUE
No users need updating

Total execution time: 0.30 seconds


In [None]:
import json
import os
import time
from typing import Dict, Any, List
import psycopg2
from psycopg2.extras import Json, execute_values

def is_meaningful_value(value: Any) -> bool:
    if value is None:
        return False
        
    if isinstance(value, (int, float)):
        return value > 0
        
    if isinstance(value, str):
        try:
            return float(value) > 0
        except ValueError:
            return bool(value)
            
    if isinstance(value, dict):
        for k, v in value.items():
            if isinstance(v, dict) and 'shares' in v:
                if is_meaningful_value(v.get('shares')):
                    return True
            elif is_meaningful_value(v):
                return True
        return False
        
    return bool(value)

def has_meaningful_data(user: Dict[str, Any]) -> bool:
    if is_meaningful_value(user.get("equity")) or is_meaningful_value(user.get("equity_stocks")):
        return True
        
    if is_meaningful_value(user.get("cryptoEquity")) or is_meaningful_value(user.get("equity_crypto")):
        return True
        
    if is_meaningful_value(user.get("cash_on_hold")):
        return True
        
    actual = user.get("actual")
    if actual is not None:
        if isinstance(actual, (int, float)):
            if actual != 0:
                return True
        elif isinstance(actual, str):
            try:
                if float(actual) != 0:
                    return True
            except ValueError:
                pass
        
    if is_meaningful_value(user.get("portfolio")) or is_meaningful_value(user.get("AUM")):
        return True
        
    tickers = user.get("tickers")
    if is_meaningful_value(tickers):
        return True
        
    blocked_tickers = user.get("blocked_tickers") or user.get("tickers_block")
    if is_meaningful_value(blocked_tickers):
        return True
        
    return False

try:
    json_path = os.environ.get("BALANCE_REPORT_PATH")
    start_time = time.time()
    
    if not os.path.exists(json_path):
        print(f"Error: File {json_path} does not exist")
    else:
        file_size_mb = os.path.getsize(json_path) / (1024*1024)
        print(f"File exists, size: {file_size_mb:.2f} MB")
    
        
        user_count = 0
        meaningful_data_count = 0
        users_with_tickers = 0
        
        users_with_meaningful_data = []
        
        with open(json_path, 'r') as file:
            balance_data = json.load(file)
            
            if isinstance(balance_data, dict) and 'balancesArray' in balance_data:
                balances_array = balance_data['balancesArray']
                print(f"Found 'balancesArray' with {len(balances_array):,} items")
                
                for user_balance in balances_array:
                    user_count += 1
                    
                    user_id = user_balance.get('user_id')
                    
                    if user_id and has_meaningful_data(user_balance):
                        meaningful_data_count += 1
                        
                        if 'tickers' in user_balance or 'blocked_tickers' in user_balance:
                            users_with_tickers += 1
                            
                            users_with_meaningful_data.append({
                                'user_id': user_id,
                                'tickers': json.dumps(user_balance.get('tickers', {})),
                                'tickers_block': json.dumps(user_balance.get('blocked_tickers', {}))
                            })
                    
                    if user_count % 50000 == 0:
                        print(f"Processed {user_count:,} users, found {meaningful_data_count:,} with meaningful data")
            else:
                print("Could not find 'balancesArray' in the JSON structure")
                print(f"JSON structure contains keys: {list(balance_data.keys())}")
        
        processing_time = time.time() - start_time
        print(f"JSON processing completed in {processing_time:.2f} seconds")
        print(f"Total users processed: {user_count:,}")
        print(f"Users with meaningful data: {meaningful_data_count:,}")
        print(f"Users with ticker information: {users_with_tickers:,}")
        
        if users_with_meaningful_data:
            print(f"\nPreparing to update database with {len(users_with_meaningful_data):,} users")
            
            print("\nSample of ticker data to be updated (first 5 records):")
            for i, user in enumerate(users_with_meaningful_data[:5]):
                tickers_sample = user['tickers'][:100] + "..." if len(user['tickers']) > 100 else user['tickers']
                tickers_block_sample = user['tickers_block'][:100] + "..." if len(user['tickers_block']) > 100 else user['tickers_block']
                print(f"{i+1}. user_id: {user['user_id']}")
                print(f"   tickers: {tickers_sample}")
                print(f"   tickers_block: {tickers_block_sample}")
                print("-" * 80)
            
            proceed = input("Proceed with database update? (yes/no): ")
            
            if proceed.lower() == "yes":
                update_cursor = conn.cursor()
                
                temp_table_name = f"tmp_ticker_updates_{int(time.time())}"
                print(f"Creating temporary table '{temp_table_name}'...")
                
                update_cursor.execute(f"""
                    CREATE TEMPORARY TABLE {temp_table_name} (
                        user_id TEXT PRIMARY KEY,
                        tickers JSONB,
                        tickers_block JSONB
                    )
                """)
                
                batch_size = 5000
                total_inserted = 0
                
                
                for i in range(0, len(users_with_meaningful_data), batch_size):
                    batch = users_with_meaningful_data[i:i+batch_size]
                    
                    insert_data = []
                    for user in batch:
                        insert_data.append((
                            user['user_id'],
                            user['tickers'],
                            user['tickers_block']
                        ))
                    
                    insert_query = f"""
                        INSERT INTO {temp_table_name} (user_id, tickers, tickers_block)
                        VALUES %s
                    """
                    execute_values(update_cursor, insert_query, insert_data, template=None, page_size=1000)
                    conn.commit()
                    
                    total_inserted += len(batch)
                    if total_inserted % 20000 == 0 or total_inserted == len(users_with_meaningful_data):
                        print(f"Inserted {total_inserted:,}/{len(users_with_meaningful_data):,} records ({(total_inserted/len(users_with_meaningful_data))*100:.1f}%)")
                
                print("Executing ticker update...")
                update_start = time.time()
                
                update_query = f"""
                    UPDATE users u
                    SET 
                        tickers = t.tickers,
                        tickers_block = t.tickers_block,
                        updated_at = CURRENT_TIMESTAMP
                    FROM {temp_table_name} t
                    WHERE u.user_id = t.user_id
                """
                update_cursor.execute(update_query)
                updated_rows = update_cursor.rowcount
                conn.commit()
                
                update_time = time.time() - update_start
                print(f"Ticker update completed for {updated_rows:,} users in {update_time:.2f} seconds")
                
                update_cursor.execute(f"DROP TABLE IF EXISTS {temp_table_name}")
                conn.commit()
                print(f"Temporary table '{temp_table_name}' cleaned up")
                
                update_cursor.close()
            else:
                print("Update cancelled by user")
        else:
            print("No users with meaningful ticker data found for update")
        
except Exception as e:
    print(f"Error processing JSON file: {str(e)}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()
    import traceback
    traceback.print_exc()

File exists, size: 180.99 MB
Found 'balancesArray' with 512,520 items
Processed 50,000 users, found 8,484 with meaningful data
Processed 100,000 users, found 24,181 with meaningful data
Processed 150,000 users, found 40,054 with meaningful data
Processed 200,000 users, found 51,771 with meaningful data
Processed 250,000 users, found 57,995 with meaningful data
Processed 300,000 users, found 66,506 with meaningful data
Processed 350,000 users, found 82,484 with meaningful data
Processed 400,000 users, found 100,030 with meaningful data
Processed 450,000 users, found 117,526 with meaningful data
Processed 500,000 users, found 137,030 with meaningful data
JSON processing completed in 3.41 seconds
Total users processed: 512,520
Users with meaningful data: 144,982
Users with ticker information: 144,935

Preparing to update database with 144,935 users

Sample of ticker data to be updated (first 5 records):
1. user_id: 093ca98f-1f5b-4325-ac09-475a24176f60
   tickers: {"FVAL": "3.11084"}
   ti

In [None]:
import json
import os
import time
from typing import Dict, Any, List
import psycopg2
from psycopg2.extras import Json, execute_values

def is_meaningful_value(value: Any) -> bool:
    if value is None:
        return False
        
    if isinstance(value, (int, float)):
        return value > 0
        
    if isinstance(value, str):
        try:
            return float(value) > 0
        except ValueError:
            return bool(value)
            
    if isinstance(value, dict):
        for k, v in value.items():
            if isinstance(v, dict) and 'shares' in v:
                if is_meaningful_value(v.get('shares')):
                    return True
            elif is_meaningful_value(v):
                return True
        return False
        
    return bool(value)

def has_meaningful_data(user: Dict[str, Any]) -> bool:
    if is_meaningful_value(user.get("equity")) or is_meaningful_value(user.get("equity_stocks")):
        return True
        
    if is_meaningful_value(user.get("cryptoEquity")) or is_meaningful_value(user.get("equity_crypto")):
        return True
        
    if is_meaningful_value(user.get("cash_on_hold")):
        return True
        
    actual = user.get("actual")
    if actual is not None:
        if isinstance(actual, (int, float)):
            if actual != 0:
                return True
        elif isinstance(actual, str):
            try:
                if float(actual) != 0:
                    return True
            except ValueError:
                pass
        
    if is_meaningful_value(user.get("portfolio")) or is_meaningful_value(user.get("AUM")):
        return True
        
    tickers = user.get("tickers")
    if is_meaningful_value(tickers):
        return True
        
    blocked_tickers = user.get("blocked_tickers") or user.get("tickers_block")
    if is_meaningful_value(blocked_tickers):
        return True
        
    return False

def try_convert_numeric(value):
    if value is None:
        return None
    try:
        return float(value)
    except (ValueError, TypeError):
        return value

try:
    json_path = os.environ.get("BALANCE_REPORT_PATH")
    print(f"Reading balance report from: {json_path}")
    start_time = time.time()
    
    if not os.path.exists(json_path):
        print(f"Error: File {json_path} does not exist")
    else:
        file_size_mb = os.path.getsize(json_path) / (1024*1024)
        print(f"File exists, size: {file_size_mb:.2f} MB")
    
        print("Processing JSON file...")
        
        user_count = 0
        meaningful_data_count = 0
        users_with_data = 0
        
        users_with_meaningful_data = []
        
        with open(json_path, 'r') as file:
            balance_data = json.load(file)
            
            if isinstance(balance_data, dict) and 'balancesArray' in balance_data:
                balances_array = balance_data['balancesArray']
                print(f"Found 'balancesArray' with {len(balances_array):,} items")
                
                for user_balance in balances_array:
                    user_count += 1
                    
                    user_id = user_balance.get('user_id')
                    
                    if user_id and has_meaningful_data(user_balance):
                        meaningful_data_count += 1
                        users_with_data += 1
                        
                        user_data = {
                            'user_id': user_id,
                            'equity_crypto': try_convert_numeric(user_balance.get('cryptoEquity')),
                            'equity_stocks': try_convert_numeric(user_balance.get('equity')),
                            'cash_on_hold': try_convert_numeric(user_balance.get('cash_on_hold')),
                            'actual': try_convert_numeric(user_balance.get('actual')),
                            'aum': try_convert_numeric(user_balance.get('portfolio')),
                            'tickers': json.dumps(user_balance.get('tickers', {})),
                            'tickers_block': json.dumps(user_balance.get('blocked_tickers', {}))
                        }
                        
                        users_with_meaningful_data.append(user_data)
                    
                    if user_count % 50000 == 0:
                        print(f"Processed {user_count:,} users, found {meaningful_data_count:,} with meaningful data")
            else:
                print("Could not find 'balancesArray' in the JSON structure")
                print(f"JSON structure contains keys: {list(balance_data.keys())}")
        
        processing_time = time.time() - start_time
        print(f"JSON processing completed in {processing_time:.2f} seconds")
        print(f"Total users processed: {user_count:,}")
        print(f"Users with meaningful data: {meaningful_data_count:,}")
        
        if users_with_meaningful_data:
            print(f"\nPreparing to update database with {len(users_with_meaningful_data):,} users")
            
            print("\nSample of user data to be updated (first 3 records):")
            for i, user in enumerate(users_with_meaningful_data[:3]):
                print(f"{i+1}. user_id: {user['user_id']}")
                for field, value in user.items():
                    if field in ['tickers', 'tickers_block']:
                        sample = value[:100] + "..." if len(value) > 100 else value
                        print(f"   {field}: {sample}")
                    else:
                        print(f"   {field}: {value}")
                print("-" * 80)
            
            proceed = input("Proceed with database update? (yes/no): ")
            
            if proceed.lower() == "yes":
                update_cursor = conn.cursor()
                
                temp_table_name = f"tmp_user_updates_{int(time.time())}"
                print(f"Creating temporary table '{temp_table_name}'...")
                
                update_cursor.execute(f"""
                    CREATE TEMPORARY TABLE {temp_table_name} (
                        user_id TEXT PRIMARY KEY,
                        equity_crypto NUMERIC,
                        equity_stocks NUMERIC,
                        cash_on_hold NUMERIC,
                        actual NUMERIC,
                        aum NUMERIC,
                        tickers JSONB,
                        tickers_block JSONB
                    )
                """)
                
                batch_size = 5000
                total_inserted = 0
                                
                for i in range(0, len(users_with_meaningful_data), batch_size):
                    batch = users_with_meaningful_data[i:i+batch_size]
                    
                    insert_data = []
                    for user in batch:
                        insert_data.append((
                            user['user_id'],
                            user['equity_crypto'],
                            user['equity_stocks'],
                            user['cash_on_hold'],
                            user['actual'],
                            user['aum'],
                            user['tickers'],
                            user['tickers_block']
                        ))
                    
                    insert_query = f"""
                        INSERT INTO {temp_table_name} (
                            user_id, equity_crypto, equity_stocks, cash_on_hold, 
                            actual, aum, tickers, tickers_block
                        )
                        VALUES %s
                    """
                    execute_values(update_cursor, insert_query, insert_data, template=None, page_size=1000)
                    conn.commit()
                    
                    total_inserted += len(batch)
                    if total_inserted % 20000 == 0 or total_inserted == len(users_with_meaningful_data):
                        print(f"Inserted {total_inserted:,}/{len(users_with_meaningful_data):,} records ({(total_inserted/len(users_with_meaningful_data))*100:.1f}%)")
                
                print("Executing user data update...")
                update_start = time.time()
                
                update_query = f"""
                    UPDATE users u
                    SET 
                        equity_crypto = CASE WHEN t.equity_crypto IS NOT NULL THEN t.equity_crypto ELSE u.equity_crypto END,
                        equity_stocks = CASE WHEN t.equity_stocks IS NOT NULL THEN t.equity_stocks ELSE u.equity_stocks END,
                        cash_on_hold = CASE WHEN t.cash_on_hold IS NOT NULL THEN t.cash_on_hold ELSE u.cash_on_hold END,
                        actual = CASE WHEN t.actual IS NOT NULL THEN t.actual ELSE u.actual END,
                        aum = CASE WHEN t.aum IS NOT NULL THEN t.aum ELSE u.aum END,
                        tickers = t.tickers,
                        tickers_block = t.tickers_block,
                        updated_at = CURRENT_TIMESTAMP
                    FROM {temp_table_name} t
                    WHERE u.user_id = t.user_id
                """
                update_cursor.execute(update_query)
                updated_rows = update_cursor.rowcount
                conn.commit()
                
                update_time = time.time() - update_start
                print(f"User data update completed for {updated_rows:,} users in {update_time:.2f} seconds")
                
                update_cursor.execute(f"DROP TABLE IF EXISTS {temp_table_name}")
                conn.commit()
                print(f"Temporary table '{temp_table_name}' cleaned up")
                
                update_cursor.close()
            else:
                print("Update cancelled by user")
        else:
            print("No users with meaningful data found for update")
        
except Exception as e:
    print(f"Error processing JSON file: {str(e)}")
    if 'conn' in locals() and conn is not None:
        conn.rollback()

In [None]:
try:
    print("Closing database connection...")
    
    if 'conn' in locals() and conn is not None:
        conn.close()
        print("Database connection closed")
    
    
except Exception as e:
    print(f"Error during cleanup: {str(e)}")
    
finally:
    import warnings
    warnings.resetwarnings()
    print("Session complete")