In [None]:
import requests

# Base URL for Solana RPC endpoint
rpc_url = "https://api.mainnet-beta.solana.com"

# List of wallet addresses holding some tokens (example)
wallet_addresses = [
    'C4su4UzsPgbDApTEWFThKwbTyHE9ZoRnV7TXwrtL55SB',
    'AnsWpQ5KzJpns4fTXQWzWg8YakJMrDS2T6M3TpUX4t5U',
    '93AX7iJUTbzmzB4vHXCRHjfLyrskAfNd1ReRQkfRUnXL',
    # Add more wallet addresses...
]

def get_token_accounts_by_owner(wallet_address):
    headers = {'Content-Type': 'application/json'}
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "getTokenAccountsByOwner",
        "params": [
            wallet_address,
            {
                "programId": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA",
                "encoding": "jsonParsed"
            }
        ]
    }

    response = requests.post(rpc_url, json=payload, headers=headers)
    return response.json()

# Loop through each wallet and retrieve their token accounts
wallet_to_mints = {}

for wallet in wallet_addresses:
    accounts = get_token_accounts_by_owner(wallet)
    # Extract mint addresses for each wallet
    mints = [account['account']['data']['parsed']['info']['mint'] for account in accounts.get('result', {}).get('value', [])]
    
    # Store the mint addresses for each wallet
    wallet_to_mints[wallet] = mints

# Print the mint addresses associated with each wallet
print(wallet_to_mints)

In [None]:
import asyncio
import nest_asyncio
from solana.rpc.async_api import AsyncClient
from solana.rpc.api import Pubkey

nest_asyncio.apply()

async def get_wallet_transactions(wallet_address, limit=10):
    pubkey = Pubkey.from_string(wallet_address)
    async with AsyncClient("https://api.mainnet-beta.solana.com") as client:
        # Fetch the signatures for the wallet address
        signatures_response = await client.get_signatures_for_address(pubkey, limit=limit)

        # Check if the response contains 'value' and that it's a list
        if not hasattr(signatures_response, 'value') or not isinstance(signatures_response.value, list):
            return []  # Return an empty list if no 'value' or incorrect format

        # Extract signatures from the response
        transaction_signatures = [sig.signature for sig in signatures_response.value]

        # Fetch the transaction details using get_transaction (instead of get_parsed_transactions)
        transactions = []
        for signature in transaction_signatures:
            transaction = await client.get_transaction(signature, "jsonParsed", max_supported_transaction_version=0)
            transactions.append(transaction)

        return transactions

async def main():
    wallet_address = "BHcTB516BrHzqDyFjMnDfWhSdLLxKKbihf4RauCJtD1h"
    transactions = await get_wallet_transactions(wallet_address)
    for tx in transactions:
        await asyncio.sleep(0.5)
        print(f"Signature: {tx.transaction.signatures[0]}")
        print(f"Timestamp: {tx.block_time}")
        print("Instructions:")
        for instruction in tx.transaction.message.instructions:
            print(f"  Program: {instruction.program_id}")
        print("-" * 20)

asyncio.run(main())


In [None]:
import csv
import requests
import os
from pathlib import Path
from dotenv import load_dotenv
from pprint import pprint

# Load environment variables
load_dotenv()

# RPC Endpoint
RPC_ENDPOINT = os.getenv("RPC_ENDPOINT")

# Fetch token holders using getTokenLargestAccounts
def get_token_holders(token_address):
    payload = {
        "jsonrpc": "2.0",
        "id": "getTokenLargestAccounts",
        "method": "getTokenLargestAccounts",
        "params": [token_address],
    }
    response = requests.post(RPC_ENDPOINT, json=payload)
    if response.status_code == 200:
        data = response.json().get("result", {}).get("value", [])
        pprint(data)
        return [(holder["address"], float(holder["amount"])) for holder in data]
    return []
    
