From 90409d969be6129311d841b497517302ce7e2f10 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 8 Feb 2022 09:14:02 +0200 Subject: [PATCH 1/3] chore(deps): bump simple-get in /applications/tari_collectibles (#3805) Bumps [simple-get](https://github.com/feross/simple-get) from 3.1.0 to 3.1.1. - [Release notes](https://github.com/feross/simple-get/releases) - [Commits](https://github.com/feross/simple-get/compare/v3.1.0...v3.1.1) --- updated-dependencies: - dependency-name: simple-get dependency-type: indirect ... Signed-off-by: dependabot[bot] --- applications/tari_collectibles/package-lock.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/applications/tari_collectibles/package-lock.json b/applications/tari_collectibles/package-lock.json index 7b1b30e135..63e8269099 100644 --- a/applications/tari_collectibles/package-lock.json +++ b/applications/tari_collectibles/package-lock.json @@ -3529,9 +3529,9 @@ "dev": true }, "simple-get": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-3.1.0.tgz", - "integrity": "sha512-bCR6cP+aTdScaQCnQKbPKtJOKDp/hj9EDLJo3Nw4y1QksqaovlW/bnptB6/c1e+qmNIDHRK+oXFDdEqBT8WzUA==", + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-3.1.1.tgz", + "integrity": "sha512-CQ5LTKGfCpvE1K0n2us+kuMPbk/q0EKl82s4aheV9oXjFEz6W/Y7oQFVJuU6QG77hRT4Ghb5RURteF5vnWjupA==", "dev": true, "requires": { "decompress-response": "^4.2.0", From a171513a26e1cce17fe31793a6712ac5a1ea9867 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 8 Feb 2022 09:14:28 +0200 Subject: [PATCH 2/3] chore(deps): bump simple-get in /applications/launchpad/gui-vue (#3806) Bumps [simple-get](https://github.com/feross/simple-get) from 3.1.0 to 3.1.1. - [Release notes](https://github.com/feross/simple-get/releases) - [Commits](https://github.com/feross/simple-get/compare/v3.1.0...v3.1.1) --- updated-dependencies: - dependency-name: simple-get dependency-type: indirect ... Signed-off-by: dependabot[bot] --- .../launchpad/gui-vue/package-lock.json | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/applications/launchpad/gui-vue/package-lock.json b/applications/launchpad/gui-vue/package-lock.json index e92c255da3..db72d6e596 100644 --- a/applications/launchpad/gui-vue/package-lock.json +++ b/applications/launchpad/gui-vue/package-lock.json @@ -2981,6 +2981,7 @@ "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==", "dev": true, + "optional": true, "dependencies": { "color-convert": "^2.0.1" }, @@ -2996,6 +2997,7 @@ "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", "dev": true, + "optional": true, "dependencies": { "ansi-styles": "^4.1.0", "supports-color": "^7.1.0" @@ -3012,6 +3014,7 @@ "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", "dev": true, + "optional": true, "dependencies": { "color-name": "~1.1.4" }, @@ -3023,13 +3026,15 @@ "version": "1.1.4", "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", - "dev": true + "dev": true, + "optional": true }, "node_modules/@vue/cli-service/node_modules/has-flag": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", "dev": true, + "optional": true, "engines": { "node": ">=8" } @@ -3039,6 +3044,7 @@ "resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz", "integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==", "dev": true, + "optional": true, "dependencies": { "big.js": "^5.2.2", "emojis-list": "^3.0.0", @@ -3053,6 +3059,7 @@ "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==", "dev": true, + "optional": true, "dependencies": { "has-flag": "^4.0.0" }, @@ -3066,6 +3073,7 @@ "resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz", "integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==", "dev": true, + "optional": true, "dependencies": { "chalk": "^4.1.0", "hash-sum": "^2.0.0", @@ -16229,9 +16237,9 @@ ] }, "node_modules/simple-get": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-3.1.0.tgz", - "integrity": "sha512-bCR6cP+aTdScaQCnQKbPKtJOKDp/hj9EDLJo3Nw4y1QksqaovlW/bnptB6/c1e+qmNIDHRK+oXFDdEqBT8WzUA==", + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-3.1.1.tgz", + "integrity": "sha512-CQ5LTKGfCpvE1K0n2us+kuMPbk/q0EKl82s4aheV9oXjFEz6W/Y7oQFVJuU6QG77hRT4Ghb5RURteF5vnWjupA==", "dev": true, "dependencies": { "decompress-response": "^4.2.0", @@ -22138,6 +22146,7 @@ "resolved": "https://registry.npmjs.org/ansi-styles/-/ansi-styles-4.3.0.tgz", "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==", "dev": true, + "optional": true, "requires": { "color-convert": "^2.0.1" } @@ -22147,6 +22156,7 @@ "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", "dev": true, + "optional": true, "requires": { "ansi-styles": "^4.1.0", "supports-color": "^7.1.0" @@ -22157,6 +22167,7 @@ "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", "dev": true, + "optional": true, "requires": { "color-name": "~1.1.4" } @@ -22165,19 +22176,22 @@ "version": "1.1.4", "resolved": "https://registry.npmjs.org/color-name/-/color-name-1.1.4.tgz", "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", - "dev": true + "dev": true, + "optional": true }, "has-flag": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", - "dev": true + "dev": true, + "optional": true }, "loader-utils": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/loader-utils/-/loader-utils-2.0.2.tgz", "integrity": "sha512-TM57VeHptv569d/GKh6TAYdzKblwDNiumOdkFnejjD0XwTH87K90w3O7AiJRqdQoXygvi1VQTJTLGhJl7WqA7A==", "dev": true, + "optional": true, "requires": { "big.js": "^5.2.2", "emojis-list": "^3.0.0", @@ -22189,6 +22203,7 @@ "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==", "dev": true, + "optional": true, "requires": { "has-flag": "^4.0.0" } @@ -22198,6 +22213,7 @@ "resolved": "https://registry.npmjs.org/vue-loader/-/vue-loader-16.8.3.tgz", "integrity": "sha512-7vKN45IxsKxe5GcVCbc2qFU5aWzyiLrYJyUuMz4BQLKctCj/fmCa0w6fGiiQ2cLFetNcek1ppGJQDCup0c1hpA==", "dev": true, + "optional": true, "requires": { "chalk": "^4.1.0", "hash-sum": "^2.0.0", @@ -32751,9 +32767,9 @@ "dev": true }, "simple-get": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-3.1.0.tgz", - "integrity": "sha512-bCR6cP+aTdScaQCnQKbPKtJOKDp/hj9EDLJo3Nw4y1QksqaovlW/bnptB6/c1e+qmNIDHRK+oXFDdEqBT8WzUA==", + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/simple-get/-/simple-get-3.1.1.tgz", + "integrity": "sha512-CQ5LTKGfCpvE1K0n2us+kuMPbk/q0EKl82s4aheV9oXjFEz6W/Y7oQFVJuU6QG77hRT4Ghb5RURteF5vnWjupA==", "dev": true, "requires": { "decompress-response": "^4.2.0", From 74df1d0705d7acad452564e71d6fea79fc7a8daa Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Tue, 8 Feb 2022 11:50:24 +0200 Subject: [PATCH 3/3] feat(validator_node): add get_sidechain_block p2p rpc method (#3803) Description --- - Add get_sidechain_blocks to validator node p2p rpc - Use fixed 32-byte array for hashes on dan layer - Implement FromStr and fallible integer conversions for TemplateId - Remove asset_public_key from Instruction - Add necessary chain db calls - Add client call for get_sidechain_blocks - Adds mocks and basic tests for get_sidechain_blocks validator node rpc Motivation and Context --- Allow client to fetch sidechain instruction state from vn How Has This Been Tested? --- Manually --- .gitignore | 5 +- Cargo.lock | 4 +- .../tari_app_grpc/proto/validator_node.proto | 2 +- applications/tari_validator_node/Cargo.toml | 15 +- applications/tari_validator_node/build.rs | 3 +- .../proto/dan/common.proto | 27 +++ .../consensus.proto} | 18 +- .../proto/{p2p => dan}/validator_node.proto | 17 +- .../tari_validator_node/src/dan_node.rs | 25 ++- .../src/grpc/conversions.rs | 2 +- .../tari_validator_node/src/grpc/mod.rs | 2 +- .../src/grpc/validator_node_grpc_server.rs | 26 ++- .../{conversions/dan.rs => conversions.rs} | 143 +++++++++----- .../tari_validator_node/src/p2p/proto/mod.rs | 43 ++-- .../tari_validator_node/src/p2p/rpc/mod.rs | 13 +- .../src/p2p/rpc/service_impl.rs | 144 +++++++++++--- .../tari_validator_node/src/p2p/rpc/test.rs | 132 +++++++++++++ .../services/inbound_connection_service.rs | 2 +- .../services/outbound_connection_service.rs | 2 +- .../src/p2p/services/rpc_client.rs | 64 +++++- .../tests/listener_dialer.rs | 2 +- comms/src/connection_manager/tests/manager.rs | 3 +- comms/src/connectivity/test.rs | 2 +- comms/src/protocol/rpc/context.rs | 4 + comms/src/protocol/rpc/test/mock.rs | 2 +- comms/src/test_utils/mod.rs | 7 +- comms/src/test_utils/peer_manager.rs | 63 ++++++ comms/src/test_utils/test_node.rs | 21 +- dan_layer/core/Cargo.toml | 1 + dan_layer/core/src/digital_assets_error.rs | 11 +- dan_layer/core/src/fixed_hash.rs | 101 ++++++++++ dan_layer/core/src/lib.rs | 1 + .../core/src/models/error.rs | 10 +- .../core/src/models/hot_stuff_tree_node.rs | 14 +- dan_layer/core/src/models/instruction.rs | 43 ++-- dan_layer/core/src/models/instruction_set.rs | 24 ++- dan_layer/core/src/models/mod.rs | 84 +++++--- dan_layer/core/src/models/node.rs | 64 ++++++ .../core/src/models/quorum_certificate.rs | 16 +- .../models/{block.rs => sidechain_block.rs} | 26 ++- dan_layer/core/src/models/tree_node_hash.rs | 76 ++++++++ .../core/src/services/asset_processor.rs | 4 +- dan_layer/core/src/services/asset_proxy.rs | 98 +++++----- .../core/src/services/mempool_service.rs | 64 +++--- dan_layer/core/src/services/mocks/mod.rs | 43 +++- dan_layer/core/src/services/mod.rs | 1 - .../core/src/services/payload_provider.rs | 6 +- .../src/services/validator_node_rpc_client.rs | 13 +- dan_layer/core/src/storage/chain/chain_db.rs | 62 +++++- .../storage/chain/chain_db_backend_adapter.rs | 5 +- .../storage/chain/chain_db_unit_of_work.rs | 19 +- .../core/src/storage/chain/db_instruction.rs | 2 +- dan_layer/core/src/storage/chain/db_node.rs | 2 +- dan_layer/core/src/storage/chain/db_qc.rs | 2 +- dan_layer/core/src/storage/db_factory.rs | 8 + dan_layer/core/src/storage/error.rs | 2 + dan_layer/core/src/storage/mocks/chain_db.rs | 183 ++++++++++++++++++ dan_layer/core/src/storage/mocks/mod.rs | 167 ++++++++++++++++ dan_layer/core/src/storage/mocks/state_db.rs | 86 ++++++++ dan_layer/core/src/storage/mod.rs | 2 + .../core/src/workers/states/commit_state.rs | 4 +- .../core/src/workers/states/decide_state.rs | 4 +- .../src/workers/states/pre_commit_state.rs | 6 +- dan_layer/core/src/workers/states/prepare.rs | 4 +- dan_layer/core/src/workers/states/starting.rs | 14 +- dan_layer/storage_sqlite/src/error.rs | 15 +- .../storage_sqlite/src/models/instruction.rs | 43 ++-- dan_layer/storage_sqlite/src/models/node.rs | 2 +- .../src/sqlite_chain_backend_adapter.rs | 124 +++++++++--- .../storage_sqlite/src/sqlite_db_factory.rs | 49 +++-- .../src/sqlite_storage_service.rs | 4 +- infrastructure/test_utils/src/paths.rs | 2 + 72 files changed, 1877 insertions(+), 427 deletions(-) create mode 100644 applications/tari_validator_node/proto/dan/common.proto rename applications/tari_validator_node/proto/{p2p/dan_consensus.proto => dan/consensus.proto} (75%) rename applications/tari_validator_node/proto/{p2p => dan}/validator_node.proto (90%) rename applications/tari_validator_node/src/p2p/proto/{conversions/dan.rs => conversions.rs} (57%) create mode 100644 applications/tari_validator_node/src/p2p/rpc/test.rs create mode 100644 comms/src/test_utils/peer_manager.rs create mode 100644 dan_layer/core/src/fixed_hash.rs rename applications/tari_validator_node/src/p2p/proto/conversions/mod.rs => dan_layer/core/src/models/error.rs (84%) create mode 100644 dan_layer/core/src/models/node.rs rename dan_layer/core/src/models/{block.rs => sidechain_block.rs} (75%) create mode 100644 dan_layer/core/src/models/tree_node_hash.rs create mode 100644 dan_layer/core/src/storage/mocks/chain_db.rs create mode 100644 dan_layer/core/src/storage/mocks/mod.rs create mode 100644 dan_layer/core/src/storage/mocks/state_db.rs diff --git a/.gitignore b/.gitignore index f3128c3fcc..f0a7c73bd7 100644 --- a/.gitignore +++ b/.gitignore @@ -44,4 +44,7 @@ node_modules # ignore output files from windows ISS buildtools/Output/ -/applications/tari_collectibles/src-tauri/data \ No newline at end of file +/applications/tari_collectibles/src-tauri/data + +# Asset files +assets/ diff --git a/Cargo.lock b/Cargo.lock index b98ebe082e..bb89122659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6821,6 +6821,7 @@ dependencies = [ "tari_shutdown", "tari_storage", "tari_test_utils 0.8.1", + "tari_utilities", "thiserror", "tokio 1.15.0", "tokio-stream", @@ -7209,6 +7210,7 @@ dependencies = [ "tari_comms_rpc_macros", "tari_core", "tari_crypto", + "tari_dan_common_types", "tari_dan_core", "tari_dan_storage_sqlite", "tari_mmr", @@ -7216,7 +7218,7 @@ dependencies = [ "tari_service_framework", "tari_shutdown", "tari_storage", - "tari_test_utils 0.8.1", + "tari_test_utils 0.27.3", "thiserror", "tokio 1.15.0", "tokio-stream", diff --git a/applications/tari_app_grpc/proto/validator_node.proto b/applications/tari_app_grpc/proto/validator_node.proto index 807a5e80e2..3d930643b7 100644 --- a/applications/tari_app_grpc/proto/validator_node.proto +++ b/applications/tari_app_grpc/proto/validator_node.proto @@ -99,4 +99,4 @@ message InvokeMethodRequest{ message InvokeMethodResponse { string status = 1; bytes result = 2; -} \ No newline at end of file +} diff --git a/applications/tari_validator_node/Cargo.toml b/applications/tari_validator_node/Cargo.toml index 5911551612..d64d87a09d 100644 --- a/applications/tari_validator_node/Cargo.toml +++ b/applications/tari_validator_node/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" [dependencies] tari_app_utilities = { path = "../tari_app_utilities" } -tari_app_grpc = {path = "../tari_app_grpc" } +tari_app_grpc = { path = "../tari_app_grpc" } tari_common = { path = "../../common" } tari_comms = { path = "../../comms" } tari_comms_dht = { path = "../../comms/dht" } @@ -20,10 +20,11 @@ tari_p2p = { path = "../../base_layer/p2p" } tari_service_framework = { path = "../../base_layer/service_framework" } tari_shutdown = { path = "../../infrastructure/shutdown" } tari_storage = { path = "../../infrastructure/storage" } -tari_core = {path = "../../base_layer/core"} -tari_dan_core = {path = "../../dan_layer/core"} -tari_dan_storage_sqlite = {path = "../../dan_layer/storage_sqlite"} -tari_common_types = {path = "../../base_layer/common_types"} +tari_core = { path = "../../base_layer/core" } +tari_dan_core = { path = "../../dan_layer/core" } +tari_dan_storage_sqlite = { path = "../../dan_layer/storage_sqlite" } +tari_dan_common_types = { path = "../../dan_layer/common_types" } +tari_common_types = { path = "../../base_layer/common_types" } anyhow = "1.0.53" async-trait = "0.1.50" @@ -37,7 +38,7 @@ prost = "0.9" prost-types = "0.9" serde = "1.0.126" thiserror = "^1.0.20" -tokio = { version="1.10", features = ["macros", "time", "sync"]} +tokio = { version = "1.10", features = ["macros", "time", "sync"] } tokio-stream = { version = "0.1.7", features = ["sync"] } tonic = "0.6.2" @@ -47,7 +48,7 @@ bytecodec = { version = "0.4.14", features = ["bincode_codec"] } serde_json = "1.0.64" [dev-dependencies] -tari_test_utils = "0.8.1" +tari_test_utils = { path = "../../infrastructure/test_utils" } [build-dependencies] tari_common = { path = "../../common", features = ["build"] } diff --git a/applications/tari_validator_node/build.rs b/applications/tari_validator_node/build.rs index c6ba71ff75..dece9cfad9 100644 --- a/applications/tari_validator_node/build.rs +++ b/applications/tari_validator_node/build.rs @@ -22,7 +22,8 @@ fn main() -> Result<(), Box> { tari_common::build::ProtobufCompiler::new() - .proto_paths(&["proto/p2p"]) + .proto_paths(&["proto/dan"]) + .include_paths(&["proto/dan"]) .emit_rerun_if_changed_directives() .compile() .unwrap(); diff --git a/applications/tari_validator_node/proto/dan/common.proto b/applications/tari_validator_node/proto/dan/common.proto new file mode 100644 index 0000000000..69b727da5a --- /dev/null +++ b/applications/tari_validator_node/proto/dan/common.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package tari.dan.common; + +message SideChainBlock { + Node node = 1; + InstructionSet instructions = 2; +} + +message Node { + bytes hash = 1; + bytes parent = 2; + uint32 height = 3; + bool is_committed = 4; +} + +message Instruction { + uint32 template_id = 1; + string method = 2; + bytes args = 3; + // bytes token_id = 5; + // bytes signature = 6; +} + +message InstructionSet{ + repeated Instruction instructions = 1; +} diff --git a/applications/tari_validator_node/proto/p2p/dan_consensus.proto b/applications/tari_validator_node/proto/dan/consensus.proto similarity index 75% rename from applications/tari_validator_node/proto/p2p/dan_consensus.proto rename to applications/tari_validator_node/proto/dan/consensus.proto index dadba4f39e..4f7fcac31f 100644 --- a/applications/tari_validator_node/proto/p2p/dan_consensus.proto +++ b/applications/tari_validator_node/proto/dan/consensus.proto @@ -1,6 +1,8 @@ syntax = "proto3"; -package tari.p2p.dan; +package tari.dan.consensus; + +import "common.proto"; enum HotStuffMessageType { HOT_STUFF_MESSAGE_TYPE_UNKNOWN = 0; @@ -41,22 +43,10 @@ message Signature{ } message TariDanPayload { - InstructionSet instruction_set = 1; + tari.dan.common.InstructionSet instruction_set = 1; CheckpointData checkpoint = 2; } message CheckpointData { // todo: fill this in } - -message InstructionSet{ - repeated Instruction instructions = 1; -} -message Instruction { - bytes asset_public_key = 1; - uint32 template_id = 2; - string method = 3; - bytes args = 4; -// bytes token_id = 5; -// bytes signature = 6; -} diff --git a/applications/tari_validator_node/proto/p2p/validator_node.proto b/applications/tari_validator_node/proto/dan/validator_node.proto similarity index 90% rename from applications/tari_validator_node/proto/p2p/validator_node.proto rename to applications/tari_validator_node/proto/dan/validator_node.proto index 8def313a24..f43a5982cc 100644 --- a/applications/tari_validator_node/proto/p2p/validator_node.proto +++ b/applications/tari_validator_node/proto/dan/validator_node.proto @@ -21,7 +21,9 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. syntax = "proto3"; -package tari.p2p.validator_node; +package tari.dan.validator_node; + +import "common.proto"; service ValidatorNode { rpc GetTokenData(GetTokenDataRequest) returns (GetTokenDataResponse); @@ -64,4 +66,15 @@ message InvokeMethodRequest{ message InvokeMethodResponse { bytes result = 1; Status status = 2; -} \ No newline at end of file +} + +message GetSidechainBlocksRequest { + bytes asset_public_key = 1; + bytes start_hash = 2; + bytes end_hash = 3; +} + + +message GetSidechainBlocksResponse { + tari.dan.common.SideChainBlock block = 1; +} diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 1ab749cefc..563e02465f 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -115,7 +115,7 @@ impl DanNode { } } info!(target: LOG_TARGET, "Adding asset {:?}", asset.public_key); - let node_identitiy = node_identity.as_ref().clone(); + let node_identity = node_identity.as_ref().clone(); let mempool = mempool_service.clone(); let handles = handles.clone(); let subscription_factory = subscription_factory.clone(); @@ -124,19 +124,16 @@ impl DanNode { let db_factory = db_factory.clone(); tasks.insert( asset.public_key.clone(), - task::spawn(async move { - DanNode::start_asset_worker( - asset.clone(), - node_identitiy, - mempool, - handles, - subscription_factory, - shutdown, - dan_config, - db_factory, - ) - .await - }), + task::spawn(DanNode::start_asset_worker( + asset.clone(), + node_identity, + mempool, + handles, + subscription_factory, + shutdown, + dan_config, + db_factory, + )), ); } } diff --git a/applications/tari_validator_node/src/grpc/conversions.rs b/applications/tari_validator_node/src/grpc/conversions.rs index f8b08201f6..1f24e37b96 100644 --- a/applications/tari_validator_node/src/grpc/conversions.rs +++ b/applications/tari_validator_node/src/grpc/conversions.rs @@ -30,7 +30,7 @@ impl From for St { Self(tari_rpc::SidechainMetadata { asset_public_key: source.asset_public_key().as_bytes().to_vec(), committed_height: source.committed_height().into(), - committed_hash: source.committed_hash().clone().into(), + committed_hash: source.committed_hash().as_bytes().to_vec(), }) } } diff --git a/applications/tari_validator_node/src/grpc/mod.rs b/applications/tari_validator_node/src/grpc/mod.rs index 125971e942..10ea203ea1 100644 --- a/applications/tari_validator_node/src/grpc/mod.rs +++ b/applications/tari_validator_node/src/grpc/mod.rs @@ -19,6 +19,6 @@ // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -pub(crate) mod conversions; +mod conversions; pub mod services; pub(crate) mod validator_node_grpc_server; diff --git a/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs b/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs index c052d4c66e..0bd54a1a24 100644 --- a/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs +++ b/applications/tari_validator_node/src/grpc/validator_node_grpc_server.rs @@ -19,12 +19,13 @@ // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::convert::TryInto; + use tari_app_grpc::tari_rpc as rpc; use tari_common_types::types::PublicKey; use tari_comms::NodeIdentity; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ - models::TemplateId, services::{AssetProcessor, AssetProxy, ServiceSpecification}, storage::DbFactory, }; @@ -90,7 +91,10 @@ impl rpc::validator_node_ .asset_proxy .invoke_method( &asset_public_key, - request.template_id.into(), + request + .template_id + .try_into() + .map_err(|_| Status::invalid_argument("invalid template_id"))?, request.method.clone(), request.args.clone(), ) @@ -130,6 +134,10 @@ impl rpc::validator_node_ let request = request.into_inner(); let asset_public_key = PublicKey::from_bytes(&request.asset_public_key) .map_err(|err| Status::invalid_argument(format!("Asset public key was not a valid public key:{}", err)))?; + let template_id = request + .template_id + .try_into() + .map_err(|_| Status::invalid_argument("Invalid template_id"))?; if let Some(state) = self .db_factory .get_state_db(&asset_public_key) @@ -138,12 +146,7 @@ impl rpc::validator_node_ let mut unit_of_work = state.new_unit_of_work(); let response_bytes = self .asset_processor - .invoke_read_method( - TemplateId::from(request.template_id), - request.method, - &request.args, - &mut unit_of_work, - ) + .invoke_read_method(template_id, request.method, &request.args, &mut unit_of_work) .map_err(|e| Status::internal(format!("Could not invoke read method: {}", e)))?; Ok(Response::new(rpc::InvokeReadMethodResponse { result: response_bytes.unwrap_or_default(), @@ -157,12 +160,7 @@ impl rpc::validator_node_ // Forward to proxy let response_bytes = self .asset_proxy - .invoke_read_method( - &asset_public_key, - TemplateId::from(request.template_id), - request.method, - request.args, - ) + .invoke_read_method(&asset_public_key, template_id, request.method, request.args) .await .map_err(|err| Status::internal(format!("Error calling proxied method:{}", err)))?; // TODO: Populate authority diff --git a/applications/tari_validator_node/src/p2p/proto/conversions/dan.rs b/applications/tari_validator_node/src/p2p/proto/conversions.rs similarity index 57% rename from applications/tari_validator_node/src/p2p/proto/conversions/dan.rs rename to applications/tari_validator_node/src/p2p/proto/conversions.rs index 964b6542d7..1bb4bd7012 100644 --- a/applications/tari_validator_node/src/p2p/proto/conversions/dan.rs +++ b/applications/tari_validator_node/src/p2p/proto/conversions.rs @@ -1,4 +1,4 @@ -// Copyright 2021, The Tari Project +// Copyright 2022, The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the // following conditions are met: @@ -31,17 +31,20 @@ use tari_dan_core::models::{ HotStuffTreeNode, Instruction, InstructionSet, + Node, QuorumCertificate, + SideChainBlock, Signature, StateRoot, TariDanPayload, + TemplateId, TreeNodeHash, ViewId, }; -use crate::p2p::proto::dan as dan_proto; +use crate::p2p::proto; -impl From> for dan_proto::HotStuffMessage { +impl From> for proto::consensus::HotStuffMessage { fn from(source: HotStuffMessage) -> Self { Self { message_type: source.message_type().as_u8() as i32, @@ -49,13 +52,18 @@ impl From> for dan_proto::HotStuffMessage { justify: source.justify().map(|j| j.clone().into()), partial_sig: source.partial_sig().map(|s| s.clone().into()), view_number: source.view_number().as_u64(), - node_hash: source.node_hash().map(|s| s.0.clone()).unwrap_or_default(), + node_hash: source + .node_hash() + .copied() + .unwrap_or_else(TreeNodeHash::zero) + .as_bytes() + .to_vec(), asset_public_key: source.asset_public_key().to_vec(), } } } -impl From> for dan_proto::HotStuffTreeNode { +impl From> for proto::consensus::HotStuffTreeNode { fn from(source: HotStuffTreeNode) -> Self { Self { parent: Vec::from(source.parent().as_bytes()), @@ -66,7 +74,7 @@ impl From> for dan_proto::HotStuffTreeNode { } } -impl From for dan_proto::QuorumCertificate { +impl From for proto::consensus::QuorumCertificate { fn from(source: QuorumCertificate) -> Self { Self { message_type: source.message_type().as_u8() as i32, @@ -77,13 +85,13 @@ impl From for dan_proto::QuorumCertificate { } } -impl From for dan_proto::Signature { +impl From for proto::consensus::Signature { fn from(_s: Signature) -> Self { Self {} } } -impl From for dan_proto::TariDanPayload { +impl From for proto::consensus::TariDanPayload { fn from(source: TariDanPayload) -> Self { let (instruction_set, checkpoint) = source.destruct(); Self { @@ -93,23 +101,15 @@ impl From for dan_proto::TariDanPayload { } } -impl From for dan_proto::CheckpointData { +impl From for proto::consensus::CheckpointData { fn from(_source: CheckpointData) -> Self { Self {} } } -impl From for dan_proto::InstructionSet { - fn from(source: InstructionSet) -> Self { - Self { - instructions: source.instructions().iter().map(|i| i.into()).collect(), - } - } -} -impl From<&Instruction> for dan_proto::Instruction { +impl From<&Instruction> for proto::common::Instruction { fn from(source: &Instruction) -> Self { Self { - asset_public_key: Vec::from(source.asset_id().as_bytes()), template_id: source.template_id() as u32, method: source.method().to_string(), args: Vec::from(source.args()), @@ -117,14 +117,14 @@ impl From<&Instruction> for dan_proto::Instruction { } } -impl TryFrom for HotStuffMessage { +impl TryFrom for HotStuffMessage { type Error = String; - fn try_from(value: dan_proto::HotStuffMessage) -> Result { + fn try_from(value: proto::consensus::HotStuffMessage) -> Result { let node_hash = if value.node_hash.is_empty() { None } else { - Some(TreeNodeHash(value.node_hash)) + Some(TreeNodeHash::try_from(value.node_hash).map_err(|err| err.to_string())?) }; Ok(Self::new( ViewId(value.view_number), @@ -139,28 +139,28 @@ impl TryFrom for HotStuffMessage { } } -impl TryFrom for QuorumCertificate { +impl TryFrom for QuorumCertificate { type Error = String; - fn try_from(value: dan_proto::QuorumCertificate) -> Result { + fn try_from(value: proto::consensus::QuorumCertificate) -> Result { Ok(Self::new( HotStuffMessageType::try_from(value.message_type as u8)?, ViewId(value.view_number), - TreeNodeHash(value.node_hash), + TreeNodeHash::try_from(value.node_hash).map_err(|err| err.to_string())?, value.signature.map(|s| s.try_into()).transpose()?, )) } } -impl TryFrom for HotStuffTreeNode { +impl TryFrom for HotStuffTreeNode { type Error = String; - fn try_from(value: dan_proto::HotStuffTreeNode) -> Result { + fn try_from(value: proto::consensus::HotStuffTreeNode) -> Result { if value.parent.is_empty() { return Err("parent not provided".to_string()); } Ok(Self::new( - TreeNodeHash(value.parent), + TreeNodeHash::try_from(value.parent).map_err(|err| err.to_string())?, value .payload .map(|p| p.try_into()) @@ -172,45 +172,48 @@ impl TryFrom for HotStuffTreeNode { } } -impl TryFrom for Signature { +impl TryFrom for Signature { type Error = String; - fn try_from(_value: dan_proto::Signature) -> Result { + fn try_from(_value: proto::consensus::Signature) -> Result { Ok(Self {}) } } -impl TryFrom for InstructionSet { +impl TryFrom for InstructionSet { type Error = String; - fn try_from(value: dan_proto::InstructionSet) -> Result { + fn try_from(value: proto::common::InstructionSet) -> Result { let instructions: Vec = value .instructions .into_iter() .map(|i| i.try_into()) .collect::>()?; - Ok(Self::from_slice(&instructions)) + Ok(Self::from_vec(instructions)) + } +} + +impl From for proto::common::InstructionSet { + fn from(value: InstructionSet) -> Self { + Self { + instructions: value.instructions().iter().map(Into::into).collect(), + } } } -impl TryFrom for Instruction { +impl TryFrom for Instruction { type Error = String; - fn try_from(value: dan_proto::Instruction) -> Result { - Ok(Self::new( - PublicKey::from_bytes(&value.asset_public_key) - .map_err(|e| format!("asset_id was not a valid public key: {}", e))?, - value.template_id.into(), - value.method, - value.args, - )) + fn try_from(value: proto::common::Instruction) -> Result { + let template_id = TemplateId::try_from(value.template_id).map_err(|err| err.to_string())?; + Ok(Self::new(template_id, value.method, value.args)) } } -impl TryFrom for TariDanPayload { +impl TryFrom for TariDanPayload { type Error = String; - fn try_from(value: dan_proto::TariDanPayload) -> Result { + fn try_from(value: proto::consensus::TariDanPayload) -> Result { let instruction_set = value .instruction_set .ok_or_else(|| "Instructions were not present".to_string())? @@ -221,10 +224,60 @@ impl TryFrom for TariDanPayload { } } -impl TryFrom for CheckpointData { +impl TryFrom for CheckpointData { type Error = String; - fn try_from(_value: dan_proto::CheckpointData) -> Result { + fn try_from(_value: proto::consensus::CheckpointData) -> Result { Ok(Self::default()) } } + +impl From for proto::common::SideChainBlock { + fn from(block: SideChainBlock) -> Self { + let (node, instructions) = block.destruct(); + Self { + node: Some(node.into()), + instructions: Some(instructions.into()), + } + } +} + +impl TryFrom for SideChainBlock { + type Error = String; + + fn try_from(block: proto::common::SideChainBlock) -> Result { + let node = block + .node + .map(TryInto::try_into) + .ok_or_else(|| "No node provided in sidechain block".to_string())??; + let instructions = block + .instructions + .map(TryInto::try_into) + .ok_or_else(|| "No InstructionSet provided in sidechain block".to_string())??; + Ok(Self::new(node, instructions)) + } +} + +impl From for proto::common::Node { + fn from(node: Node) -> Self { + Self { + hash: node.hash().as_bytes().to_vec(), + parent: node.parent().as_bytes().to_vec(), + height: node.height(), + is_committed: node.is_committed(), + } + } +} + +impl TryFrom for Node { + type Error = String; + + fn try_from(node: proto::common::Node) -> Result { + let hash = TreeNodeHash::try_from(node.hash).map_err(|err| err.to_string())?; + let parent = TreeNodeHash::try_from(node.parent).map_err(|err| err.to_string())?; + let height = node.height; + let is_committed = node.is_committed; + + Ok(Self::new(hash, parent, height, is_committed)) + } +} diff --git a/applications/tari_validator_node/src/p2p/proto/mod.rs b/applications/tari_validator_node/src/p2p/proto/mod.rs index e5a71258f4..185d9ba540 100644 --- a/applications/tari_validator_node/src/p2p/proto/mod.rs +++ b/applications/tari_validator_node/src/p2p/proto/mod.rs @@ -1,31 +1,34 @@ -// Copyright 2021. The Tari Project +// Copyright 2022, The Tari Project // -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: // -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. // -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. // -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. // -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. mod conversions; -pub mod dan { - include!(concat!(env!("OUT_DIR"), "/tari.p2p.dan.rs")); +pub mod common { + include!(concat!(env!("OUT_DIR"), "/tari.dan.common.rs")); +} + +pub mod consensus { + include!(concat!(env!("OUT_DIR"), "/tari.dan.consensus.rs")); } pub mod validator_node { - include!(concat!(env!("OUT_DIR"), "/tari.p2p.validator_node.rs")); + include!(concat!(env!("OUT_DIR"), "/tari.dan.validator_node.rs")); } diff --git a/applications/tari_validator_node/src/p2p/rpc/mod.rs b/applications/tari_validator_node/src/p2p/rpc/mod.rs index f77d6a832b..e888b36430 100644 --- a/applications/tari_validator_node/src/p2p/rpc/mod.rs +++ b/applications/tari_validator_node/src/p2p/rpc/mod.rs @@ -22,15 +22,18 @@ mod service_impl; +#[cfg(test)] +mod test; + pub use service_impl::ValidatorNodeRpcServiceImpl; -use tari_comms::protocol::rpc::{Request, Response, RpcStatus}; +use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; use tari_comms_rpc_macros::tari_rpc; use tari_dan_core::{ services::{AssetProcessor, MempoolService}, storage::DbFactory, }; -use super::proto::validator_node as proto; +use crate::p2p::proto::validator_node as proto; #[tari_rpc(protocol_name = b"t/vn/1", server_struct = ValidatorNodeRpcServer, client_struct = ValidatorNodeRpcClient)] pub trait ValidatorNodeRpcService: Send + Sync + 'static { @@ -51,6 +54,12 @@ pub trait ValidatorNodeRpcService: Send + Sync + 'static { &self, request: Request, ) -> Result, RpcStatus>; + + #[rpc(method = 4)] + async fn get_sidechain_blocks( + &self, + request: Request, + ) -> Result, RpcStatus>; } pub fn create_validator_node_rpc_service< diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index 2c293b2a12..8e6e7fab98 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -1,10 +1,10 @@ // Copyright 2021, The Tari Project // -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that +// the following conditions are met: // -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the +// following disclaimer. // // 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the // following disclaimer in the documentation and/or other materials provided with the distribution. @@ -12,21 +12,28 @@ // 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote // products derived from this software without specific prior written permission. // -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +// DAMAGE. +use std::convert::TryFrom; + +use log::*; use tari_common_types::types::PublicKey; -use tari_comms::protocol::rpc::{Request, Response, RpcStatus}; +use tari_comms::protocol::rpc::{Request, Response, RpcStatus, Streaming}; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ - models::{Instruction, TemplateId}, + models::{Instruction, TemplateId, TreeNodeHash}, services::{AssetProcessor, MempoolService}, storage::DbFactory, }; +use tokio::{sync::mpsc, task}; + +const LOG_TARGET: &str = "vn::p2p::rpc"; use crate::p2p::{proto::validator_node as proto, rpc::ValidatorNodeRpcService}; @@ -52,11 +59,12 @@ impl< } #[tari_comms::async_trait] -impl< - TMempoolService: MempoolService + Clone, - TDbFactory: DbFactory + Clone, - TAssetProcessor: AssetProcessor + Clone, - > ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl +impl ValidatorNodeRpcService + for ValidatorNodeRpcServiceImpl +where + TMempoolService: MempoolService + Clone, + TDbFactory: DbFactory + Clone, + TAssetProcessor: AssetProcessor + Clone, { async fn get_token_data( &self, @@ -83,7 +91,7 @@ impl< let response_bytes = self .asset_processor .invoke_read_method( - TemplateId::from(request.template_id), + TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, request.method, &request.args, &mut unit_of_work, @@ -101,9 +109,7 @@ impl< dbg!(&request); let request = request.into_message(); let instruction = Instruction::new( - PublicKey::from_bytes(&request.asset_public_key) - .map_err(|_err| RpcStatus::bad_request("asset_public_key was not a valid public key"))?, - request.template_id.into(), + TemplateId::try_from(request.template_id).map_err(|_| RpcStatus::bad_request("Invalid template_id"))?, request.method.clone(), request.args.clone(), /* TokenId(request.token_id.clone()), @@ -112,21 +118,107 @@ impl< * create_com_sig_from_bytes(&request.signature) * .map_err(|err| Status::invalid_argument("signature was not a valid comsig"))?, */ ); - + debug!(target: LOG_TARGET, "Submitting instruction {} to mempool", instruction); let mut mempool_service = self.mempool_service.clone(); match mempool_service.submit_instruction(instruction).await { Ok(_) => { + debug!(target: LOG_TARGET, "Accepted instruction into mempool"); return Ok(Response::new(proto::InvokeMethodResponse { result: vec![], status: proto::Status::Accepted as i32, - })) + })); }, - Err(_) => { + Err(err) => { + debug!(target: LOG_TARGET, "Mempool rejected instruction: {}", err); return Ok(Response::new(proto::InvokeMethodResponse { result: vec![], status: proto::Status::Errored as i32, - })) + })); }, } } + + async fn get_sidechain_blocks( + &self, + request: Request, + ) -> Result, RpcStatus> { + let msg = request.into_message(); + + let asset_public_key = PublicKey::from_bytes(&msg.asset_public_key) + .map_err(|_| RpcStatus::bad_request("Invalid asset_public_key"))?; + let start_hash = + TreeNodeHash::try_from(msg.start_hash).map_err(|_| RpcStatus::bad_request("Invalid start hash"))?; + + let end_hash = Some(msg.end_hash) + .filter(|h| !h.is_empty()) + .map(TreeNodeHash::try_from) + .transpose() + .map_err(|_| RpcStatus::bad_request("Invalid end_hash"))?; + + let db = self + .db_factory + .get_chain_db(&asset_public_key) + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found("Asset not found"))?; + + let start_block = db + .find_sidechain_block_by_node_hash(&start_hash) + .map_err(RpcStatus::log_internal_error(LOG_TARGET))? + .ok_or_else(|| RpcStatus::not_found(format!("Block not found with start_hash '{}'", start_hash)))?; + + let end_block_exists = end_hash + .as_ref() + .map(|end_hash| db.sidechain_block_exists(end_hash)) + .transpose() + .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; + + if !end_block_exists.unwrap_or(true) { + return Err(RpcStatus::not_found(format!( + "Block not found with end_hash '{}'", + end_hash.unwrap_or_else(TreeNodeHash::zero) + ))); + } + + let (tx, rx) = mpsc::channel(2); + + task::spawn(async move { + let mut current_block_hash = *start_block.node().hash(); + if tx + .send(Ok(proto::GetSidechainBlocksResponse { + block: Some(start_block.into()), + })) + .await + .is_err() + { + return; + } + loop { + match db.find_sidechain_block_by_parent_node_hash(¤t_block_hash) { + Ok(Some(block)) => { + current_block_hash = *block.node().hash(); + if tx + .send(Ok(proto::GetSidechainBlocksResponse { + block: Some(block.into()), + })) + .await + .is_err() + { + return; + } + if end_hash.map(|h| h == current_block_hash).unwrap_or(false) { + return; + } + }, + Ok(None) => return, + Err(err) => { + error!(target: LOG_TARGET, "Failure while streaming blocks: {}", err); + let _ = tx.send(Err(RpcStatus::general("Internal database failure"))).await; + return; + }, + } + } + }); + + Ok(Streaming::new(rx)) + } } diff --git a/applications/tari_validator_node/src/p2p/rpc/test.rs b/applications/tari_validator_node/src/p2p/rpc/test.rs new file mode 100644 index 0000000000..e7ebbcbbaf --- /dev/null +++ b/applications/tari_validator_node/src/p2p/rpc/test.rs @@ -0,0 +1,132 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::convert::TryFrom; + +use tari_common_types::types::PublicKey; +use tari_comms::{protocol::rpc::mock::RpcRequestMock, test_utils}; +use tari_crypto::tari_utilities::{hex::Hex, ByteArray}; +use tari_dan_core::{ + fixed_hash::FixedHash, + models::{Node, TreeNodeHash}, + services::mocks::{MockAssetProcessor, MockMempoolService}, + storage::{chain::ChainDbUnitOfWork, mocks::MockDbFactory, DbFactory}, +}; +use tari_test_utils::{paths::tempdir, streams::convert_mpsc_to_stream}; +use tokio_stream::StreamExt; + +use crate::p2p::{ + proto, + rpc::{ValidatorNodeRpcService, ValidatorNodeRpcServiceImpl}, +}; + +fn setup() -> ( + ValidatorNodeRpcServiceImpl, + RpcRequestMock, + MockDbFactory, +) { + let tmp = tempdir().unwrap(); + let peer_manager = test_utils::build_peer_manager(&tmp); + let mock = RpcRequestMock::new(peer_manager); + let mempool = MockMempoolService; + let db_factory = MockDbFactory::default(); + let asset_processor = MockAssetProcessor; + let service = ValidatorNodeRpcServiceImpl::new(mempool, db_factory.clone(), asset_processor); + + (service, mock, db_factory) +} + +mod get_sidechain_blocks { + + use super::*; + + #[tokio::test] + async fn it_fetches_matching_block() { + let (service, mock, db_factory) = setup(); + let asset_public_key = PublicKey::default(); + let db = db_factory.get_or_create_chain_db(&asset_public_key).unwrap(); + let mut uow = db.new_unit_of_work(); + + // Some random parent hash to ensure stream does not last forever + let parent = + TreeNodeHash::from_hex("972209d3622c1227a499fd2cfcfa75fdde547d1a21fa805522d3a1a315ebd1a3").unwrap(); + uow.add_node(TreeNodeHash::zero(), parent, 1).unwrap(); + uow.commit().unwrap(); + + let req = proto::validator_node::GetSidechainBlocksRequest { + asset_public_key: asset_public_key.to_vec(), + start_hash: TreeNodeHash::zero().as_bytes().to_vec(), + end_hash: vec![], + }; + let req = mock.request_with_context(Default::default(), req); + let mut resp = service.get_sidechain_blocks(req).await.unwrap().into_inner(); + let stream = convert_mpsc_to_stream(&mut resp).map(|r| r.unwrap()); + + let responses = stream + .collect::>() + .await; + assert_eq!(responses.len(), 1); + let node = Node::new(TreeNodeHash::zero(), parent, 1, false); + let block = responses[0].block.clone(); + assert_eq!( + Node::try_from(block.as_ref().unwrap().node.clone().unwrap()).unwrap(), + node + ); + assert_eq!( + block.as_ref().unwrap().instructions.clone().unwrap().instructions.len(), + 0 + ); + } + + #[tokio::test] + async fn it_errors_if_asset_not_found() { + let (service, mock, _) = setup(); + + let req = proto::validator_node::GetSidechainBlocksRequest { + asset_public_key: PublicKey::default().to_vec(), + start_hash: FixedHash::zero().as_bytes().to_vec(), + end_hash: vec![], + }; + let req = mock.request_with_context(Default::default(), req); + let err = service.get_sidechain_blocks(req).await.unwrap_err(); + assert!(err.as_status_code().is_not_found()); + assert_eq!(err.details(), "Asset not found"); + } + + #[tokio::test] + async fn it_errors_if_block_not_found() { + let (service, mock, db_factory) = setup(); + let asset_public_key = PublicKey::default(); + let db = db_factory.get_or_create_chain_db(&asset_public_key).unwrap(); + db.new_unit_of_work().commit().unwrap(); + + let req = proto::validator_node::GetSidechainBlocksRequest { + asset_public_key: asset_public_key.to_vec(), + start_hash: FixedHash::zero().as_bytes().to_vec(), + end_hash: vec![], + }; + let req = mock.request_with_context(Default::default(), req); + let err = service.get_sidechain_blocks(req).await.unwrap_err(); + assert!(err.as_status_code().is_not_found()); + assert!(err.details().starts_with("Block not found")); + } +} diff --git a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs index a54bd9d79d..2b48deba66 100644 --- a/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/inbound_connection_service.rs @@ -205,7 +205,7 @@ impl TariCommsInboundConnectionService { async fn forward_message(&mut self, message: Arc) -> Result<(), DigitalAssetError> { // let from = message.authenticated_origin.as_ref().unwrap().clone(); let from = message.source_peer.public_key.clone(); - let proto_message: proto::dan::HotStuffMessage = message.decode_message().unwrap(); + let proto_message: proto::consensus::HotStuffMessage = message.decode_message().unwrap(); let hot_stuff_message: HotStuffMessage = proto_message .try_into() .map_err(DigitalAssetError::InvalidPeerMessage)?; diff --git a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs index 6c2a202337..f90935f3b1 100644 --- a/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs +++ b/applications/tari_validator_node/src/p2p/services/outbound_connection_service.rs @@ -75,7 +75,7 @@ impl OutboundService for TariCommsOutboundServic return Ok(()); } - let inner = proto::dan::HotStuffMessage::from(message); + let inner = proto::consensus::HotStuffMessage::from(message); let tari_message = OutboundDomainMessage::new(TariMessageType::DanConsensusMessage, inner); self.outbound_message_requester diff --git a/applications/tari_validator_node/src/p2p/services/rpc_client.rs b/applications/tari_validator_node/src/p2p/services/rpc_client.rs index 8bd43fe0d8..e009a33c90 100644 --- a/applications/tari_validator_node/src/p2p/services/rpc_client.rs +++ b/applications/tari_validator_node/src/p2p/services/rpc_client.rs @@ -20,7 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::convert::TryInto; + use async_trait::async_trait; +use log::*; use tari_common_types::types::PublicKey; use tari_comms::{ connection_manager::ConnectionManagerError, @@ -31,13 +34,15 @@ use tari_comms::{ use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester}; use tari_crypto::tari_utilities::ByteArray; use tari_dan_core::{ - models::TemplateId, + models::{SideChainBlock, TemplateId, TreeNodeHash}, services::{ValidatorNodeClientFactory, ValidatorNodeRpcClient}, DigitalAssetError, }; +use tokio_stream::StreamExt; use crate::p2p::{proto::validator_node as proto, rpc}; -// const LOG_TARGET: &str = "tari::validator_node::p2p::services::rpc_client"; + +const LOG_TARGET: &str = "tari::validator_node::p2p::services::rpc_client"; pub struct TariCommsValidatorNodeRpcClient { connectivity: ConnectivityRequester, @@ -93,6 +98,10 @@ impl ValidatorNodeRpcClient for TariCommsValidatorNodeRpcClient { method: String, args: Vec, ) -> Result>, DigitalAssetError> { + debug!( + target: LOG_TARGET, + r#"Invoking read method "{}" for asset '{}'"#, method, asset_public_key + ); let mut connection = self.create_connection().await?; let mut client = connection.connect_rpc::().await?; let request = proto::InvokeReadMethodRequest { @@ -117,6 +126,10 @@ impl ValidatorNodeRpcClient for TariCommsValidatorNodeRpcClient { method: String, args: Vec, ) -> Result>, DigitalAssetError> { + debug!( + target: LOG_TARGET, + r#"Invoking method "{}" for asset '{}'"#, method, asset_public_key + ); let mut connection = self.create_connection().await?; let mut client = connection.connect_rpc::().await?; let request = proto::InvokeMethodRequest { @@ -126,11 +139,50 @@ impl ValidatorNodeRpcClient for TariCommsValidatorNodeRpcClient { args, }; let response = client.invoke_method(request).await?; - Ok(if response.result.is_empty() { - None + + debug!( + target: LOG_TARGET, + "Validator node '{}' returned status '{}' for asset '{}'", self.address, response.status, asset_public_key + ); + if response.result.is_empty() { + Ok(None) } else { - Some(response.result) - }) + Ok(Some(response.result)) + } + } + + async fn get_sidechain_blocks( + &mut self, + asset_public_key: &PublicKey, + start_hash: TreeNodeHash, + end_hash: Option, + ) -> Result, DigitalAssetError> { + let mut connection = self.create_connection().await?; + let mut client = connection.connect_rpc::().await?; + let request = proto::GetSidechainBlocksRequest { + asset_public_key: asset_public_key.to_vec(), + start_hash: start_hash.as_bytes().to_vec(), + end_hash: end_hash.map(|h| h.as_bytes().to_vec()).unwrap_or_default(), + }; + + let stream = client.get_sidechain_blocks(request).await?; + // TODO: By first collecting all the blocks, we lose the advantage of streaming. Since you cannot return + // `Result, _>`, and the Map type is private in tokio-stream, its a little tricky to + // return the stream and not leak the RPC response type out of the client + let blocks = stream + .map(|result| { + let resp = result.map_err(DigitalAssetError::from)?; + let block: SideChainBlock = resp + .block + .ok_or_else(|| DigitalAssetError::ConversionError("Node returned empty block".to_string()))? + .try_into() + .map_err(DigitalAssetError::ConversionError)?; + Ok(block) + }) + .collect::>() + .await?; + + Ok(blocks) } } diff --git a/comms/src/connection_manager/tests/listener_dialer.rs b/comms/src/connection_manager/tests/listener_dialer.rs index 1fc21c7c76..5e0b26a92a 100644 --- a/comms/src/connection_manager/tests/listener_dialer.rs +++ b/comms/src/connection_manager/tests/listener_dialer.rs @@ -44,7 +44,7 @@ use crate::{ peer_manager::PeerFeatures, protocol::ProtocolId, runtime, - test_utils::{node_identity::build_node_identity, test_node::build_peer_manager}, + test_utils::{build_peer_manager, node_identity::build_node_identity}, transports::MemoryTransport, }; diff --git a/comms/src/connection_manager/tests/manager.rs b/comms/src/connection_manager/tests/manager.rs index ce27bc96c9..8fd87baad3 100644 --- a/comms/src/connection_manager/tests/manager.rs +++ b/comms/src/connection_manager/tests/manager.rs @@ -46,9 +46,10 @@ use crate::{ runtime, runtime::task, test_utils::{ + build_peer_manager, count_string_occurrences, node_identity::{build_node_identity, ordered_node_identities}, - test_node::{build_connection_manager, build_peer_manager, TestNodeConfig}, + test_node::{build_connection_manager, TestNodeConfig}, }, transports::{MemoryTransport, TcpTransport}, }; diff --git a/comms/src/connectivity/test.rs b/comms/src/connectivity/test.rs index d75030e0be..e662232d56 100644 --- a/comms/src/connectivity/test.rs +++ b/comms/src/connectivity/test.rs @@ -40,9 +40,9 @@ use crate::{ runtime, runtime::task, test_utils::{ + build_peer_manager, mocks::{create_connection_manager_mock, create_peer_connection_mock_pair, ConnectionManagerMockState}, node_identity::{build_many_node_identities, build_node_identity}, - test_node::build_peer_manager, }, NodeIdentity, PeerManager, diff --git a/comms/src/protocol/rpc/context.rs b/comms/src/protocol/rpc/context.rs index a0b2a2e6f3..142e028162 100644 --- a/comms/src/protocol/rpc/context.rs +++ b/comms/src/protocol/rpc/context.rs @@ -58,6 +58,10 @@ impl RpcCommsBackend { pub fn peer_manager(&self) -> &PeerManager { &self.peer_manager } + + pub fn peer_manager_owned(&self) -> Arc { + self.peer_manager.clone() + } } #[async_trait] diff --git a/comms/src/protocol/rpc/test/mock.rs b/comms/src/protocol/rpc/test/mock.rs index 276adf2643..a14d9cc0cd 100644 --- a/comms/src/protocol/rpc/test/mock.rs +++ b/comms/src/protocol/rpc/test/mock.rs @@ -50,8 +50,8 @@ use crate::{ ProtocolId, }, test_utils::{ + build_peer_manager, mocks::{create_connectivity_mock, ConnectivityManagerMockState}, - test_node::build_peer_manager, }, }; diff --git a/comms/src/test_utils/mod.rs b/comms/src/test_utils/mod.rs index f68825a1ef..5471fa4dab 100644 --- a/comms/src/test_utils/mod.rs +++ b/comms/src/test_utils/mod.rs @@ -23,13 +23,16 @@ cfg_test! { #[allow(dead_code)] pub mod factories; - - pub mod test_node; + pub(crate) mod test_node; } pub mod mocks; pub mod node_id; pub mod node_identity; + +pub mod peer_manager; +pub use peer_manager::build_peer_manager; + pub mod transport; pub fn count_string_occurrences(items: T, expected: &[&str]) -> usize diff --git a/comms/src/test_utils/peer_manager.rs b/comms/src/test_utils/peer_manager.rs new file mode 100644 index 0000000000..3c5d67bea2 --- /dev/null +++ b/comms/src/test_utils/peer_manager.rs @@ -0,0 +1,63 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::sync::Arc; + +use crate::PeerManager; + +#[cfg(test)] +pub fn build_peer_manager() -> Arc { + Arc::new(PeerManager::new(tari_storage::HashmapDatabase::new(), None).unwrap()) +} + +#[cfg(not(test))] +pub use not_test::build_peer_manager; + +#[cfg(not(test))] +mod not_test { + use std::{iter, path::Path}; + + use rand::{distributions::Alphanumeric, Rng}; + use tari_storage::{lmdb_store::LMDBBuilder, LMDBWrapper}; + + use super::*; + + pub fn build_peer_manager>(data_path: P) -> Arc { + let peer_database_name = { + let mut rng = rand::thread_rng(); + iter::repeat(()) + .map(|_| rng.sample(Alphanumeric) as char) + .take(8) + .collect::() + }; + std::fs::create_dir_all(&data_path).unwrap(); + let datastore = LMDBBuilder::new() + .set_path(data_path) + .set_env_config(Default::default()) + .set_max_number_of_databases(1) + .add_database(&peer_database_name, lmdb_zero::db::CREATE) + .build() + .unwrap(); + let peer_database = datastore.get_handle(&peer_database_name).unwrap(); + Arc::new(PeerManager::new(LMDBWrapper::new(Arc::new(peer_database)), None).unwrap()) + } +} diff --git a/comms/src/test_utils/test_node.rs b/comms/src/test_utils/test_node.rs index 3e5d7229a0..38b614c41f 100644 --- a/comms/src/test_utils/test_node.rs +++ b/comms/src/test_utils/test_node.rs @@ -20,6 +20,15 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::{sync::Arc, time::Duration}; + +use rand::rngs::OsRng; +use tari_shutdown::ShutdownSignal; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{broadcast, mpsc}, +}; + use crate::{ backoff::ConstantBackoff, connection_manager::{ConnectionManager, ConnectionManagerConfig, ConnectionManagerRequester}, @@ -29,14 +38,6 @@ use crate::{ protocol::Protocols, transports::Transport, }; -use rand::rngs::OsRng; -use std::{sync::Arc, time::Duration}; -use tari_shutdown::ShutdownSignal; -use tari_storage::HashmapDatabase; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - sync::{broadcast, mpsc}, -}; #[derive(Clone, Debug)] pub struct TestNodeConfig { @@ -99,7 +100,3 @@ where requester } - -pub fn build_peer_manager() -> Arc { - Arc::new(PeerManager::new(HashmapDatabase::new(), None).unwrap()) -} diff --git a/dan_layer/core/Cargo.toml b/dan_layer/core/Cargo.toml index f3b44c3f6e..59c417fb27 100644 --- a/dan_layer/core/Cargo.toml +++ b/dan_layer/core/Cargo.toml @@ -19,6 +19,7 @@ tari_storage = { path = "../../infrastructure/storage" } tari_core = {path = "../../base_layer/core"} tari_dan_common_types = {path = "../common_types"} tari_common_types = {path = "../../base_layer/common_types"} +tari_utilities = "0.3.0" anyhow = "1.0.53" async-trait = "0.1.50" diff --git a/dan_layer/core/src/digital_assets_error.rs b/dan_layer/core/src/digital_assets_error.rs index a785dd6e1e..470958b557 100644 --- a/dan_layer/core/src/digital_assets_error.rs +++ b/dan_layer/core/src/digital_assets_error.rs @@ -20,11 +20,14 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use tari_comms::{connectivity::ConnectivityError, protocol::rpc::RpcError}; +use tari_comms::{ + connectivity::ConnectivityError, + protocol::rpc::{RpcError, RpcStatus}, +}; use tari_comms_dht::DhtDiscoveryError; use thiserror::Error; -use crate::storage::StorageError; +use crate::{models::ModelError, storage::StorageError}; #[derive(Debug, Error)] pub enum DigitalAssetError { @@ -72,10 +75,14 @@ pub enum DigitalAssetError { ConnectivityError(#[from] ConnectivityError), #[error("RpcError: {0}")] RpcError(#[from] RpcError), + #[error("Remote node returned error: {0}")] + RpcStatusError(#[from] RpcStatus), #[error("Dht Discovery error: {0}")] DhtDiscoveryError(#[from] DhtDiscoveryError), #[error("Fatal error: {0}")] FatalError(String), + #[error(transparent)] + ModelError(#[from] ModelError), } impl From for DigitalAssetError { diff --git a/dan_layer/core/src/fixed_hash.rs b/dan_layer/core/src/fixed_hash.rs new file mode 100644 index 0000000000..66281ab5b9 --- /dev/null +++ b/dan_layer/core/src/fixed_hash.rs @@ -0,0 +1,101 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{convert::TryFrom, ops::Deref}; + +use digest::consts::U32; +use tari_utilities::hex::{Hex, HexError}; + +const ZERO_HASH: [u8; FixedHash::byte_size()] = [0u8; FixedHash::byte_size()]; + +#[derive(thiserror::Error, Debug)] +#[error("Invalid size")] +pub struct FixedHashSizeError; + +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] +pub struct FixedHash([u8; FixedHash::byte_size()]); + +impl FixedHash { + pub const fn byte_size() -> usize { + 32 + } + + pub fn zero() -> Self { + Self(ZERO_HASH) + } + + pub fn as_slice(&self) -> &[u8] { + &self.0 + } +} + +impl From<[u8; FixedHash::byte_size()]> for FixedHash { + fn from(hash: [u8; FixedHash::byte_size()]) -> Self { + hash.into() + } +} + +impl TryFrom> for FixedHash { + type Error = FixedHashSizeError; + + fn try_from(value: Vec) -> Result { + if value.len() != FixedHash::byte_size() { + return Err(FixedHashSizeError); + } + + let mut buf = [0u8; FixedHash::byte_size()]; + buf.copy_from_slice(&value); + Ok(Self(buf)) + } +} + +impl From> for FixedHash { + fn from(hash: digest::generic_array::GenericArray) -> Self { + Self(hash.into()) + } +} + +impl PartialEq<[u8]> for FixedHash { + fn eq(&self, other: &[u8]) -> bool { + self.0[..].eq(other) + } +} + +impl Hex for FixedHash { + fn from_hex(hex: &str) -> Result + where Self: Sized { + let hash = <[u8; FixedHash::byte_size()] as Hex>::from_hex(hex)?; + Ok(Self(hash)) + } + + fn to_hex(&self) -> String { + self.0.to_hex() + } +} + +impl Deref for FixedHash { + type Target = [u8; FixedHash::byte_size()]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/dan_layer/core/src/lib.rs b/dan_layer/core/src/lib.rs index 6e3b71d1e9..0b67153e03 100644 --- a/dan_layer/core/src/lib.rs +++ b/dan_layer/core/src/lib.rs @@ -22,6 +22,7 @@ #![allow(clippy::too_many_arguments)] mod digital_assets_error; pub use digital_assets_error::DigitalAssetError; +pub mod fixed_hash; pub mod models; pub mod services; pub mod storage; diff --git a/applications/tari_validator_node/src/p2p/proto/conversions/mod.rs b/dan_layer/core/src/models/error.rs similarity index 84% rename from applications/tari_validator_node/src/p2p/proto/conversions/mod.rs rename to dan_layer/core/src/models/error.rs index e1ac3e58af..d0fd37ec53 100644 --- a/applications/tari_validator_node/src/p2p/proto/conversions/mod.rs +++ b/dan_layer/core/src/models/error.rs @@ -1,4 +1,4 @@ -// Copyright 2021, The Tari Project +// Copyright 2022, The Tari Project // // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the // following conditions are met: @@ -20,4 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -mod dan; +#[derive(Debug, thiserror::Error)] +pub enum ModelError { + #[error("Invalid template id Number {value}")] + InvalidTemplateIdNumber { value: i64 }, + #[error("Failed to parse string: {details}")] + StringParseError { details: String }, +} diff --git a/dan_layer/core/src/models/hot_stuff_tree_node.rs b/dan_layer/core/src/models/hot_stuff_tree_node.rs index f0f9c8af6b..9cc9543e3b 100644 --- a/dan_layer/core/src/models/hot_stuff_tree_node.rs +++ b/dan_layer/core/src/models/hot_stuff_tree_node.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use digest::Digest; +use digest::{Digest, FixedOutput}; use tari_crypto::common::Blake256; use crate::models::{Payload, StateRoot, TreeNodeHash}; @@ -40,7 +40,7 @@ impl HotStuffTreeNode { parent, payload, state_root, - hash: TreeNodeHash(vec![]), + hash: TreeNodeHash::zero(), height, }; s.hash = s.calculate_hash(); @@ -49,9 +49,9 @@ impl HotStuffTreeNode { pub fn genesis(payload: TPayload) -> HotStuffTreeNode { let mut s = Self { - parent: TreeNodeHash(vec![0u8; 32]), + parent: TreeNodeHash::zero(), payload, - hash: TreeNodeHash(vec![]), + hash: TreeNodeHash::zero(), state_root: StateRoot::default(), height: 0, }; @@ -70,11 +70,11 @@ impl HotStuffTreeNode { pub fn calculate_hash(&self) -> TreeNodeHash { let result = Blake256::new() - .chain(self.parent.0.as_slice()) + .chain(self.parent.as_bytes()) .chain(self.payload.consensus_hash()) .chain(self.height.to_le_bytes()) - .finalize(); - TreeNodeHash(result.to_vec()) + .finalize_fixed(); + result.into() } pub fn hash(&self) -> &TreeNodeHash { diff --git a/dan_layer/core/src/models/instruction.rs b/dan_layer/core/src/models/instruction.rs index 5629c622cb..2c5d109757 100644 --- a/dan_layer/core/src/models/instruction.rs +++ b/dan_layer/core/src/models/instruction.rs @@ -20,21 +20,22 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::fmt::{Display, Formatter}; + use digest::Digest; -use tari_common_types::types::PublicKey; -use tari_crypto::{common::Blake256, tari_utilities::ByteArray}; +use tari_crypto::common::Blake256; +use tari_utilities::hex::Hex; -use crate::models::TemplateId; +use crate::{fixed_hash::FixedHash, models::TemplateId}; #[derive(Clone, Debug)] pub struct Instruction { - asset_id: PublicKey, template_id: TemplateId, method: String, args: Vec, // from: TokenId, // signature: ComSig, - hash: Vec, + hash: FixedHash, } impl PartialEq for Instruction { @@ -45,7 +46,6 @@ impl PartialEq for Instruction { impl Instruction { pub fn new( - asset_id: PublicKey, template_id: TemplateId, method: String, args: Vec, @@ -53,23 +53,18 @@ impl Instruction { * _signature: ComSig, */ ) -> Self { let mut s = Self { - asset_id, template_id, method, args, // from, // TODO: this is obviously wrong // signature: ComSig::default(), - hash: vec![], + hash: FixedHash::zero(), }; s.hash = s.calculate_hash(); s } - pub fn asset_id(&self) -> &PublicKey { - &self.asset_id - } - pub fn template_id(&self) -> TemplateId { self.template_id } @@ -91,17 +86,27 @@ impl Instruction { // &self.signature // } - pub fn hash(&self) -> &[u8] { + pub fn hash(&self) -> &FixedHash { &self.hash } - pub fn calculate_hash(&self) -> Vec { - let b = Blake256::new() - .chain(self.asset_id.as_bytes()) - .chain(self.method.as_bytes()) - .chain(&self.args); + pub fn calculate_hash(&self) -> FixedHash { + let b = Blake256::new().chain(self.method.as_bytes()).chain(&self.args); // b.chain(self.from.as_bytes()) // .chain(com_sig_to_bytes(&self.signature)) - b.finalize().to_vec() + b.finalize().into() + } +} + +impl Display for Instruction { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Method: {}, Hash: {}, Args: {} bytes, Template: {}", + self.method, + self.hash.to_hex(), + self.args.len(), + self.template_id + ) } } diff --git a/dan_layer/core/src/models/instruction_set.rs b/dan_layer/core/src/models/instruction_set.rs index 915fd4e13f..1e9d6a7cd0 100644 --- a/dan_layer/core/src/models/instruction_set.rs +++ b/dan_layer/core/src/models/instruction_set.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::hash::Hash; +use std::{hash::Hash, iter::FromIterator}; use tari_crypto::common::Blake256; use tari_mmr::MerkleMountainRange; @@ -46,13 +46,12 @@ pub struct InstructionSet { impl InstructionSet { pub fn empty() -> Self { - Self::from_slice(&[]) + Self::from_vec(vec![]) } - pub fn from_slice(instructions: &[Instruction]) -> Self { - let ins = Vec::from(instructions); + pub fn from_vec(instructions: Vec) -> Self { let mut result = Self { - instructions: ins, + instructions, hash: InstructionSetHash(vec![]), }; result.hash = result.calculate_hash(); @@ -63,7 +62,7 @@ impl InstructionSet { let mut mmr = MerkleMountainRange::::new(Vec::default()); // assume instructions are sorted for instruction in &self.instructions { - mmr.push(instruction.calculate_hash()).unwrap(); + mmr.push(instruction.calculate_hash().to_vec()).unwrap(); } InstructionSetHash(mmr.get_merkle_root().unwrap()) @@ -74,6 +73,19 @@ impl InstructionSet { } } +impl FromIterator for InstructionSet { + fn from_iter>(iter: T) -> Self { + let instructions = iter.into_iter().collect(); + Self::from_vec(instructions) + } +} + +impl From> for InstructionSet { + fn from(instructions: Vec) -> Self { + Self::from_vec(instructions) + } +} + impl ConsensusHash for InstructionSet { fn consensus_hash(&self) -> &[u8] { self.hash.as_bytes() diff --git a/dan_layer/core/src/models/mod.rs b/dan_layer/core/src/models/mod.rs index ef0dd18e67..007452bdef 100644 --- a/dan_layer/core/src/models/mod.rs +++ b/dan_layer/core/src/models/mod.rs @@ -20,21 +20,31 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{convert::TryFrom, fmt::Debug, hash::Hash}; +use std::{ + convert::{TryFrom, TryInto}, + fmt::{Debug, Display, Formatter}, + hash::Hash, + str::FromStr, +}; + mod asset_definition; mod base_layer_metadata; mod base_layer_output; mod committee; pub mod domain_events; +mod error; mod hot_stuff_message; mod hot_stuff_tree_node; mod instruction; mod instruction_set; +mod node; mod payload; mod quorum_certificate; +mod sidechain_block; mod sidechain_metadata; mod state_root; mod tari_dan_payload; +mod tree_node_hash; mod view; mod view_id; @@ -42,15 +52,19 @@ pub use asset_definition::AssetDefinition; pub use base_layer_metadata::BaseLayerMetadata; pub use base_layer_output::BaseLayerOutput; pub use committee::Committee; +pub use error::ModelError; pub use hot_stuff_message::HotStuffMessage; pub use hot_stuff_tree_node::HotStuffTreeNode; pub use instruction::Instruction; pub use instruction_set::InstructionSet; +pub use node::Node; pub use payload::Payload; pub use quorum_certificate::QuorumCertificate; +pub use sidechain_block::SideChainBlock; pub use sidechain_metadata::SidechainMetadata; pub use state_root::StateRoot; pub use tari_dan_payload::{CheckpointData, TariDanPayload}; +pub use tree_node_hash::TreeNodeHash; pub use view::View; pub use view_id::ViewId; @@ -74,34 +88,56 @@ pub enum TemplateId { EditableMetadata = 20, } -impl TemplateId { - pub fn _parse(s: &str) -> TemplateId { +impl FromStr for TemplateId { + type Err = ModelError; + + fn from_str(s: &str) -> Result { match s { - "EditableMetadata" => TemplateId::EditableMetadata, + "Tip002" => Ok(TemplateId::Tip002), + "Tip003" => Ok(TemplateId::Tip003), + "Tip004" => Ok(TemplateId::Tip004), + "Tip721" => Ok(TemplateId::Tip721), + "EditableMetadata" => Ok(TemplateId::EditableMetadata), _ => { - // TODO: Propagate error instead dbg!("Unrecognised template"); - TemplateId::EditableMetadata + Err(ModelError::StringParseError { + details: format!("Unrecognised template ID '{}'", s), + }) }, } } } -impl From for TemplateId { - fn from(v: u32) -> Self { - // Must be an easier way than this - match v { - 2 => TemplateId::Tip002, - 3 => TemplateId::Tip003, - 4 => TemplateId::Tip004, - 721 => TemplateId::Tip721, - _ => { - unimplemented!() - }, +impl TryFrom for TemplateId { + type Error = ModelError; + + fn try_from(value: u32) -> Result { + match value { + 2 => Ok(TemplateId::Tip002), + 3 => Ok(TemplateId::Tip003), + 4 => Ok(TemplateId::Tip004), + 721 => Ok(TemplateId::Tip721), + _ => Err(ModelError::InvalidTemplateIdNumber { value: value as i64 }), } } } +impl TryFrom for TemplateId { + type Error = ModelError; + + fn try_from(value: i32) -> Result { + u32::try_from(value) + .map_err(|_| ModelError::InvalidTemplateIdNumber { value: value as i64 })? + .try_into() + } +} + +impl Display for TemplateId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + #[derive(Clone, Debug, Hash)] pub struct TokenId(pub Vec); @@ -154,20 +190,6 @@ impl TryFrom for HotStuffMessageType { } } -#[derive(Debug, Clone, PartialEq)] -pub struct TreeNodeHash(pub Vec); - -impl TreeNodeHash { - pub fn as_bytes(&self) -> &[u8] { - self.0.as_slice() - } -} - -impl From for Vec { - fn from(v: TreeNodeHash) -> Self { - v.0 - } -} pub trait ConsensusHash { fn consensus_hash(&self) -> &[u8]; } diff --git a/dan_layer/core/src/models/node.rs b/dan_layer/core/src/models/node.rs new file mode 100644 index 0000000000..9d3767bcde --- /dev/null +++ b/dan_layer/core/src/models/node.rs @@ -0,0 +1,64 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use crate::{models::TreeNodeHash, storage::chain::DbNode}; + +#[derive(Debug, Clone, PartialEq)] +pub struct Node { + hash: TreeNodeHash, + parent: TreeNodeHash, + height: u32, + is_committed: bool, +} + +impl Node { + pub fn new(hash: TreeNodeHash, parent: TreeNodeHash, height: u32, is_committed: bool) -> Node { + Self { + hash, + parent, + height, + is_committed, + } + } + + pub fn hash(&self) -> &TreeNodeHash { + &self.hash + } + + pub fn parent(&self) -> &TreeNodeHash { + &self.parent + } + + pub fn height(&self) -> u32 { + self.height + } + + pub fn is_committed(&self) -> bool { + self.is_committed + } +} + +impl From for Node { + fn from(db_node: DbNode) -> Self { + Node::new(db_node.hash, db_node.parent, db_node.height, db_node.is_committed) + } +} diff --git a/dan_layer/core/src/models/quorum_certificate.rs b/dan_layer/core/src/models/quorum_certificate.rs index 42f9b1d1bf..6b3ab7d433 100644 --- a/dan_layer/core/src/models/quorum_certificate.rs +++ b/dan_layer/core/src/models/quorum_certificate.rs @@ -20,7 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::models::{HotStuffMessageType, Signature, TreeNodeHash, ViewId}; +use crate::{ + models::{HotStuffMessageType, Signature, TreeNodeHash, ViewId}, + storage::chain::DbQc, +}; #[derive(Debug, Clone)] pub struct QuorumCertificate { @@ -82,3 +85,14 @@ impl QuorumCertificate { self.message_type() == message_type && view_id == self.view_number() } } + +impl From for QuorumCertificate { + fn from(rec: DbQc) -> Self { + Self { + message_type: rec.message_type, + node_hash: rec.node_hash, + view_number: rec.view_number, + signature: rec.signature, + } + } +} diff --git a/dan_layer/core/src/models/block.rs b/dan_layer/core/src/models/sidechain_block.rs similarity index 75% rename from dan_layer/core/src/models/block.rs rename to dan_layer/core/src/models/sidechain_block.rs index da034a0911..8dca014b8c 100644 --- a/dan_layer/core/src/models/block.rs +++ b/dan_layer/core/src/models/sidechain_block.rs @@ -20,14 +20,28 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::models::Instruction; +use crate::models::{InstructionSet, Node}; -pub struct Block { - instructions: Vec, +#[derive(Debug)] +pub struct SideChainBlock { + node: Node, + instructions: InstructionSet, } -impl Block { - pub fn new() -> Self { - Self { instructions: vec![] } +impl SideChainBlock { + pub fn new(node: Node, instructions: InstructionSet) -> Self { + Self { node, instructions } + } + + pub fn node(&self) -> &Node { + &self.node + } + + pub fn instructions(&self) -> &InstructionSet { + &self.instructions + } + + pub fn destruct(self) -> (Node, InstructionSet) { + (self.node, self.instructions) } } diff --git a/dan_layer/core/src/models/tree_node_hash.rs b/dan_layer/core/src/models/tree_node_hash.rs new file mode 100644 index 0000000000..461f9a0a3c --- /dev/null +++ b/dan_layer/core/src/models/tree_node_hash.rs @@ -0,0 +1,76 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{ + convert::TryFrom, + fmt::{Display, Formatter}, +}; + +use tari_utilities::hex::{Hex, HexError}; + +use crate::fixed_hash::{FixedHash, FixedHashSizeError}; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct TreeNodeHash(FixedHash); + +impl TreeNodeHash { + pub fn zero() -> Self { + Self(FixedHash::zero()) + } + + pub fn as_bytes(&self) -> &[u8] { + self.0.as_slice() + } +} + +impl> From for TreeNodeHash { + fn from(hash: T) -> Self { + Self(hash.into()) + } +} + +impl TryFrom> for TreeNodeHash { + type Error = FixedHashSizeError; + + fn try_from(value: Vec) -> Result { + let hash = FixedHash::try_from(value)?; + Ok(Self(hash)) + } +} + +impl Hex for TreeNodeHash { + fn from_hex(hex: &str) -> Result + where Self: Sized { + let hash = FixedHash::from_hex(hex)?; + Ok(Self(hash)) + } + + fn to_hex(&self) -> String { + self.0.to_hex() + } +} + +impl Display for TreeNodeHash { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.to_hex()) + } +} diff --git a/dan_layer/core/src/services/asset_processor.rs b/dan_layer/core/src/services/asset_processor.rs index 7e07ffb187..8e1f66b42d 100644 --- a/dan_layer/core/src/services/asset_processor.rs +++ b/dan_layer/core/src/services/asset_processor.rs @@ -20,6 +20,8 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use std::convert::TryInto; + use tari_core::transactions::transaction::TemplateParameter; use crate::{ @@ -147,7 +149,7 @@ impl TemplateFactory { asset_definition: &AssetDefinition, state_db: &mut TUnitOfWork, ) -> Result<(), DigitalAssetError> { - match TemplateId::from(template.template_id) { + match template.template_id.try_into()? { TemplateId::Tip002 => tip002_template::init(template, asset_definition, state_db)?, _ => unimplemented!(), } diff --git a/dan_layer/core/src/services/asset_proxy.rs b/dan_layer/core/src/services/asset_proxy.rs index f3ffe3c49e..f7b548c2d3 100644 --- a/dan_layer/core/src/services/asset_proxy.rs +++ b/dan_layer/core/src/services/asset_proxy.rs @@ -148,56 +148,63 @@ impl> ConcreteAsse Some(chk) => chk, }; - if let Some(committee) = last_checkpoint.get_side_chain_committee() { - match invoke_type { - InvokeType::InvokeReadMethod => { - let mut tasks = FuturesUnordered::new(); - for member in committee.iter().take(self.max_clients_to_ask) { - tasks.push(self.forward_invoke_read_to_node( - member, - asset_public_key, - template_id, - method.clone(), - args.clone(), - )); - } + let committee = last_checkpoint + .get_side_chain_committee() + .ok_or(DigitalAssetError::NoCommitteeForAsset)?; - for result in tasks.next().await { - match result { - Ok(data) => return Ok(data), - Err(err) => { - error!(target: LOG_TARGET, "Committee member responded with error:{}", err); - }, - } - } - }, - InvokeType::InvokeMethod => { - let mut tasks = FuturesUnordered::new(); - for member in committee.iter().take(self.max_clients_to_ask) { - tasks.push(self.forward_invoke_to_node( - member, - asset_public_key, - template_id, - method.clone(), - args.clone(), - )); + debug!( + target: LOG_TARGET, + "Found {} committee member(s): {}", + committee.len(), + committee.iter().map(ToString::to_string).collect::>().join(", ") + ); + + match invoke_type { + InvokeType::InvokeReadMethod => { + let mut tasks = FuturesUnordered::new(); + for member in committee.iter().take(self.max_clients_to_ask) { + tasks.push(self.forward_invoke_read_to_node( + member, + asset_public_key, + template_id, + method.clone(), + args.clone(), + )); + } + + for result in tasks.next().await { + match result { + Ok(data) => return Ok(data), + Err(err) => { + error!(target: LOG_TARGET, "Committee member responded with error:{}", err); + }, } + } + }, + InvokeType::InvokeMethod => { + let mut tasks = FuturesUnordered::new(); + for member in committee.iter().take(self.max_clients_to_ask) { + tasks.push(self.forward_invoke_to_node( + member, + asset_public_key, + template_id, + method.clone(), + args.clone(), + )); + } - for result in tasks.next().await { - match result { - Ok(data) => return Ok(data), - Err(err) => { - error!(target: LOG_TARGET, "Committee member responded with error:{}", err); - }, - } + for result in tasks.next().await { + match result { + Ok(data) => return Ok(data), + Err(err) => { + error!(target: LOG_TARGET, "Committee member responded with error:{}", err); + }, } - }, - }; + } + }, + }; - Err(DigitalAssetError::NoResponsesFromCommittee) - } else { - Err(DigitalAssetError::NoCommitteeForAsset) - } + Err(DigitalAssetError::NoResponsesFromCommittee) } } @@ -215,7 +222,6 @@ impl> AssetProxy // check if we are processing this asset if self.db_factory.get_state_db(asset_public_key)?.is_some() { let instruction = Instruction::new( - asset_public_key.clone(), template_id, method.clone(), args.clone(), diff --git a/dan_layer/core/src/services/mempool_service.rs b/dan_layer/core/src/services/mempool_service.rs index 49fca20167..ca88960f9f 100644 --- a/dan_layer/core/src/services/mempool_service.rs +++ b/dan_layer/core/src/services/mempool_service.rs @@ -25,7 +25,11 @@ use std::sync::Arc; use async_trait::async_trait; use tokio::sync::Mutex; -use crate::{digital_assets_error::DigitalAssetError, models::Instruction}; +use crate::{ + digital_assets_error::DigitalAssetError, + fixed_hash::FixedHash, + models::{Instruction, TreeNodeHash}, +}; #[async_trait] pub trait MempoolService: Sync + Send + 'static { @@ -33,17 +37,17 @@ pub trait MempoolService: Sync + Send + 'static { async fn read_block(&self, limit: usize) -> Result, DigitalAssetError>; async fn reserve_instruction_in_block( &mut self, - instruction_hash: &[u8], - block_hash: Vec, + instruction_hash: &FixedHash, + block_hash: TreeNodeHash, ) -> Result<(), DigitalAssetError>; - async fn remove_all_in_block(&mut self, block_hash: &[u8]) -> Result<(), DigitalAssetError>; - async fn release_reservations(&mut self, block_hash: &[u8]) -> Result<(), DigitalAssetError>; + async fn remove_all_in_block(&mut self, block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError>; + async fn release_reservations(&mut self, block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError>; async fn size(&self) -> usize; } #[derive(Default)] pub struct ConcreteMempoolService { - instructions: Vec<(Instruction, Option>)>, + instructions: Vec<(Instruction, Option)>, } #[async_trait] @@ -55,12 +59,12 @@ impl MempoolService for ConcreteMempoolService { async fn read_block(&self, limit: usize) -> Result, DigitalAssetError> { let mut result = vec![]; - for (i, instruction) in self.instructions.iter().enumerate() { + for (i, (instruction, block_hash)) in self.instructions.iter().enumerate() { if i > limit { break; } - if instruction.1.is_none() { - result.push(instruction.0.clone()); + if block_hash.is_none() { + result.push(instruction.clone()); } } Ok(result) @@ -68,12 +72,12 @@ impl MempoolService for ConcreteMempoolService { async fn reserve_instruction_in_block( &mut self, - instruction_hash: &[u8], - block_hash: Vec, + instruction_hash: &FixedHash, + node_hash: TreeNodeHash, ) -> Result<(), DigitalAssetError> { - for mut instruction in self.instructions.iter_mut() { - if instruction.0.hash() == instruction_hash { - instruction.1 = Some(block_hash); + for (instruction, node_hash_mut) in self.instructions.iter_mut() { + if instruction.hash() == instruction_hash { + *node_hash_mut = Some(node_hash); break; } } @@ -81,21 +85,19 @@ impl MempoolService for ConcreteMempoolService { Ok(()) } - async fn remove_all_in_block(&mut self, block_hash: &[u8]) -> Result<(), DigitalAssetError> { - let mut new_instructions = Vec::with_capacity(self.instructions.len()); - for instruction in self.instructions.drain(..) { - if instruction.1.as_deref() != Some(block_hash) { - new_instructions.push(instruction) - } - } - self.instructions = new_instructions; + async fn remove_all_in_block(&mut self, block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError> { + self.instructions = self + .instructions + .drain(..) + .filter(|(_, node_hash)| node_hash.as_ref() != Some(block_hash)) + .collect(); Ok(()) } - async fn release_reservations(&mut self, block_hash: &[u8]) -> Result<(), DigitalAssetError> { - for mut instruction in self.instructions.iter_mut() { - if instruction.1.as_deref() == Some(block_hash) { - instruction.1 = None; + async fn release_reservations(&mut self, block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError> { + for (_, block_hash_mut) in self.instructions.iter_mut() { + if block_hash_mut.as_ref() == Some(block_hash) { + *block_hash_mut = None; } } Ok(()) @@ -144,21 +146,21 @@ impl MempoolService for MempoolServiceHandle { async fn reserve_instruction_in_block( &mut self, - instruction_hash: &[u8], - block_hash: Vec, + instruction_hash: &FixedHash, + node_hash: TreeNodeHash, ) -> Result<(), DigitalAssetError> { self.mempool .lock() .await - .reserve_instruction_in_block(instruction_hash, block_hash) + .reserve_instruction_in_block(instruction_hash, node_hash) .await } - async fn remove_all_in_block(&mut self, block_hash: &[u8]) -> Result<(), DigitalAssetError> { + async fn remove_all_in_block(&mut self, block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError> { self.mempool.lock().await.remove_all_in_block(block_hash).await } - async fn release_reservations(&mut self, block_hash: &[u8]) -> Result<(), DigitalAssetError> { + async fn release_reservations(&mut self, block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError> { self.mempool.lock().await.release_reservations(block_hash).await } diff --git a/dan_layer/core/src/services/mocks/mod.rs b/dan_layer/core/src/services/mocks/mod.rs index b5989823bd..50097ba479 100644 --- a/dan_layer/core/src/services/mocks/mod.rs +++ b/dan_layer/core/src/services/mocks/mod.rs @@ -33,6 +33,7 @@ use tari_core::transactions::transaction::TemplateParameter; use super::CommitteeManager; use crate::{ digital_assets_error::DigitalAssetError, + fixed_hash::FixedHash, models::{ AssetDefinition, BaseLayerMetadata, @@ -43,11 +44,13 @@ use crate::{ Payload, Signature, StateRoot, + TemplateId, TreeNodeHash, }, services::{ base_node_client::BaseNodeClient, infrastructure_services::NodeAddressable, + AssetProcessor, EventsPublisher, MempoolService, PayloadProcessor, @@ -72,17 +75,17 @@ impl MempoolService for MockMempoolService { async fn reserve_instruction_in_block( &mut self, - _instruction_hash: &[u8], - _block_hash: Vec, + _instruction_hash: &FixedHash, + _block_hash: TreeNodeHash, ) -> Result<(), DigitalAssetError> { todo!() } - async fn remove_all_in_block(&mut self, _block_hash: &[u8]) -> Result<(), DigitalAssetError> { + async fn remove_all_in_block(&mut self, _block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError> { todo!() } - async fn release_reservations(&mut self, _block_hash: &[u8]) -> Result<(), DigitalAssetError> { + async fn release_reservations(&mut self, _block_hash: &TreeNodeHash) -> Result<(), DigitalAssetError> { todo!() } @@ -264,3 +267,35 @@ impl PayloadProcessor for MockPayloadProcessor { todo!() } } + +#[derive(Debug, Clone)] +pub struct MockAssetProcessor; + +impl AssetProcessor for MockAssetProcessor { + fn init_template( + &self, + _template_parameter: &TemplateParameter, + _asset_definition: &AssetDefinition, + _state_db: &mut TUnitOfWork, + ) -> Result<(), DigitalAssetError> { + todo!() + } + + fn execute_instruction( + &self, + _instruction: &Instruction, + _db: &mut TUnitOfWork, + ) -> Result<(), DigitalAssetError> { + todo!() + } + + fn invoke_read_method( + &self, + _template_id: TemplateId, + _method: String, + _args: &[u8], + _state_db: &mut TUnifOfWork, + ) -> Result>, DigitalAssetError> { + todo!() + } +} diff --git a/dan_layer/core/src/services/mod.rs b/dan_layer/core/src/services/mod.rs index cd7271b593..57ef78dbb9 100644 --- a/dan_layer/core/src/services/mod.rs +++ b/dan_layer/core/src/services/mod.rs @@ -42,7 +42,6 @@ pub use signing_service::{NodeIdentitySigningService, SigningService}; mod asset_proxy; mod checkpoint_manager; -#[cfg(test)] pub mod mocks; mod service_specification; mod validator_node_rpc_client; diff --git a/dan_layer/core/src/services/payload_provider.rs b/dan_layer/core/src/services/payload_provider.rs index e821eacbb6..734e5cdcee 100644 --- a/dan_layer/core/src/services/payload_provider.rs +++ b/dan_layer/core/src/services/payload_provider.rs @@ -55,7 +55,7 @@ impl TariDanPayloadProvider { impl PayloadProvider for TariDanPayloadProvider { async fn create_payload(&self) -> Result { let instructions = self.mempool.read_block(100).await?; - let instruction_set = InstructionSet::from_slice(&instructions); + let instruction_set = InstructionSet::from_vec(instructions); Ok(TariDanPayload::new(instruction_set, None)) } @@ -76,13 +76,13 @@ impl PayloadProvider for TariDa // Reserve all instructions if they succeeded for instruction in payload.instructions() { self.mempool - .reserve_instruction_in_block(instruction.hash(), reservation_key.0.clone()) + .reserve_instruction_in_block(instruction.hash(), *reservation_key) .await?; } Ok(()) } async fn remove_payload(&mut self, reservation_key: &TreeNodeHash) -> Result<(), DigitalAssetError> { - self.mempool.remove_all_in_block(reservation_key.as_bytes()).await + self.mempool.remove_all_in_block(reservation_key).await } } diff --git a/dan_layer/core/src/services/validator_node_rpc_client.rs b/dan_layer/core/src/services/validator_node_rpc_client.rs index ad71b0be23..9392fcb730 100644 --- a/dan_layer/core/src/services/validator_node_rpc_client.rs +++ b/dan_layer/core/src/services/validator_node_rpc_client.rs @@ -23,7 +23,11 @@ use async_trait::async_trait; use tari_common_types::types::PublicKey; -use crate::{models::TemplateId, services::infrastructure_services::NodeAddressable, DigitalAssetError}; +use crate::{ + models::{SideChainBlock, TemplateId, TreeNodeHash}, + services::infrastructure_services::NodeAddressable, + DigitalAssetError, +}; pub trait ValidatorNodeClientFactory { type Addr: NodeAddressable; @@ -48,4 +52,11 @@ pub trait ValidatorNodeRpcClient { method: String, args: Vec, ) -> Result>, DigitalAssetError>; + + async fn get_sidechain_blocks( + &mut self, + asset_public_key: &PublicKey, + start_hash: TreeNodeHash, + end_hash: Option, + ) -> Result, DigitalAssetError>; } diff --git a/dan_layer/core/src/storage/chain/chain_db.rs b/dan_layer/core/src/storage/chain/chain_db.rs index 35354ca50e..866591974f 100644 --- a/dan_layer/core/src/storage/chain/chain_db.rs +++ b/dan_layer/core/src/storage/chain/chain_db.rs @@ -1,5 +1,3 @@ -// Copyright 2021. The Tari Project -// // Redistribution and use in source and binary forms, with or without modification, are permitted provided that the // following conditions are met: // @@ -21,7 +19,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - models::QuorumCertificate, + models::{QuorumCertificate, SideChainBlock, TreeNodeHash}, storage::{ chain::{chain_db_unit_of_work::ChainDbUnitOfWorkImpl, ChainDbBackendAdapter}, StorageError, @@ -46,6 +44,59 @@ impl ChainDb { pub fn get_locked_qc(&self) -> Result { self.adapter.get_locked_qc().map_err(TBackendAdapter::Error::into) } + + pub fn is_empty(&self) -> Result { + self.adapter.is_empty().map_err(TBackendAdapter::Error::into) + } + + pub fn sidechain_block_exists(&self, hash: &TreeNodeHash) -> Result { + self.adapter.node_exists(hash).map_err(TBackendAdapter::Error::into) + } + + pub fn find_sidechain_block_by_node_hash( + &self, + hash: &TreeNodeHash, + ) -> Result, StorageError> { + let maybe_block = self + .adapter + .find_node_by_hash(hash) + .map_err(TBackendAdapter::Error::into)?; + + let (block_id, node) = match maybe_block { + Some(v) => v, + None => return Ok(None), + }; + + let instructions = self + .adapter + .find_all_instructions_by_node(block_id) + .map_err(TBackendAdapter::Error::into)?; + let instructions = instructions.into_iter().map(|i| i.instruction).collect(); + + Ok(Some(SideChainBlock::new(node.into(), instructions))) + } + + pub fn find_sidechain_block_by_parent_node_hash( + &self, + parent_hash: &TreeNodeHash, + ) -> Result, StorageError> { + let maybe_block = self + .adapter + .find_node_by_parent_hash(parent_hash) + .map_err(TBackendAdapter::Error::into)?; + let (block_id, node) = match maybe_block { + Some(v) => v, + None => return Ok(None), + }; + + let instructions = self + .adapter + .find_all_instructions_by_node(block_id) + .map_err(TBackendAdapter::Error::into)?; + let instructions = instructions.into_iter().map(|i| i.instruction).collect(); + + Ok(Some(SideChainBlock::new(node.into(), instructions))) + } } impl ChainDb { @@ -53,8 +104,3 @@ impl ChainDb ChainDb { - pub fn is_empty(&self) -> Result { - self.adapter.is_empty().map_err(TBackendAdapter::Error::into) - } -} diff --git a/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs b/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs index 9cac5015e9..70e3846ddf 100644 --- a/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs +++ b/dan_layer/core/src/storage/chain/chain_db_backend_adapter.rs @@ -37,6 +37,7 @@ pub trait ChainDbBackendAdapter: Send + Sync + Clone { type Payload: Payload; fn is_empty(&self) -> Result; + fn node_exists(&self, node_hash: &TreeNodeHash) -> Result; fn create_transaction(&self) -> Result; fn insert_node(&self, item: &DbNode, transaction: &Self::BackendTransaction) -> Result<(), Self::Error>; fn update_node( @@ -56,7 +57,9 @@ pub trait ChainDbBackendAdapter: Send + Sync + Clone { fn find_highest_prepared_qc(&self) -> Result; fn get_locked_qc(&self) -> Result; fn get_prepare_qc(&self) -> Result, Self::Error>; - fn find_node_by_hash(&self, node_hash: &TreeNodeHash) -> Result<(Self::Id, DbNode), Self::Error>; + fn find_node_by_hash(&self, node_hash: &TreeNodeHash) -> Result, Self::Error>; + fn find_node_by_parent_hash(&self, parent_hash: &TreeNodeHash) -> Result, Self::Error>; + fn find_all_instructions_by_node(&self, node_id: Self::Id) -> Result, Self::Error>; fn update_prepare_qc(&self, item: &DbQc, transaction: &Self::BackendTransaction) -> Result<(), Self::Error>; fn update_locked_qc(&self, locked_qc: &DbQc, transaction: &Self::BackendTransaction) -> Result<(), Self::Error>; } diff --git a/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs b/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs index feaccef6fa..f7cbe41e83 100644 --- a/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs +++ b/dan_layer/core/src/storage/chain/chain_db_unit_of_work.rs @@ -169,7 +169,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf return Ok(QuorumCertificate::new( locked_qc.message_type, locked_qc.view_number, - locked_qc.node_hash.clone(), + locked_qc.node_hash, locked_qc.signature.clone(), )); } @@ -183,7 +183,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf DbQc { message_type: qc.message_type(), view_number: qc.view_number(), - node_hash: qc.node_hash().clone(), + node_hash: *qc.node_hash(), signature: qc.signature().cloned(), }, false, @@ -198,14 +198,14 @@ impl ChainDbUnitOfWork for ChainDbUnitOf let mut locked_qc = locked_qc.get_mut(); locked_qc.message_type = qc.message_type(); locked_qc.view_number = qc.view_number(); - locked_qc.node_hash = qc.node_hash().clone(); + locked_qc.node_hash = *qc.node_hash(); locked_qc.signature = qc.signature().cloned(); } else { inner.locked_qc = Some(UnitOfWorkTracker::new( DbQc { message_type: qc.message_type(), view_number: qc.view_number(), - node_hash: qc.node_hash().clone(), + node_hash: *qc.node_hash(), signature: qc.signature().cloned(), }, true, @@ -226,7 +226,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf return Ok(Some(QuorumCertificate::new( prepare_qc.message_type, prepare_qc.view_number, - prepare_qc.node_hash.clone(), + prepare_qc.node_hash, prepare_qc.signature.clone(), ))); } @@ -242,7 +242,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf DbQc { message_type: qc.message_type(), view_number: qc.view_number(), - node_hash: qc.node_hash().clone(), + node_hash: *qc.node_hash(), signature: qc.signature().cloned(), }, false, @@ -261,7 +261,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf DbQc { message_type: qc.message_type(), view_number: qc.view_number(), - node_hash: qc.node_hash().clone(), + node_hash: *qc.node_hash(), signature: qc.signature().cloned(), }, true, @@ -271,7 +271,7 @@ impl ChainDbUnitOfWork for ChainDbUnitOf let mut db_qc = db_qc.get_mut(); db_qc.message_type = qc.message_type(); db_qc.view_number = qc.view_number(); - db_qc.node_hash = qc.node_hash().clone(); + db_qc.node_hash = *qc.node_hash(); db_qc.signature = qc.signature().cloned(); }, } @@ -325,7 +325,8 @@ impl ChainDbUnitOfWorkInner Result>, StorageError>; + fn get_or_create_chain_db( &self, asset_public_key: &PublicKey, ) -> Result, StorageError>; + fn get_state_db( &self, asset_public_key: &PublicKey, ) -> Result>, StorageError>; + fn get_or_create_state_db( &self, asset_public_key: &PublicKey, diff --git a/dan_layer/core/src/storage/error.rs b/dan_layer/core/src/storage/error.rs index 1f7fa8003a..dd6dc11f2f 100644 --- a/dan_layer/core/src/storage/error.rs +++ b/dan_layer/core/src/storage/error.rs @@ -50,4 +50,6 @@ pub enum StorageError { FileSystemPathDoesNotExist, #[error("Merkle error:{0}")] MerkleMountainRangeError(#[from] MerkleMountainRangeError), + #[error("General storage error: {details}")] + General { details: String }, } diff --git a/dan_layer/core/src/storage/mocks/chain_db.rs b/dan_layer/core/src/storage/mocks/chain_db.rs new file mode 100644 index 0000000000..f2de215bb9 --- /dev/null +++ b/dan_layer/core/src/storage/mocks/chain_db.rs @@ -0,0 +1,183 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::sync::{Arc, RwLock}; + +use super::MemoryChainDb; +use crate::{ + models::{QuorumCertificate, TreeNodeHash}, + storage::{ + chain::{ChainDbBackendAdapter, DbInstruction, DbNode, DbQc}, + StorageError, + }, +}; + +#[derive(Debug, Clone, Default)] +pub struct MockChainDbBackupAdapter { + db: Arc>, +} + +impl MockChainDbBackupAdapter { + pub fn new() -> Self { + Self { db: Default::default() } + } +} + +impl ChainDbBackendAdapter for MockChainDbBackupAdapter { + type BackendTransaction = (); + type Error = StorageError; + type Id = usize; + type Payload = String; + + fn is_empty(&self) -> Result { + let lock = self.db.read().unwrap(); + Ok(lock.nodes.is_empty()) + } + + fn node_exists(&self, node_hash: &TreeNodeHash) -> Result { + let lock = self.db.read().unwrap(); + let exists = lock.nodes.rows().any(|rec| rec.hash == *node_hash); + Ok(exists) + } + + fn create_transaction(&self) -> Result { + Ok(()) + } + + fn insert_node(&self, node: &DbNode, _: &Self::BackendTransaction) -> Result<(), Self::Error> { + let mut lock = self.db.write().unwrap(); + lock.nodes.insert(node.clone()); + Ok(()) + } + + fn update_node(&self, id: &Self::Id, item: &DbNode, _: &Self::BackendTransaction) -> Result<(), Self::Error> { + let mut lock = self.db.write().unwrap(); + if lock.nodes.update(*id, item.clone()) { + Ok(()) + } else { + Err(StorageError::NotFound) + } + } + + fn insert_instruction(&self, item: &DbInstruction, _: &Self::BackendTransaction) -> Result<(), Self::Error> { + let mut lock = self.db.write().unwrap(); + lock.instructions.insert(item.clone()); + Ok(()) + } + + fn commit(&self, _: &Self::BackendTransaction) -> Result<(), Self::Error> { + Ok(()) + } + + fn locked_qc_id(&self) -> Self::Id { + 1 + } + + fn prepare_qc_id(&self) -> Self::Id { + 1 + } + + fn find_highest_prepared_qc(&self) -> Result { + let lock = self.db.read().unwrap(); + let highest = lock + .prepare_qc + .rows() + .fold(None, |found: Option<&DbQc>, rec| match found { + Some(r) if rec.view_number > r.view_number => Some(rec), + Some(r) => Some(r), + None => Some(rec), + }) + .ok_or(StorageError::NotFound)?; + + Ok(highest.clone().into()) + } + + fn get_locked_qc(&self) -> Result { + let lock = self.db.read().unwrap(); + // FIXME: when this implementation is finalized in sqlite/lmdb impl + let rec = lock.locked_qc.rows().next().cloned().map(Into::into).unwrap(); + Ok(rec) + } + + fn get_prepare_qc(&self) -> Result, Self::Error> { + let lock = self.db.read().unwrap(); + // FIXME: when this implementation is finalized in sqlite/lmdb impl + let rec = lock.prepare_qc.rows().next().cloned().map(Into::into); + Ok(rec) + } + + fn find_node_by_hash(&self, node_hash: &TreeNodeHash) -> Result, Self::Error> { + let lock = self.db.read().unwrap(); + let recs = lock + .nodes + .records() + .find(|(_, rec)| rec.hash == *node_hash) + .map(|(id, node)| (id, node.clone())); + Ok(recs) + } + + fn find_node_by_parent_hash(&self, parent_hash: &TreeNodeHash) -> Result, Self::Error> { + let lock = self.db.read().unwrap(); + let rec = lock + .nodes + .records() + .find(|(_, rec)| rec.parent == *parent_hash) + .map(|(id, node)| (id, node.clone())); + Ok(rec) + } + + fn find_all_instructions_by_node(&self, node_id: Self::Id) -> Result, Self::Error> { + let lock = self.db.read().unwrap(); + let node = lock.nodes.get(node_id).ok_or(StorageError::NotFound)?; + let recs = lock + .instructions + .rows() + .filter(|rec| rec.node_hash == node.hash) + .cloned() + .collect(); + Ok(recs) + } + + fn update_prepare_qc(&self, item: &DbQc, _transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + let mut lock = self.db.write().unwrap(); + let id = lock + .prepare_qc + .records() + .next() + .map(|(id, _)| id) + .ok_or(StorageError::NotFound)?; + lock.prepare_qc.update(id, item.clone()); + Ok(()) + } + + fn update_locked_qc(&self, locked_qc: &DbQc, _transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + let mut lock = self.db.write().unwrap(); + let id = lock + .locked_qc + .records() + .next() + .map(|(id, _)| id) + .ok_or(StorageError::NotFound)?; + lock.locked_qc.update(id, locked_qc.clone()); + Ok(()) + } +} diff --git a/dan_layer/core/src/storage/mocks/mod.rs b/dan_layer/core/src/storage/mocks/mod.rs new file mode 100644 index 0000000000..d8c3d6bf00 --- /dev/null +++ b/dan_layer/core/src/storage/mocks/mod.rs @@ -0,0 +1,167 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +mod chain_db; +mod state_db; + +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; + +use tari_common_types::types::PublicKey; + +use crate::storage::{ + chain::{ChainDb, DbInstruction, DbNode, DbQc}, + mocks::{chain_db::MockChainDbBackupAdapter, state_db::MockStateDbBackupAdapter}, + state::StateDb, + DbFactory, + StorageError, +}; + +#[derive(Clone, Default)] +pub struct MockDbFactory { + chain_db: Arc>>, + state_db: Arc>>, +} + +impl DbFactory for MockDbFactory { + type ChainDbBackendAdapter = MockChainDbBackupAdapter; + type StateDbBackendAdapter = MockStateDbBackupAdapter; + + fn get_chain_db( + &self, + asset_public_key: &PublicKey, + ) -> Result>, StorageError> { + Ok(self + .chain_db + .read() + .unwrap() + .get(asset_public_key) + .cloned() + .map(ChainDb::new)) + } + + fn get_or_create_chain_db( + &self, + asset_public_key: &PublicKey, + ) -> Result, StorageError> { + let entry = self + .chain_db + .write() + .unwrap() + .entry(asset_public_key.clone()) + .or_default() + .clone(); + Ok(ChainDb::new(entry)) + } + + fn get_state_db( + &self, + asset_public_key: &PublicKey, + ) -> Result>, StorageError> { + Ok(self + .state_db + .read() + .unwrap() + .get(asset_public_key) + .cloned() + .map(StateDb::new)) + } + + fn get_or_create_state_db( + &self, + asset_public_key: &PublicKey, + ) -> Result, StorageError> { + let entry = self + .state_db + .write() + .unwrap() + .entry(asset_public_key.clone()) + .or_default() + .clone(); + Ok(StateDb::new(entry)) + } +} + +#[derive(Debug, Default)] +pub(self) struct MemoryChainDb { + pub nodes: MemoryDbTable, + pub instructions: MemoryDbTable, + pub prepare_qc: MemoryDbTable, + pub locked_qc: MemoryDbTable, +} + +#[derive(Debug)] +struct MemoryDbTable { + records: HashMap, + id_counter: usize, +} + +// We don't need/want the V: Default bound +impl Default for MemoryDbTable { + fn default() -> Self { + Self { + records: Default::default(), + id_counter: 0, + } + } +} + +impl MemoryDbTable { + pub fn next_id(&mut self) -> usize { + let id = self.id_counter; + self.id_counter = self.id_counter.wrapping_add(1); + id + } + + pub fn records(&self) -> impl Iterator { + self.records.iter().map(|(k, v)| (*k, v)) + } + + pub fn rows(&self) -> impl Iterator { + self.records.values() + } + + pub fn is_empty(&self) -> bool { + self.records.is_empty() + } + + pub fn get(&self, id: usize) -> Option<&V> { + self.records.get(&id) + } + + pub fn insert(&mut self, v: V) { + let id = self.next_id(); + self.records.insert(id, v); + } + + pub fn update(&mut self, id: usize, v: V) -> bool { + match self.records.get_mut(&id) { + Some(rec) => { + *rec = v; + true + }, + None => false, + } + } +} diff --git a/dan_layer/core/src/storage/mocks/state_db.rs b/dan_layer/core/src/storage/mocks/state_db.rs new file mode 100644 index 0000000000..d4e272e4cf --- /dev/null +++ b/dan_layer/core/src/storage/mocks/state_db.rs @@ -0,0 +1,86 @@ +// Copyright 2022, The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use patricia_tree::PatriciaMap; + +use crate::storage::{ + state::{DbKeyValue, StateDbBackendAdapter}, + StorageError, +}; + +#[derive(Debug, Clone, Default)] +pub struct MockStateDbBackupAdapter; + +impl StateDbBackendAdapter for MockStateDbBackupAdapter { + type BackendTransaction = (); + type Error = StorageError; + + fn create_transaction(&self) -> Result { + todo!() + } + + fn update_key_value( + &self, + _schema: &str, + _key: &[u8], + _value: &[u8], + _tx: &Self::BackendTransaction, + ) -> Result<(), Self::Error> { + todo!() + } + + fn get(&self, _schema: &str, _key: &[u8]) -> Result>, Self::Error> { + todo!() + } + + fn find_keys_by_value(&self, _schema: &str, _value: &[u8]) -> Result>, Self::Error> { + todo!() + } + + fn commit(&self, _tx: &Self::BackendTransaction) -> Result<(), Self::Error> { + todo!() + } + + fn get_current_state_tree(&self, _tx: &Self::BackendTransaction) -> Result>, Self::Error> { + todo!() + } + + fn set_current_state_tree( + &self, + _tree: PatriciaMap>, + _tx: &Self::BackendTransaction, + ) -> Result<(), Self::Error> { + todo!() + } + + fn get_all_schemas(&self, _tx: &Self::BackendTransaction) -> Result, Self::Error> { + todo!() + } + + fn get_all_values_for_schema( + &self, + _schema: &str, + _tx: &Self::BackendTransaction, + ) -> Result, Self::Error> { + todo!() + } +} diff --git a/dan_layer/core/src/storage/mod.rs b/dan_layer/core/src/storage/mod.rs index 831ea8789b..b65098ba37 100644 --- a/dan_layer/core/src/storage/mod.rs +++ b/dan_layer/core/src/storage/mod.rs @@ -35,3 +35,5 @@ mod unit_of_work_tracker; pub use db_factory::DbFactory; pub use unit_of_work_tracker::UnitOfWorkTracker; + +pub mod mocks; diff --git a/dan_layer/core/src/workers/states/commit_state.rs b/dan_layer/core/src/workers/states/commit_state.rs index b4b6586898..373cc98883 100644 --- a/dan_layer/core/src/workers/states/commit_state.rs +++ b/dan_layer/core/src/workers/states/commit_state.rs @@ -194,7 +194,7 @@ where if &n != m_node { unimplemented!("Nodes did not match"); } - Some(m_node.clone()) + Some(*m_node) } else { Some(n) } @@ -242,7 +242,7 @@ where unit_of_work.set_locked_qc(justify)?; self.send_vote_to_leader( - justify.node_hash().clone(), + *justify.node_hash(), outbound, view_leader, current_view.view_id, diff --git a/dan_layer/core/src/workers/states/decide_state.rs b/dan_layer/core/src/workers/states/decide_state.rs index 2c14530ef0..eaca8e9e54 100644 --- a/dan_layer/core/src/workers/states/decide_state.rs +++ b/dan_layer/core/src/workers/states/decide_state.rs @@ -189,7 +189,7 @@ where if &n != m_node { unimplemented!("Nodes did not match"); } - Some(m_node.clone()) + Some(*m_node) } else { Some(n) } @@ -236,7 +236,7 @@ where payload_provider.remove_payload(justify.node_hash()).await?; unit_of_work.commit_node(justify.node_hash())?; - info!(target: LOG_TARGET, "Committed node: {}", justify.node_hash().0.to_hex()); + info!(target: LOG_TARGET, "Committed node: {}", justify.node_hash().to_hex()); Ok(Some(ConsensusWorkerStateEvent::Decided)) } else { warn!(target: LOG_TARGET, "received non justify message"); diff --git a/dan_layer/core/src/workers/states/pre_commit_state.rs b/dan_layer/core/src/workers/states/pre_commit_state.rs index 15a32cc2f9..60ffa1ad28 100644 --- a/dan_layer/core/src/workers/states/pre_commit_state.rs +++ b/dan_layer/core/src/workers/states/pre_commit_state.rs @@ -201,7 +201,7 @@ where if &n != m_node { unimplemented!("Nodes did not match"); } - Some(m_node.clone()) + Some(*m_node) } else { Some(n) } @@ -252,7 +252,7 @@ where unit_of_work.set_prepare_qc(justify)?; self.send_vote_to_leader( - justify.node_hash().clone(), + *justify.node_hash(), outbound, view_leader, current_view.view_id, @@ -274,7 +274,7 @@ where view_number: ViewId, signing_service: &TSigningService, ) -> Result<(), DigitalAssetError> { - let mut message = HotStuffMessage::vote_pre_commit(node.clone(), view_number, self.asset_public_key.clone()); + let mut message = HotStuffMessage::vote_pre_commit(node, view_number, self.asset_public_key.clone()); message.add_partial_sig(signing_service.sign(&self.node_id, &message.create_signature_challenge())?); outbound.send(self.node_id.clone(), view_leader.clone(), message).await } diff --git a/dan_layer/core/src/workers/states/prepare.rs b/dan_layer/core/src/workers/states/prepare.rs index 61682c0f57..4fc82dd2e3 100644 --- a/dan_layer/core/src/workers/states/prepare.rs +++ b/dan_layer/core/src/workers/states/prepare.rs @@ -223,7 +223,7 @@ where .new_unit_of_work(); let proposal = self .create_proposal( - high_qc.node_hash().clone(), + *high_qc.node_hash(), payload_provider, payload_processor, 0, @@ -292,7 +292,7 @@ where payload_provider.reserve_payload(node.payload(), node.hash()).await?; self.send_vote_to_leader( - node.hash().clone(), + *node.hash(), outbound, view_leader, current_view.view_id, diff --git a/dan_layer/core/src/workers/states/starting.rs b/dan_layer/core/src/workers/states/starting.rs index 52ca48d477..915df3d8fd 100644 --- a/dan_layer/core/src/workers/states/starting.rs +++ b/dan_layer/core/src/workers/states/starting.rs @@ -23,6 +23,7 @@ use std::marker::PhantomData; use log::*; +use tari_utilities::hex::Hex; use crate::{ digital_assets_error::DigitalAssetError, @@ -97,9 +98,18 @@ where TBaseNodeClient: BaseNodeClient committee_manager.read_from_checkpoint(last_checkpoint)?; if !committee_manager.current_committee()?.contains(node_id) { + info!( + target: LOG_TARGET, + "Validator node not part of committee for asset public key '{}'", + asset_definition.public_key.to_hex() + ); return Ok(ConsensusWorkerStateEvent::NotPartOfCommittee); } - + info!( + target: LOG_TARGET, + "Validator node is a committee member for asset public key '{}'", + asset_definition.public_key.to_hex() + ); // read and create the genesis block info!(target: LOG_TARGET, "Creating DB"); let chain_db = db_factory.get_or_create_chain_db(&asset_definition.public_key)?; @@ -132,7 +142,7 @@ where TBaseNodeClient: BaseNodeClient } info!(target: LOG_TARGET, "Saving genesis node"); let node = HotStuffTreeNode::genesis(payload_provider.create_genesis_payload()); - let genesis_qc = QuorumCertificate::genesis(node.hash().clone()); + let genesis_qc = QuorumCertificate::genesis(*node.hash()); chain_storage_service.add_node(&node, tx.clone()).await?; tx.commit_node(node.hash())?; debug!(target: LOG_TARGET, "Setting locked QC"); diff --git a/dan_layer/storage_sqlite/src/error.rs b/dan_layer/storage_sqlite/src/error.rs index f4d58bb013..4bc6f3d904 100644 --- a/dan_layer/storage_sqlite/src/error.rs +++ b/dan_layer/storage_sqlite/src/error.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use diesel; -use tari_dan_core::storage::StorageError; +use tari_dan_core::{fixed_hash::FixedHashSizeError, models::ModelError, storage::StorageError}; use thiserror::Error; #[derive(Debug, Error)] @@ -42,6 +42,10 @@ pub enum SqliteStorageError { }, #[error("Error decoding bytes:{0}")] DecodeError(#[from] bytecodec::Error), + #[error("Encountered malformed hash data")] + MalformedHashData, + #[error(transparent)] + ModelError(#[from] ModelError), } impl From for StorageError { @@ -57,6 +61,15 @@ impl From for StorageError { reason: source.to_string(), }, SqliteStorageError::DecodeError(e) => StorageError::DecodeError(e), + other => StorageError::General { + details: other.to_string(), + }, } } } + +impl From for SqliteStorageError { + fn from(_: FixedHashSizeError) -> Self { + SqliteStorageError::MalformedHashData + } +} diff --git a/dan_layer/storage_sqlite/src/models/instruction.rs b/dan_layer/storage_sqlite/src/models/instruction.rs index 683c4a39d6..8e6c6a1c2f 100644 --- a/dan_layer/storage_sqlite/src/models/instruction.rs +++ b/dan_layer/storage_sqlite/src/models/instruction.rs @@ -1,10 +1,10 @@ // Copyright 2021. The Tari Project // -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that +// the following conditions are met: // -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the +// following disclaimer. // // 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the // following disclaimer in the documentation and/or other materials provided with the distribution. @@ -12,26 +12,39 @@ // 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote // products derived from this software without specific prior written permission. // -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::schema::*; +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +// PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY +// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +// DAMAGE. -#[derive(Queryable)] +use std::convert::{TryFrom, TryInto}; + +use crate::{error::SqliteStorageError, schema::*}; + +#[derive(Debug, Identifiable, Queryable)] pub struct Instruction { pub id: i32, pub hash: Vec, - pub node_hash: Vec, + pub node_id: i32, pub template_id: i32, pub method: String, pub args: Vec, } -#[derive(Insertable)] +impl TryFrom for tari_dan_core::models::Instruction { + type Error = SqliteStorageError; + + fn try_from(instruction: Instruction) -> Result { + let template_id = instruction.template_id.try_into()?; + Ok(Self::new(template_id, instruction.method, instruction.args)) + } +} + +#[derive(Debug, Insertable)] #[table_name = "instructions"] pub struct NewInstruction { pub hash: Vec, diff --git a/dan_layer/storage_sqlite/src/models/node.rs b/dan_layer/storage_sqlite/src/models/node.rs index 571d78c072..9c665ba800 100644 --- a/dan_layer/storage_sqlite/src/models/node.rs +++ b/dan_layer/storage_sqlite/src/models/node.rs @@ -22,7 +22,7 @@ use crate::schema::*; -#[derive(Queryable)] +#[derive(Identifiable, Queryable)] pub struct Node { pub id: i32, pub hash: Vec, diff --git a/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs b/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs index b4f56431d4..ac57779dd2 100644 --- a/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs +++ b/dan_layer/storage_sqlite/src/sqlite_chain_backend_adapter.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; use diesel::{prelude::*, Connection, SqliteConnection}; use log::*; @@ -32,12 +32,12 @@ use tari_dan_core::{ use crate::{ error::SqliteStorageError, models::{ - instruction::NewInstruction, + instruction::{Instruction, NewInstruction}, locked_qc::LockedQc, node::{NewNode, Node}, prepare_qc::PrepareQc, }, - schema::{locked_qc::dsl, *}, + schema::*, SqliteTransaction, }; @@ -73,6 +73,22 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { Ok(n.is_none()) } + fn node_exists(&self, node_hash: &TreeNodeHash) -> Result { + let connection = SqliteConnection::establish(self.database_url.as_str())?; + use crate::schema::nodes::dsl; + let count = dsl::nodes + .filter(nodes::parent.eq(node_hash.as_bytes())) + .limit(1) + .count() + .first::(&connection) + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "node_exists: count".to_string(), + })?; + + Ok(count > 0) + } + fn create_transaction(&self) -> Result { let connection = SqliteConnection::establish(self.database_url.as_str())?; connection @@ -132,6 +148,7 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { } fn update_locked_qc(&self, item: &DbQc, transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { + use crate::schema::locked_qc::dsl; let message_type = item.message_type.as_u8() as i32; let existing: Result = dsl::locked_qc.find(1).first(transaction.connection()); match existing { @@ -217,14 +234,15 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { source, operation: "get_prepare_qc".to_string(), })?; - Ok(qc.map(|qc| { - QuorumCertificate::new( + qc.map(|qc| { + Ok(QuorumCertificate::new( HotStuffMessageType::try_from(qc.message_type as u8).unwrap(), ViewId::from(qc.view_number as u64), - TreeNodeHash(qc.node_hash.clone()), + qc.node_hash.try_into()?, qc.signature.map(|s| Signature::from_bytes(s.as_slice())), - ) - })) + )) + }) + .transpose() } fn commit(&self, transaction: &Self::BackendTransaction) -> Result<(), Self::Error> { @@ -248,7 +266,7 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { } fn find_highest_prepared_qc(&self) -> Result { - use crate::schema::*; + use crate::schema::locked_qc::dsl; let connection = SqliteConnection::establish(self.database_url.as_str())?; // TODO: this should be a single row let result: Option = prepare_qc::table @@ -282,12 +300,13 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { Ok(QuorumCertificate::new( HotStuffMessageType::try_from(qc.message_type as u8).unwrap(), ViewId::from(qc.view_number as u64), - TreeNodeHash(qc.node_hash.clone()), + qc.node_hash.try_into()?, qc.signature.map(|s| Signature::from_bytes(s.as_slice())), )) } fn get_locked_qc(&self) -> Result { + use crate::schema::locked_qc::dsl; let connection = SqliteConnection::establish(self.database_url.as_str())?; let qc: LockedQc = dsl::locked_qc .find(self.locked_qc_id()) @@ -299,27 +318,55 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { Ok(QuorumCertificate::new( HotStuffMessageType::try_from(qc.message_type as u8).unwrap(), ViewId::from(qc.view_number as u64), - TreeNodeHash(qc.node_hash.clone()), + qc.node_hash.try_into()?, qc.signature.map(|s| Signature::from_bytes(s.as_slice())), )) } - fn find_node_by_hash(&self, node_hash: &TreeNodeHash) -> Result<(Self::Id, DbNode), Self::Error> { + fn find_node_by_hash(&self, parent_hash: &TreeNodeHash) -> Result, Self::Error> { use crate::schema::nodes::dsl; let connection = SqliteConnection::establish(self.database_url.as_str())?; - let node: Node = dsl::nodes - .filter(nodes::hash.eq(&node_hash.0)) - .first(&connection) + let node = dsl::nodes + .filter(nodes::parent.eq(parent_hash.as_bytes())) + .first::(&connection) + .optional() .map_err(|source| SqliteStorageError::DieselError { source, operation: "find_node_by_hash".to_string(), })?; - Ok((node.id, DbNode { - hash: TreeNodeHash(node.hash), - parent: TreeNodeHash(node.parent), - height: node.height as u32, - is_committed: node.is_committed, - })) + + match node { + Some(node) => Ok(Some((node.id, DbNode { + hash: node.hash.try_into()?, + parent: node.parent.try_into()?, + height: node.height as u32, + is_committed: node.is_committed, + }))), + None => Ok(None), + } + } + + fn find_node_by_parent_hash(&self, node_hash: &TreeNodeHash) -> Result, Self::Error> { + use crate::schema::nodes::dsl; + let connection = SqliteConnection::establish(self.database_url.as_str())?; + let node = dsl::nodes + .filter(nodes::parent.eq(node_hash.as_bytes())) + .first::(&connection) + .optional() + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "find_node_by_hash".to_string(), + })?; + + match node { + Some(node) => Ok(Some((node.id, DbNode { + hash: node.hash.try_into()?, + parent: node.parent.try_into()?, + height: node.height as u32, + is_committed: node.is_committed, + }))), + None => Ok(None), + } } fn insert_instruction( @@ -330,14 +377,14 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { use crate::schema::nodes::dsl; // TODO: this could be made more efficient let node: Node = dsl::nodes - .filter(nodes::hash.eq(&item.node_hash.0)) + .filter(nodes::hash.eq(&item.node_hash.as_bytes())) .first(transaction.connection()) .map_err(|source| SqliteStorageError::DieselError { source, operation: "insert_instruction::find_node".to_string(), })?; let new_instruction = NewInstruction { - hash: Vec::from(item.instruction.hash()), + hash: item.instruction.hash().to_vec(), node_id: node.id, template_id: item.instruction.template_id() as i32, method: item.instruction.method().to_string(), @@ -352,4 +399,35 @@ impl ChainDbBackendAdapter for SqliteChainBackendAdapter { })?; Ok(()) } + + fn find_all_instructions_by_node(&self, node_id: Self::Id) -> Result, Self::Error> { + use crate::schema::{instructions::dsl as instructions_dsl, nodes::dsl as nodes_dsl}; + let connection = SqliteConnection::establish(self.database_url.as_str())?; + let node = nodes_dsl::nodes + .filter(nodes::id.eq(node_id)) + .first::(&connection) + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "find_all_instructions_by_node::find_node".to_string(), + })?; + let instructions = instructions_dsl::instructions + .filter(instructions::node_id.eq(&node.id)) + .load::(&connection) + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "find_all_instructions_by_node::filter_by_node_id".to_string(), + })?; + let node_hash = node.hash.try_into()?; + let instructions = instructions + .into_iter() + .map(|i| { + Ok(DbInstruction { + instruction: i.try_into()?, + node_hash, + }) + }) + .collect::>()?; + + Ok(instructions) + } } diff --git a/dan_layer/storage_sqlite/src/sqlite_db_factory.rs b/dan_layer/storage_sqlite/src/sqlite_db_factory.rs index 7fd0da6b40..2d2aa029ea 100644 --- a/dan_layer/storage_sqlite/src/sqlite_db_factory.rs +++ b/dan_layer/storage_sqlite/src/sqlite_db_factory.rs @@ -24,6 +24,7 @@ use std::{fs::create_dir_all, path::PathBuf}; use diesel::{Connection, ConnectionError, SqliteConnection}; use diesel_migrations::embed_migrations; +use log::*; use tari_common::GlobalConfig; use tari_common_types::types::PublicKey; use tari_dan_core::storage::{chain::ChainDb, state::StateDb, DbFactory, StorageError}; @@ -64,17 +65,45 @@ impl SqliteDbFactory { .into_string() .expect("Should not fail") } + + fn try_connect(&self, url: &str) -> Result, StorageError> { + match SqliteConnection::establish(url) { + Ok(connection) => { + connection + .execute("PRAGMA foreign_keys = ON;") + .map_err(|source| SqliteStorageError::DieselError { + source, + operation: "set pragma".to_string(), + })?; + Ok(Some(connection)) + }, + Err(ConnectionError::BadConnection(_)) => Ok(None), + Err(err) => Err(SqliteStorageError::from(err).into()), + } + } } impl DbFactory for SqliteDbFactory { type ChainDbBackendAdapter = SqliteChainBackendAdapter; type StateDbBackendAdapter = SqliteStateDbBackendAdapter; + fn get_chain_db( + &self, + asset_public_key: &PublicKey, + ) -> Result>, StorageError> { + let database_url = self.database_url_for(asset_public_key); + match self.try_connect(&database_url)? { + Some(_) => Ok(Some(ChainDb::new(SqliteChainBackendAdapter::new(database_url)))), + None => Ok(None), + } + } + fn get_or_create_chain_db( &self, asset_public_key: &PublicKey, ) -> Result, StorageError> { let database_url = self.database_url_for(asset_public_key); + debug!("Loading chain database from {}", database_url); create_dir_all(&PathBuf::from(&database_url).parent().unwrap()) .map_err(|_| StorageError::FileSystemPathDoesNotExist)?; let connection = SqliteConnection::establish(database_url.as_str()).map_err(SqliteStorageError::from)?; @@ -93,23 +122,10 @@ impl DbFactory for SqliteDbFactory { &self, asset_public_key: &PublicKey, ) -> Result>, StorageError> { - // let database_url = self.database_url_for(asset_public_key); - - match SqliteConnection::establish(database_url.as_str()) { - Ok(connection) => { - connection - .execute("PRAGMA foreign_keys = ON;") - .map_err(|source| SqliteStorageError::DieselError { - source, - operation: "set pragma".to_string(), - })?; - Ok(Some(StateDb::new(SqliteStateDbBackendAdapter::new(database_url)))) - }, - Err(err) => match err { - ConnectionError::BadConnection(_) => Ok(None), - _ => Err(SqliteStorageError::from(err).into()), - }, + match self.try_connect(&database_url)? { + Some(_) => Ok(Some(StateDb::new(SqliteStateDbBackendAdapter::new(database_url)))), + None => Ok(None), } } @@ -118,6 +134,7 @@ impl DbFactory for SqliteDbFactory { asset_public_key: &PublicKey, ) -> Result, StorageError> { let database_url = self.database_url_for(asset_public_key); + create_dir_all(&PathBuf::from(&database_url).parent().unwrap()) .map_err(|_| StorageError::FileSystemPathDoesNotExist)?; diff --git a/dan_layer/storage_sqlite/src/sqlite_storage_service.rs b/dan_layer/storage_sqlite/src/sqlite_storage_service.rs index 16188aedda..0fd9361334 100644 --- a/dan_layer/storage_sqlite/src/sqlite_storage_service.rs +++ b/dan_layer/storage_sqlite/src/sqlite_storage_service.rs @@ -43,9 +43,9 @@ impl ChainStorageService for SqliteStorageService { ) -> Result<(), StorageError> { let mut db = db; for instruction in node.payload().instructions() { - db.add_instruction(node.hash().clone(), instruction.clone())?; + db.add_instruction(*node.hash(), instruction.clone())?; } - db.add_node(node.hash().clone(), node.parent().clone(), node.height())?; + db.add_node(*node.hash(), *node.parent(), node.height())?; Ok(()) } } diff --git a/infrastructure/test_utils/src/paths.rs b/infrastructure/test_utils/src/paths.rs index edfff843bc..6d1cb67525 100644 --- a/infrastructure/test_utils/src/paths.rs +++ b/infrastructure/test_utils/src/paths.rs @@ -54,6 +54,8 @@ pub fn cargo_path() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")) } +pub use tempfile::tempdir; + #[cfg(test)] mod test { use std::{