Skip to content

Commit

Permalink
feat: process fork in indexer mode
Browse files Browse the repository at this point in the history
  • Loading branch information
classicalliu committed Aug 14, 2019
1 parent a038e18 commit a7ad2d5
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ export default class Transaction extends BaseEntity {
})
updatedAt!: string

// only used for check fork in indexer mode
@Column({
type: 'boolean',
})
confirmed: boolean = false

@OneToMany(_type => InputEntity, input => input.transaction)
inputs!: InputEntity[]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
import {MigrationInterface, QueryRunner, TableColumn, getConnection} from "typeorm";
import {MigrationInterface, QueryRunner, TableColumn, getConnection, In} from "typeorm";
import TransactionEntity from '../entities/transaction'
import { OutputStatus } from '../../../services/tx/params'
import { TransactionStatus } from '../../../types/cell-types'
import OutputEntity from 'database/chain/entities/output'

export class AddStatusToTx1562038960990 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'status' varchar NOT NULL DEFAULT '';`)
// TransactionStatus.Success = 'success'
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'status' varchar NOT NULL DEFAULT 'success';`)

const pendingTxHashes: string[] = (await getConnection()
.getRepository(OutputEntity)
.createQueryBuilder('output')
.select(`output.outPointTxHash`, 'txHash')
.where({
status: OutputStatus.Sent
})
.getRawMany())
.filter(output => output.txHash)
await getConnection()
.createQueryBuilder()
.update(TransactionEntity)
.set({ status: TransactionStatus.Pending })
.where({
hash: In(pendingTxHashes)
})
.execute()

const txs = await getConnection()
.getRepository(TransactionEntity)
.find({ relations: ['inputs', 'outputs'] })
const updatedTxs = txs.map(tx => {
tx.status = tx.outputs[0].status === OutputStatus.Sent ? TransactionStatus.Pending : TransactionStatus.Success
return tx
})
await getConnection().manager.save(updatedTxs)
await queryRunner.changeColumn('transaction', 'status', new TableColumn({
name: 'status',
type: 'varchar',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {MigrationInterface, QueryRunner} from "typeorm";

export class AddConfirmed1565693320664 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'confirmed' boolean NOT NULL DEFAULT false;`)
}

public async down(queryRunner: QueryRunner): Promise<any> {
await queryRunner.dropColumn('transaction', 'confirmed')
}

}
3 changes: 2 additions & 1 deletion packages/neuron-wallet/src/database/chain/ormconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import SyncInfo from './entities/sync-info'

import { InitMigration1561695143591 } from './migrations/1561695143591-InitMigration'
import { AddStatusToTx1562038960990 } from './migrations/1562038960990-AddStatusToTx'
import { AddConfirmed1565693320664 } from './migrations/1565693320664-AddConfirmed'

export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError'

Expand All @@ -31,7 +32,7 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
type: 'sqlite',
database: dbPath(genesisBlockHash),
entities: [Transaction, Input, Output, SyncInfo],
migrations: [InitMigration1561695143591, AddStatusToTx1562038960990],
migrations: [InitMigration1561695143591, AddStatusToTx1562038960990, AddConfirmed1565693320664],
logging,
}
}
Expand Down
22 changes: 22 additions & 0 deletions packages/neuron-wallet/src/services/indexer/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import BlockNumber from 'services/sync/block-number'
import AddressesUsedSubject from 'models/subjects/addresses-used-subject'
import LockUtils from 'models/lock-utils'
import TransactionPersistor from 'services/tx/transaction-persistor'
import IndexerTransaction from 'services/tx/indexer-transaction'

import IndexerRPC from './indexer-rpc'

Expand Down Expand Up @@ -82,6 +83,27 @@ export default class Queue {
}
}

public processFork = async () => {
while (!this.stopped) {
try {
const tip = this.tipBlockNumber
const txs = await IndexerTransaction.txHashes()
for (const tx of txs) {
const result = await this.getBlocksService.getTransaction(tx.hash)
if (!result) {
await IndexerTransaction.deleteTxWhenFork(tx.hash)
} else if (tip - BigInt(tx.blockNumber) >= 1000) {
await IndexerTransaction.confirm(tx.hash)
}
}
} catch (err) {
logger.error(`indexer delete forked tx:`, err)
} finally {
await this.yield(10000)
}
}
}

public getCurrentBlockNumber = async (lockHashes: string[]) => {
// get lock hash indexer status
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates()
Expand Down
52 changes: 52 additions & 0 deletions packages/neuron-wallet/src/services/tx/indexer-transaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { getConnection } from 'typeorm'
import TransactionEntity from 'database/chain/entities/transaction'
import Utils from 'services/sync/utils'
import InputEntity from 'database/chain/entities/input'
import OutputEntity from 'database/chain/entities/output'
import { OutputStatus } from './params'

export default class IndexerTransaction {
public static txHashes = async () => {
const txs = await getConnection()
.getRepository(TransactionEntity)
.createQueryBuilder('tx')
.where(`tx.confirmed = false`)
.getMany()

return txs
}

public static confirm = async (hash: string) => {
await getConnection().manager.update(TransactionEntity, hash, { confirmed: true })
}

public static deleteTxWhenFork = async (hash: string) => {
const tx = await getConnection()
.getRepository(TransactionEntity)
.findOne(hash, { relations: ['inputs', 'outputs'] })

if (!tx) {
return
}

// reset previous output to OutputStatus.Live
await getConnection().transaction(async transactionalEntityManager => {
await Utils.mapSeries(tx.inputs, async (input: InputEntity) => {
if (!input.lockHash) {
return
}

await transactionalEntityManager.update(
OutputEntity,
{
outPointTxHash: input.outPointTxHash,
outPointIndex: input.outPointIndex,
},
{ status: OutputStatus.Live }
)
})

await transactionalEntityManager.remove([tx, ...tx.inputs, ...tx.outputs])
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@ export const switchNetwork = async (nodeURL: string) => {
})

indexerQueue.start()
indexerQueue.processFork()
}

0 comments on commit a7ad2d5

Please sign in to comment.