tokens = ["BbzVAbu8rrmEGQRSFhS2tF42Agp1FsBtcEMoYLH3r84J",
"BBBmxremG7CnQCg1BpArvEY5QPH1J7fydn68fNKMcZne",
"7zaAmppPP7tstHMBqvrsy5J11ZGS2BkGbHv2xey5iNBo",
"CtQ7hnCGqLmFeW87tom2fHGTsCraTjzogjzALQLykXqL",
"DLktwpXUh9ydTGxcZRvkESJFggLiezu2VFP72dapump",
"6cfZ9tJhfsR7bZfVWAPtu2UrPqfWomud67Q2RC5Ypump",
"FnJoStz95yP5B9kSg2BiCE8FdsiL4S2AnZLrtMJYBdHg",
"9NcdSJeMvgMYpFgKJGrJyBqMTkWDApzZv7kwFiFpnWMy",
"4Sk8TVAoWaCPET62MRGsuKi8e2VwyAgtbiHS1MxP7vcn",
"FBmV4Zp3YiXSaybUKV6Dnh5W489cmQCRbnBiUnEwmoon"
]
for token in tokens:
    holders = get_token_holders(token)
    print(f"Token: {token}")
    print("Holders:")
    for holder, amount in holders:
        print(f"  {holder}: {amount}")
    print("\n")


In [None]:
import requests

RPC_ENDPOINT = "https://api.mainnet-beta.solana.com"

def get_token_accounts_by_owner(wallet_address):
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "getTokenAccountsByOwner",
        "params": [
            wallet_address,
            {"programId": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"},
            {"encoding": "jsonParsed"}
        ]
    }
    
    try:
        response = requests.post(RPC_ENDPOINT, json=payload, timeout=10)
        response.raise_for_status()
        data = response.json().get("result", {}).get("value", [])

        # Extract mints
        mints = [account["account"]["data"]["parsed"]["info"]["mint"] for account in data]
        return mints

    except requests.exceptions.RequestException as e:
        print(f"Error fetching token accounts: {e}")
        return []

# Example usage
wallet_address = "EA2giVUJRAmxL4K9NHydnDsXTNqtFxXMfW5mzSWVANrh"
tokens = get_token_accounts_by_owner(wallet_address)
print(tokens)


In [4]:
import os
import requests
import time
from typing import Union, List, Optional
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

class RunRPCMethod:
    BASE_URL = "https://mainnet.helius-rpc.com"

    def __init__(self, max_retries: int = 3, initial_delay: int = 1):
        self.api_key = os.getenv("HELIUS_API_KEY")
        if not self.api_key:
            raise ValueError("HELIUS_API_KEY not found in .env file")

        self.max_retries = max_retries
        self.initial_delay = initial_delay

    @property
    def rpc_url(self) -> str:
        return f"{self.BASE_URL}/?api-key={self.api_key}"

    def _make_api_request(self, payload: dict):
        """Make API requests with exponential backoff."""
        delay = self.initial_delay
        for attempt in range(self.max_retries):
            try:
                response = requests.post(self.rpc_url, json=payload, headers={"Content-Type": "application/json"}, timeout=10)
                response.raise_for_status()
                return response.json()
            except requests.exceptions.RequestException as e:
                if attempt == self.max_retries - 1:
                    print(f"Max retries reached. Failed request: {payload}")
                    raise e
                print(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds...")
                time.sleep(delay)
                delay *= 2  # Exponential backoff

    def call(self, method: str, request_id: Union[int, str], params: Optional[List] = None):
        """Generic method to call any Solana RPC method with retries."""
        payload = {
            "jsonrpc": "2.0",  # Hardcoded constant
            "id": request_id,
            "method": method,
            "params": params or []
        }
        return self._make_api_request(payload)

    # Predefined methods for common RPC calls
    def get_account_info(self, request_id: Union[int, str], account: str, encoding: str = "jsonParsed"):
        """Fetch account info from Solana blockchain."""
        return self.call("getAccountInfo", request_id, [{"encoding": encoding, "account": account}])

    def get_block_height(self, request_id: Union[int, str]):
        """Fetch the current block height."""
        return self.call("getBlockHeight", request_id)

    def get_recent_blockhash(self, request_id: Union[int, str]):
        """Fetch the most recent blockhash."""
        return self.call("getRecentBlockhash", request_id)

    def get_transaction(self, request_id: Union[int, str], signature: str, encoding: str = "json"):
        """Fetch transaction details by signature."""
        return self.call("getTransaction", request_id, [signature, {"encoding": encoding}])


In [None]:
# rom rpc_client import RunRPCMethod

# Initialize the RPC client
rpc = RunRPCMethod()

# Make calls using predefined methods
try:
    account_info = rpc.get_account_info(request_id="account_info", account="2aoZTcWvDZKeyufWkYbJ9mYaoSD1P9HPKzEcZ79GZHbx")

    block_height = rpc.get_block_height(request_id="block_1")
    print("Block Height:", block_height)

    recent_blockhash = rpc.get_recent_blockhash(request_id="blockhash_abc")
    print("Recent Blockhash:", recent_blockhash)

    transaction_info = rpc.get_transaction(request_id="txn_456", signature="QBcTdN1VuSVffWzPXpnzN6t8UZXZe3Gm16pAaTuhSQkXfeDvWzVhhzZWevuhA7pNb4Cgmm6kLjBUAwiX7YAQcAC")
    print("Transaction Info:", transaction_info)

except Exception as e:
    print("Error:", e)


In [None]:
import websockets
import asyncio
import nest_asyncio
import json

nest_asyncio.apply()

HELIUS_WS_URL = "wss://mainnet.helius-rpc.com/?api-key=cde2166e-a9cc-4f20-aab6-931319852b4a"

async def listen_helius():
    async with websockets.connect(HELIUS_WS_URL) as ws:
        mint_subscription = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "logsSubscribe",
            "params": [
                {
                    "mentions": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"]
                }
            ]
        }
        
        raydium_subscription = {
            "jsonrpc": "2.0",
            "id": 2,
            "method": "logsSubscribe",
            "params": [
                {
                    "mentions": ["675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"]
                }
            ]
        }

        await ws.send(json.dumps(mint_subscription))
        await ws.send(json.dumps(raydium_subscription))
        print("Listening for token mints & Raydium pool creations with enhanced anti-Rug and frontrun checks...")

        while True:
            try:
                message = await ws.recv()
                print(message)
            except Exception as e:
                print(e)

