Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agents/crypto_trader_bot/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
OPENAI_API_KEY=

BOTFATHER_API_TOKEN=
BITQUERY_OAUTH_TOKEN=
LOG_LEVEL=INFO # Options: DEBUG, INFO, WARNING, ERROR
6 changes: 6 additions & 0 deletions agents/crypto_trader_bot/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.venv
.env
swarmzero-data
**/*/__pycache__/
__pycache__/
.DS_Store
64 changes: 64 additions & 0 deletions agents/crypto_trader_bot/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Crypto Trader Telegram Bot

An AI-powered cryptocurrency trading bot that leverages BitQuery for market data and provides trading insights through a Telegram interface.

Inspired by [How to build your own Add Liquidity Signal Telegram Bot for Solana DEX Pools | Bitquery API Tutorial
](https://youtu.be/s5GTjKhUmEo?si=DI61viBplqKIYwXG).

## Features

- Real-time cryptocurrency market data monitoring via BitQuery
- Telegram bot interface for user interaction
- Configurable logging levels
- AI-powered trading analysis and recommendations

## Prerequisites

- Python 3.11 or higher
- Poetry for dependency management
- Telegram Bot Token (from BotFather)
- BitQuery OAuth Token
- OpenAI API Key (for AI features)

## Installation

1. Clone the repository:
```bash
git clone https://github.com/swarmzero/examples.git
cd examples/agents/crypto-trader-bot
```

2. Install dependencies using Poetry:
```bash
poetry install --no-root
```

3. Set up environment variables:
- Copy `.env.example` to `.env`
- Fill in the required tokens and API keys:
- `OPENAI_API_KEY`: Your OpenAI API key
- `BOTFATHER_API_TOKEN`: Your Telegram bot token
- `BITQUERY_OAUTH_TOKEN`: Your BitQuery OAuth token
- `LOG_LEVEL`: Desired logging level (DEBUG, INFO, WARNING, ERROR)

## Usage

1. Activate the Poetry environment:
```bash
poetry shell
```

2. Run the bot:
```bash
poetry run python main.py
```

The bot will start and listen for commands through Telegram.

## Project Structure

- `main.py`: Entry point and bot initialization
- `bitquery_service.py`: BitQuery API integration
- `dex_agent.py`: Trading logic and analysis
- `dex_rabbit_bot.py`: Telegram bot implementation
- `config.py`: Configuration management
159 changes: 159 additions & 0 deletions agents/crypto_trader_bot/bitquery_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import logging
from datetime import datetime, timedelta, timezone
from typing import Dict

import aiohttp

from config import get_chain_id

logger = logging.getLogger(__name__)


class BitQueryService:
def __init__(self, oauth_token: str):
if not oauth_token:
raise ValueError("BitQuery OAuth token is required")

logger.debug("Initializing BitQueryService")
self.oauth_token = oauth_token
self.url = "https://streaming.bitquery.io/graphql"
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {oauth_token}",
}

def _get_base_trade_fields(self, address_field: str = "SmartContract") -> str:
"""Get common trade fields structure for all chains"""
return f"""
Block {{
Number
Time
}}
Transaction {{
Hash
}}
Trade {{
Buy {{
Amount
Currency {{
Name
Symbol
{address_field}
}}
Price
}}
Sell {{
Amount
Currency {{
Name
Symbol
{address_field}
}}
Price
}}
Dex {{
ProtocolName
}}
}}
"""

def _get_chain_query(self, chain: str) -> tuple[str, str]:
"""Get chain-specific query structure and namespace"""
if chain == "solana":
return "Solana", "MintAddress"
elif chain == "tron":
return "Tron", "Address"
elif chain == "ton":
return "TON", "Address"
else: # EVM chains
return "EVM", "SmartContract"

async def get_chain_activity(self, chain: str, time_window: int = 60) -> Dict:
"""
Fetch trading activity for specified chain
time_window: minutes to look back
"""
try:
logger.debug(f"Fetching chain activity for {chain}, time window: {time_window}min")
now = datetime.now(timezone.utc)
time_ago = now - timedelta(minutes=time_window)

# Normalize chain name
chain = get_chain_id(chain)
namespace, address_field = self._get_chain_query(chain)
trade_fields = self._get_base_trade_fields(address_field)

# Build query based on chain type
if namespace == "EVM":
# Query for EVM chains
query = f"""
query ($network: evm_network!, $since: DateTime) {{
{namespace}(network: $network) {{
DEXTrades(
orderBy: {{descending: Block_Time}}
where: {{Block: {{Time: {{since: $since}}}}}}
) {{
{trade_fields}
}}
}}
}}
"""
variables = {"network": chain.lower(), "since": time_ago.isoformat()}
else:
# Query for non-EVM chains (Solana, Tron, TON)
query = f"""
query ($since: DateTime) {{
{namespace} {{
DEXTrades(
orderBy: {{descending: Block_Time}}
where: {{Block: {{Time: {{since: $since}}}}}}
) {{
{trade_fields}
}}
}}
}}
"""
variables = {"since": time_ago.isoformat()}

# Log the query and variables
logger.debug(f"BitQuery request for {chain}:")
logger.debug(f"Query: {query}")
logger.debug(f"Variables: {variables}")

async with aiohttp.ClientSession() as session:
async with session.post(
self.url,
headers=self.headers,
json={"query": query, "variables": variables},
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"BitQuery API error: Status {response.status}, Response: {error_text}")
raise aiohttp.ClientError(f"BitQuery API returned status {response.status}")

data = await response.json()

if "errors" in data:
logger.error(f"GraphQL errors: {data['errors']}")
raise ValueError(f"GraphQL query failed: {data['errors']}")

# Log the response data
trades = data.get("data", {}).get(namespace, {}).get("DEXTrades", [])
logger.info(f"Received {len(trades)} trades from BitQuery for {chain}")

# Log sample of trades for debugging
if trades:
logger.debug("Sample trade data (first trade):")
logger.debug(f"Block: {trades[0].get('Block', {})}")
logger.debug(f"Transaction: {trades[0].get('Transaction', {})}")
logger.debug(f"Trade details: {trades[0].get('Trade', {})}")

logger.debug(f"Successfully fetched data for {chain}")
return data

except aiohttp.ClientError as e:
logger.error(f"Network error while fetching chain activity: {str(e)}")
raise
except Exception as e:
logger.error(f"Error fetching chain activity for {chain}: {str(e)}")
raise
69 changes: 69 additions & 0 deletions agents/crypto_trader_bot/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import Dict, NamedTuple

logger = logging.getLogger(__name__)


class ChainConfig(NamedTuple):
name: str
url_path: str
native_token: str
explorer: str


SUPPORTED_CHAINS: Dict[str, ChainConfig] = {
"solana": ChainConfig("Solana", "solana", "SOL", "https://solscan.io"),
"eth": ChainConfig("Ethereum", "eth", "ETH", "https://etherscan.io"),
"tron": ChainConfig("Tron", "tron", "TRX", "https://tronscan.org"),
"ton": ChainConfig("TON", "ton", "TON", "https://tonscan.org"),
"base": ChainConfig("Base", "base", "ETH", "https://basescan.org"),
"matic": ChainConfig("Polygon", "matic", "MATIC", "https://polygonscan.com"),
"bsc": ChainConfig("BSC", "bsc", "BNB", "https://bscscan.com"),
"opbnb": ChainConfig("opBNB", "opbnb", "BNB", "https://opbnbscan.com"),
"optimism": ChainConfig("Optimism", "optimism", "ETH", "https://optimistic.etherscan.io"),
"arbitrum": ChainConfig("Arbitrum", "arbitrum", "ETH", "https://arbiscan.io"),
}

# Chain name variations mapping
CHAIN_ALIASES = {
# Ethereum variations
"ethereum": "eth",
"ether": "eth",
# BSC variations
"binance": "bsc",
"binance smart chain": "bsc",
# Polygon variations
"polygon": "matic",
# Common variations for other chains
"sol": "solana",
"opt": "optimism",
"arb": "arbitrum",
}

# Log supported chains on module load
logger.info(f"Loaded {len(SUPPORTED_CHAINS)} supported chains:")
for chain_id, config in SUPPORTED_CHAINS.items():
logger.debug(f" {chain_id}: {config.name} ({config.native_token})")


def normalize_chain_name(chain: str) -> str:
"""Convert various chain name formats to our standard chain ID"""
chain = chain.lower().strip()
return CHAIN_ALIASES.get(chain, chain)


def validate_chain(chain: str) -> bool:
"""Validate if a chain is supported"""
normalized_chain = normalize_chain_name(chain)
is_valid = normalized_chain in SUPPORTED_CHAINS
if not is_valid:
logger.warning(f"Attempted to use unsupported chain: {chain} (normalized: {normalized_chain})")
return is_valid


def get_chain_id(chain: str) -> str:
"""Get the standardized chain ID from any valid chain name variation"""
normalized_chain = normalize_chain_name(chain)
if normalized_chain in SUPPORTED_CHAINS:
return normalized_chain
raise ValueError(f"Unsupported chain: {chain}")
Loading