Skip to content

Commit 0703a4b

Browse files
authored
Feature/slowdownload (#683)
* tweak debugSubstreamsRunDelay to max 500ms * when download or execution is too slow for live, gracefully downgrade to a merged-block-based stream * fix test after zstd upgrade makes it more efficient * add to logs the 'client_read_average_time_last_5_minutes' to see if they are the cause of the wait time * tweak changelog
1 parent 73f4424 commit 0703a4b

File tree

10 files changed

+76
-17
lines changed

10 files changed

+76
-17
lines changed

docs/release-notes/change-log.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313

1414
### Server
1515

16-
* Add a maximum execution time for a full tier2 segment. By default, this is 60 minutes. It will fail with `rpc error: code = DeadlineExceeded desc = request active for too long`.
16+
* **BREAKING** Add a maximum execution time for a full tier2 segment. By default, this is 60 minutes. It will fail with `rpc error: code = DeadlineExceeded desc = request active for too long`.
1717
It can be configured from the `SegmentExecutionTimeout` configuration option on Tier2Config or disabled by setting it to 0.
1818
* Improve log message for 'request active for a long time', adding stats.
19+
* Fix `subscription channel at max capacity` error: when the LIVE channel is full (ex: slow module execution or slow client reader), the request will be continued from merged files instead of failing, and gracefully recover if performance is restored.
1920
* Fixed a small context memory leak when using wasmtime (especially with grpc-based metering plugin)
2021

2122
### CLI

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/jhump/protoreflect v1.14.0
88
github.com/spf13/cobra v1.7.0
99
github.com/spf13/pflag v1.0.5
10-
github.com/streamingfast/bstream v0.0.2-0.20250221181559-fb0809660f91
10+
github.com/streamingfast/bstream v0.0.2-0.20250903174843-9c884c3356fd
1111
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375
1212
github.com/streamingfast/dauth v0.0.0-20250821195214-8e2a3c300f97
1313
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
@@ -106,6 +106,7 @@ require (
106106
github.com/hashicorp/errwrap v1.1.0 // indirect
107107
github.com/manifoldco/promptui v0.9.0 // indirect
108108
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
109+
github.com/mostynb/go-grpc-compression v1.2.3 // indirect
109110
github.com/mschoch/smat v0.2.0 // indirect
110111
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
111112
github.com/pinax-network/graph-networks-libs/packages/golang v0.7.0 // indirect
@@ -163,7 +164,7 @@ require (
163164
github.com/inconshreveable/mousetrap v1.1.0 // indirect
164165
github.com/itchyny/timefmt-go v0.1.5 // indirect
165166
github.com/jmespath/go-jmespath v0.4.0 // indirect
166-
github.com/klauspost/compress v1.16.7
167+
github.com/klauspost/compress v1.17.8
167168
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
168169
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
169170
github.com/magiconair/properties v1.8.7 // indirect

go.sum

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,8 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
360360
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a/go.mod h1:UJSiEoRfvx3hP73CvoARgeLjaIOjybY9vj8PUPPFGeU=
361361
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
362362
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
363-
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
364-
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
363+
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
364+
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
365365
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
366366
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
367367
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@@ -411,6 +411,8 @@ github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4
411411
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
412412
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
413413
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
414+
github.com/mostynb/go-grpc-compression v1.2.3 h1:42/BKWMy0KEJGSdWvzqIyOZ95YcR9mLPqKctH7Uo//I=
415+
github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1Vjs47Km/Y2FE6ouQ7Lg=
414416
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
415417
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
416418
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
@@ -460,8 +462,8 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ
460462
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
461463
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
462464
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
463-
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
464-
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
465+
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
466+
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
465467
github.com/rs/cors v1.10.0 h1:62NOS1h+r8p1mW6FM0FSB0exioXLhd/sh15KpjWBZ+8=
466468
github.com/rs/cors v1.10.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
467469
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -490,8 +492,8 @@ github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
490492
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
491493
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
492494
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
493-
github.com/streamingfast/bstream v0.0.2-0.20250221181559-fb0809660f91 h1:n4Ws5vS8dF4J6WYP7mE0C9DsE1Inju7BKd6XWI4T18c=
494-
github.com/streamingfast/bstream v0.0.2-0.20250221181559-fb0809660f91/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
495+
github.com/streamingfast/bstream v0.0.2-0.20250903174843-9c884c3356fd h1:CXp4kPAfdGyMJlitOipGLp0piCV+IRw9sIwzE4p0aU8=
496+
github.com/streamingfast/bstream v0.0.2-0.20250903174843-9c884c3356fd/go.mod h1:YXtnOZbqcqU4fzGqdi8B7rMcp2iQePQGf4oTIPBnzgc=
495497
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY=
496498
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng=
497499
github.com/streamingfast/dauth v0.0.0-20250821195214-8e2a3c300f97 h1:Lcack+Yv9KWBeKfRrjQ+KUegwHbh6/aFLJuDyYzcXZ4=

metering/metering_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestWithBytesReadMeteringOptionsZstd(t *testing.T) {
7070
}
7171

7272
uncompressedSize := 1024
73-
compressedSize := 24
73+
compressedSize := 16
7474

7575
err = store.WriteObject(nil, "test", bytes.NewReader(bytes.Repeat([]byte("1"), uncompressedSize)))
7676
if err != nil {

metrics/stats.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ type Stats struct {
5757
// counter is used to get the next jobIdx
5858
counter uint64
5959

60-
error error
61-
logger *zap.Logger
60+
clientReadTime *dmetrics.AvgDurationCounter
61+
error error
62+
logger *zap.Logger
6263
}
6364

6465
type runningJobs map[uint64]*extendedJob
@@ -157,6 +158,7 @@ func NewReqStats(config *Config, logger *zap.Logger) *Stats {
157158
return &Stats{
158159
config: config,
159160
blockRate: dmetrics.MustNewAvgRateCounter(1*time.Second, 30*time.Second, "blocks"),
161+
clientReadTime: dmetrics.NewAvgDurationCounter(5*time.Minute, time.Second, "client_read_time"),
160162
startTime: time.Now(),
161163
logger: logger,
162164
modulesStats: make(map[string]*extendedStats),
@@ -462,6 +464,12 @@ func (s *Stats) RecordModuleWasmStoreDeletePrefix(moduleName string, sizeBytes u
462464
mod.storeOperationTime += elapsed
463465
}
464466

467+
func (s *Stats) RecordReadTime(since time.Time) {
468+
s.Lock()
469+
defer s.Unlock()
470+
s.clientReadTime.AddElapsedTime(since)
471+
}
472+
465473
func (s *Stats) RecordBlock(ref bstream.BlockRef) {
466474
s.Lock()
467475
defer s.Unlock()
@@ -726,6 +734,7 @@ func (s *Stats) getZapFields(meter dmetering.Meter) []zap.Field {
726734
zap.Uint64("remote_blocks_processed", s.remoteProcessedBlockCount), // "estimated" from remote ranges
727735
zap.Uint64("total_blocks_processed", s.processedBlocks), // includes remote and local blocks processed in this request, multiplied by execution stages, excludes blocks that were skipped from indexes
728736
zap.Uint64("uncompressed_egress_bytes", s.uncompressedEgressBytes),
737+
zap.Duration("client_read_average_time_last_5_minutes", s.clientReadTime.Average()),
729738
zap.String("error", errorText),
730739
}
731740

pipeline/pipeline.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ type Pipeline struct {
9090
// (for chains with potential block skips)
9191
lastFinalClock *pbsubstreams.Clock
9292
lastProcessedBlockRef bstream.BlockRef
93+
lastCursor *bstream.Cursor
9394
sentBlocks uint64
9495

9596
blockStepMap map[bstream.StepType]uint64
@@ -293,6 +294,10 @@ func (p *Pipeline) GetStoreMap() store.Map {
293294
return p.stores.StoreMap
294295
}
295296

297+
func (p *Pipeline) LastCursor() *bstream.Cursor {
298+
return p.lastCursor
299+
}
300+
296301
func (p *Pipeline) setupProcessingModule(reqDetails *reqctx.RequestDetails) {
297302
for _, module := range reqDetails.Modules.Modules {
298303
if reqDetails.IsOutputModule(module.Name) {

pipeline/process_block.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ func (p *Pipeline) handleStepUndo(clock *pbsubstreams.Clock, cursor *bstream.Cur
197197
targetClock := blockRefToPB(reorgJunctionBlock)
198198

199199
p.lastProcessedBlockRef = reorgJunctionBlock
200+
p.lastCursor = targetCursor
200201
return p.respFunc(
201202
&pbsubstreamsrpc.Response{
202203
Message: &pbsubstreamsrpc.Response_BlockUndoSignal{
@@ -308,6 +309,7 @@ func (p *Pipeline) handleStepNew(ctx context.Context, clock *pbsubstreams.Clock,
308309
p.stores.resetStores()
309310
logger.Debug("block processed", zap.Uint64("block_num", clock.Number))
310311
p.lastProcessedBlockRef = bstream.NewBlockRef(clock.Id, clock.Number)
312+
p.lastCursor = cursor
311313
return nil
312314
}
313315

service/live_back_filler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package service
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78

@@ -71,6 +72,9 @@ func RequestBackProcessing(ctx context.Context, logger *zap.Logger, startBlock u
7172
err := requestBackProcessing(ctx, logger, liveBackFillerRequest, clientFactory)
7273
if err != nil {
7374
logger.Debug("retryable error while live backprocessing", zap.Error(err))
75+
if errors.Is(err, context.Canceled) {
76+
return derr.NewFatalError(err)
77+
}
7478
return err
7579
}
7680

service/tier1.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
770770
zap.String("cursor", cursor),
771771
)
772772

773-
var streamHandler bstream.Handler
773+
var wrappedPipe bstream.Handler
774774
if requestDetails.ProductionMode {
775775
liveBackFiller := NewLiveBackFiller(ctx, pipe, logger, execGraph.OutputModuleStageIndex(), segmentSize, requestDetails.LinearHandoffBlockNum, s.runtimeConfig.ClientFactory, RequestBackProcessing)
776776

@@ -781,14 +781,14 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
781781
}
782782

783783
go liveBackFiller.Start(ctx)
784-
streamHandler = liveBackFiller
784+
wrappedPipe = liveBackFiller
785785
} else {
786-
streamHandler = pipe
786+
wrappedPipe = pipe
787787
}
788788

789789
blockStream, err := s.streamFactoryFunc(
790790
ctx,
791-
streamHandler,
791+
wrappedPipe,
792792
int64(requestDetails.LinearHandoffBlockNum),
793793
request.StopBlockNum,
794794
cursor,
@@ -803,7 +803,37 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
803803
}
804804

805805
ctx, span := reqctx.WithSpan(ctx, "substreams/tier1/pipeline/blocks_stream")
806-
streamErr = blockStream.Run(ctx)
806+
for {
807+
streamErr = blockStream.Run(ctx)
808+
if errors.Is(streamErr, hub.ErrSubscriptionChannelFull) {
809+
cur := pipe.LastCursor()
810+
if cur == nil {
811+
logger.Warn("subscription channel at max capacity, but no cursor was found to reconnect")
812+
break
813+
}
814+
815+
logger.Warn("subscription channel at max capacity, creating new stream", zap.String("last_block", cur.Block.String()))
816+
blockStream, err = s.streamFactoryFunc(
817+
ctx,
818+
wrappedPipe,
819+
int64(requestDetails.LinearHandoffBlockNum),
820+
request.StopBlockNum,
821+
cur.ToOpaque(),
822+
request.FinalBlocksOnly,
823+
false,
824+
logger.Named("stream"),
825+
bsstream.WithLiveSourceHandlerMiddleware(metering.LiveSourceMiddlewareHandlerFactory(ctx)),
826+
bsstream.WithFileSourceHandlerMiddleware(metering.FileSourceMiddlewareHandlerFactory(ctx)),
827+
)
828+
if err != nil {
829+
streamErr = fmt.Errorf("error getting stream: %w", err)
830+
break
831+
}
832+
continue
833+
}
834+
break
835+
}
836+
807837
span.EndWithErr(&streamErr)
808838

809839
return pipe.OnStreamTerminated(ctx, streamErr)
@@ -868,10 +898,12 @@ func tier1ResponseHandler(ctx context.Context, mut *sync.Mutex, logger *zap.Logg
868898
}
869899
}
870900

901+
begin := time.Now()
871902
if err := streamSrv.Send(resp); err != nil {
872903
logger.Info("unable to send block probably due to client disconnecting", zap.String("user_id", userID), zap.String("api_key_id", apiKeyID), zap.Error(err))
873904
return connect.NewError(connect.CodeUnavailable, err)
874905
}
906+
stats.RecordReadTime(begin)
875907

876908
if isData {
877909
stats.RecordDataSent()

tui/tui.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ func (ui *TUI) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.
155155
if debugSubstreamsRun {
156156
time.Sleep(debugSubstreamsRunDelay)
157157
debugSubstreamsRunDelay += time.Millisecond * 100
158+
if debugSubstreamsRunDelay > 500*time.Millisecond {
159+
debugSubstreamsRunDelay = 500 * time.Millisecond
160+
}
158161
}
159162
_ = isLive
160163
if ui.testRunner != nil {

0 commit comments

Comments
 (0)