Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix light client sync miss some tx. #2992

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/neuron-ui/src/components/MultisigAddress/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ export const useSubscription = ({
const tmp: Record<string, number> = {}
res.result.forEach(v => {
if (hashToPayload[v.hash]) {
tmp[hashToPayload[v.hash]] = v.blockStartNumber
tmp[hashToPayload[v.hash]] = v.localSavedBlockNumber
}
})
setMultisigSyncProgress(tmp)
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-ui/src/services/remote/multisig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ export const generateMultisigSendAllTx = remoteApi<{
multisigConfig: MultisigConfig
}>('generate-multisig-send-all-tx')
export const loadMultisigTxJson = remoteApi<string, OfflineSignJSON>('load-multisig-tx-json')
export const getMultisigSyncProgress = remoteApi<string[], { hash: string; blockStartNumber: number }[]>(
export const getMultisigSyncProgress = remoteApi<string[], { hash: string; localSavedBlockNumber: number }[]>(
'get-sync-progress-by-addresses'
)
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/block-sync-renderer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import DataUpdateSubject from '../models/subjects/data-update'
import AddressCreatedSubject from '../models/subjects/address-created-subject'
import WalletDeletedSubject from '../models/subjects/wallet-deleted-subject'
import TxDbChangedSubject from '../models/subjects/tx-db-changed-subject'
import { LumosCellQuery, LumosCell } from './sync/connector'
import { LumosCellQuery, LumosCell } from './sync/synchronizer'
import { WorkerMessage, StartParams, QueryIndexerParams } from './task'
import logger from '../utils/logger'
import CommonUtils from '../utils/common'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import logger from '../../utils/logger'
import CommonUtils from '../../utils/common'
import RpcService from '../../services/rpc-service'
import { Address } from '../../models/address'
import { Connector } from './connector'
import { Synchronizer } from './synchronizer'
import { NetworkType } from '../../models/network'
import IndexerCacheService from './indexer-cache-service'

export default class IndexerConnector extends Connector {
export default class FullSynchronizer extends Synchronizer {
private rpcService: RpcService

constructor(addresses: Address[], nodeUrl: string, indexerUrl: string, nodeType: NetworkType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Address } from '../../models/address'
import AddressMeta from '../../database/address/meta'
import { scheduler } from 'timers/promises'
import SyncProgressService from '../../services/sync-progress'
import { Connector, AppendScript } from './connector'
import { Synchronizer, AppendScript } from './synchronizer'
import { computeScriptHash as scriptToHash } from '@ckb-lumos/base/lib/utils'
import { FetchTransactionReturnType, LightRPC, LightScriptFilter } from '../../utils/ckb-rpc'
import Multisig from '../../services/multisig'
Expand All @@ -24,7 +24,7 @@ import NetworksService from '../../services/networks'

const unpackGroup = molecule.vector(blockchain.OutPoint)

export default class LightConnector extends Connector {
export default class LightSynchronizer extends Synchronizer {
private lightRpc: LightRPC
private addressMetas: AddressMeta[]

Expand Down Expand Up @@ -87,7 +87,7 @@ export default class LightConnector extends Connector {
private async synchronize() {
const syncScripts = await this.upsertTxHashes()
await this.updateSyncedBlockOfScripts(syncScripts)
const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinBlockNumber()
const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinSyncedBlockNumber()
const hasNextBlock = await this.notifyAndSyncNext(minSyncBlockNumber)
if (!hasNextBlock) {
await this.updateBlockStartNumber(minSyncBlockNumber)
Expand Down Expand Up @@ -138,7 +138,7 @@ export default class LightConnector extends Connector {
const txs = await this.getTransactions({
script: syncScript.script,
scriptType: syncScript.scriptType,
blockRange: [BI.from(syncStatus.blockEndNumber).toHexString(), syncScript.blockNumber],
blockRange: [BI.from(syncStatus.syncedBlockNumber).toHexString(), syncScript.blockNumber],
})
insertTxCaches.push(
...txs.map(v => ({
Expand All @@ -164,7 +164,7 @@ export default class LightConnector extends Connector {
syncScripts.forEach(v => {
const currentSyncProgress = syncStatusMap.get(scriptToHash(v.script))
if (currentSyncProgress) {
currentSyncProgress.blockEndNumber = parseInt(v.blockNumber)
currentSyncProgress.syncedBlockNumber = parseInt(v.blockNumber)
updatedSyncProgress.push(currentSyncProgress)
}
})
Expand Down Expand Up @@ -196,35 +196,42 @@ export default class LightConnector extends Connector {
}))
})
.flat()
const walletMinBlockNumber = await SyncProgressService.getWalletMinBlockNumber()
const walletMinBlockNumber = await SyncProgressService.getWalletMinLocalSavedBlockNumber()
const wallets = await WalletService.getInstance().getAll()
const walletStartBlockMap = wallets.reduce<Record<string, string | undefined>>(
(pre, cur) => ({ ...pre, [cur.id]: cur.startBlockNumber }),
{}
)
const otherTypeSyncBlockNumber = await SyncProgressService.getOtherTypeSyncBlockNumber()
const setScriptsParams = [
...allScripts.map(v => {
const blockNumber = Math.max(
parseInt(walletStartBlockMap[v.walletId] ?? '0x0'),
walletMinBlockNumber?.[v.walletId] ?? 0,
parseInt(existSyncscripts[scriptToHash(v.script)]?.blockNumber ?? '0x0')
)
return {
const addScripts = [
...allScripts
.filter(v => !existSyncscripts[scriptToHash(v.script)])
.map(v => {
const blockNumber = Math.max(
homura marked this conversation as resolved.
Show resolved Hide resolved
parseInt(walletStartBlockMap[v.walletId] ?? '0x0'),
walletMinBlockNumber?.[v.walletId] ?? 0
)
return {
...v,
blockNumber: `0x${blockNumber.toString(16)}`,
}
}),
...appendScripts
.filter(v => !existSyncscripts[scriptToHash(v.script)])
.map(v => ({
...v,
blockNumber: `0x${blockNumber.toString(16)}`,
}
}),
...appendScripts.map(v => ({
...v,
blockNumber:
existSyncscripts[scriptToHash(v.script)]?.blockNumber ??
`0x${(otherTypeSyncBlockNumber[scriptToHash(v.script)] ?? 0).toString(16)}`,
})),
blockNumber: `0x${(otherTypeSyncBlockNumber[scriptToHash(v.script)] ?? 0).toString(16)}`,
})),
]
await this.lightRpc.setScripts(setScriptsParams)
await this.lightRpc.setScripts(addScripts, 'partial')
const allScriptHashes = new Set([
...allScripts.map(v => scriptToHash(v.script)),
...appendScripts.map(v => scriptToHash(v.script)),
])
const deleteScript = syncScripts.filter(v => !allScriptHashes.has(scriptToHash(v.script)))
await this.lightRpc.setScripts(deleteScript, 'delete')
const walletIds = [...new Set(this.addressMetas.map(v => v.walletId))]
await SyncProgressService.resetSyncProgress(setScriptsParams)
await SyncProgressService.resetSyncProgress(addScripts)
await SyncProgressService.updateSyncProgressFlag(walletIds)
await SyncProgressService.removeByHashesAndAddressType(
SyncAddressType.Multisig,
Expand Down Expand Up @@ -281,9 +288,25 @@ export default class LightConnector extends Connector {
this.initSyncProgress(scripts)
}

private async checkTxExist(txHashes: string[]) {
const transactions = await this.lightRpc
.createBatchRequest<'getTransaction', string[], TransactionWithStatus[]>(txHashes.map(v => ['getTransaction', v]))
.exec()
return transactions.every(v => !!v.transaction)
}

async processTxsInNextBlockNumber() {
const [nextBlockNumber, txHashesInNextBlock] = await this.getTxHashesWithNextUnprocessedBlockNumber()
if (nextBlockNumber !== undefined && txHashesInNextBlock.length) {
const minSyncBlockNumber = await SyncProgressService.getCurrentWalletMinSyncedBlockNumber()
if (
nextBlockNumber !== undefined &&
txHashesInNextBlock.length &&
// For light client, if tx hash has been called with fetch_transaction, the tx can not return by get_transactions
// So before derived address synced to bigger than next synced block number, do not sync the next block number
minSyncBlockNumber >= parseInt(nextBlockNumber) &&
// check whether the tx is sync from light client, after split the light client and full DB file, this check will remove
(await this.checkTxExist(txHashesInNextBlock))
) {
this.processingBlockNumber = nextBlockNumber
await this.fetchPreviousOutputs(txHashesInNextBlock)
this.transactionsSubject.next({ txHashes: txHashesInNextBlock, params: this.processingBlockNumber })
Expand All @@ -296,7 +319,7 @@ export default class LightConnector extends Connector {
} else {
return
}
const minCachedBlockNumber = await SyncProgressService.getCurrentWalletMinBlockNumber()
const minCachedBlockNumber = await SyncProgressService.getCurrentWalletMinSyncedBlockNumber()
await this.updateBlockStartNumber(Math.min(parseInt(blockNumber), minCachedBlockNumber))
this.processNextBlockNumber()
}
Expand Down
14 changes: 7 additions & 7 deletions packages/neuron-wallet/src/block-sync-renderer/sync/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import AddressParser from '../../models/address-parser'
import Multisig from '../../models/multisig'
import BlockHeader from '../../models/chain/block-header'
import TxAddressFinder from './tx-address-finder'
import IndexerConnector from './indexer-connector'
import FullSynchronizer from './full-synchronizer'
import IndexerCacheService from './indexer-cache-service'
import logger from '../../utils/logger'
import CommonUtils from '../../utils/common'
import { ShouldInChildProcess } from '../../exceptions'
import { AppendScript, BlockTips, Connector } from './connector'
import LightConnector from './light-connector'
import { AppendScript, BlockTips, Synchronizer } from './synchronizer'
import LightSynchronizer from './light-synchronizer'
import { generateRPC } from '../../utils/ckb-rpc'
import { BUNDLED_LIGHT_CKB_URL } from '../../utils/const'
import { NetworkType } from '../../models/network'
Expand All @@ -30,7 +30,7 @@ export default class Queue {
#indexerUrl: string
#addresses: AddressInterface[]
#rpcService: RpcService
#indexerConnector: Connector | undefined
#indexerConnector: Synchronizer | undefined
#checkAndSaveQueue: QueueObject<{ txHashes: CKBComponents.Hash[]; params: unknown }> | undefined
#lockArgsSet: Set<string> = new Set()

Expand Down Expand Up @@ -67,9 +67,9 @@ export default class Queue {
logger.info('Queue:\tstart')
try {
if (this.#url === BUNDLED_LIGHT_CKB_URL) {
this.#indexerConnector = new LightConnector(this.#addresses, this.#url)
this.#indexerConnector = new LightSynchronizer(this.#addresses, this.#url)
} else {
this.#indexerConnector = new IndexerConnector(this.#addresses, this.#url, this.#indexerUrl, this.#nodeType)
this.#indexerConnector = new FullSynchronizer(this.#addresses, this.#url, this.#indexerUrl, this.#nodeType)
}
await this.#indexerConnector!.connect()
} catch (error) {
Expand Down Expand Up @@ -110,7 +110,7 @@ export default class Queue {
})
}

getIndexerConnector = (): Connector => this.#indexerConnector!
getIndexerConnector = (): Synchronizer => this.#indexerConnector!

stop = () => this.#indexerConnector!.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface AppendScript {
scriptType: CKBRPC.ScriptType
}

export abstract class Connector {
export abstract class Synchronizer {
public readonly blockTipsSubject: Subject<BlockTips> = new Subject<BlockTips>()
public readonly transactionsSubject = new Subject<{ txHashes: CKBComponents.Hash[]; params: string }>()
protected indexer: CkbIndexer
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/block-sync-renderer/task.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { LumosCellQuery } from './sync/connector'
import type { LumosCellQuery } from './sync/synchronizer'
import initConnection from '../database/chain/ormconfig'
import { register as registerTxStatusListener } from './tx-status-listener'
import SyncQueue from './sync/queue'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ export default class SyncProgress {
walletId!: string

@Column()
blockStartNumber: number = 0
lightStartBlockNumber: number = 0

@Column()
blockEndNumber: number = 0
localSavedBlockNumber: number = 0

@Column()
syncedBlockNumber: number = 0

@Column({ type: 'varchar' })
cursor?: HexString
Expand All @@ -58,8 +61,9 @@ export default class SyncProgress {
res.scriptType = obj.scriptType
res.delete = false
res.addressType = obj.addressType ?? SyncAddressType.Default
res.blockStartNumber = parseInt(obj.blockNumber)
res.blockEndNumber = parseInt(obj.blockNumber)
res.lightStartBlockNumber = parseInt(obj.blockNumber)
res.localSavedBlockNumber = parseInt(obj.blockNumber)
res.syncedBlockNumber = parseInt(obj.blockNumber)
return res
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {MigrationInterface, QueryRunner} from "typeorm";
Keith-CY marked this conversation as resolved.
Show resolved Hide resolved
import SyncProgress from "../entities/sync-progress";

const chunk = 100
export class ResetSyncProgressPrimaryKey1690361215400 implements MigrationInterface {
name = 'ResetSyncProgressPrimaryKey1690361215400'

public async up(queryRunner: QueryRunner): Promise<void> {
const syncProgresses = await queryRunner.manager.find(SyncProgress)
const syncProgresses = await queryRunner.manager.query('select * from sync_progress')
await queryRunner.query(`DROP TABLE "sync_progress"`)
await queryRunner.query(`CREATE TABLE "sync_progress" ("hash" varchar NOT NULL, "args" varchar NOT NULL, "codeHash" varchar NOT NULL, "hashType" varchar NOT NULL, "scriptType" varchar NOT NULL, "walletId" varchar NOT NULL, "blockStartNumber" integer NOT NULL, "blockEndNumber" integer, "cursor" varchar, "delete" boolean, "addressType" integer, PRIMARY KEY ("hash", "walletId"))`)
for (let index = 0; index < syncProgresses.length; index += 500) {
await queryRunner.manager.save(syncProgresses.slice(index, index + 500))
for (let index = 0; index < syncProgresses.length; index += chunk) {
homura marked this conversation as resolved.
Show resolved Hide resolved
await queryRunner.manager.query(`INSERT INTO sync_progress VALUES ${syncProgresses.slice(index, index + chunk).reduce((pre: string, cur: any) => `${pre ? `${pre},` : ''}("${cur.hash}","${cur.args}","${cur.codeHash}","${cur.hashType}","${cur.scriptType}","${cur.walletId}",${cur.blockStartNumber},${cur.blockEndNumber},${cur.cursor ? `"${cur.cursor}"` : 'NULL'},${cur.delete},${cur.addressType})`, '')};`)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {MigrationInterface, QueryRunner, TableColumn} from "typeorm";

export class RenameSyncProgress1702781527414 implements MigrationInterface {
name = 'RenameSyncProgress1702781527414'

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.renameColumn('sync_progress', 'blockStartNumber', 'localSavedBlockNumber')
await queryRunner.renameColumn('sync_progress', 'blockEndNumber', 'syncedBlockNumber')
await queryRunner.addColumn('sync_progress', new TableColumn({
name: 'lightStartBlockNumber',
type: 'integer',
default: 0
}))
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.renameColumn('sync_progress', 'localSavedBlockNumber', 'blockStartNumber')
await queryRunner.renameColumn('sync_progress', 'syncedBlockNumber', 'blockEndNumber')
}

}
2 changes: 2 additions & 0 deletions packages/neuron-wallet/src/database/chain/ormconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import { ResetSyncProgressPrimaryKey1690361215400 } from './migrations/169036121
import { TxLockAddArgs1694746034975 } from './migrations/1694746034975-TxLockAddArgs'
import { IndexerTxHashCacheRemoveField1701234043431 } from './migrations/1701234043431-IndexerTxHashCacheRemoveField'
import { CreateCellLocalInfo1701234043432 } from './migrations/1701234043432-CreateCellLocalInfo'
import { RenameSyncProgress1702781527414 } from './migrations/1702781527414-RenameSyncProgress'

export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError'

Expand Down Expand Up @@ -132,6 +133,7 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
TxLockAddArgs1694746034975,
IndexerTxHashCacheRemoveField1701234043431,
CreateCellLocalInfo1701234043432,
RenameSyncProgress1702781527414,
],
logger: 'simple-console',
logging,
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/models/chain/live-cell.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Script, { ScriptHashType } from './script'
import OutPoint from './out-point'
import { LumosCell } from '../../block-sync-renderer/sync/connector'
import { LumosCell } from '../../block-sync-renderer/sync/synchronizer'

const LUMOS_HASH_TYPE_MAP: Record<string, ScriptHashType> = {
type: ScriptHashType.Type,
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/services/live-cell-service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Script from '../models/chain/script'
import LiveCell from '../models/chain/live-cell'
import { queryIndexer } from '../block-sync-renderer/index'
import { LumosCell, LumosCellQuery } from '../block-sync-renderer/sync/connector'
import { LumosCell, LumosCellQuery } from '../block-sync-renderer/sync/synchronizer'

export default class LiveCellService {
private static instance: LiveCellService
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/services/multisig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export default class MultisigService {
.where({ hash: In(multisigScriptHashList) })
.getMany()
const syncBlockNumbersMap: Record<string, number> = syncBlockNumbers.reduce(
(pre, cur) => ({ ...pre, [cur.hash]: cur.blockStartNumber }),
(pre, cur) => ({ ...pre, [cur.hash]: cur.localSavedBlockNumber }),
{}
)
await getConnection()
Expand Down
Loading