Generic blockchain event indexer cho EVM-compatible smart contracts với real-time processing, gap detection và priority lane
- Real-time Event Indexing: Subscribe và index blockchain events ngay lập tức
- Priority Lane: Fast-track processing cho các transactions quan trọng (sub-second latency)
- Gap Detection & Backfill: Tự động phát hiện và fill missing events
- Multi-Publisher Support: RabbitMQ, Webhook, hoặc không publish
- Health Monitoring: Built-in metrics và health checks
- REST API: Query events và submit priority transactions
- Type-safe: Full TypeScript support với Zod validation
npm install @lynx-core/indexer
# or
pnpm add @lynx-core/indexer# config.yaml
serviceName: my-indexer
contractName: MyContract
contractAddress: "0x1234567890123456789012345678901234567890"
chainId: 1
network: mainnet
startBlock: 18000000
# RPC Configuration
rpcUrl: wss://mainnet.infura.io/ws/v3/YOUR_KEY
# Database Configuration
database:
uri: mongodb://localhost:27017
dbName: my_indexer
collections:
events: events
syncStatus: sync_status
priorityQueue: priority_transactions
# Publisher Configuration
publisher:
type: rabbitmq # or 'webhook' or 'none'
url: amqp://localhost
exchange: blockchain-events
retries: 3
# Feature Flags
priorityLane: true
crawler: true
crawlerInterval: 60000
# Performance
polling:
interval: 500
batchSize: 5000
# Events to index
events:
- Transfer
- Approval
- Mintimport { GenericIndexer, loadConfigFromFile } from '@lynx-core/indexer';
import MyContractABI from './abi/MyContract.json';
// Load config
const config = loadConfigFromFile('./config.yaml');
// Add ABI
config.abi = MyContractABI;
// Create and start indexer
const indexer = new GenericIndexer(config);
await indexer.start();Blockchain Event
↓
Priority Lane (optional) ← Fast track specific transactions
↓
Event Handler → Normalize & Parse
↓
Database (MongoDB)
↓
Message Queue (RabbitMQ/Webhook)
Main orchestrator quản lý tất cả components
Process và normalize blockchain events
Fast-track processing cho transactions quan trọng
- Poll mỗi 500ms
- Timeout sau 30s
- Xử lý reverted transactions
Detect và backfill missing events
- Chạy theo interval (default: 60s)
- Gap detection thông minh
- Automatic recovery
MongoDB operations với indexes tối ưu
Publish events đến RabbitMQ hoặc Webhook
REST API endpoints
Track metrics và health status
GET /healthResponse:
{
"service": "my-indexer",
"contract": {
"name": "MyContract",
"address": "0x1234...",
"chainId": 1,
"network": "mainnet"
},
"status": "healthy",
"sync": {
"lastBlock": 18500000,
"currentBlock": 18500010,
"lag": 10
},
"uptime": 3600,
"timestamp": "2025-11-02T10:00:00.000Z"
}GET /events?fromBlock=18000000&toBlock=18001000&eventName=Transfer&limit=100&offset=0Response:
{
"events": [...],
"count": 100
}POST /transactions/priority
Content-Type: application/json
{
"txHash": "0xabc...",
"context": {
"orderHashes": ["0x123..."],
"operator": "0x456...",
"timestamp": 1698768000
}
}Response:
{
"success": true,
"message": "Transaction queued for priority indexing",
"txHash": "0xabc..."
}GET /metricsResponse:
{
"eventsProcessed": 50000,
"eventsPerSecond": 10.5,
"lastProcessedBlock": 18500000,
"syncLag": 10,
"uptime": 3600
}Package này có thể index bất kỳ EVM-compatible smart contract nào, bao gồm:
- DeFi Protocols: DEX, Lending/Borrowing, Staking, Yield Farming
- NFT Marketplaces: ERC721, ERC1155, Marketplace contracts
- Token Contracts: ERC20, ERC777, Wrapped tokens
- DAOs & Governance: Voting, Treasury, Timelock contracts
- Gaming: In-game assets, P2E mechanics
- Real World Assets (RWA): Tokenized assets
- Cross-chain Bridges: Lock/Unlock, Mint/Burn events
- Prediction Markets: Order books, AMMs
- Social & Identity: ENS, Lens Protocol, Farcaster
- Any Custom Contract: Chỉ cần có ABI và events
import { GenericIndexer } from '@lynx-core/indexer';
import DEXRouterABI from './abi/DEXRouter.json';
const indexer = new GenericIndexer({
serviceName: 'dex-indexer',
contractName: 'DEXRouter',
contractAddress: '0x...',
abi: DEXRouterABI,
events: ['Swap', 'Mint', 'Burn', 'Sync'],
// ... other config
});
await indexer.start();const indexer = new GenericIndexer({
serviceName: 'nft-indexer',
contractName: 'NFTCollection',
events: ['Transfer', 'Approval', 'ApprovalForAll'],
// ... config
});const indexer = new GenericIndexer({
serviceName: 'lending-indexer',
contractName: 'LendingPool',
events: ['Deposit', 'Withdraw', 'Borrow', 'Repay', 'Liquidation'],
priorityLane: true, // Enable fast-track for important events
// ... config
});const indexer = new GenericIndexer({
serviceName: 'dao-indexer',
contractName: 'GovernorBravo',
events: ['ProposalCreated', 'VoteCast', 'ProposalExecuted', 'ProposalCanceled'],
// ... config
});const indexer = new GenericIndexer({
serviceName: 'bridge-indexer',
contractName: 'TokenBridge',
events: ['Deposit', 'Withdrawal', 'RelayMessage'],
crawler: true, // Important for cross-chain reconciliation
// ... config
});const indexer = new GenericIndexer({
serviceName: 'game-indexer',
contractName: 'GameContract',
events: ['ItemMinted', 'PlayerLevelUp', 'BattleCompleted', 'RewardClaimed'],
// ... config
});// Submit priority transaction để index ngay lập tức
// Use case: Large transfers, liquidations, governance votes
await fetch('http://localhost:3000/transactions/priority', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
txHash: '0x...',
context: {
type: 'large_transfer',
amount: '1000000000000000000',
priority: 'high'
}
})
});1. Blockchain Event Emitted
↓
2a. Real-time Subscription (WebSocket)
↓
OR
↓
2b. Priority Lane (POST /transactions/priority)
↓ (poll every 500ms)
↓
3. Event Handler
- Parse event arguments
- Normalize data (BigInt → string)
- Get block timestamp
- Check duplicates
↓
4. Database (MongoDB)
- Save event with unique index (txHash + logIndex)
- Update sync status
↓
5. Message Queue (optional)
- Publish to RabbitMQ/Webhook
- Routing key: events.{contractName}.{eventName}
↓
6. Background Crawler (periodic)
- Detect gaps in block ranges
- Backfill missing events
- Self-healing mechanism
1. Duplicate Prevention
- Unique index:
(txHash, logIndex) - Check before insert để avoid duplicates
- Idempotent operations
2. Block Synchronization
- Track last processed block trong
sync_statuscollection - Single source of truth cho sync progress
- Enable resumable indexing
3. Gap Detection Algorithm
- Query distinct block numbers từ events
- Compare với expected range
- Fill gaps bằng
queryFilter - O(n) complexity với n = số blocks trong range
4. Priority Lane Design
- In-memory queue với Map<txHash, interval>
- Poll transaction receipt every 500ms
- Auto timeout sau 30s
- Handle reverted transactions với revert reason
5. Error Handling & Resilience
- Retry logic với exponential backoff
- Graceful degradation khi publisher fails
- Connection monitoring & auto-reconnect
- Detailed error logging
serviceName: Tên service của bạncontractName: Tên contractcontractAddress: Address của contract (checksummed)chainId: Chain ID (1 = mainnet, 5 = goerli, etc.)network: Network nameabi: Contract ABIevents: Array of event names to indexstartBlock: Block number để bắt đầu indexingrpcUrl: WebSocket RPC URLdatabase: MongoDB configurationpublisher: Publisher configuration
fallbackRpcUrls: Fallback RPC URLspriorityLane: Enable priority lane (default: true)crawler: Enable crawler (default: true)crawlerInterval: Crawler interval in ms (default: 60000)polling: Polling configurationmetrics: Metrics configuration
{
indexer: string;
contractName: string;
contractAddress: string;
eventName: string;
eventSignature: string;
args: Record<string, any>;
blockNumber: number;
blockHash: string;
txHash: string;
logIndex: number;
timestamp: number;
source: 'priority_lane' | 'subscriber' | 'crawler';
indexedAt: Date;
processed: boolean;
}{
_id: 'status';
contractAddress: string;
lastBlock: number;
lastTimestamp: Date;
isHealthy: boolean;
}{
txHash: string;
context: any;
status: 'PENDING' | 'PROCESSED' | 'TIMEOUT' | 'ERROR' | 'REVERTED';
attempts: number;
receivedAt: Date;
processedAt?: Date;
blockNumber?: number;
eventsCount?: number;
error?: string;
}Package tự động handle:
- WebSocket disconnections
- MongoDB connection issues
- RabbitMQ failures với retry logic
- Missing blocks (gap detection)
- Reverted transactions
Sử dụng Pino logger với pino-pretty transport:
# Set log level
LOG_LEVEL=debug node index.jsLevels: trace, debug, info, warn, error, fatal
# Run tests
pnpm test
# Run with coverage
pnpm test -- --coverageContributions welcome! Please read contributing guidelines.
MIT
-
RPC Configuration
- Sử dụng WebSocket thay vì HTTP cho real-time events
- Set
polling.batchSizephù hợp với RPC limits (default: 5000) - Configure
fallbackRpcUrlscho high availability
-
Database Optimization
- Indexes tự động tạo cho optimal query performance
- Consider sharding cho high-volume indexers (>1M events/day)
- Regular backup và monitoring disk space
-
Priority Lane Usage
- Chỉ dùng cho events thực sự critical (liquidations, large transfers)
- Không abuse để tránh overload system
- Typical use: <1% của total transactions
-
Crawler Configuration
- Adjust
crawlerIntervalbased on chain activity - Fast chains (Polygon, BSC): 30s
- Slow chains (Ethereum): 60-120s
- Disable if using reliable RPC với guaranteed delivery
- Adjust
-
Message Queue
- Use
type: 'none'nếu chỉ cần database - RabbitMQ cho distributed systems
- Webhook cho simple integrations
- Use
-
Monitoring
// Enable metrics metrics: { enabled: true, port: 9090 }
-
Health Checks
# Kubernetes liveness probe curl http://localhost:3000/health -
Resource Requirements
- CPU: 1-2 cores
- RAM: 512MB-2GB (depends on event volume)
- Disk: SSD recommended for MongoDB
- Network: Stable connection to RPC endpoint
-
Error Handling
// Graceful shutdown process.on('SIGTERM', async () => { await indexer.stop(); });
- RPC Endpoint: Use authenticated endpoints, không expose public keys
- MongoDB: Enable authentication, không dùng default credentials
- API Server: Add authentication middleware nếu expose public
- Environment Variables: Use
.envfile cho sensitive configs
// Extend EventHandler for custom logic
class CustomEventHandler extends EventHandler {
protected async processEvent(...) {
// Custom logic
await super.processEvent(...);
}
}// Run multiple indexers in same process
const exchangeIndexer = new GenericIndexer(exchangeConfig);
const nftIndexer = new GenericIndexer(nftConfig);
await Promise.all([
exchangeIndexer.start(),
nftIndexer.start()
]);For support, email support@your-domain.com or join our Discord.