### Imports

In [1]:
import pandas as pd
pd.set_option('display.max_columns', None)
import pytz
import duckdb
import time

import requests as rq
import json
from datetime import datetime
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import os
from dotenv import load_dotenv
from pathlib import Path

import asyncio
import json
import websockets
from urllib.parse import urlparse, parse_qs
import numpy as np

### Keys

In [2]:
# Loads variables from .env file into environment
load_dotenv()

CG_DEMO_API_KEY = os.getenv("CG_DEMO_API_KEY")
if not CG_DEMO_API_KEY:
    raise RuntimeError("Missing Demo API key in the environment")

CG_PRO_API_KEY = os.getenv("CG_PRO_API_KEY")
if not CG_PRO_API_KEY:
    raise RuntimeError("Missing Pro API key in the environment")

CG_ANALYST_API_KEY = os.getenv("CG_ANALYST_API_KEY")
if not CG_ANALYST_API_KEY:
    raise RuntimeError("Missing Analyst API key in the environment")

### API status

In [3]:
PUB_URL = "https://api.coingecko.com/api/v3"
PRO_URL = "https://pro-api.coingecko.com/api/v3"

In [4]:
def get_response(endpoint, headers, params, URL):

    url = "".join((URL, endpoint))
    response = rq.get(url, headers=headers, params=params)

    try:
        data = response.json()
    except ValueError:
        print("Invalid JSON response")
        return None

    if response.status_code != 200:
        print(f"Failed to fetch data, status code {response.status_code}")

    return data

In [5]:
use_demo = {
           "accept": "application/json",
           "x-cg-demo-api-key" : CG_DEMO_API_KEY
}

use_pro = {
         "accept": "application/json",
         "x-cg-pro-api-key" : CG_PRO_API_KEY
}

In [6]:
get_response("/ping", use_demo, "", PUB_URL)

Failed to fetch data, status code 400


{'timestamp': '2026-01-25T18:40:55.371+00:00',
 'error_code': 10010,
 'status': {'error_message': 'If you are using Pro API key, please change your root URL from api.coingecko.com to pro-api.coingecko.com  Please refer here for more details: https://docs.coingecko.com/reference/authentication'}}

### Get new pools

In [7]:
def safe_get(d, path, default=None):
    """Safely get a nested dictionary value."""
    for key in path:
        if isinstance(d, dict) and key in d:
            d = d[key]
        else:
            return default
    return d

def collect_response(list_response):

    response_all = []

    for response in list_response.get("data", []):
        
        all_attributes = response.get("attributes", {})
        rel = response.get("relationships", {})
        
        base_token_add = safe_get(rel, ["base_token", "data", "id"], "NA")
        
        # If token_add exists, split it.
        token_add = base_token_add.split("_")[1] if base_token_add != "NA" and "_" in base_token_add else "NA"
        
        temp_dict = dict(
            pair = safe_get(all_attributes, ["name"], "NA"),
            pool_created_at = safe_get(all_attributes, ["pool_created_at"], "NA"),
            dex = safe_get(rel, ["dex", "data", "id"], "NA"),
            network = safe_get(rel, ["network", "data", "id"], "NA"),
            token_add = token_add,
            pool_add = safe_get(all_attributes, ["address"], "NA"),
            fdv_usd = safe_get(all_attributes, ["fdv_usd"], "NA"),
            market_cap_usd = safe_get(all_attributes, ["market_cap_usd"], "NA"),
            daily_volume = safe_get(all_attributes, ["volume_usd", "h24"], "NA"),
            daily_price_change = safe_get(all_attributes, ["price_change_percentage", "h24"], "NA"),
        )
        
        response_all.append(temp_dict)

    return response_all

