In [7]:
# Import required libraries
import os
import json
import time
import requests
from bs4 import BeautifulSoup
from solana.rpc.async_api import AsyncClient
from solana.transaction import Transaction
from solana.keypair import Keypair
from spl.token.client import Token
from base58 import b58decode

In [None]:
# Load environment variables

PRIVATE_KEY = ""
RPC_URL = "https://api.mainnet-beta.solana.com"

In [9]:
# Initialize Solana connection and wallet
async_client = AsyncClient(RPC_URL)
private_key_bytes = b58decode(PRIVATE_KEY)
wallet = Keypair.from_secret_key(bytes(private_key_bytes))

In [22]:
# Parse Twitter for tickers (placeholder function)
import asyncio
import websockets
import json

async def subscribe_to_pump_fun():
    """
    Subscribe to Pump.fun token creation events and return token mint addresses.
    """
    uri = "wss://pumpportal.fun/api/data"
    token_mint_addresses = []  # List to store token mint addresses

    async with websockets.connect(uri) as websocket:
        # Subscribing to token creation events
        payload = {"method": "subscribeNewToken"}
        await websocket.send(json.dumps(payload))

        # Listen for messages and process token creation events
        async for message in websocket:
            data = json.loads(message)

            # Check if the message contains new token creation event
            if data.get("event") == "newTokenCreated":
                token_mint = data.get("tokenMintAddress")
                if token_mint:
                    print(f"New token created: {token_mint}")
                    token_mint_addresses.append(token_mint)

            # Stop listening after a certain number of tokens (or some condition)
            if len(token_mint_addresses) >= 10:  # Example: Stop after 10 tokens
                break

    return token_mint_addresses

def parse_twitter():
    return asyncio.run(subscribe_to_pump_fun())


In [11]:
# Search for token on Pump.fun
def search_pump_fun(contract_address):
    """
    Search Pump.fun for a given contract address.
    """
    url = f"https://pump.fun/board/{contract_address}"
    headers = {"User-Agent": "Mozilla/5.0"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return response.text
    else:
        print(f"Pump.fun search failed for {contract_address}. Status code: {response.status_code}")
        return None

In [12]:
# Fetch data from Dexscreener
def get_token_price(token_address):
    response = requests.get(f"https://api.dexscreener.com/latest/dex/tokens/{token_address}")
    if response.status_code == 200:
        data = response.json()
        return float(data['pairs'][0]['priceUsd'])
    raise ValueError(f"Error fetching price for token {token_address}")

In [13]:
# Get Solsniffer contract score
def get_sol_sniffer_score(contract_address):
    """
    Fetch the contract score from Solsniffer.
    """
    url = f"https://solsniffer.com/{contract_address}"
    headers = {"User-Agent": "Mozilla/5.0"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        soup = BeautifulSoup(response.text, "html.parser")
        score_element = soup.find("div", class_="contract-score")
        if score_element:
            try:
                return float(score_element.text.strip())
            except ValueError:
                print(f"Error parsing score for {contract_address}")
                return None
        else:
            print(f"Score not found for contract {contract_address}")
            return None
    else:
        print(f"Failed to fetch data for contract {contract_address}. Status code: {response.status_code}")
        return None

In [14]:
# Buy token
async def buy_token(token_mint_address, amount_in_sol=0.01, slippage=15):
    print(f"Buying {amount_in_sol} SOL worth of token {token_mint_address} with {slippage}% slippage...")

In [15]:
# Sell token
async def sell_token(token_mint_address, leave_moonbag=0.15, slippage=15):
    print(f"Selling token {token_mint_address} while leaving {leave_moonbag * 100}% moonbag and using {slippage}% slippage...")

In [23]:
# Monitor token price for take-profit levels
async def monitor_price_and_sell(token_address, buy_price, profit_multiplier=10, leave_moonbag=0.15):
    while True:
        try:
            current_price = get_token_price(token_address)
            if current_price >= buy_price * profit_multiplier:
                print(f"Price target reached: {current_price} USD (10x of {buy_price} USD). Selling...")
                await sell_token(token_address, leave_moonbag=leave_moonbag)
                break
            else:
                print(f"Current price: {current_price} USD. Waiting for 10x.")
        except Exception as e:
            print(f"Error monitoring price: {e}")
        time.sleep(60)  # Check every minute

In [26]:
# Main workflow
async def main():
    # Step 1: Parse Twitter for tokens
    tokens = parse_twitter()
    for token in tokens:
        print(f"Found token: {token}")

        # Step 2: Search Pump.fun and Dexscreener
        pump_data = search_pump_fun(token)
        print(f"Pump.fun data: {pump_data}")

        # Initialize current_price
        current_price = None

        try:
            current_price = get_token_price(token)
            print(f"Dexscreener price for {token}: {current_price}")
        except Exception as e:
            print(f"Error fetching Dexscreener data for {token}: {e}")

        if current_price is None:
            print(f"Skipping token {token} due to missing price data.")
            continue

        # Step 3: Check contract score via Solsniffer
        score = get_sol_sniffer_score(token)
        if score is not None and score < 85:
            print(f"Low Solsniffer score ({score}) for token {token}. Skipping...")
            continue

        # Step 4: Buy the token
        await buy_token(token, amount_in_sol=1, slippage=15)

        # Step 5: Monitor price and sell at 10x
        buy_price = current_price
        await monitor_price_and_sell(token, buy_price, profit_multiplier=10, leave_moonbag=0.15)


In [27]:
# Run the script
import asyncio
try:
    loop = asyncio.get_running_loop()
except RuntimeError:
    loop = None

if loop and loop.is_running():
    # If an event loop is running, schedule the coroutine
    print("Detected running event loop. Using `asyncio.create_task()`.")
    task = asyncio.create_task(main())
else:
    # If no event loop is running, start one
    asyncio.run(main())

Task exception was never retrieved
future: <Task finished name='Task-6' coro=<main() done, defined at /var/folders/sy/1nr3d4256xg9rwg4pwh9cpl80000gn/T/ipykernel_47110/969673130.py:2> exception=RuntimeError('asyncio.run() cannot be called from a running event loop')>
Traceback (most recent call last):
  File "/var/folders/sy/1nr3d4256xg9rwg4pwh9cpl80000gn/T/ipykernel_47110/969673130.py", line 4, in main
    tokens = parse_twitter()
             ^^^^^^^^^^^^^^^
  File "/var/folders/sy/1nr3d4256xg9rwg4pwh9cpl80000gn/T/ipykernel_47110/3285979297.py", line 36, in parse_twitter
    return asyncio.run(subscribe_to_pump_fun())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/asyncio/runners.py", line 190, in run
    raise RuntimeError(
RuntimeError: asyncio.run() cannot be called from a running event loop


Detected running event loop. Using `asyncio.create_task()`.


  task = asyncio.create_task(main())