if __name__ == "__main__":
    asyncio.run(listen_helius())


In [12]:
import requests

DEX_SCREENER_API = "https://api.dexscreener.com/latest/dex/tokens/"

def analyze_token_metrics(token_address):
    """Fetches and analyzes token liquidity, volume, and holders."""
    url = f"{DEX_SCREENER_API}{token_address}"
    try:
        response = requests.get(url)

        if response.status_code == 200:
            data = response.json().get("pairs", [])

            if not data:
                print(f"⚠️ No valid data returned for {token_address}")
                return None

            # You can customize this if you need to analyze multiple pairs.
            pair_data = data[0]  # Default to the first pair (you can adjust this logic)

            # Check for Raydium filter
            dex = pair_data.get("dexId", "Unknown")  # Default to "Unknown" if not found
            if dex.lower() != "raydium":
                print(
                    f"⚠️ Token {token_address} is not a Raydium token, skipping analysis."
                )
                return None

            analysis = {
                "symbol": pair_data.get("baseToken", {}).get("symbol", "N/A"),
                "token_address": token_address,
                "liquidity": pair_data.get("liquidity", 0),  # Default to 0 if not found
                "volume_24h": pair_data.get("volume", {}).get(
                    "h24", 0
                ),  # Default to 0 if not found
                "holders": pair_data.get("holders", 0),  # Default to 0 if not found
                "dex": dex,
                "social_links": pair_data.get("info", {}).get(
                    "socials", []
                ),  # Default to empty list if not found
            }

            # You could add more pairs or different fields here if needed
            return analysis
        else:
            print(
                f"⚠️ Failed to fetch token data for {token_address} - Status code: {response.status_code}"
            )
            return None

    except requests.RequestException as e:
        print(f"❌ Error fetching token data for {token_address}: {e}")
        return None

In [None]:
results = analyze_token_metrics("DZaBTefhyzqdaNX6FyGAyUwpAAEeZeXgqw5FsHTkDciA")
print(results)


In [14]:
import requests

chainId = "solana"
tokenAddress = "6dPgBkXgWHmrH41zRVYtGrqAT4wAAertU3RjbiZJB4X8"

response = requests.get(
    "https://api.dexscreener.com/token-pairs/v1/{chainId}/{tokenAddress}",
)