In [8]:
def get_new_pools(network, sort_by_col, max_pages=None):
    
    endpoint = f"/onchain/networks/{network}/new_pools"
    params = {}
    newpools_all = []
    page_count = 0

    # Follow pagination via the response links.next and collect across pages, with an optional max_pages cap.
    while True:
        pools_list_response = get_response(endpoint, use_pro, params, PRO_URL)
        if not pools_list_response:
            break

        newpools_all.extend(collect_response(pools_list_response))
        page_count += 1

        if max_pages is not None and page_count >= max_pages:
            break

        links = pools_list_response.get("links", {})
        next_link = links.get("next") if isinstance(links, dict) else None
        if not next_link:
            break

        parsed = urlparse(next_link)
        endpoint = parsed.path
        params = {k: v[0] for k, v in parse_qs(parsed.query).items()}

    df_new_pools = pd.DataFrame(newpools_all)

    # Change to local timezone
    df_new_pools["pool_created_at"] = pd.to_datetime(df_new_pools["pool_created_at"], utc=True)
    df_new_pools["pool_created_at"] = df_new_pools["pool_created_at"].dt.tz_convert("Europe/Berlin")

    return df_new_pools[df_new_pools["dex"] == "pump-fun"].sort_values(
        by=[f"{sort_by_col}"], ascending=False
    )


In [9]:
#get_new_pools("solana", "pool_created_at", max_pages = 5).head(50)

### Filter profitable pools

In [10]:
def collect_pool_response(list_response):

    response = list_response.get("data", {})
    all_attributes = response.get("attributes", {})
    daily_tx = all_attributes["transactions"]["h24"]
    rel = response["relationships"]
    
    # Safely extract launchpad_details or default to empty dict
    launchpad_details = all_attributes.get("launchpad_details", {})
        
    response_dict = dict(
        pair = all_attributes["name"],
        dex = rel["dex"]["data"]["id"],
        token_add = rel["base_token"]["data"]["id"].split("_")[1],
        pool_add = all_attributes["address"],
        pool_created_at = all_attributes["pool_created_at"],
        fdv_usd = all_attributes["fdv_usd"],
        market_cap_usd = all_attributes["market_cap_usd"],
        daily_volume = all_attributes["volume_usd"]["h24"],
        daily_price_change = all_attributes["price_change_percentage"]["h24"],
        daily_buys = daily_tx["buys"],
        daily_sells = daily_tx["sells"],
        daily_buyers = daily_tx["buyers"],
        daily_sellers = daily_tx["sellers"],
        grad_pert = (
            launchpad_details.get("graduation_percentage")
            if launchpad_details else 0
        ),
        completed = launchpad_details.get("completed", False),
        completed_at = launchpad_details.get("completed_at", None),
        dest_pool = launchpad_details.get("migrated_destination_pool_address", None)
    )

    return response_dict

In [11]:
def get_pool_data(network, pool_address):

    target_url = f"/onchain/networks/{network}/pools/{pool_address}"

    pool_list_response = get_response(target_url,
                                      use_pro,
                                      "",
                                      PRO_URL)

    pool_all = collect_pool_response(pool_list_response)

    return pool_all

In [12]:
def collect_pool_data(network, num_rows, max_pages):

    df_new_pools = get_new_pools(network, "pool_created_at", max_pages).head(num_rows)
    
    all_pool_data = []

    for pool_add in df_new_pools["pool_add"]:
        pool_data = get_pool_data(network, pool_add)
        all_pool_data.append(pool_data)

    df = pd.DataFrame(all_pool_data)

    df = df.astype({
        "pair": "string",
        "dex": "string",
        "pool_add": "string",
        "token_add": "string",
        "daily_buys": "Int64",
        "daily_sells": "Int64",
        "daily_buyers": "Int64",
        "daily_sellers": "Int64",
        "completed": "boolean",
        "dest_pool": "string",
    })

    # Numeric columns (coerce invalids to NaN)
    for col in ["fdv_usd", "market_cap_usd", "daily_volume", "daily_price_change", "grad_pert"]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # Timestamps
    df["pool_created_at"] = pd.to_datetime(df["pool_created_at"], utc=True, errors="coerce")
    df["completed_at"] = pd.to_datetime(df["completed_at"], utc=True, errors="coerce")

    return df

