diff --git a/offchain-modules/packages/app-monitor/src/balanceProvider.ts b/offchain-modules/packages/app-monitor/src/balanceProvider.ts index 02c54e3f..d5a7a0d6 100644 --- a/offchain-modules/packages/app-monitor/src/balanceProvider.ts +++ b/offchain-modules/packages/app-monitor/src/balanceProvider.ts @@ -1,11 +1,11 @@ -import CKB from '@nervosnetwork/ckb-sdk-core'; -import { CkbIndexer, Order, ScriptType, SearchKey } from '@force-bridge/x/dist/ckb/tx-helper/indexer'; import { utils } from '@ckb-lumos/base'; +import { EthAsset } from '@force-bridge/x/dist/ckb/model/asset'; +import { CkbIndexer, Order, ScriptType, SearchKey } from '@force-bridge/x/dist/ckb/tx-helper/indexer'; +import { getOwnerTypeHash } from '@force-bridge/x/dist/ckb/tx-helper/multisig/multisig_helper'; +import { logger } from '@force-bridge/x/dist/utils/logger'; +import CKB from '@nervosnetwork/ckb-sdk-core'; import Web3 from 'web3'; import { AbiItem } from 'web3-utils'; -import { logger } from '@force-bridge/x/dist/utils/logger'; -import { getOwnerTypeHash } from '@force-bridge/x/dist/ckb/tx-helper/multisig/multisig_helper'; -import { EthAsset } from '@force-bridge/x/dist/ckb/model/asset'; const minERC20ABI = [ // balanceOf diff --git a/offchain-modules/packages/app-monitor/src/duration.ts b/offchain-modules/packages/app-monitor/src/duration.ts index 13dbe3c0..f053803e 100644 --- a/offchain-modules/packages/app-monitor/src/duration.ts +++ b/offchain-modules/packages/app-monitor/src/duration.ts @@ -25,6 +25,8 @@ export interface EthConfig { export interface CkbConfig { lastHandledBlock: number; + sudtBillReconcBlock: number; + sudtBillLastReconcBlock: number; matchCount: { burn: number; mint: number; @@ -55,6 +57,8 @@ export function NewDurationCfg(): Duration { }, ckb: { lastHandledBlock: ForceBridgeCore.config.ckb.startBlockHeight, + sudtBillLastReconcBlock: 0, + sudtBillReconcBlock: 0, matchCount: { mint: 0, burn: 0, diff --git a/offchain-modules/packages/app-monitor/src/monitor.ts b/offchain-modules/packages/app-monitor/src/monitor.ts index 315f77dc..70f692e0 100644 --- a/offchain-modules/packages/app-monitor/src/monitor.ts +++ b/offchain-modules/packages/app-monitor/src/monitor.ts @@ -1,12 +1,14 @@ import { parseAddress } from '@ckb-lumos/helpers'; -import { CkbBurnRecord, CkbMintRecord, EthLockRecord, EthUnlockRecord } from '@force-bridge/reconc/dist'; +import { CkbBurnRecord, CkbMintRecord, EthLockRecord, EthUnlockRecord, SudtRecord } from '@force-bridge/reconc/dist'; import { nonNullable } from '@force-bridge/x'; import { IndexerCollector } from '@force-bridge/x/dist/ckb/tx-helper/collector'; import { getOwnerTypeHash } from '@force-bridge/x/dist/ckb/tx-helper/multisig/multisig_helper'; import { verifierEndpoint, feeAccounts } from '@force-bridge/x/dist/config'; import { bootstrap, ForceBridgeCore } from '@force-bridge/x/dist/core'; +import { KVDb } from '@force-bridge/x/dist/db/kv'; +import { SudtDb } from '@force-bridge/x/dist/db/sudt'; import { CKBRecordObservable } from '@force-bridge/x/dist/reconc'; -import { asyncSleep, foreverPromise } from '@force-bridge/x/dist/utils'; +import { asyncSleep, foreverPromise, getDBConnection } from '@force-bridge/x/dist/utils'; import { logger } from '@force-bridge/x/dist/utils/logger'; import { createCKBRecordObservable, @@ -14,10 +16,10 @@ import { EthRecordObservable, } from '@force-bridge/xchain-eth/dist/reconc'; import axios from 'axios'; +import dayjs from 'dayjs'; import { ethers } from 'ethers'; import { assetListPriceChange } from './assetPrice'; import { BalanceProvider } from './balanceProvider'; -import dayjs from 'dayjs'; import { WebHook } from './discord'; import { Duration, EventItem, monitorEvent, NewDurationCfg, readMonitorConfig, writeMonitorConfig } from './duration'; @@ -105,6 +107,8 @@ export class Monitor { private durationConfig: Duration; private gasPriceRecorder: GasPriceRecorder; private balanceProvider: BalanceProvider; + private sudtDb: SudtDb; + private kvDb: KVDb; webHookInfoUrl: string; webHookErrorUrl: string; @@ -143,6 +147,22 @@ export class Monitor { this.durationConfig.ckb.pending.mints.set(mint.fromTxId!, newEvent(mint)); } + async onSudtRecord(record: SudtRecord): Promise { + logger.info(`Receive sudt:${JSON.stringify(record)}`); + try { + await this.sudtDb.createSudtTransferRecord( + record.txId, + record.direction, + record.lock, + record.token, + record.amount, + record.index, + ); + } catch (e) { + logger.error(e.stack); + } + } + async onCkbBurnRecord(burn: CkbBurnRecord): Promise { logger.info(`Receive ckbBurn:${JSON.stringify(burn)}`); this.durationConfig.ckb.pending.burns.set(burn.txId, newEvent(burn)); @@ -212,6 +232,7 @@ export class Monitor { this.observeCkbEvent().catch((err) => { logger.error(`Monitor observeCkbEvent error:${err.stack}`); }); + void this.reconcSudtBill(); // this.observeAssetPrice().catch((err) => { // logger.error(`Monitor observeAssetPrice error:${err.stack}`); // }); @@ -230,7 +251,38 @@ export class Monitor { } } + async reconcSudtBill(): Promise { + let fromBlock = this.durationConfig.ckb.sudtBillLastReconcBlock; + let toBlock = fromBlock + 100; + while (fromBlock < this.durationConfig.ckb.sudtBillReconcBlock) { + toBlock > this.durationConfig.ckb.sudtBillReconcBlock ? this.durationConfig.ckb.sudtBillReconcBlock : toBlock; + try { + logger.info(`Monitor reconcSudtBill fromBlock: ${fromBlock} toBlock: ${toBlock}`); + await this.ckbRecordObservable + .observeSudtRecord({ fromBlock: `0x${fromBlock.toString(16)}`, toBlock: `0x${toBlock.toString(16)}` }) + .subscribe((records) => { + records.forEach((record) => { + void this.onSudtRecord(record); + }); + }); + } catch (e) { + logger.error(e.stack); + } + try { + await this.kvDb.set('sudt_bill_last_handle_block', toBlock.toString()); + } catch (e) { + logger.error(e.stack); + } + fromBlock = toBlock + 1; + toBlock = fromBlock + 100; + await asyncSleep(10); + } + } + async init(): Promise { + const conn = await getDBConnection(); + this.sudtDb = new SudtDb(conn); + this.kvDb = new KVDb(conn); let durationConfig = readMonitorConfig(); if (!durationConfig) { durationConfig = NewDurationCfg(); @@ -246,6 +298,19 @@ export class Monitor { if (ForceBridgeCore.config.monitor!.expiredCheckInterval > 0) { expiredCheckInterval = ForceBridgeCore.config.monitor!.expiredCheckInterval; } + + this.durationConfig.ckb.sudtBillReconcBlock = parseInt( + (await this.kvDb.get('sudt_bill_reconc_handle_block')) ?? '0', + ); + if (this.durationConfig.ckb.sudtBillReconcBlock == 0) { + await this.kvDb.set('sudt_bill_reconc_handle_block', this.durationConfig.ckb.lastHandledBlock.toString()); + this.durationConfig.ckb.sudtBillReconcBlock = this.durationConfig.ckb.lastHandledBlock; + } + + this.durationConfig.ckb.sudtBillLastReconcBlock = parseInt( + (await this.kvDb.get('sudt_bill_last_handle_block')) ?? '0', + ); + console.log(this.durationConfig); } checkExpiredEvent(): void { @@ -514,6 +579,12 @@ export class Monitor { }) .subscribe((record) => this.onCkbBurnRecord(record)); + await this.ckbRecordObservable.observeSudtRecord({ fromBlock, toBlock }).subscribe((records) => { + records.forEach((record) => { + void this.onSudtRecord(record); + }); + }); + this.durationConfig.ckb.lastHandledBlock = toBlockNum; }, { diff --git a/offchain-modules/packages/reconc/src/index.ts b/offchain-modules/packages/reconc/src/index.ts index de205d14..9ef44e0e 100644 --- a/offchain-modules/packages/reconc/src/index.ts +++ b/offchain-modules/packages/reconc/src/index.ts @@ -58,6 +58,13 @@ export type CkbMintRecord = ToRecord & { blockHash?: string; }; +export type SudtRecord = FromRecord & { + token: string; + lock: string; + direction: 'in' | 'out'; + index: number; +}; + export class Reconciliation { constructor(public from: FromRecord[], public to: ToRecord[]) {} diff --git a/offchain-modules/packages/scripts/src/testnetDocker.ts b/offchain-modules/packages/scripts/src/testnetDocker.ts index 0ad04cdc..540ef7f1 100644 --- a/offchain-modules/packages/scripts/src/testnetDocker.ts +++ b/offchain-modules/packages/scripts/src/testnetDocker.ts @@ -1,6 +1,7 @@ import fs from 'fs'; import path from 'path'; import { KeyStore } from '@force-bridge/keystore/dist'; +import { nonNullable } from '@force-bridge/x'; import { OwnerCellConfig } from '@force-bridge/x/dist/ckb/tx-helper/deploy'; import { Config, WhiteListEthAsset, CkbDeps } from '@force-bridge/x/dist/config'; import { getFromEnv, privateKeyToCkbPubkeyHash, writeJsonToFile } from '@force-bridge/x/dist/utils'; @@ -8,9 +9,8 @@ import { logger, initLog } from '@force-bridge/x/dist/utils/logger'; import * as dotenv from 'dotenv'; import * as lodash from 'lodash'; import * as Mustache from 'mustache'; -import { execShellCmd, PATH_PROJECT_ROOT, pathFromProjectRoot } from './utils'; +import { execShellCmd, pathFromProjectRoot } from './utils'; import { deployDev } from './utils/deploy'; -import { nonNullable } from '@force-bridge/x'; dotenv.config({ path: process.env.DOTENV_PATH || '.env' }); @@ -99,12 +99,12 @@ async function generateConfig( writeJsonToFile({ forceBridge: watcherConfig }, path.join(configPath, 'watcher/force_bridge.json')); //monitor const monitorConfig: Config = lodash.cloneDeep(baseConfig); - monitorConfig.common.orm = undefined; monitorConfig.common.port = undefined; monitorConfig.common.openMetric = false; monitorConfig.common.role = 'watcher'; monitorConfig.common.log.identity = 'monitor'; monitorConfig.common.log.logFile = path.join(configPath, 'monitor/force_bridge.log'); + monitorConfig.common.orm!.host = 'monitor_db'; monitorConfig.monitor = { discordWebHook: monitorDiscordWebHook, expiredTime: 1800000, //30 minutes @@ -236,6 +236,13 @@ services: depends_on: - {{name}}_db {{/verifiers}} + monitor_db: + image: mysql:5.7 + environment: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: forcebridge + ports: + - 3064:3306 monitor: image: node:14.18.1-bullseye restart: on-failure @@ -250,6 +257,8 @@ services: cd /app/offchain-modules; npx ts-node ./packages/app-cli/src/index.ts monitor -cfg /data/force_bridge.json ' + depends_on: + - monitor_db volumes: force-bridge-node-modules: external: true diff --git a/offchain-modules/packages/x/src/db/entity/sudt.ts b/offchain-modules/packages/x/src/db/entity/sudt.ts new file mode 100644 index 00000000..a49f6dc2 --- /dev/null +++ b/offchain-modules/packages/x/src/db/entity/sudt.ts @@ -0,0 +1,32 @@ +import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'; + +@Entity() +@Index(['txHash', 'index', 'direction'], { unique: true }) +export class Sudt { + @PrimaryGeneratedColumn('increment') + id: number; + + @Column() + txHash: string; + + @Column() + direction: number; + + @Column({ length: 10240 }) + address: string; + + @Column() + sudtArgs: string; + + @Column() + index: number; + + @Column() + amount: string; + + @CreateDateColumn() + createdAt: string; + + @UpdateDateColumn() + updatedAt: string; +} diff --git a/offchain-modules/packages/x/src/db/index.ts b/offchain-modules/packages/x/src/db/index.ts index 79e6b399..4db12eb9 100644 --- a/offchain-modules/packages/x/src/db/index.ts +++ b/offchain-modules/packages/x/src/db/index.ts @@ -5,3 +5,4 @@ export * from './tron'; export * from './btc'; export * from './eos'; export * from './kv'; +export * from './sudt'; diff --git a/offchain-modules/packages/x/src/db/sudt.ts b/offchain-modules/packages/x/src/db/sudt.ts new file mode 100644 index 00000000..b8bbfab8 --- /dev/null +++ b/offchain-modules/packages/x/src/db/sudt.ts @@ -0,0 +1,29 @@ +import { Connection, Repository } from 'typeorm'; +import { Sudt } from './entity/sudt'; + +export class SudtDb { + private sudtRepository: Repository; + + constructor(private connection: Connection) { + this.sudtRepository = connection.getRepository(Sudt); + } + + async createSudtTransferRecord( + hash: string, + direction: 'in' | 'out', + address: string, + sudtArgs: string, + amount: string, + index: number, + ): Promise { + const record = this.sudtRepository.create({ + txHash: hash, + direction: direction == 'in' ? 1 : -1, + address, + sudtArgs, + amount, + index, + }); + await this.sudtRepository.save(record); + } +} diff --git a/offchain-modules/packages/x/src/reconc/CKBRecordObservable.ts b/offchain-modules/packages/x/src/reconc/CKBRecordObservable.ts index bd8091b1..f106cb61 100644 --- a/offchain-modules/packages/x/src/reconc/CKBRecordObservable.ts +++ b/offchain-modules/packages/x/src/reconc/CKBRecordObservable.ts @@ -1,23 +1,23 @@ -import { core } from '@ckb-lumos/base'; +import { core, utils } from '@ckb-lumos/base'; import { CKBIndexerClient, SearchKey, SearchKeyFilter } from '@force-bridge/ckb-indexer-client'; import type * as Indexer from '@force-bridge/ckb-indexer-client'; -import { CkbBurnRecord, CkbMintRecord } from '@force-bridge/reconc'; +import { CkbBurnRecord, CkbMintRecord, SudtRecord } from '@force-bridge/reconc'; import { Amount } from '@lay2/pw-core'; import { default as RPC } from '@nervosnetwork/ckb-sdk-rpc'; import { Observable, from } from 'rxjs'; -import { map, expand, takeWhile, filter as rxFilter, mergeMap, distinct } from 'rxjs/operators'; +import { map, expand, takeWhile, filter as rxFilter, mergeMap, distinct, retry } from 'rxjs/operators'; import { Asset } from '../ckb/model/asset'; import { ScriptLike } from '../ckb/model/script'; import { RecipientCellData } from '../ckb/tx-helper/generated/eth_recipient_cell'; import { ForceBridgeLockscriptArgs } from '../ckb/tx-helper/generated/force_bridge_lockscript'; import { MintWitness } from '../ckb/tx-helper/generated/mint_witness'; +import { ForceBridgeCore } from '../core'; import { fromHexString, toHexString, uint8ArrayToString } from '../utils'; export interface CKBRecordObservableProvider { ownerCellTypeHash: string; recipientType: ScriptLike; bridgeLock: ScriptLike; - indexer: CKBIndexerClient; rpc: RPC; /** @@ -26,19 +26,20 @@ export interface CKBRecordObservableProvider { scriptToAddress: (script: ScriptLike) => string; } -export interface CKBMintFilter { +export interface Filter { + fromBlock?: string; // hex string + toBlock?: string; // hex string +} + +export type CKBMintFilter = Filter & { // lock?: ScriptLike; - fromBlock?: string; - toBlock?: string; asset?: Asset; -} +}; -export interface CKBBurnFilter { - fromBlock?: string; // hex string - toBlock?: string; // hex string +export type CKBBurnFilter = Filter & { sender?: ScriptLike; filterRecipientData: (data: RecipientCellData) => boolean; -} +}; function isTypeIDCorrect(args: string, expectOwnerTypeHash: string): boolean { const bridgeLockArgs = new ForceBridgeLockscriptArgs(fromHexString(args).buffer); @@ -49,6 +50,87 @@ function isTypeIDCorrect(args: string, expectOwnerTypeHash: string): boolean { export class CKBRecordObservable { constructor(private provider: CKBRecordObservableProvider) {} + observeSudtRecord(filter: Filter): Observable { + const { rpc, indexer: indexer } = this.provider; + + const searchKey: SearchKey = { + filter: { block_range: [filter.fromBlock ?? '0x0', filter.toBlock ?? '0xffffffffffffffff'] }, + script: { + code_hash: ForceBridgeCore.config.ckb.deps.sudtType.script.codeHash, + hash_type: ForceBridgeCore.config.ckb.deps.sudtType.script.hashType, + args: '0x', + }, + script_type: 'type', + }; + + const observable = from( + indexer.get_transactions({ + searchKey, + }), + ).pipe( + retry(2), + takeWhile((res) => res.objects.length > 0), + mergeMap((res) => res.objects), + mergeMap(async (getTxResult) => { + try { + const records: SudtRecord[] = []; + const tx = await rpc.getTransaction(getTxResult.tx_hash); + const inputs = tx.transaction.inputs; + for (let i = 0; i < inputs.length; i++) { + const input = inputs[i]; + if (input.previousOutput) { + const tx = await rpc.getTransaction(input.previousOutput.txHash); + const output = tx.transaction.outputs[parseInt(input.previousOutput.index)]; + if ( + output.type && + output.type.codeHash == searchKey.script.code_hash && + output.type.hashType == searchKey.script.hash_type + ) { + records.push({ + index: i, + txId: getTxResult.tx_hash, + amount: tx.transaction.outputsData[parseInt(input.previousOutput.index)], + lock: this.provider.scriptToAddress(ScriptLike.from(output.lock)), + direction: 'out', + token: utils.computeScriptHash({ + hash_type: output.type.hashType, + code_hash: output.type.codeHash, + args: output.type.args, + }), + }); + } + } + } + + tx.transaction.outputs.forEach((v, k) => { + if ( + v.type && + v.type.codeHash == searchKey.script.code_hash && + v.type.hashType == searchKey.script.hash_type + ) { + records.push({ + index: k, + txId: tx.transaction.hash, + amount: tx.transaction.outputsData[k], + lock: this.provider.scriptToAddress(ScriptLike.from(v.lock)), + direction: 'in', + token: utils.computeScriptHash({ + hash_type: v.type.hashType, + code_hash: v.type.codeHash, + args: v.type.args, + }), + }); + } + }); + return records; + } catch (e) { + return []; + } + }), + ); + return observable; + } + observeMintRecord(filter: CKBMintFilter): Observable { const { rpc, indexer: indexer, ownerCellTypeHash, bridgeLock } = this.provider; const blockRange: SearchKeyFilter['block_range'] = [ diff --git a/offchain-modules/packages/x/src/utils/index.ts b/offchain-modules/packages/x/src/utils/index.ts index 8e360e76..ee303a3d 100644 --- a/offchain-modules/packages/x/src/utils/index.ts +++ b/offchain-modules/packages/x/src/utils/index.ts @@ -22,6 +22,7 @@ import { TronLock } from '../db/entity/TronLock'; import { TronUnlock } from '../db/entity/TronUnlock'; import { WithdrawedBridgeFee } from '../db/entity/WithdrawedBridgeFee'; import { KV } from '../db/entity/kv'; +import { Sudt } from '../db/entity/sudt'; import { nonNullable } from '../errors'; export { asyncSleep, retryPromise, foreverPromise } from './promise'; @@ -160,6 +161,7 @@ export async function getDBConnection(): Promise { TronLock, TronUnlock, WithdrawedBridgeFee, + Sudt, ], namingStrategy: new SnakeNamingStrategy(), });