data = response.json()

In [18]:
import time
import requests

TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
RPC_ENDPOINT="https://mainnet.helius-rpc.com/?api-key=cde2166e-a9cc-4f20-aab6-931319852b4a"


def get_new_token_mints():
    """Fetch newly minted Solana SPL tokens using Helius RPC, excluding non-token program transactions."""

    # Initialize a set to keep track of seen transactions
    seen_transactions = set()

    url = RPC_ENDPOINT
    current_time = int(time.time())
    one_hour_ago = current_time - 3600  # One hour ago in Unix timestamp

    # Define the list of program IDs to be excluded
    excluded_program_ids = [
        "So11111111111111111111111111111111111111112",  # Solana System Program ID
        "JUPyiwrYJFskUPiHa7hkeR8VUtAeFoSYbKedZNsDvCN",  # Example non-token program
        "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",  # Another example non-token program
        "11111111111111111111111111111111",  # Example non-token program
        # Add other program IDs you want to exclude
    ]

    response = requests.post(
        url,
        json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": "getSignaturesForAddress",
            "params": [
                TOKEN_PROGRAM_ID,  # Track transactions for Token Program
                {"limit": 20},  # Fetch latest 20 transactions
            ],
        },
    )

    if response.status_code != 200:
        print(f"❌ Error fetching transactions: {response.status_code}")
        return []

    transactions = response.json().get("result", [])
    new_tokens = []

    for tx in transactions:
        sig = tx["signature"]

        if tx["blockTime"] > one_hour_ago:
            break  # Stop processing if we've reached transactions older than one hour

        if sig in seen_transactions:  # Avoid duplicate processing
            continue

        seen_transactions.add(sig)  # Mark transaction as processed

        # Fetch transaction details
        tx_response = requests.post(
            url,
            json={
                "jsonrpc": "2.0",
                "id": 1,
                "method": "getTransaction",
                "params": [
                    sig,
                    {"maxSupportedTransactionVersion": 0},
                ],  # Get parsed transaction details
            },
        )

        if tx_response.status_code != 200:
            print(f"⚠️ Error fetching transaction {sig}: {tx_response.status_code}")
            continue

        tx_data = tx_response.json().get("result", {})

        # Check if the transaction involves any excluded program IDs
        if "meta" in tx_data and tx_data["meta"].get("err") is None:
            if "postTokenBalances" in tx_data["meta"]:
                for balance_change in tx_data["meta"]["postTokenBalances"]:
                    mint_address = balance_change["mint"]

                    # Exclude transactions involving unwanted program IDs
                    if any(
                        mint_address == excluded_program_id
                        for excluded_program_id in excluded_program_ids
                    ):
                        continue

                    # Ensure mint address is unique
                    if mint_address not in [t["mint_address"] for t in new_tokens]:
                        new_tokens.append(
                            {
                                "mint_address": mint_address,
                                "created_at": tx.get("blockTime", time.time()),
                            }
                        )

    return new_tokens

In [None]:
get_new_token_mints()

In [None]:
import asyncio
import websockets
import json
import requests
import redis
import time
import nest_asyncio

nest_asyncio.apply()

# Redis Setup
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

# API Configurations
HELIUS_RPC_URL = (
    "wss://mainnet.helius-rpc.com/?api-key=cde2166e-a9cc-4f20-aab6-931319852b4a"
)
DEX_SCREENER_URL = "https://api.dexscreener.com/latest/dex/tokens/solana"

# Solana Token Program ID (Used for tracking token mints)
SPL_TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"


async def monitor_new_tokens():
    """Listens for new Solana token mints via Helius RPC."""
    async with websockets.connect(HELIUS_RPC_URL) as ws:
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "logsSubscribe",
            "params": [
                {"mentions": [SPL_TOKEN_PROGRAM_ID]},
                {"commitment": "finalized"},
            ],
        }
        await ws.send(json.dumps(request))
        print("🔍 Listening for new token mints...")

        while True:
            response = await ws.recv()
            data = json.loads(response)

            if "params" in data and "result" in data["params"]:
                logs = data["params"]["result"]["value"]
                if any("MintTo" in log for log in logs["logs"]):
                    print("🚀 New token mint detected!")

                    mint_info = extract_mint_info(logs)
                    if mint_info and not redis_client.sismember(
                        "processed_tokens", mint_info["mint_address"]
                    ):
                        redis_client.sadd("processed_tokens", mint_info["mint_address"])
                        print(f"✅ Token Added: {mint_info}")
                        await check_dex_screener(mint_info)