In [13]:
def analyze_pools(network, num_rows, max_pages=5):

    df_pool_data = collect_pool_data(network, num_rows, max_pages)

    # Inspect key metrics such as Fully Diluted Volume (FDV) and age of the pool. We want
    # to filter out pools which are older than 10 minutes and have FDV less than $5000.
    cutoff = pd.Timestamp.now(tz="UTC") - pd.Timedelta(minutes=10)

    df_filtered = df_pool_data[
                        (df_pool_data["pool_created_at"] >= cutoff) &
                        (df_pool_data["fdv_usd"] > 2500)
                        ].copy()

    # Convert to local timezone
    df_filtered["pool_created_at"] = df_filtered["pool_created_at"].dt.tz_convert("Europe/Berlin")
    
    return df_filtered.sort_values(by="daily_volume", ascending=False)

In [116]:
#analyze_pools("solana", num_rows=200).head(5)

### Monitor real-time price

In [117]:
WS_URL = f"wss://stream.coingecko.com/v1?x_cg_pro_api_key={CG_ANALYST_API_KEY}"
NETWORK_ID = "solana"

#### Stream and write to DuckDB database

In [118]:
dbw.close()

In [119]:
DB_PATH = str(Path.cwd() / "price_ws_stream.duckdb")

# One connection for writes
dbw = duckdb.connect(DB_PATH)

db_lock = asyncio.Lock()

# Create new table
dbw.execute("""
CREATE TABLE IF NOT EXISTS price_stream (
    channel_type TEXT,
    network_id TEXT,
    token_address TEXT,
    usd_price DOUBLE,
    usd_price_24h_change_percentage DOUBLE,
    usd_market_cap DOUBLE,
    usd_24h_vol DOUBLE,
    last_updated_at TIMESTAMPTZ
);
""")

rename_map = {
    "c": "channel_type",
    "n": "network_id",
    "ta": "token_address",
    "p": "usd_price",
    "pp": "usd_price_24h_change_percentage",
    "m": "usd_market_cap",
    "v": "usd_24h_vol",
    "t": "last_updated_at",
}

async def stream_token_price_and_write_to_db(TOKEN_ADDRESS):

    async with websockets.connect(WS_URL) as ws:
        # 1) Subscribe
        subscribe_msg = {
            "command": "subscribe",
            "identifier": json.dumps({"channel": "OnchainSimpleTokenPrice"})
        }
        await ws.send(json.dumps(subscribe_msg))

        # 2) Send message to set tokens
        data_payload = {
            "network_id:token_addresses": [f"{NETWORK_ID}:{TOKEN_ADDRESS}"],
            "action": "set_tokens"
        }
        message_msg = {
            "command": "message",
            "identifier": json.dumps({"channel": "OnchainSimpleTokenPrice"}),
            "data": json.dumps(data_payload)
        }
        await ws.send(json.dumps(message_msg))

        # 3) Stream and then write data
        while True:
            msg = await ws.recv()
            payload = json.loads(msg)

            # Unwrap if needed
            if isinstance(payload, dict) and "message" in payload:
                data = payload["message"]
            else:
                data = payload

            # Only process if we get valid data
            if isinstance(data, dict) and "c" in data: 

                row = {rename_map[k]: payload.get(k) for k in rename_map}

                # Convert UNIX seconds to CET/CEST
                row["last_updated_at"] = (
                    pd.to_datetime(row["last_updated_at"], unit="s", utc=True)
                    .tz_convert("Europe/Berlin")
                )

                #print(row)
                async with db_lock:
                    dbw.execute(
                        "INSERT INTO price_stream VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
                        [
                            row["channel_type"],
                            row["network_id"],
                            row["token_address"],
                            row["usd_price"],
                            row["usd_price_24h_change_percentage"],
                            row["usd_market_cap"],
                            row["usd_24h_vol"],
                            row["last_updated_at"],
                        ],
                    )

#### Start background writers

In [126]:
# To be run after we connect + subscribe and have `ws`
#TOKEN_ADDRESS = "E1uH2rcjNnYWwbNyyHZBUj9uAXwrPaSwY4FG8zWzpump"

df_pools = analyze_pools("solana", num_rows=200).head(10)
token_list = df_pools["token_add"].dropna().unique().tolist()

