From 46d8bc473e9c07cb827129c4776bade967b92b28 Mon Sep 17 00:00:00 2001 From: CMajeri Date: Wed, 28 Jul 2021 15:08:25 +0200 Subject: [PATCH 01/17] Enables fast tracing for CALL family --- core/vm/logger.go | 70 ++++++++++++++++++++++++++++++++++++++++-- internal/ethapi/api.go | 6 +++- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/core/vm/logger.go b/core/vm/logger.go index 900a5e58543aa..f9579d93b4290 100644 --- a/core/vm/logger.go +++ b/core/vm/logger.go @@ -110,12 +110,22 @@ type Tracer interface { CaptureEnd(output []byte, gasUsed uint64, t time.Duration, err error) } +type wrappedLog struct { + parent *wrappedLog + error error + log StructLog + children []*wrappedLog +} + // StructLogger is an EVM state logger and implements Tracer. // // StructLogger can capture state based on the given Log configuration and also keeps // a track record of modified storage which is used in reporting snapshots of the // contract their storage. type StructLogger struct { + current *wrappedLog + depth int + cfg LogConfig storage map[common.Address]Storage @@ -145,12 +155,45 @@ func (l *StructLogger) Reset() { // CaptureStart implements the Tracer interface to initialize the tracing operation. func (l *StructLogger) CaptureStart(env *EVM, from common.Address, to common.Address, create bool, input []byte, gas uint64, value *big.Int) { + l.depth = 0 + l.current = &wrappedLog{} +} + +func errToStr(err error) string { + if err == nil { + return "" + } + return err.Error() } // CaptureState logs a new structured log message and pushes it out to the environment // // CaptureState also tracks SLOAD/SSTORE ops to track storage change. func (l *StructLogger) CaptureState(env *EVM, pc uint64, op OpCode, gas, cost uint64, scope *ScopeContext, rData []byte, depth int, err error) { + for ; l.depth > depth-1; l.depth-- { + l.current = l.current.parent + } + if err != nil { + if l.current.error != nil { + l.current.error=fmt.Errorf("%s:%s", l.current.error.Error(), err.Error()) + } + l.current.error = err + } + switch op { + case CALL,DELEGATECALL,STATICCALL,CALLCODE: + l.depth = l.depth+1 + wl := &wrappedLog{ + parent: l.current, + error: l.current.error, + } + l.current.children = append(l.current.children, wl) + l.current = wl + case REVERT: + l.current.error = ErrExecutionReverted + return + default: + return + } memory := scope.Memory stack := scope.Stack contract := scope.Contract @@ -205,7 +248,7 @@ func (l *StructLogger) CaptureState(env *EVM, pc uint64, op OpCode, gas, cost ui } // create a new snapshot of the EVM. log := StructLog{pc, op, gas, cost, mem, memory.Len(), stck, rdata, storage, depth, env.StateDB.GetRefund(), err} - l.logs = append(l.logs, log) + l.current.log = log } // CaptureFault implements the Tracer interface to trace an execution fault @@ -215,6 +258,16 @@ func (l *StructLogger) CaptureFault(env *EVM, pc uint64, op OpCode, gas, cost ui // CaptureEnd is called after the call finishes to finalize the tracing. func (l *StructLogger) CaptureEnd(output []byte, gasUsed uint64, t time.Duration, err error) { + for ; l.depth > 1; l.depth-- { + l.current = l.current.parent + } + l.current.log = StructLog{ + Op: CALL, + GasCost: gasUsed, + ReturnData: output, + Depth: 0, + Err: err, + } l.output = output l.err = err if l.cfg.Debug { @@ -225,8 +278,21 @@ func (l *StructLogger) CaptureEnd(output []byte, gasUsed uint64, t time.Duration } } +// Depth first append for all children (stack max depth is 1024) +func (l *wrappedLog) getLogs() []StructLog { + var logs []StructLog + l.log.Err = l.error + logs = append(logs, l.log) + for _, child := range l.children { + logs = append(logs, child.getLogs()...) + } + return logs +} + // StructLogs returns the captured log entries. -func (l *StructLogger) StructLogs() []StructLog { return l.logs } +func (l *StructLogger) StructLogs() []StructLog { + return l.current.getLogs() +} // Error returns the VM error captured by the trace. func (l *StructLogger) Error() error { return l.err } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 1af98e1071dad..d1068afc4a4b7 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1142,13 +1142,17 @@ type StructLogRes struct { func FormatLogs(logs []vm.StructLog) []StructLogRes { formatted := make([]StructLogRes, len(logs)) for index, trace := range logs { + var errString string + if trace.Err != nil { + errString = trace.Err.Error() + } formatted[index] = StructLogRes{ Pc: trace.Pc, Op: trace.Op.String(), Gas: trace.Gas, GasCost: trace.GasCost, Depth: trace.Depth, - Error: trace.ErrorString(), + Error: errString, } if trace.Stack != nil { stack := make([]string, len(trace.Stack)) From 0b450bcb6d150c2508898c301f81cc9820897ee9 Mon Sep 17 00:00:00 2001 From: CMajeri Date: Wed, 28 Jul 2021 15:21:38 +0200 Subject: [PATCH 02/17] Removes useless code --- core/vm/logger.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/core/vm/logger.go b/core/vm/logger.go index f9579d93b4290..0ca35a6fa4771 100644 --- a/core/vm/logger.go +++ b/core/vm/logger.go @@ -159,13 +159,6 @@ func (l *StructLogger) CaptureStart(env *EVM, from common.Address, to common.Add l.current = &wrappedLog{} } -func errToStr(err error) string { - if err == nil { - return "" - } - return err.Error() -} - // CaptureState logs a new structured log message and pushes it out to the environment // // CaptureState also tracks SLOAD/SSTORE ops to track storage change. @@ -174,9 +167,6 @@ func (l *StructLogger) CaptureState(env *EVM, pc uint64, op OpCode, gas, cost ui l.current = l.current.parent } if err != nil { - if l.current.error != nil { - l.current.error=fmt.Errorf("%s:%s", l.current.error.Error(), err.Error()) - } l.current.error = err } switch op { From ff52ea4c2c606b492602dd06d79314f885e99d46 Mon Sep 17 00:00:00 2001 From: Bastien Giegel Date: Mon, 7 Feb 2022 11:13:14 +0100 Subject: [PATCH 03/17] =?UTF-8?q?apply=20log=20wrapper=20chervin=E2=80=99s?= =?UTF-8?q?=20patch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- eth/tracers/logger/logger.go | 60 ++++++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/eth/tracers/logger/logger.go b/eth/tracers/logger/logger.go index 8461935822d85..c54d294877c39 100644 --- a/eth/tracers/logger/logger.go +++ b/eth/tracers/logger/logger.go @@ -99,12 +99,22 @@ func (s *StructLog) ErrorString() string { return "" } +type wrappedLog struct { + parent *wrappedLog + error error + log StructLog + children []*wrappedLog +} + // StructLogger is an EVM state logger and implements EVMLogger. // // StructLogger can capture state based on the given Log configuration and also keeps // a track record of modified storage which is used in reporting snapshots of the // contract their storage. type StructLogger struct { + current *wrappedLog + depth int + cfg Config env *vm.EVM @@ -136,12 +146,36 @@ func (l *StructLogger) Reset() { // CaptureStart implements the EVMLogger interface to initialize the tracing operation. func (l *StructLogger) CaptureStart(env *vm.EVM, from common.Address, to common.Address, create bool, input []byte, gas uint64, value *big.Int) { l.env = env + l.depth = 0 + l.current = &wrappedLog{} } // CaptureState logs a new structured log message and pushes it out to the environment // // CaptureState also tracks SLOAD/SSTORE ops to track storage change. func (l *StructLogger) CaptureState(pc uint64, op vm.OpCode, gas, cost uint64, scope *vm.ScopeContext, rData []byte, depth int, err error) { + for ; l.depth > depth-1; l.depth-- { + l.current = l.current.parent + } + if err != nil { + l.current.error = err + } + switch op { + case vm.CALL, vm.DELEGATECALL, vm.STATICCALL, vm.CALLCODE: + l.depth = l.depth + 1 + wl := &wrappedLog{ + parent: l.current, + error: l.current.error, + } + l.current.children = append(l.current.children, wl) + l.current = wl + case vm.REVERT: + l.current.error = vm.ErrExecutionReverted + return + default: + return + } + memory := scope.Memory stack := scope.Stack contract := scope.Contract @@ -198,7 +232,7 @@ func (l *StructLogger) CaptureState(pc uint64, op vm.OpCode, gas, cost uint64, s } // create a new snapshot of the EVM. log := StructLog{pc, op, gas, cost, mem, memory.Len(), stck, rdata, storage, depth, l.env.StateDB.GetRefund(), err} - l.logs = append(l.logs, log) + l.current.log = log } // CaptureFault implements the EVMLogger interface to trace an execution fault @@ -208,6 +242,17 @@ func (l *StructLogger) CaptureFault(pc uint64, op vm.OpCode, gas, cost uint64, s // CaptureEnd is called after the call finishes to finalize the tracing. func (l *StructLogger) CaptureEnd(output []byte, gasUsed uint64, t time.Duration, err error) { + for ; l.depth > 1; l.depth-- { + l.current = l.current.parent + } + l.current.log = StructLog{ + Op: vm.CALL, + GasCost: gasUsed, + ReturnData: output, + Depth: 0, + Err: err, + } + l.output = output l.err = err if l.cfg.Debug { @@ -223,8 +268,19 @@ func (l *StructLogger) CaptureEnter(typ vm.OpCode, from common.Address, to commo func (l *StructLogger) CaptureExit(output []byte, gasUsed uint64, err error) {} +// Depth first append for all children (stack max depth is 1024) +func (l *wrappedLog) getLogs() []StructLog { + var logs []StructLog + l.log.Err = l.error + logs = append(logs, l.log) + for _, child := range l.children { + logs = append(logs, child.getLogs()...) + } + return logs +} + // StructLogs returns the captured log entries. -func (l *StructLogger) StructLogs() []StructLog { return l.logs } +func (l *StructLogger) StructLogs() []StructLog { return l.current.getLogs() } // Error returns the VM error captured by the trace. func (l *StructLogger) Error() error { return l.err } From 4c6d40391b86574a6dc38034deeb7860cfdfae6e Mon Sep 17 00:00:00 2001 From: CMajeri Date: Fri, 27 May 2022 17:22:29 +0200 Subject: [PATCH 04/17] Detect silent failures (no REVERT or error message) --- eth/tracers/logger/logger.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/eth/tracers/logger/logger.go b/eth/tracers/logger/logger.go index c54d294877c39..a4c6c21ee59b9 100644 --- a/eth/tracers/logger/logger.go +++ b/eth/tracers/logger/logger.go @@ -154,7 +154,14 @@ func (l *StructLogger) CaptureStart(env *vm.EVM, from common.Address, to common. // // CaptureState also tracks SLOAD/SSTORE ops to track storage change. func (l *StructLogger) CaptureState(pc uint64, op vm.OpCode, gas, cost uint64, scope *vm.ScopeContext, rData []byte, depth int, err error) { - for ; l.depth > depth-1; l.depth-- { + stack := scope.Stack + for i := l.depth - (depth - 1); l.depth > depth-1; l.depth, i = l.depth-1, i-1 { + if l.current.error == nil { + switch stack.Data()[len(stack.Data())-i].Bytes32()[31] { + case 0x00: + l.current.error = fmt.Errorf("call failed") + } + } l.current = l.current.parent } if err != nil { @@ -177,7 +184,6 @@ func (l *StructLogger) CaptureState(pc uint64, op vm.OpCode, gas, cost uint64, s } memory := scope.Memory - stack := scope.Stack contract := scope.Contract // check if already accumulated the specified number of logs if l.cfg.Limit != 0 && l.cfg.Limit <= len(l.logs) { From a29939938bbc506900665c40903baf8c34524cfd Mon Sep 17 00:00:00 2001 From: CMajeri Date: Tue, 21 Jun 2022 09:36:16 +0200 Subject: [PATCH 05/17] Simplify loop --- eth/tracers/logger/logger.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eth/tracers/logger/logger.go b/eth/tracers/logger/logger.go index 0f1f89320cb9f..78b279efd7253 100644 --- a/eth/tracers/logger/logger.go +++ b/eth/tracers/logger/logger.go @@ -174,7 +174,8 @@ func (l *StructLogger) CaptureState(pc uint64, op vm.OpCode, gas, cost uint64, s memory := scope.Memory stack := scope.Stack contract := scope.Contract - for i := l.depth - (depth - 1); l.depth > depth-1; l.depth, i = l.depth-1, i-1 { + for ; l.depth > depth-1; l.depth = l.depth - 1 { + i := l.depth - (depth - 1) if l.current.error == nil { switch stack.Data()[len(stack.Data())-i].Bytes32()[31] { case 0x00: From d0dc349fd36bd79f94516c866251783641ed12f1 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi <1591639+s1na@users.noreply.github.com> Date: Wed, 31 Aug 2022 16:14:53 +0200 Subject: [PATCH 06/17] graphql: return correct logs for tx (#25612) * graphql: fix tx logs * minor * Use optimized search for selecting tx logs --- graphql/graphql.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/graphql/graphql.go b/graphql/graphql.go index 97b460c205cee..66c25841db981 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "math/big" + "sort" "strconv" "github.com/ethereum/go-ethereum" @@ -478,13 +479,16 @@ func (t *Transaction) getLogs(ctx context.Context) (*[]*Log, error) { if err != nil { return nil, err } - ret := make([]*Log, 0, len(logs)) - for _, log := range logs { + var ret []*Log + // Select tx logs from all block logs + ix := sort.Search(len(logs), func(i int) bool { return uint64(logs[i].TxIndex) == t.index }) + for ix < len(logs) && uint64(logs[ix].TxIndex) == t.index { ret = append(ret, &Log{ r: t.r, transaction: t, - log: log, + log: logs[ix], }) + ix++ } return &ret, nil } From 3b41be695e6e12829abf6e5edec0606ee37f14d9 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi <1591639+s1na@users.noreply.github.com> Date: Tue, 13 Sep 2022 21:49:52 +0200 Subject: [PATCH 07/17] graphql: fixes missing tx logs (#25745) * graphql: fix tx logs * graphql: refactor test service setup * graphql: add test for tx logs --- graphql/graphql.go | 2 +- graphql/graphql_test.go | 227 +++++++++++++++++++++------------------- graphql/service.go | 9 +- 3 files changed, 124 insertions(+), 114 deletions(-) diff --git a/graphql/graphql.go b/graphql/graphql.go index 66c25841db981..356ff669fb16d 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -481,7 +481,7 @@ func (t *Transaction) getLogs(ctx context.Context) (*[]*Log, error) { } var ret []*Log // Select tx logs from all block logs - ix := sort.Search(len(logs), func(i int) bool { return uint64(logs[i].TxIndex) == t.index }) + ix := sort.Search(len(logs), func(i int) bool { return uint64(logs[i].TxIndex) >= t.index }) for ix < len(logs) && uint64(logs[ix].TxIndex) == t.index { ret = append(ret, &Log{ r: t.r, diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index d55f4e063486c..491c731521138 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -17,6 +17,8 @@ package graphql import ( + "context" + "encoding/json" "fmt" "io" "math/big" @@ -51,15 +53,21 @@ func TestBuildSchema(t *testing.T) { } defer stack.Close() // Make sure the schema can be parsed and matched up to the object model. - if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { + if _, err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { t.Errorf("Could not construct GraphQL handler: %v", err) } } // Tests that a graphQL request is successfully handled when graphql is enabled on the specified endpoint func TestGraphQLBlockSerialization(t *testing.T) { - stack := createNode(t, true, false) + stack := createNode(t) defer stack.Close() + genesis := &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + } + newGQLService(t, stack, genesis, 10, func(i int, gen *core.BlockGen) {}) // start node if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -161,8 +169,55 @@ func TestGraphQLBlockSerialization(t *testing.T) { } func TestGraphQLBlockSerializationEIP2718(t *testing.T) { - stack := createNode(t, true, true) + // Account for signing txes + var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + dad = common.HexToAddress("0x0000000000000000000000000000000000000dad") + ) + stack := createNode(t) defer stack.Close() + genesis := &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + Alloc: core.GenesisAlloc{ + address: {Balance: funds}, + // The address 0xdad sloads 0x00 and 0x01 + dad: { + Code: []byte{byte(vm.PC), byte(vm.PC), byte(vm.SLOAD), byte(vm.SLOAD)}, + Nonce: 0, + Balance: big.NewInt(0), + }, + }, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer := types.LatestSigner(genesis.Config) + newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) { + gen.SetCoinbase(common.Address{1}) + tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{ + Nonce: uint64(0), + To: &dad, + Value: big.NewInt(100), + Gas: 50000, + GasPrice: big.NewInt(params.InitialBaseFee), + }) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.AccessListTx{ + ChainID: genesis.Config.ChainID, + Nonce: uint64(1), + To: &dad, + Gas: 30000, + GasPrice: big.NewInt(params.InitialBaseFee), + Value: big.NewInt(50), + AccessList: types.AccessList{{ + Address: dad, + StorageKeys: []common.Hash{{0}}, + }}, + }) + gen.AddTx(tx) + }) // start node if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -198,7 +253,7 @@ func TestGraphQLBlockSerializationEIP2718(t *testing.T) { // Tests that a graphQL request is not handled successfully when graphql is not enabled on the specified endpoint func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) { - stack := createNode(t, false, false) + stack := createNode(t) defer stack.Close() if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -212,7 +267,59 @@ func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) { assert.Equal(t, http.StatusNotFound, resp.StatusCode) } -func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node { +func TestGraphQLTransactionLogs(t *testing.T) { + var ( + key, _ = crypto.GenerateKey() + addr = crypto.PubkeyToAddress(key.PublicKey) + dadStr = "0x0000000000000000000000000000000000000dad" + dad = common.HexToAddress(dadStr) + genesis = &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + Alloc: core.GenesisAlloc{ + addr: {Balance: big.NewInt(params.Ether)}, + dad: { + // LOG0(0, 0), LOG0(0, 0), RETURN(0, 0) + Code: common.Hex2Bytes("60006000a060006000a060006000f3"), + Nonce: 0, + Balance: big.NewInt(0), + }, + }, + } + signer = types.LatestSigner(genesis.Config) + stack = createNode(t) + ) + defer stack.Close() + + handler := newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) { + tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 1, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 2, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + }) + // start node + if err := stack.Start(); err != nil { + t.Fatalf("could not start node: %v", err) + } + query := `{block { transactions { logs { account { address } } } } }` + res := handler.Schema.Exec(context.Background(), query, "", map[string]interface{}{}) + if res.Errors != nil { + t.Fatalf("graphql query failed: %v", res.Errors) + } + have, err := json.Marshal(res.Data) + if err != nil { + t.Fatalf("failed to encode graphql response: %s", err) + } + want := fmt.Sprintf(`{"block":{"transactions":[{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]}]}}`, dadStr, dadStr, dadStr, dadStr, dadStr, dadStr) + if string(have) != want { + t.Errorf("response unmatch. expected %s, got %s", want, have) + } +} + +func createNode(t *testing.T) *node.Node { stack, err := node.New(&node.Config{ HTTPHost: "127.0.0.1", HTTPPort: 0, @@ -222,83 +329,12 @@ func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node { if err != nil { t.Fatalf("could not create node: %v", err) } - if !gqlEnabled { - return stack - } - if !txEnabled { - createGQLService(t, stack) - } else { - createGQLServiceWithTransactions(t, stack) - } return stack } -func createGQLService(t *testing.T, stack *node.Node) { - // create backend - ethConf := ðconfig.Config{ - Genesis: &core.Genesis{ - Config: params.AllEthashProtocolChanges, - GasLimit: 11500000, - Difficulty: big.NewInt(1048576), - }, - Ethash: ethash.Config{ - PowMode: ethash.ModeFake, - }, - NetworkId: 1337, - TrieCleanCache: 5, - TrieCleanCacheJournal: "triecache", - TrieCleanCacheRejournal: 60 * time.Minute, - TrieDirtyCache: 5, - TrieTimeout: 60 * time.Minute, - SnapshotCache: 5, - } - ethBackend, err := eth.New(stack, ethConf) - if err != nil { - t.Fatalf("could not create eth backend: %v", err) - } - // Create some blocks and import them - chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(), - ethash.NewFaker(), ethBackend.ChainDb(), 10, func(i int, gen *core.BlockGen) {}) - _, err = ethBackend.BlockChain().InsertChain(chain) - if err != nil { - t.Fatalf("could not create import blocks: %v", err) - } - // create gql service - filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) - err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) - if err != nil { - t.Fatalf("could not create graphql service: %v", err) - } -} - -func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) { - // create backend - key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - address := crypto.PubkeyToAddress(key.PublicKey) - funds := big.NewInt(1000000000000000) - dad := common.HexToAddress("0x0000000000000000000000000000000000000dad") - +func newGQLService(t *testing.T, stack *node.Node, gspec *core.Genesis, genBlocks int, genfunc func(i int, gen *core.BlockGen)) *handler { ethConf := ðconfig.Config{ - Genesis: &core.Genesis{ - Config: params.AllEthashProtocolChanges, - GasLimit: 11500000, - Difficulty: big.NewInt(1048576), - Alloc: core.GenesisAlloc{ - address: {Balance: funds}, - // The address 0xdad sloads 0x00 and 0x01 - dad: { - Code: []byte{ - byte(vm.PC), - byte(vm.PC), - byte(vm.SLOAD), - byte(vm.SLOAD), - }, - Nonce: 0, - Balance: big.NewInt(0), - }, - }, - BaseFee: big.NewInt(params.InitialBaseFee), - }, + Genesis: gspec, Ethash: ethash.Config{ PowMode: ethash.ModeFake, }, @@ -310,49 +346,22 @@ func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) { TrieTimeout: 60 * time.Minute, SnapshotCache: 5, } - ethBackend, err := eth.New(stack, ethConf) if err != nil { t.Fatalf("could not create eth backend: %v", err) } - signer := types.LatestSigner(ethConf.Genesis.Config) - - legacyTx, _ := types.SignNewTx(key, signer, &types.LegacyTx{ - Nonce: uint64(0), - To: &dad, - Value: big.NewInt(100), - Gas: 50000, - GasPrice: big.NewInt(params.InitialBaseFee), - }) - envelopTx, _ := types.SignNewTx(key, signer, &types.AccessListTx{ - ChainID: ethConf.Genesis.Config.ChainID, - Nonce: uint64(1), - To: &dad, - Gas: 30000, - GasPrice: big.NewInt(params.InitialBaseFee), - Value: big.NewInt(50), - AccessList: types.AccessList{{ - Address: dad, - StorageKeys: []common.Hash{{0}}, - }}, - }) - // Create some blocks and import them chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(), - ethash.NewFaker(), ethBackend.ChainDb(), 1, func(i int, b *core.BlockGen) { - b.SetCoinbase(common.Address{1}) - b.AddTx(legacyTx) - b.AddTx(envelopTx) - }) - + ethash.NewFaker(), ethBackend.ChainDb(), genBlocks, genfunc) _, err = ethBackend.BlockChain().InsertChain(chain) if err != nil { t.Fatalf("could not create import blocks: %v", err) } - // create gql service + // Set up handler filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) - err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) + handler, err := newHandler(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) if err != nil { t.Fatalf("could not create graphql service: %v", err) } + return handler } diff --git a/graphql/service.go b/graphql/service.go index 019026bc7ea7d..6f6e58335991c 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -57,17 +57,18 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // New constructs a new GraphQL service instance. func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { - return newHandler(stack, backend, filterSystem, cors, vhosts) + _, err := newHandler(stack, backend, filterSystem, cors, vhosts) + return err } // newHandler returns a new `http.Handler` that will answer GraphQL queries. // It additionally exports an interactive query browser on the / endpoint. -func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { +func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) (*handler, error) { q := Resolver{backend, filterSystem} s, err := graphql.ParseSchema(schema, &q) if err != nil { - return err + return nil, err } h := handler{Schema: s} handler := node.NewHTTPHandlerStack(h, cors, vhosts, nil) @@ -76,5 +77,5 @@ func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters. stack.RegisterHandler("GraphQL", "/graphql", handler) stack.RegisterHandler("GraphQL", "/graphql/", handler) - return nil + return &h, nil } From 972007a517c49ee9e2a359950d81c74467492ed2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 14 Sep 2022 11:07:10 +0200 Subject: [PATCH 08/17] Release Geth v1.10.24 --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index 5f24b41f28526..349bdf1c18d16 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 10 // Minor version component of the current release - VersionPatch = 23 // Patch version component of the current release + VersionPatch = 24 // Patch version component of the current release VersionMeta = "stable" // Version metadata to append to the version string ) From 8f61fc8b73565ff15d27fd9731b945d9d1878ec3 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Thu, 15 Sep 2022 17:50:54 +0200 Subject: [PATCH 09/17] params: set TerminalTotalDifficultyPassed to true (#25769) * params: set TerminalTotalDifficultyPassed to true * Update params/config.go Co-authored-by: Martin Holst Swende --- params/config.go | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/params/config.go b/params/config.go index 4d6eec8939bca..80e671f9bf417 100644 --- a/params/config.go +++ b/params/config.go @@ -59,25 +59,26 @@ var ( // MainnetChainConfig is the chain parameters to run a node on the main network. MainnetChainConfig = &ChainConfig{ - ChainID: big.NewInt(1), - HomesteadBlock: big.NewInt(1_150_000), - DAOForkBlock: big.NewInt(1_920_000), - DAOForkSupport: true, - EIP150Block: big.NewInt(2_463_000), - EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), - EIP155Block: big.NewInt(2_675_000), - EIP158Block: big.NewInt(2_675_000), - ByzantiumBlock: big.NewInt(4_370_000), - ConstantinopleBlock: big.NewInt(7_280_000), - PetersburgBlock: big.NewInt(7_280_000), - IstanbulBlock: big.NewInt(9_069_000), - MuirGlacierBlock: big.NewInt(9_200_000), - BerlinBlock: big.NewInt(12_244_000), - LondonBlock: big.NewInt(12_965_000), - ArrowGlacierBlock: big.NewInt(13_773_000), - GrayGlacierBlock: big.NewInt(15_050_000), - TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000 - Ethash: new(EthashConfig), + ChainID: big.NewInt(1), + HomesteadBlock: big.NewInt(1_150_000), + DAOForkBlock: big.NewInt(1_920_000), + DAOForkSupport: true, + EIP150Block: big.NewInt(2_463_000), + EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), + EIP155Block: big.NewInt(2_675_000), + EIP158Block: big.NewInt(2_675_000), + ByzantiumBlock: big.NewInt(4_370_000), + ConstantinopleBlock: big.NewInt(7_280_000), + PetersburgBlock: big.NewInt(7_280_000), + IstanbulBlock: big.NewInt(9_069_000), + MuirGlacierBlock: big.NewInt(9_200_000), + BerlinBlock: big.NewInt(12_244_000), + LondonBlock: big.NewInt(12_965_000), + ArrowGlacierBlock: big.NewInt(13_773_000), + GrayGlacierBlock: big.NewInt(15_050_000), + TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000 + TerminalTotalDifficultyPassed: true, + Ethash: new(EthashConfig), } // MainnetTrustedCheckpoint contains the light client trusted checkpoint for the main network. From 69568c554880b3567bace64f8848ff1be27d084d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 15 Sep 2022 17:55:58 +0200 Subject: [PATCH 10/17] params: release Geth v1.10.25 --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index 349bdf1c18d16..3b5978e849cde 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 10 // Minor version component of the current release - VersionPatch = 24 // Patch version component of the current release + VersionPatch = 25 // Patch version component of the current release VersionMeta = "stable" // Version metadata to append to the version string ) From 85e469f787be53192fcbcee86ce67635020a4ad5 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 31 Aug 2022 17:58:18 +0200 Subject: [PATCH 11/17] eth/protocols/snap: fix problems due to idle-but-busy peers (#25651) --- eth/protocols/snap/sync.go | 90 +++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 35 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index deaa4456e0fdc..1455bacbcb880 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -2248,14 +2248,18 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.accountIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.accountIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.accountReqs[id] if !ok { @@ -2360,14 +2364,18 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.bytecodeIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.bytecodeIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.bytecodeReqs[id] if !ok { @@ -2469,14 +2477,18 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.storageIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.storageIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.storageReqs[id] if !ok { @@ -2596,14 +2608,18 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.trienodeHealIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.trienodeHealIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.trienodeHealReqs[id] if !ok { @@ -2691,14 +2707,18 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.bytecodeHealIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.bytecodeHealIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.bytecodeHealReqs[id] if !ok { From 937ea491f95e5676d2ce3e0364ba9f05452bc351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 9 Sep 2022 11:42:57 +0300 Subject: [PATCH 12/17] eth/protocols/snap: throttle trie heal requests when peers DoS us (#25666) * eth/protocols/snap: throttle trie heal requests when peers DoS us * eth/protocols/snap: lower heal throttle log to debug Co-authored-by: Martin Holst Swende * eth/protocols/snap: fix comment Co-authored-by: Martin Holst Swende --- eth/protocols/snap/sync.go | 107 ++++++++++++++++++++++++++++++++++--- 1 file changed, 100 insertions(+), 7 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 1455bacbcb880..eb8260bf7c97a 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -21,10 +21,12 @@ import ( "encoding/json" "errors" "fmt" + gomath "math" "math/big" "math/rand" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -78,6 +80,29 @@ const ( // and waste round trip times. If it's too high, we're capping responses and // waste bandwidth. maxTrieRequestCount = maxRequestSize / 512 + + // trienodeHealRateMeasurementImpact is the impact a single measurement has on + // the local node's trienode processing capacity. A value closer to 0 reacts + // slower to sudden changes, but it is also more stable against temporary hiccups. + trienodeHealRateMeasurementImpact = 0.005 + + // minTrienodeHealThrottle is the minimum divisor for throttling trie node + // heal requests to avoid overloading the local node and exessively expanding + // the state trie bedth wise. + minTrienodeHealThrottle = 1 + + // maxTrienodeHealThrottle is the maximum divisor for throttling trie node + // heal requests to avoid overloading the local node and exessively expanding + // the state trie bedth wise. + maxTrienodeHealThrottle = maxTrieRequestCount + + // trienodeHealThrottleIncrease is the multiplier for the throttle when the + // rate of arriving data is higher than the rate of processing it. + trienodeHealThrottleIncrease = 1.33 + + // trienodeHealThrottleDecrease is the divisor for the throttle when the + // rate of arriving data is lower than the rate of processing it. + trienodeHealThrottleDecrease = 1.25 ) var ( @@ -431,6 +456,11 @@ type Syncer struct { trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running + trienodeHealRate float64 // Average heal rate for processing trie node data + trienodeHealPend uint64 // Number of trie nodes currently pending for processing + trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested + trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated + trienodeHealSynced uint64 // Number of state trie nodes downloaded trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk trienodeHealDups uint64 // Number of state trie nodes already processed @@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer { trienodeHealIdlers: make(map[string]struct{}), bytecodeHealIdlers: make(map[string]struct{}), - trienodeHealReqs: make(map[uint64]*trienodeHealRequest), - bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), - stateWriter: db.NewBatch(), + trienodeHealReqs: make(map[uint64]*trienodeHealRequest), + bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), + trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk + stateWriter: db.NewBatch(), extProgress: new(SyncProgress), } @@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai if cap > maxTrieRequestCount { cap = maxTrieRequestCount } + cap = int(float64(cap) / s.trienodeHealThrottle) + if cap <= 0 { + cap = 1 + } var ( hashes = make([]common.Hash, 0, cap) paths = make([]string, 0, cap) @@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // processTrienodeHealResponse integrates an already validated trienode response // into the healer tasks. func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { + var ( + start = time.Now() + fills int + ) for i, hash := range res.hashes { node := res.nodes[i] @@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { res.task.trieTasks[res.paths[i]] = res.hashes[i] continue } + fills++ + // Push the trie node into the state syncer s.trienodeHealSynced++ s.trienodeHealBytes += common.StorageSize(len(node)) @@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { log.Crit("Failed to persist healing data", "err", err) } log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) + + // Calculate the processing rate of one filled trie node + rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second)) + + // Update the currently measured trienode queueing and processing throughput. + // + // The processing rate needs to be updated uniformly independent if we've + // processed 1x100 trie nodes or 100x1 to keep the rate consistent even in + // the face of varying network packets. As such, we cannot just measure the + // time it took to process N trie nodes and update once, we need one update + // per trie node. + // + // Naively, that would be: + // + // for i:=0; i time.Second { + // Periodically adjust the trie node throttler + if float64(pending) > 2*s.trienodeHealRate { + s.trienodeHealThrottle *= trienodeHealThrottleIncrease + } else { + s.trienodeHealThrottle /= trienodeHealThrottleDecrease + } + if s.trienodeHealThrottle > maxTrienodeHealThrottle { + s.trienodeHealThrottle = maxTrienodeHealThrottle + } else if s.trienodeHealThrottle < minTrienodeHealThrottle { + s.trienodeHealThrottle = minTrienodeHealThrottle + } + s.trienodeHealThrottled = time.Now() + + log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle) + } } // processBytecodeHealResponse integrates an already validated bytecode response @@ -2655,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error // Cross reference the requested trienodes with the response to find gaps // that the serving node is missing - hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState) - hash := make([]byte, 32) - - nodes := make([][]byte, len(req.hashes)) + var ( + hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState) + hash = make([]byte, 32) + nodes = make([][]byte, len(req.hashes)) + fills uint64 + ) for i, j := 0, 0; i < len(trienodes); i++ { // Find the next hash that we've been served, leaving misses with nils hasher.Reset() @@ -2670,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error } if j < len(req.hashes) { nodes[j] = trienodes[i] + fills++ j++ continue } // We've either ran out of hashes, or got unrequested data logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i) + // Signal this request as failed, and ready for rescheduling s.scheduleRevertTrienodeHealRequest(req) return errors.New("unexpected healing trienode") } // Response validated, send it to the scheduler for filling + atomic.AddUint64(&s.trienodeHealPend, fills) + defer func() { + atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1)) + }() response := &trienodeHealResponse{ paths: req.paths, task: req.task, From a32e69a28c2abef6a6b96e8d2466f1f27fd693ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 6 Sep 2022 12:57:03 +0300 Subject: [PATCH 13/17] trie: check childrens' existence concurrently for snap heal (#25694) --- trie/sync.go | 57 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/trie/sync.go b/trie/sync.go index 303fcbfa22e2d..862ce7e16e6c4 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -19,6 +19,7 @@ package trie import ( "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" @@ -381,11 +382,11 @@ func (s *Sync) scheduleCodeRequest(req *codeRequest) { // retrieval scheduling. func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { // Gather all the children of the node, irrelevant whether known or not - type child struct { + type childNode struct { path []byte node node } - var children []child + var children []childNode switch node := (object).(type) { case *shortNode: @@ -393,14 +394,14 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { if hasTerm(key) { key = key[:len(key)-1] } - children = []child{{ + children = []childNode{{ node: node.Val, path: append(append([]byte(nil), req.path...), key...), }} case *fullNode: for i := 0; i < 17; i++ { if node.Children[i] != nil { - children = append(children, child{ + children = append(children, childNode{ node: node.Children[i], path: append(append([]byte(nil), req.path...), byte(i)), }) @@ -410,7 +411,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { panic(fmt.Sprintf("unknown node: %+v", node)) } // Iterate over the children, and request all unknown ones - requests := make([]*nodeRequest, 0, len(children)) + var ( + missing = make(chan *nodeRequest, len(children)) + pending sync.WaitGroup + ) for _, child := range children { // Notify any external watcher of a new key/value node if req.callback != nil { @@ -433,19 +437,36 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { if s.membatch.hasNode(child.path) { continue } - // If database says duplicate, then at least the trie node is present - // and we hold the assumption that it's NOT legacy contract code. - chash := common.BytesToHash(node) - if rawdb.HasTrieNode(s.database, chash) { - continue - } - // Locally unknown node, schedule for retrieval - requests = append(requests, &nodeRequest{ - path: child.path, - hash: chash, - parent: req, - callback: req.callback, - }) + // Check the presence of children concurrently + pending.Add(1) + go func(child childNode) { + defer pending.Done() + + // If database says duplicate, then at least the trie node is present + // and we hold the assumption that it's NOT legacy contract code. + chash := common.BytesToHash(node) + if rawdb.HasTrieNode(s.database, chash) { + return + } + // Locally unknown node, schedule for retrieval + missing <- &nodeRequest{ + path: child.path, + hash: chash, + parent: req, + callback: req.callback, + } + }(child) + } + } + pending.Wait() + + requests := make([]*nodeRequest, 0, len(children)) + for done := false; !done; { + select { + case miss := <-missing: + requests = append(requests, miss) + default: + done = true } } return requests, nil From 99bbb337011ef614fc5d25c130aa1cef92915366 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 20 Sep 2022 14:14:24 +0300 Subject: [PATCH 14/17] eth: fix a rare datarace on CHT challenge reply / shutdown (#25831) --- eth/handler.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index 4224a9f33a849..143147b0c8153 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { if h.checkpointHash != (common.Hash{}) { // Request the peer's checkpoint header for chain height/weight validation resCh := make(chan *eth.Response) - if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil { + + req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh) + if err != nil { return err } // Start a timer to disconnect if the peer doesn't reply in time go func() { + // Ensure the request gets cancelled in case of error/drop + defer req.Close() + timeout := time.NewTimer(syncChallengeTimeout) defer timeout.Stop() @@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // If we have any explicit peer required block hashes, request them for number, hash := range h.requiredBlocks { resCh := make(chan *eth.Response) - if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil { + + req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh) + if err != nil { return err } - go func(number uint64, hash common.Hash) { + go func(number uint64, hash common.Hash, req *eth.Request) { + // Ensure the request gets cancelled in case of error/drop + defer req.Close() + timeout := time.NewTimer(syncChallengeTimeout) defer timeout.Stop() @@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name()) h.removePeer(peer.ID()) } - }(number, hash) + }(number, hash, req) } // Handle incoming messages until the connection is torn down return handler(peer) From 27600a5b8452c39ce9ef5c502c98a0c1ee35859b Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 27 Oct 2022 15:25:01 +0200 Subject: [PATCH 15/17] eth/filters: change filter block to be by-ref (#26054) This PR changes the block field in the filter to be a pointer, to disambiguate between empty hash and no hash --- eth/filters/filter.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 0a70c9ece1dbe..8d80647fc241f 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -34,8 +34,8 @@ type Filter struct { addresses []common.Address topics [][]common.Hash - block common.Hash // Block hash if filtering a single block - begin, end int64 // Range interval if filtering multiple blocks + block *common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks matcher *bloombits.Matcher } @@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { // Create a generic filter and convert it into a block filter filter := newFilter(sys, addresses, topics) - filter.block = block + filter.block = &block return filter } @@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common. // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // If we're doing singleton block filtering, execute and return - if f.block != (common.Hash{}) { - header, err := f.sys.backend.HeaderByHash(ctx, f.block) + if f.block != nil { + header, err := f.sys.backend.HeaderByHash(ctx, *f.block) if err != nil { return nil, err } From 211dbb719711420358a3cdf718ce476c212adcf5 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Wed, 2 Nov 2022 09:29:33 -0500 Subject: [PATCH 16/17] rpc: handle wrong HTTP batch response length (#26064) --- rpc/client.go | 1 + rpc/client_test.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++ rpc/http.go | 3 +++ 3 files changed, 52 insertions(+) diff --git a/rpc/client.go b/rpc/client.go index d3ce0297754c9..fcd31319a3c59 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -31,6 +31,7 @@ import ( ) var ( + ErrBadResult = errors.New("bad result in JSON-RPC response") ErrClientQuit = errors.New("client is closed") ErrNoResult = errors.New("no result in JSON-RPC response") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") diff --git a/rpc/client_test.go b/rpc/client_test.go index 04c847d0d626c..5ae549d865336 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -19,6 +19,7 @@ package rpc import ( "context" "encoding/json" + "errors" "fmt" "math/rand" "net" @@ -144,6 +145,53 @@ func TestClientBatchRequest(t *testing.T) { } } +func TestClientBatchRequest_len(t *testing.T) { + b, err := json.Marshal([]jsonrpcMessage{ + {Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)}, + {Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)}, + }) + if err != nil { + t.Fatal("failed to encode jsonrpc message:", err) + } + s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + _, err := rw.Write(b) + if err != nil { + t.Error("failed to write response:", err) + } + })) + t.Cleanup(s.Close) + + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + + t.Run("too-few", func(t *testing.T) { + batch := []BatchElem{ + {Method: "foo"}, + {Method: "bar"}, + {Method: "baz"}, + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + defer cancelFn() + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) + } + }) + + t.Run("too-many", func(t *testing.T) { + batch := []BatchElem{ + {Method: "foo"}, + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + defer cancelFn() + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) + } + }) +} + func TestClientNotify(t *testing.T) { server := newTestServer() defer server.Stop() diff --git a/rpc/http.go b/rpc/http.go index 858d808586524..b82ea2707a546 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -173,6 +173,9 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } + if len(respmsgs) != len(msgs) { + return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult) + } for i := 0; i < len(respmsgs); i++ { op.resp <- &respmsgs[i] } From e5eb32acee19cc9fca6a03b10283b7484246b15a Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 3 Nov 2022 11:59:33 +0100 Subject: [PATCH 17/17] params: release geth v1.10.26 stable --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index 3b5978e849cde..d9a3958512c34 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 10 // Minor version component of the current release - VersionPatch = 25 // Patch version component of the current release + VersionPatch = 26 // Patch version component of the current release VersionMeta = "stable" // Version metadata to append to the version string )