def extract_mint_info(logs):
    """Extracts the token mint address, authority, and other details."""
    try:
        mint_address = logs["account"]
        mint_authority = logs.get("owner", "Unknown")
        return {
            "mint_address": mint_address,
            "mint_authority": mint_authority,
        }
    except Exception as e:
        print(f"❌ Error extracting mint info: {e}")
        return None


def fetch_dex_screener_tokens():
    """Fetches newly listed Solana tokens from DEX Screener API."""
    try:
        response = requests.get(DEX_SCREENER_URL)
        if response.status_code == 200:
            data = response.json()
            return data.get("pairs", [])
        return []
    except Exception as e:
        print(f"❌ Error fetching DEX Screener data: {e}")
        return []


async def check_dex_screener(mint_info):
    """Checks if a newly minted token is listed on DEX Screener."""
    while True:
        tokens = fetch_dex_screener_tokens()
        for pair in tokens:
            base_token = pair["baseToken"]
            if base_token["address"] == mint_info["mint_address"]:
                liquidity = pair["liquidity"]["usd"] if "liquidity" in pair else 0
                volume = pair["volume"]["h24"] if "volume" in pair else 0

                if liquidity >= 10000:  # Only track tokens with at least $10K liquidity
                    redis_client.sadd("listed_tokens", mint_info["mint_address"])
                    print(
                        f"💰 Token Listed on DEX: {base_token['name']} ({base_token['symbol']})"
                    )
                    print(f"🔗 DEX Screener URL: {pair['url']}")
                    return

        print("🔄 Checking DEX Screener again in 30s...")
        await asyncio.sleep(30)


async def main():
    await asyncio.gather(monitor_new_tokens())


if __name__ == "__main__":
    asyncio.run(main())

In [None]:
import os
import requests
import csv
from datetime import datetime
import time

# Define the file path where the CSV will be saved
file_path = 'src/agents/api_data/new_token_addresses_1.csv'

# Ensure the directory exists, create it if it doesn't
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
    os.makedirs(directory)

# Dexscreener API endpoint for latest token profiles
API_URL = "https://api.dexscreener.com/token-profiles/latest/v1"

# Fetch the latest token profiles
def get_latest_tokens():
    response = requests.get(API_URL)
    if response.status_code == 200:
        return response.json()  # Returns the latest token data
    else:
        print(f"Error: {response.status_code}")
        return None

# Filter tokens to include only Solana tokens (chainId == "solana")
def filter_solana_tokens(tokens):
    solana_tokens = [token for token in tokens if token['chainId'] == 'solana']
    return solana_tokens

# Function to check if the token is already in the CSV file
def is_token_duplicate(token_address):
    if os.path.exists(file_path):
        with open(file_path, mode='r', newline='') as file:
            reader = csv.reader(file)
            existing_tokens = [row[0] for row in reader]
            if token_address in existing_tokens:
                return True
    return False

# Generate CSV with token information
def generate_csv(tokens):
    # Open CSV file for writing (append mode)
    with open(file_path, mode="a", newline="") as file:
        writer = csv.writer(file)
        
        # Write header only if the file is empty
        if os.path.getsize(file_path) == 0:
            writer.writerow(["Token Address", "Time Found", "Epoch Time", "Solscan Link", "DexScreener Link", "Birdeye Link"])
        
        # Loop through the Solana tokens and write the details
        for token in tokens:
            token_address = token["tokenAddress"]
            
            # Check if the token already exists in the CSV file
            if is_token_duplicate(token_address):
                continue
            
            time_found = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            epoch_time = int(time.time())
            
            # Form the URLs
            solscan_link = f"https://solscan.io/tx/{token_address}"
            dexscreener_link = token["url"]
            birdeye_link = f"https://birdeye.so/token/{token_address}?chain=solana"
            
            # Write the token data to CSV
            writer.writerow([token_address, time_found, epoch_time, solscan_link, dexscreener_link, birdeye_link])
    
    print(f"CSV file '{file_path}' updated successfully!")

