diff --git a/ledger/heavy/handler/handler.go b/ledger/heavy/handler/handler.go index f931d3dcf2..b6c319f2e8 100644 --- a/ledger/heavy/handler/handler.go +++ b/ledger/heavy/handler/handler.go @@ -42,7 +42,6 @@ import ( type Handler struct { cfg configuration.Ledger - Bus insolar.MessageBus JetCoordinator jet.Coordinator PCS insolar.PlatformCryptographyScheme RecordAccessor object.RecordAccessor diff --git a/ledger/heavy/integration/pulsefinalization_test.go b/ledger/heavy/integration/pulsefinalization_test.go new file mode 100644 index 0000000000..18dc71f1e4 --- /dev/null +++ b/ledger/heavy/integration/pulsefinalization_test.go @@ -0,0 +1,83 @@ +// +// Copyright 2019 Insolar Technologies GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// +build slowtest + +package integration_test + +import ( + "os" + "testing" + "time" + + "github.com/insolar/insolar/insolar" + "github.com/insolar/insolar/insolar/payload" + "github.com/insolar/insolar/instrumentation/inslogger" + "github.com/insolar/insolar/ledger/drop" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func panicIfErr(err error) { + if err != nil { + panic(err) + } +} + +func Test_FinalizePulse(t *testing.T) { + t.Parallel() + + ctx := inslogger.TestContext(t) + cfg := DefaultHeavyConfig() + defer os.RemoveAll(cfg.Ledger.Storage.DataDirectory) + + s, err := NewServer(ctx, cfg, insolar.GenesisHeavyConfig{}, nil) + assert.NoError(t, err) + defer s.Stop() + + s.SetPulse(ctx) + s.SetPulse(ctx) + + targetPulse := s.Pulse() - PulseStep + + _, done := s.Send(ctx, &payload.GotHotConfirmation{ + JetID: insolar.ZeroJetID, + Pulse: targetPulse, + Split: false, + }) + done() + + require.Equal(t, insolar.GenesisPulse.PulseNumber, s.JetKeeper.TopSyncPulse()) + + d := drop.Drop{ + Pulse: targetPulse, + JetID: insolar.ZeroJetID, + Split: false, + } + + _, done = s.Send(ctx, &payload.Replication{ + JetID: insolar.ZeroJetID, + Pulse: targetPulse, + Drop: drop.MustEncode(&d), + }) + done() + + numIterations := 20 + for s.JetKeeper.TopSyncPulse() == insolar.GenesisPulse.PulseNumber && numIterations > 0 { + time.Sleep(500 * time.Millisecond) + numIterations-- + } + require.Equal(t, targetPulse, s.JetKeeper.TopSyncPulse()) +} diff --git a/ledger/heavy/integration/server_test.go b/ledger/heavy/integration/server_test.go new file mode 100644 index 0000000000..18718b8d7f --- /dev/null +++ b/ledger/heavy/integration/server_test.go @@ -0,0 +1,486 @@ +// +// Copyright 2019 Insolar Technologies GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// +build slowtest + +package integration_test + +import ( + "context" + "crypto" + "io/ioutil" + "math" + "sync" + "time" + + "github.com/dgraph-io/badger" + "github.com/insolar/insolar/network" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/insolar/insolar/component" + "github.com/insolar/insolar/configuration" + "github.com/insolar/insolar/cryptography" + "github.com/insolar/insolar/insolar" + "github.com/insolar/insolar/insolar/bus" + "github.com/insolar/insolar/insolar/gen" + "github.com/insolar/insolar/insolar/jet" + "github.com/insolar/insolar/insolar/jetcoordinator" + "github.com/insolar/insolar/insolar/node" + "github.com/insolar/insolar/insolar/payload" + "github.com/insolar/insolar/insolar/pulse" + "github.com/insolar/insolar/insolar/store" + "github.com/insolar/insolar/instrumentation/inslogger" + "github.com/insolar/insolar/keystore" + "github.com/insolar/insolar/ledger/artifact" + "github.com/insolar/insolar/ledger/drop" + "github.com/insolar/insolar/ledger/genesis" + "github.com/insolar/insolar/ledger/heavy/executor" + "github.com/insolar/insolar/ledger/heavy/handler" + "github.com/insolar/insolar/ledger/heavy/pulsemanager" + "github.com/insolar/insolar/ledger/object" + "github.com/insolar/insolar/log" + networknode "github.com/insolar/insolar/network/node" + "github.com/insolar/insolar/platformpolicy" + "github.com/pkg/errors" +) + +var ( + light = nodeMock{ + ref: gen.Reference(), + shortID: 1, + role: insolar.StaticRoleLightMaterial, + } + heavy = nodeMock{ + ref: gen.Reference(), + shortID: 2, + role: insolar.StaticRoleHeavyMaterial, + } + virtual = nodeMock{ + ref: gen.Reference(), + shortID: 3, + role: insolar.StaticRoleVirtual, + } +) + +func NodeHeavy() insolar.Reference { + return heavy.ref +} + +const PulseStep insolar.PulseNumber = 10 + +type Server struct { + pm insolar.PulseManager + pulse insolar.Pulse + lock sync.RWMutex + clientSender bus.Sender + JetKeeper executor.JetKeeper + replicator executor.HeavyReplicator + dbRollback *executor.DBRollback +} + +// After using it you have to remove directory configuration.Storage.DataDirectory by yourself +func DefaultHeavyConfig() configuration.Configuration { + tmpDir, err := ioutil.TempDir("", "heavy-integr-test-") + if err != nil { + panic(err) + } + cfg := configuration.Configuration{} + cfg.KeysPath = "testdata/bootstrap_keys.json" + cfg.Ledger.LightChainLimit = math.MaxInt32 + cfg.Bus.ReplyTimeout = time.Minute + cfg.Ledger.Storage = configuration.Storage{ + DataDirectory: tmpDir, + } + return cfg +} + +func defaultReceiveCallback(meta payload.Meta, pl payload.Payload) []payload.Payload { + return nil +} + +func NewServer( + ctx context.Context, + cfg configuration.Configuration, + genesisCfg insolar.GenesisHeavyConfig, + receiveCallback func(meta payload.Meta, pl payload.Payload) []payload.Payload, +) (*Server, error) { + // Cryptography. + var ( + KeyProcessor insolar.KeyProcessor + CryptoScheme insolar.PlatformCryptographyScheme + CryptoService insolar.CryptographyService + ) + { + var err error + // Private key storage. + ks, err := keystore.NewKeyStore(cfg.KeysPath) + if err != nil { + return nil, errors.Wrap(err, "failed to load KeyStore") + } + // Public key manipulations. + KeyProcessor = platformpolicy.NewKeyProcessor() + // Platform cryptography. + CryptoScheme = platformpolicy.NewPlatformCryptographyScheme() + // Sign, verify, etc. + CryptoService = cryptography.NewCryptographyService() + + c := component.Manager{} + c.Inject(CryptoService, CryptoScheme, KeyProcessor, ks) + } + + // Network. + var ( + NodeNetwork network.NodeNetwork + ) + { + NodeNetwork = newNodeNetMock(&light) + } + + // Role calculations. + var ( + Coordinator jet.Coordinator + Pulses *pulse.DB + Jets *jet.DBStore + Nodes *node.Storage + DB *store.BadgerDB + DBRollback *executor.DBRollback + ) + { + var err error + DB, err = store.NewBadgerDB(badger.DefaultOptions(cfg.Ledger.Storage.DataDirectory)) + if err != nil { + panic(errors.Wrap(err, "failed to initialize DB")) + } + Nodes = node.NewStorage() + Pulses = pulse.NewDB(DB) + Jets = jet.NewDBStore(DB) + + c := jetcoordinator.NewJetCoordinator(cfg.Ledger.LightChainLimit) + c.PulseCalculator = Pulses + c.PulseAccessor = Pulses + c.JetAccessor = Jets + c.OriginProvider = NodeNetwork + c.PlatformCryptographyScheme = CryptoScheme + c.Nodes = Nodes + + Coordinator = c + } + + logger := log.NewWatermillLogAdapter(inslogger.FromContext(ctx)) + // Communication. + var ( + ServerBus, ClientBus *bus.Bus + ServerPubSub, ClientPubSub *gochannel.GoChannel + ) + { + ServerPubSub = gochannel.NewGoChannel(gochannel.Config{}, logger) + ClientPubSub = gochannel.NewGoChannel(gochannel.Config{}, logger) + ServerBus = bus.NewBus(cfg.Bus, ServerPubSub, Pulses, Coordinator, CryptoScheme) + + c := jetcoordinator.NewJetCoordinator(cfg.Ledger.LightChainLimit) + c.PulseCalculator = Pulses + c.PulseAccessor = Pulses + c.JetAccessor = Jets + c.OriginProvider = newNodeNetMock(&virtual) + c.PlatformCryptographyScheme = CryptoScheme + c.Nodes = Nodes + ClientBus = bus.NewBus(cfg.Bus, ClientPubSub, Pulses, c, CryptoScheme) + } + + var replicator executor.HeavyReplicator + + // Heavy components. + var ( + PulseManager insolar.PulseManager + Handler *handler.Handler + Genesis *genesis.Genesis + Records *object.RecordDB + JetKeeper executor.JetKeeper + ) + { + Records = object.NewRecordDB(DB) + indexes := object.NewIndexDB(DB, Records) + drops := drop.NewDB(DB) + JetKeeper = executor.NewJetKeeper(Jets, DB, Pulses) + DBRollback = executor.NewDBRollback(JetKeeper, Pulses, drops, Records, indexes, Jets, Pulses) + + sp := pulse.NewStartPulse() + + backupMaker, err := executor.NewBackupMaker(ctx, DB, cfg.Ledger.Backup, JetKeeper.TopSyncPulse()) + if err != nil { + return nil, errors.Wrap(err, "failed create backuper") + } + + replicator = executor.NewHeavyReplicatorDefault(Records, indexes, CryptoScheme, Pulses, drops, JetKeeper, backupMaker, Jets) + + pm := pulsemanager.NewPulseManager() + pm.NodeNet = NodeNetwork + pm.NodeSetter = Nodes + pm.Nodes = Nodes + pm.PulseAppender = Pulses + pm.PulseAccessor = Pulses + pm.JetModifier = Jets + pm.StartPulse = sp + pm.FinalizationKeeper = executor.NewFinalizationKeeperDefault(JetKeeper, Pulses, cfg.Ledger.LightChainLimit) + + h := handler.New(cfg.Ledger) + h.RecordAccessor = Records + h.RecordModifier = Records + h.JetCoordinator = Coordinator + h.IndexAccessor = indexes + h.IndexModifier = indexes + h.DropModifier = drops + h.PCS = CryptoScheme + h.PulseAccessor = Pulses + h.PulseCalculator = Pulses + h.StartPulse = sp + h.JetModifier = Jets + h.JetAccessor = Jets + h.JetTree = Jets + h.DropDB = drops + h.JetKeeper = JetKeeper + h.BackupMaker = backupMaker + h.Sender = ClientBus + h.Replicator = replicator + + PulseManager = pm + Handler = h + + artifactManager := &artifact.Scope{ + PulseNumber: insolar.FirstPulseNumber, + PCS: CryptoScheme, + RecordAccessor: Records, + RecordModifier: Records, + IndexModifier: indexes, + IndexAccessor: indexes, + } + Genesis = &genesis.Genesis{ + ArtifactManager: artifactManager, + BaseRecord: &genesis.BaseRecord{ + DB: DB, + DropModifier: drops, + PulseAppender: Pulses, + PulseAccessor: Pulses, + RecordModifier: Records, + IndexModifier: indexes, + }, + + DiscoveryNodes: genesisCfg.DiscoveryNodes, + ContractsConfig: genesisCfg.ContractsConfig, + } + } + + // Start routers with handlers. + { + outHandler := func(msg *message.Message) error { + meta := payload.Meta{} + err := meta.Unmarshal(msg.Payload) + if err != nil { + panic(errors.Wrap(err, "failed to unmarshal meta")) + } + + pl, err := payload.Unmarshal(meta.Payload) + if err != nil { + panic(nil) + } + go func() { + var replies []payload.Payload + if receiveCallback != nil { + replies = receiveCallback(meta, pl) + } else { + replies = defaultReceiveCallback(meta, pl) + } + + for _, rep := range replies { + msg, err := payload.NewMessage(rep) + if err != nil { + panic(err) + } + ClientBus.Reply(context.Background(), meta, msg) + } + }() + + clientHandler := func(msg *message.Message) (messages []*message.Message, e error) { + return nil, nil + } + // Republish as incoming to client. + _, err = ClientBus.IncomingMessageRouter(clientHandler)(msg) + + if err != nil { + panic(err) + } + return nil + } + + inRouter, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + panic(err) + } + outRouter, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + panic(err) + } + + outRouter.AddNoPublisherHandler( + "Outgoing", + bus.TopicOutgoing, + ServerPubSub, + outHandler, + ) + + inRouter.AddMiddleware( + middleware.InstantAck, + ServerBus.IncomingMessageRouter, + ) + inRouter.AddNoPublisherHandler( + "OutgoingFromClient", + bus.TopicOutgoing, + ClientPubSub, + Handler.Process, + ) + + startRouter(ctx, inRouter) + startRouter(ctx, outRouter) + } + + inslogger.FromContext(ctx).WithFields(map[string]interface{}{ + "light": light.ID().String(), + "virtual": virtual.ID().String(), + "heavy": heavy.ID().String(), + }).Info("started test server") + + if err := Genesis.Start(ctx); err != nil { + log.Fatalf("genesis failed on heavy with error: %v", err) + } + + err := DBRollback.Start(ctx) + if err != nil { + log.Fatalf("rollback.Start return error: %v", err) + } + + s := &Server{ + pm: PulseManager, + pulse: *insolar.GenesisPulse, + clientSender: ClientBus, + JetKeeper: JetKeeper, + replicator: replicator, + dbRollback: DBRollback, + } + return s, nil +} + +func startRouter(ctx context.Context, router *message.Router) { + go func() { + if err := router.Run(ctx); err != nil { + inslogger.FromContext(ctx).Error("Error while running router", err) + } + }() + <-router.Running() +} + +func (s *Server) SetPulse(ctx context.Context) { + s.lock.Lock() + defer s.lock.Unlock() + + s.pulse = insolar.Pulse{ + PulseNumber: s.pulse.PulseNumber + PulseStep, + } + err := s.pm.Set(ctx, s.pulse) + if err != nil { + panic(err) + } +} + +func (s *Server) Pulse() insolar.PulseNumber { + s.lock.Lock() + defer s.lock.Unlock() + + return s.pulse.PulseNumber +} + +func (s *Server) Send(ctx context.Context, pl payload.Payload) (<-chan *message.Message, func()) { + msg, err := payload.NewMessage(pl) + if err != nil { + panic(err) + } + return s.clientSender.SendTarget(ctx, msg, gen.Reference()) +} + +func (s *Server) Stop() { +} + +type nodeMock struct { + ref insolar.Reference + shortID insolar.ShortNodeID + role insolar.StaticRole +} + +func (n *nodeMock) ID() insolar.Reference { + return n.ref +} + +func (n *nodeMock) ShortID() insolar.ShortNodeID { + return n.shortID +} + +func (n *nodeMock) Role() insolar.StaticRole { + return n.role +} + +func (n *nodeMock) PublicKey() crypto.PublicKey { + panic("implement me") +} + +func (n *nodeMock) Address() string { + return "" +} + +func (n *nodeMock) GetGlobuleID() insolar.GlobuleID { + panic("implement me") +} + +func (n *nodeMock) Version() string { + panic("implement me") +} + +func (n *nodeMock) LeavingETA() insolar.PulseNumber { + panic("implement me") +} + +func (n *nodeMock) GetState() insolar.NodeState { + return insolar.NodeReady +} + +func (n *nodeMock) GetPower() insolar.Power { + return 1 +} + +type nodeNetMock struct { + me insolar.NetworkNode +} + +func (n *nodeNetMock) GetAccessor(insolar.PulseNumber) network.Accessor { + return networknode.NewAccessor(networknode.NewSnapshot(insolar.GenesisPulse.PulseNumber, []insolar.NetworkNode{&virtual, &heavy, &light})) +} + +func newNodeNetMock(me insolar.NetworkNode) *nodeNetMock { + return &nodeNetMock{me: me} +} + +func (n *nodeNetMock) GetOrigin() insolar.NetworkNode { + return n.me +} diff --git a/ledger/heavy/integration/startstop_test.go b/ledger/heavy/integration/startstop_test.go new file mode 100644 index 0000000000..622ca2a769 --- /dev/null +++ b/ledger/heavy/integration/startstop_test.go @@ -0,0 +1,36 @@ +// +// Copyright 2019 Insolar Technologies GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// +build slowtest + +package integration_test + +import ( + "context" + "os" + "testing" + + "github.com/insolar/insolar/insolar" + "github.com/stretchr/testify/assert" +) + +func TestStartStop(t *testing.T) { + cfg := DefaultHeavyConfig() + defer os.RemoveAll(cfg.Ledger.Storage.DataDirectory) + + s, err := NewServer(context.Background(), cfg, insolar.GenesisHeavyConfig{}, nil) + assert.NoError(t, err) + s.Stop() +} diff --git a/ledger/heavy/integration/testdata/bootstrap_keys.json b/ledger/heavy/integration/testdata/bootstrap_keys.json new file mode 100644 index 0000000000..559fdc69f0 --- /dev/null +++ b/ledger/heavy/integration/testdata/bootstrap_keys.json @@ -0,0 +1,4 @@ +{ + "private_key": "-----BEGIN PRIVATE KEY-----\nMHcCAQEEIDR3m4Lvbul4pYm0h4c6O7+3LAncILXNvTZxm6/yVHVFoAoGCCqGSM49\nAwEHoUQDQgAE/kltJAtODHfdxAtn/H/TPbdPsx+G6DhdDIFrridWSphE8ejBZXGP\nwR6Rou57n4FdHVavUgm4jBrBu9fdOMXLjA==\n-----END PRIVATE KEY-----\n", + "public_key": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/kltJAtODHfdxAtn/H/TPbdPsx+G\n6DhdDIFrridWSphE8ejBZXGPwR6Rou57n4FdHVavUgm4jBrBu9fdOMXLjA==\n-----END PUBLIC KEY-----\n" +} \ No newline at end of file diff --git a/ledger/heavy/integration/testdata/certificate.json b/ledger/heavy/integration/testdata/certificate.json new file mode 100644 index 0000000000..6a356d5791 --- /dev/null +++ b/ledger/heavy/integration/testdata/certificate.json @@ -0,0 +1,7 @@ +{ + "majority_rule": 0, + "public_key": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE/kltJAtODHfdxAtn/H/TPbdPsx+G\n6DhdDIFrridWSphE8ejBZXGPwR6Rou57n4FdHVavUgm4jBrBu9fdOMXLjA==\n-----END PUBLIC KEY-----\n", + "reference": "4K3NiGuqYGqKPnYp6XeGd2kdN4P9veL6rYcWkLKWXZCu.4FFB8zfQoGznSmzDxwv4njX1aR9ioL8GHSH17QXH2AFa", + "roles": [], + "bootstrap_nodes": [] +} diff --git a/ledger/heavy/integration/testdata/root_member_keys.json b/ledger/heavy/integration/testdata/root_member_keys.json new file mode 100644 index 0000000000..ae46baf36d --- /dev/null +++ b/ledger/heavy/integration/testdata/root_member_keys.json @@ -0,0 +1,4 @@ +{ + "private_key": "-----BEGIN PRIVATE KEY-----\nMHcCAQEEIPHZvtODiQoj5X6mB0htUe1p1X2/Yi2PeBs6K/ZPgCKFoAoGCCqGSM49\nAwEHoUQDQgAEsp2gK/5k+K7IZyhxLCabU++3KM2TRSuBO8Kns4WMrw4ze6huJSPs\nOCdCGSPLMROPC6NNdDRW95l8QKlbYd8E3Q==\n-----END PRIVATE KEY-----\n", + "public_key": "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEsp2gK/5k+K7IZyhxLCabU++3KM2T\nRSuBO8Kns4WMrw4ze6huJSPsOCdCGSPLMROPC6NNdDRW95l8QKlbYd8E3Q==\n-----END PUBLIC KEY-----\n" +} \ No newline at end of file diff --git a/ledger/heavy/pulsemanager/pulsemanager.go b/ledger/heavy/pulsemanager/pulsemanager.go index f55fcc124e..239c818e8a 100644 --- a/ledger/heavy/pulsemanager/pulsemanager.go +++ b/ledger/heavy/pulsemanager/pulsemanager.go @@ -93,6 +93,7 @@ func (m *PulseManager) Set(ctx context.Context, newPulse insolar.Pulse) error { if err != nil { panic(errors.Wrap(err, "call of SetActiveNodes failed")) } + } logger.Info("save pulse to storage") diff --git a/server/internal/heavy/components.go b/server/internal/heavy/components.go index 95a1c00b2e..553f6c935a 100644 --- a/server/internal/heavy/components.go +++ b/server/internal/heavy/components.go @@ -295,7 +295,6 @@ func newComponents(ctx context.Context, cfg configuration.Configuration, genesis h.JetCoordinator = Coordinator h.IndexAccessor = indexes h.IndexModifier = indexes - h.Bus = Bus h.DropModifier = drops h.PCS = CryptoScheme h.PulseAccessor = Pulses