-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding Event and Block subscription capabilities and tests
- Loading branch information
1 parent
3239a0e
commit cae79c3
Showing
36 changed files
with
861 additions
and
948 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1 @@ | ||
export { EventWatcher, EventWatcherOptions } from './src/event-watcher' | ||
export { DefaultEventLog, EventFilter } from './src/models' | ||
export { | ||
EventDB, | ||
EthProvider, | ||
FullEventFilter, | ||
EventFilterOptions, | ||
EventLog, | ||
} from './src/interfaces' | ||
export * from './src' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
/* External Imports */ | ||
import { DB, getLogger, logError } from '@pigi/core' | ||
import { Block, Provider } from 'ethers/providers' | ||
|
||
/* Internal Imports */ | ||
import { EthereumListener } from './interfaces/listener' | ||
|
||
const log = getLogger('ethereum-block-processor') | ||
const blockKey: Buffer = Buffer.from('latestBlock') | ||
|
||
/** | ||
* Ethereum Block Processor | ||
* Single place through which all block subscriptions are handled. | ||
*/ | ||
export class EthereumBlockProcessor { | ||
private readonly subscriptions: Set<EthereumListener<Block>> | ||
private currentBlockNumber: number | ||
|
||
constructor( | ||
private readonly db: DB, | ||
private readonly earliestBlock: number = 0 | ||
) { | ||
this.subscriptions = new Set<EthereumListener<Block>>() | ||
this.currentBlockNumber = 0 | ||
} | ||
|
||
/** | ||
* Subscribes to new blocks. | ||
* This will also fetch and send the provided event handler all historical blocks not in | ||
* the database unless backfill is set to false. | ||
* | ||
* @param provider The provider with the connection to the blockchain | ||
* @param handler The event handler subscribing | ||
* @param backfill Whether or not to fetch previous events | ||
*/ | ||
public async subscribe( | ||
provider: Provider, | ||
handler: EthereumListener<Block>, | ||
backfill: boolean = true | ||
): Promise<void> { | ||
this.subscriptions.add(handler) | ||
|
||
provider.on('block', async (blockNumber) => { | ||
log.debug(`Block [${blockNumber}] was mined!`) | ||
await this.fetchAndDisseminateBlock(provider, blockNumber) | ||
this.currentBlockNumber = blockNumber | ||
try { | ||
await this.db.put( | ||
blockKey, | ||
Buffer.from(this.currentBlockNumber.toString()) | ||
) | ||
} catch (e) { | ||
logError( | ||
log, | ||
`Error storing most recent block received [${blockNumber}]!`, | ||
e | ||
) | ||
} | ||
}) | ||
|
||
if (backfill) { | ||
await this.backfillBlocks(provider) | ||
} | ||
} | ||
|
||
/** | ||
* Fetches and broadcasts the Block for the provided block number. | ||
* | ||
* @param provider The provider with the connection to the blockchain | ||
* @param blockNumber The block number | ||
*/ | ||
private async fetchAndDisseminateBlock( | ||
provider: Provider, | ||
blockNumber: number | ||
): Promise<void> { | ||
log.debug(`Fetching block [${blockNumber}].`) | ||
const block: Block = await provider.getBlock(blockNumber, true) | ||
log.debug(`Received block: [${JSON.stringify(block)}].`) | ||
|
||
this.subscriptions.forEach((h) => { | ||
try { | ||
// purposefully ignore promise | ||
h.handle(block) | ||
} catch (e) { | ||
// should be logged in handler | ||
} | ||
}) | ||
} | ||
|
||
/** | ||
* Fetches historical blocks. | ||
* | ||
* @param provider The provider with the connection to the blockchain. | ||
*/ | ||
private async backfillBlocks(provider: Provider): Promise<void> { | ||
log.debug(`Backfilling blocks`) | ||
const blockNumber = await this.getBlockNumber(provider) | ||
|
||
const lastSyncedBlockBuffer: Buffer = await this.db.get(blockKey) | ||
const lastSyncedNumber: number = !!lastSyncedBlockBuffer | ||
? parseInt(lastSyncedBlockBuffer.toString(), 10) | ||
: this.earliestBlock - 1 | ||
|
||
if (blockNumber === lastSyncedNumber) { | ||
log.debug(`Up to date, not backfilling.`) | ||
return | ||
} | ||
|
||
for (let i = lastSyncedNumber + 1; i <= blockNumber; i++) { | ||
await this.fetchAndDisseminateBlock(provider, i) | ||
} | ||
|
||
log.debug( | ||
`backfilled from block [${lastSyncedNumber + 1}] to [${blockNumber}]!` | ||
) | ||
} | ||
|
||
/** | ||
* Fetches the current block number from the given provider. | ||
* | ||
* @param provider The provider connected to a node | ||
* @returns The current block number | ||
*/ | ||
private async getBlockNumber(provider: Provider): Promise<number> { | ||
if (this.currentBlockNumber === 0) { | ||
this.currentBlockNumber = await provider.getBlockNumber() | ||
} | ||
|
||
log.debug(`Current block number: ${this.currentBlockNumber}`) | ||
return this.currentBlockNumber | ||
} | ||
} |
Oops, something went wrong.