@@ -18,6 +18,11 @@ type WatchLogsResult = {
1818 logs$ : Observable < StorageAdapterBlock > ;
1919} ;
2020
21+ type WatchLogsEvent = {
22+ blockNumber : string ;
23+ logs : RpcLog [ ] ;
24+ } ;
25+
2126export function watchLogs ( { url, address, fromBlock } : WatchLogsInput ) : WatchLogsResult {
2227 const topics = [
2328 storeEventsAbi . flatMap ( ( event ) => encodeEventTopics ( { abi : [ event ] , eventName : event . name } ) ) ,
@@ -94,24 +99,24 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
9499 client . socket . addEventListener ( "message" , ( message ) => {
95100 const response = JSON . parse ( message . data ) ;
96101 if ( "error" in response ) {
97- debug ( "was error, returning error to subscriber" ) ;
98- // Return JSON-RPC errors to the subscriber
102+ debug ( "JSON-RPC error, returning error to subscriber" ) ;
99103 subscriber . error ( response . error ) ;
100104 return ;
101105 }
102106
103107 // Parse the logs from wiresaw_watchLogs
104108 if ( "params" in response && response . params . subscription === subscriptionId ) {
105109 debug ( "parsing logs" ) ;
106- const logs : RpcLog [ ] = response . params . result ;
107- const formattedLogs = logs . map ( ( log ) => formatLog ( log ) ) ;
110+ const result : WatchLogsEvent = response . params . result ;
111+ const formattedLogs = result . logs . map ( ( log ) => formatLog ( log ) ) ;
108112 const parsedLogs = parseEventLogs ( { abi : storeEventsAbi , logs : formattedLogs } ) ;
109- debug ( "got logs" , parsedLogs ) ;
113+ const blockNumber = BigInt ( result . blockNumber ) ;
114+ debug ( "got logs" , parsedLogs , "for pending block" , blockNumber ) ;
110115 if ( caughtUp ) {
111116 debug ( "handing off logs to subscriber" ) ;
112- const blockNumber = parsedLogs [ 0 ] . blockNumber ;
113117 subscriber . next ( { blockNumber, logs : parsedLogs } ) ;
114- resumeBlock = blockNumber + 1n ;
118+ // Since this the event's block number corresponds to a pending block, we have to refetch this block in case of a restart
119+ resumeBlock = blockNumber ;
115120 } else {
116121 debug ( "buffering logs" ) ;
117122 logBuffer . push ( ...parsedLogs ) ;
@@ -125,11 +130,12 @@ export function watchLogs({ url, address, fromBlock }: WatchLogsInput): WatchLog
125130 debug ( "fetching initial logs" ) ;
126131 const initialLogs = await fetchInitialLogs ( { client, address, fromBlock : resumeBlock , topics } ) ;
127132 debug ( "got initial logs" , initialLogs ) ;
128- const logs = [ ...initialLogs , ...logBuffer ] . sort ( logSort ) ;
133+ const logs = [ ...initialLogs . logs , ...logBuffer ] . sort ( logSort ) ;
129134 debug ( "combining with log buffer" , logs ) ;
130- const blockNumber = logs . at ( - 1 ) ?. blockNumber ?? resumeBlock ;
135+ const blockNumber = logs . at ( - 1 ) ?. blockNumber ?? initialLogs . blockNumber ;
131136 subscriber . next ( { blockNumber, logs } ) ;
132- resumeBlock = blockNumber + 1n ;
137+ // Since this the block number can correspond to a pending block, we have to refetch this block in case of a restart
138+ resumeBlock = blockNumber ;
133139 caughtUp = true ;
134140 } catch ( error ) {
135141 debug ( "could not get initial logs" , error ) ;
@@ -159,7 +165,7 @@ async function fetchInitialLogs({
159165 address,
160166 topics,
161167 fromBlock,
162- } : FetchInitialLogsInput ) : Promise < StoreEventsLog [ ] > {
168+ } : FetchInitialLogsInput ) : Promise < { blockNumber : bigint ; logs : StoreEventsLog [ ] } > {
163169 // Fetch latest block number
164170 const latestBlockNumber : Hex = (
165171 await client . requestAsync ( {
@@ -169,25 +175,17 @@ async function fetchInitialLogs({
169175 } )
170176 ) . result ;
171177
172- const [ catchUpLogs , pendingLogs ] = await Promise . all ( [
173- // Request all logs from `fromBlock` to the latest block number
174- client . requestAsync ( {
178+ // Request all logs from `fromBlock` to the latest block number
179+ const rawInitialLogs : RpcLog [ ] = await client
180+ . requestAsync ( {
175181 body : {
176182 method : "eth_getLogs" ,
177183 params : [ { address, topics, fromBlock : toHex ( fromBlock ) , toBlock : latestBlockNumber } ] ,
178184 } ,
179- } ) ,
180- // Request all logs from the current pending block
181- client . requestAsync ( {
182- body : {
183- method : "wiresaw_getLogs" ,
184- params : [ { address, topics, fromBlock : latestBlockNumber } ] ,
185- } ,
186- } ) ,
187- ] ) ;
185+ } )
186+ . then ( ( res ) => res . result ) ;
188187
189188 // Return all logs from `fromBlock` until the current pending block state as initial result
190- const rawLogs : RpcLog [ ] = [ ...catchUpLogs . result , ...pendingLogs . result ] ;
191- const formattedLogs = rawLogs . map ( ( log ) => formatLog ( log ) ) ;
192- return parseEventLogs ( { abi : storeEventsAbi , logs : formattedLogs } ) ;
189+ const formattedLogs = rawInitialLogs . map ( ( log ) => formatLog ( log ) ) ;
190+ return { blockNumber : BigInt ( latestBlockNumber ) , logs : parseEventLogs ( { abi : storeEventsAbi , logs : formattedLogs } ) } ;
193191}
0 commit comments