# Continuous monitoring loop
def monitor_tokens():
    while True:
        print("Checking for new tokens...")
        latest_tokens = get_latest_tokens()
        if latest_tokens:
            # Filter for Solana tokens
            solana_tokens = filter_solana_tokens(latest_tokens)
            
            # Generate CSV with the token information
            generate_csv(solana_tokens)
        
        # Sleep for a desired interval before checking again (e.g., every 60 seconds)
        time.sleep(10)

# Start monitoring
monitor_tokens()


In [2]:
!pwd

/teamspace/studios/this_studio/moondev


In [9]:
import os
import requests
import pandas as pd
import json
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
import anthropic
import openai
from typing import Dict, List
import time
from termcolor import colored, cprint
import random
import src.config as config
from dotenv import load_dotenv

load_dotenv()

# Fun emoji sets for different actions
SPINNER_EMOJIS = ["🌍", "🌎", "🌏"]  # Earth spinning
MOON_PHASES = ["🌑", "🌒", "🌓", "🌔", "🌕", "🌖", "🌗", "🌘"]  # Moon phases
ROCKET_SEQUENCE = ["🚀", "💫", "✨", "💫", "🌟"]  # Rocket launch
ERROR_EMOJIS = ["💥", "🚨", "⚠️", "❌", "🔥"]  # Error indicators
SUCCESS_EMOJIS = ["✨", "🎯", "🎨", "🎪", "🎭", "🎪"]  # Success indicators

COINGECKO_API_KEY = os.getenv("COINGECKO_API_KEY")
BASE_URL = "https://api.coingecko.com/api/v3"
RESULTS_DIR = Path("/teamspace/studios/this_studio/moondev/src/data/coingecko_results")
NEW_COINS_FILE = RESULTS_DIR / "new_coins.csv"
DELAY_BETWEEN_REQUESTS = 1

def print_spinner(
    message: str, emoji_set: List[str], color: str = "white", bg_color: str = "on_blue"
):
    """Print a spinning emoji animation with message"""
    for emoji in emoji_set:
        print(f"\r{emoji} {colored(message, color, bg_color)}", end="", flush=True)
        time.sleep(0.2)
    print()  # New line after animation


def print_fancy(
    message: str,
    color: str = "white",
    bg_color: str = "on_blue",
    emojis: List[str] = None,
):
    """Print a message with random emojis from set"""
    if emojis:
        emoji = random.choice(emojis)
        cprint(f"{emoji} {message} {emoji}", color, bg_color)
    else:
        cprint(message, color, bg_color)

def get_new_coins() -> pd.DataFrame:
        """Get latest coins using the free simple markets endpoint"""
        try:
            print_spinner("Scanning for new coins...", MOON_PHASES, "yellow", "on_blue")
            response = requests.get(
                f"{BASE_URL}/coins/markets",
                params={
                    "vs_currency": "usd",
                    "order": "volume_desc",  # High volume coins
                    "per_page": 250,
                    "page": 1,
                    "sparkline": False,
                },
            )

            if response.status_code == 200:
                data = response.json()
                df = pd.DataFrame(data)
                df["timestamp"] = datetime.now().isoformat()
                df["coingecko_url"] = df["id"].apply(
                    lambda x: f"https://www.coingecko.com/en/coins/{x}"
                )
                df.to_csv(NEW_COINS_FILE, index=False)

                print_fancy(
                    f"🎯 Found {len(df)} active tokens!",
                    "cyan",
                    "on_grey",
                    SUCCESS_EMOJIS,
                )
                return df

            else:
                print_fancy(
                    f"Error fetching coins: {response.text}",
                    "white",
                    "on_red",
                    ERROR_EMOJIS,
                )
                return pd.DataFrame()

        except Exception as e:
            print_fancy(f"Error: {str(e)}", "white", "on_red", ERROR_EMOJIS)
            return pd.DataFrame()

In [10]:
new_coins = get_new_coins()

🌘 [44m[33mScanning for new coins...[0m
[40m[36m🎯 🎯 Found 250 active tokens! 🎯[0m