tasks = [asyncio.create_task(stream_token_price_and_write_to_db(t)) for t in token_list]


In [138]:
[t for t in asyncio.all_tasks() if not t.done()]

[<Task pending name='Task-67' coro=<WebSocketCommonProtocol.close_connection() running at /home/vikas/.local/lib/python3.10/site-packages/websockets/legacy/protocol.py:1289> wait_for=<Task pending name='Task-65' coro=<WebSocketCommonProtocol.transfer_data() running at /home/vikas/.local/lib/python3.10/site-packages/websockets/legacy/protocol.py:955> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[Task.task_wakeup(), _wait.<locals>._on_completion() at /usr/lib/python3.10/asyncio/tasks.py:475]>>,
 <Task pending name='Task-77' coro=<WebSocketCommonProtocol.transfer_data() running at /home/vikas/.local/lib/python3.10/site-packages/websockets/legacy/protocol.py:955> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[Task.task_wakeup(), _wait.<locals>._on_completion() at /usr/lib/python3.10/asyncio/tasks.py:475]>,
 <Task pending name='Task-65' coro=<WebSocketCommonProtocol.transfer_data() running at /home/vikas/.local/lib/python3.10/site-packages/websockets/legacy/protocol.py:955> w

In [114]:
#task.cancel()
#task.done()
#task.cancelled()

#### Read from database

In [None]:
# dbr = duckdb.connect(DB_PATH)

# df = dbr.execute(f"""
#     SELECT * FROM price_stream
#     WHERE token_address = '{TOKEN_ADDRESS}'
#     ORDER BY last_updated_at DESC
#     LIMIT 10
# """).df()

# df

In [136]:
tokens_sql = ", ".join(f"'{t}'" for t in token_list)

df = dbr.execute(f"""
    SELECT *
    FROM price_stream
    WHERE token_address IN ({tokens_sql})
    QUALIFY row_number() OVER (
        PARTITION BY token_address
        ORDER BY last_updated_at DESC
    ) = 1
""").df()

df

Unnamed: 0,channel_type,network_id,token_address,usd_price,usd_price_24h_change_percentage,usd_market_cap,usd_24h_vol,last_updated_at
0,G1,solana,3CuaKMv91Eosrtm9eanKVTaSJSkW4qyDTsyxtUpwpump,3e-06,-23.419508,3342.271879,0.0,2026-01-25 20:55:19+01:00
1,G1,solana,pX3P1Jx3v9hjQgZKZgq2Hr5HXc2npRvwqq3R4fjpump,3e-06,-14.461514,3301.969635,0.0,2026-01-25 20:55:19+01:00
2,G1,solana,8jARRbh7oqfbeYQRwwdhRtqbzUgB9SC7hFKB8purpump,1e-05,164.900011,9891.655048,6978.555816,2026-01-25 20:58:10+01:00
3,G1,solana,ATmPiwoeWzFWN3dbzXAxFWU51sWGV29WJRCACawgpump,3e-06,-24.076345,3440.689618,17467.124298,2026-01-25 20:56:24+01:00
4,G1,solana,672QE1jQxvSZ9wHy7t1QLGRqjjaP6Uq6Gq1ecGU4pump,3e-06,-7.994076,3323.369381,0.0,2026-01-25 20:55:19+01:00
5,G1,solana,D8irVhWymqYsxdfJAgZVjckaRRHZvammwid76Ce4pump,3e-06,4.433166,3294.693012,0.0,2026-01-25 20:55:19+01:00
6,G1,solana,5WgXtD5qD9zgYHzkZpNmwmAQ7aXwQv6hwrkurpsqpump,3e-06,-10.296614,3304.344002,2297.378696,2026-01-25 20:56:01+01:00
7,G1,solana,6BsV1T2yrq5suAScS5R6Y3yJCZNipYdATj6yGpmapump,3e-06,-20.367739,3436.03168,0.0,2026-01-25 20:55:19+01:00
8,G1,solana,GXrxojrUGcnRYZUNc8n4PUskKVtsCZsi35M3CmQ7pump,3e-06,-8.758283,3350.618809,0.0,2026-01-25 20:55:19+01:00
9,G1,solana,J33r7petRauxbhBp1YcGMZ5GqPKS2Q1tdDbaoSFdpump,3e-06,0.676046,3301.871705,233.220086,2026-01-25 20:59:31+01:00


#### Trade simulation

In [None]:
def trade_status(usd_price, entry):

    if usd_price > entry * 1.2:
        return "take_profit"
    if usd_price < entry * 0.8:
        return "stop_loss"
    
    return "monitoring"

# Per-token state
state = {t: {"last_ts": None, "entry_price": None, "last_status": None} for t in token_list}

dbr = duckdb.connect(DB_PATH)

# Use ANSI escape codes in the print strings for colors
COLORS = {
    "monitoring": "\033[34m",  # blue
    "stop_loss": "\033[31m",   # red
    "take_profit": "\033[32m", # green
}
RESET = "\033[0m"

while True:
    time.sleep(2)

    for token in token_list:
        last_ts = state[token]["last_ts"]
        entry_price = state[token]["entry_price"]
        last_status = state[token]["last_status"]

        if last_ts is None:
            query = f"""
            SELECT * FROM price_stream
            WHERE token_address = '{token}'
            ORDER BY last_updated_at ASC
            """
        else:
            query = f"""
            SELECT * FROM price_stream
            WHERE token_address = '{token}'
              AND last_updated_at > '{last_ts}'
            ORDER BY last_updated_at ASC
            """

        df = dbr.execute(query).df()
        if df.empty:
            continue

        if entry_price is None:
            entry_price = df.iloc[0]["usd_price"]

        for _, row in df.iterrows():
            status = trade_status(row["usd_price"], entry_price)

            price_fmt = f"{row['usd_price']:.8f}"
            entry_fmt = f"{entry_price:.8f}"

            if status != last_status:
                line = f"[{row['last_updated_at']}] {token} {status} @ {price_fmt} (entry = {entry_fmt} USD)"
                print(f"{COLORS.get(status, '')}{line}{RESET}")
                last_status = status

            last_ts = row["last_updated_at"]

        state[token].update({"last_ts": last_ts, "entry_price": entry_price, "last_status": last_status})

[34m[2026-01-25 20:55:19+01:00] ATmPiwoeWzFWN3dbzXAxFWU51sWGV29WJRCACawgpump monitoring @ 0.00000526 (entry = 0.00000526 USD)[0m
[31m[2026-01-25 20:55:27+01:00] ATmPiwoeWzFWN3dbzXAxFWU51sWGV29WJRCACawgpump stop_loss @ 0.00000408 (entry = 0.00000526 USD)[0m
[34m[2026-01-25 20:55:19+01:00] 8jARRbh7oqfbeYQRwwdhRtqbzUgB9SC7hFKB8purpump monitoring @ 0.00000807 (entry = 0.00000807 USD)[0m
[31m[2026-01-25 20:56:01+01:00] 8jARRbh7oqfbeYQRwwdhRtqbzUgB9SC7hFKB8purpump stop_loss @ 0.00000626 (entry = 0.00000807 USD)[0m
[34m[2026-01-25 20:56:03+01:00] 8jARRbh7oqfbeYQRwwdhRtqbzUgB9SC7hFKB8purpump monitoring @ 0.00000675 (entry = 0.00000807 USD)[0m
[31m[2026-01-25 20:56:04+01:00] 8jARRbh7oqfbeYQRwwdhRtqbzUgB9SC7hFKB8purpump stop_loss @ 0.00000529 (entry = 0.00000807 USD)[0m
[34m[2026-01-25 20:56:05+01:00] 8jARRbh7oqfbeYQRwwdhRtqbzUgB9SC7hFKB8purpump monitoring @ 0.00000827 (entry = 0.00000807 USD)[0m
[31m[2026-01-25 20:56:07+01:00] 8jARRbh7oqfbeYQRwwdhRtqbzUgB9SC7hFKB8purpump stop_los