From 1f88a29c3c13421d5740caf481041c7cb5c6dcd4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 10 Mar 2022 14:33:20 +0900 Subject: [PATCH] Quickwit service run several services Closes #1160 --- Cargo.lock | 158 ++++++- docs/get-started/installation.md | 18 +- docs/get-started/quickstart.md | 2 +- .../add-full-text-search-to-your-olap-db.md | 14 +- ...ial-hdfs-logs-distributed-search-aws-s3.md | 2 +- docs/guides/tutorial-hdfs-logs.md | 8 +- docs/reference/cli.md | 83 ++-- quickwit-cli/src/cli.rs | 20 +- quickwit-cli/src/cli_doc_ext.toml | 8 +- quickwit-cli/src/service.rs | 238 ++++------ quickwit-cli/tests/cli.rs | 2 +- quickwit-cluster/Cargo.toml | 1 + quickwit-cluster/src/cluster.rs | 4 +- quickwit-cluster/src/lib.rs | 50 ++- quickwit-cluster/src/service.rs | 32 +- .../src/actors/indexing_pipeline.rs | 5 +- .../src/actors/indexing_server.rs | 18 +- quickwit-indexing/src/actors/mod.rs | 2 +- quickwit-indexing/src/lib.rs | 36 +- quickwit-indexing/src/models/mod.rs | 3 + quickwit-proto/Cargo.toml | 4 +- quickwit-proto/build.rs | 1 - quickwit-proto/proto/cluster.proto | 4 +- quickwit-proto/src/cluster.rs | 233 ++++++---- quickwit-proto/src/quickwit.rs | 411 ++++++++++-------- quickwit-search/src/lib.rs | 25 +- quickwit-search/src/search_client_pool.rs | 4 +- quickwit-serve/Cargo.toml | 1 + .../src/cluster_api/grpc_adapter.rs | 6 +- .../src/cluster_api/rest_handler.rs | 15 +- quickwit-serve/src/error.rs | 26 +- quickwit-serve/src/format.rs | 14 + quickwit-serve/src/grpc.rs | 22 +- quickwit-serve/src/indexing_api/mod.rs | 22 + .../src/indexing_api/rest_handler.rs | 47 ++ quickwit-serve/src/lib.rs | 175 +++++--- quickwit-serve/src/rest.rs | 27 +- quickwit-serve/src/search_api/rest_handler.rs | 50 +-- 38 files changed, 1082 insertions(+), 709 deletions(-) create mode 100644 quickwit-serve/src/indexing_api/mod.rs create mode 100644 quickwit-serve/src/indexing_api/rest_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 91e7519c182..cdfbbd86ab0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,9 +106,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.52" +version = "0.1.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" +checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" dependencies = [ "proc-macro2", "quote", @@ -132,6 +132,49 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9f346c92c1e9a71d14fe4aaf7c2a5d9932cc4e5e48d8fb6641524416eb79ddd" +dependencies = [ + "async-trait", + "axum-core", + "bitflags", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbcda393bef9c87572779cb8ef916f12d77750b27535dd6819fa86591627a51" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", +] + [[package]] name = "backoff" version = "0.4.0" @@ -365,8 +408,8 @@ checksum = "cc347c19eb5b940f396ac155822caee6662f850d97306890ac3773ed76c90c5a" dependencies = [ "prost", "prost-types", - "tonic", - "tonic-build", + "tonic 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tonic-build 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "tracing-core", ] @@ -388,7 +431,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "tracing", "tracing-core", "tracing-subscriber", @@ -1166,6 +1209,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.6.0" @@ -1459,6 +1508,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "matchit" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9376a4f0340565ad675d11fc1419227faf5f60cd7ac9cb2e7185a471f30af833" + [[package]] name = "md-5" version = "0.9.1" @@ -2061,6 +2116,16 @@ dependencies = [ "output_vt100", ] +[[package]] +name = "prettyplease" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b83ec2d0af5c5c556257ff52c9f98934e243b9fd39604bfb2a9b75ec2e97f18" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-crate" version = "1.1.3" @@ -2326,6 +2391,7 @@ dependencies = [ "flume", "itertools", "quickwit-common", + "quickwit-config", "quickwit-proto", "scuttlebutt", "serde", @@ -2529,8 +2595,8 @@ dependencies = [ "prost", "prost-build", "serde", - "tonic", - "tonic-build", + "tonic 0.6.2 (git+https://github.com/hyperium/tonic?rev=01e5be5)", + "tonic-build 0.6.2 (git+https://github.com/hyperium/tonic?rev=01e5be5)", ] [[package]] @@ -2665,9 +2731,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" +checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" dependencies = [ "proc-macro2", ] @@ -3013,7 +3079,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver 1.0.6", + "semver 1.0.7", ] [[package]] @@ -3191,9 +3257,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d" +checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4" [[package]] name = "semver-parser" @@ -3510,6 +3576,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "tabled" version = "0.5.0" @@ -3944,6 +4016,37 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "tonic" +version = "0.6.2" +source = "git+https://github.com/hyperium/tonic?rev=01e5be5#01e5be508051eebf19c233d48b57797a17331383" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "prost-derive", + "tokio", + "tokio-stream", + "tokio-util 0.6.9", + "tower", + "tower-layer", + "tower-service", + "tracing", + "tracing-futures", +] + [[package]] name = "tonic-build" version = "0.6.2" @@ -3956,6 +4059,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tonic-build" +version = "0.6.2" +source = "git+https://github.com/hyperium/tonic?rev=01e5be5#01e5be508051eebf19c233d48b57797a17331383" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn", +] + [[package]] name = "tower" version = "0.4.12" @@ -3976,6 +4091,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8" +dependencies = [ + "bitflags", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.1" diff --git a/docs/get-started/installation.md b/docs/get-started/installation.md index e0c38d208bd..39ef652f450 100644 --- a/docs/get-started/installation.md +++ b/docs/get-started/installation.md @@ -4,7 +4,7 @@ sidebar_position: 2 --- Quickwit compiles to a single binary, we provide different methods to install it. -We notably provide musl builds to provide static binaries with no dependencies. +We notably provide musl builds to provide static binaries with no dependencies. ## Download @@ -25,8 +25,8 @@ Checkout all builds on [github](https://github.com/quickwit-oss/quickwit/release Quickwit depends on the following external libraries to work correctly: - `libpq`: the Postgres client library. -- `libssl`: the industry defacto cryptography library. -These libraries can be installed on your system using the native package manager. +- `libssl`: the industry defacto cryptography library. +These libraries can be installed on your system using the native package manager. On Ubuntu for instance, you can install these dependencies using the following command: ```bash @@ -35,7 +35,7 @@ apt-get -y update && apt-get -y install libpq-dev libssl-dev :::note -Quickwit static binary packages are also provided as `musl` builds. These packages don't require you to install any external library and can be automatically picked during installation on your system if the required libc version is not present. You can also download and manually install a static binary package. +Quickwit static binary packages are also provided as `musl` builds. These packages don't require you to install any external library and can be automatically picked during installation on your system if the required libc version is not present. You can also download and manually install a static binary package. ::: @@ -65,14 +65,14 @@ quickwit-{version} - `config/quickwit.yaml`: is the default configuration file. - `LICENSE_AGPLv3.0.txt`: the license file. - `quickwit`: the quickwit executable binary. -- `qwdata/`: the default data directory. +- `qwdata/`: the default data directory. ## Use the docker image -If you use docker, this might be one of the quickest way to get going. +If you use docker, this might be one of the quickest way to get going. The following command will pull the image from [dockerhub](https://hub.docker.com/r/quickwit/quickwit) -and gets you right in the shell of the running container ready to execute Quickwit commands. +and gets you right in the shell of the running container ready to execute Quickwit commands. Note that we are also mounting the working directory as volume. This is useful when you already have your dataset ready on your machine and want to work with Quickwit docker image. ```bash @@ -90,14 +90,14 @@ mkdir data && cd data curl -o wikipedia_index_config.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/wikipedia/index-config.yaml curl -o wiki-articles-10000.json https://quickwit-datasets-public.s3.amazonaws.com/wiki-articles-10000.json -# create, index and search using the container +# create, index and search using the container docker run -v "$(pwd)":"/quickwit/qwdata" quickwit/quickwit index create --index-config ./qwdata/wikipedia_index_config.yaml docker run -v "$(pwd)":"/quickwit/qwdata" quickwit/quickwit index ingest --index wikipedia --input-path ./qwdata/wiki-articles-10000.json docker run -v "$(pwd)":"/quickwit/qwdata" quickwit/quickwit index search --index wikipedia --query "barack obama" -docker run -v "$(pwd)":"/quickwit/qwdata" --expose 7280 -p 7280:7280 quickwit/quickwit service run searcher +docker run -v "$(pwd)":"/quickwit/qwdata" --expose 7280 -p 7280:7280 quickwit/quickwit run --service searcher ``` Now you can make HTTP requests to the searcher service API. diff --git a/docs/get-started/quickstart.md b/docs/get-started/quickstart.md index 19c79ae5520..d72b4fb3200 100644 --- a/docs/get-started/quickstart.md +++ b/docs/get-started/quickstart.md @@ -114,7 +114,7 @@ It should return 10 hits. Now you're ready to serve our search API. Quickwit provides a search [REST API](../reference/rest-api.md) that can be started using the `service` subcommand. ```bash -./quickwit service run searcher +./quickwit run --service searcher ``` Check it's working with a simple GET request in the browser or via cURL: diff --git a/docs/guides/add-full-text-search-to-your-olap-db.md b/docs/guides/add-full-text-search-to-your-olap-db.md index 1c470c98dd7..d05fe37fb7a 100644 --- a/docs/guides/add-full-text-search-to-your-olap-db.md +++ b/docs/guides/add-full-text-search-to-your-olap-db.md @@ -4,7 +4,7 @@ sidebar_position: 2 --- -This guide will help you add full-text search to a well-known OLAP database, Clickhouse, using the Quickwit search streaming feature. Indeed Quickwit exposes a REST endpoint that streams ids or whatever attributes matching a search query **extremely fast** (up to 50 million in 1 second), and Clickhouse can easily use them with joins queries. +This guide will help you add full-text search to a well-known OLAP database, Clickhouse, using the Quickwit search streaming feature. Indeed Quickwit exposes a REST endpoint that streams ids or whatever attributes matching a search query **extremely fast** (up to 50 million in 1 second), and Clickhouse can easily use them with joins queries. We will take the [Github archive dataset](https://www.gharchive.org/), which gathers more than 3 billion Github events: `WatchEvent`, `PullRequestEvent`, `IssuesEvent`... You can dive into this [great analysis](https://ghe.clickhouse.tech/) made by Clickhouse to have a good understanding of the dataset. We also took strong inspiration from this work, and we are very grateful to them for sharing this. @@ -97,7 +97,7 @@ You can check it's working by using the `search` command and looking for `tantiv ## Start a searcher ```bash -./quickwit service run searcher +./quickwit run --service searcher ``` This command will start an HTTP server with a [REST API](../reference/rest-api.md). We are now @@ -163,10 +163,10 @@ gunzip -c gh-archive-2021-12.json.gz | clickhouse-client -d gh-archive --query=" Let's check it's working: ```SQL # Top repositories by stars -SELECT repo_name, count() AS stars -FROM github_events -WHERE event_type = 'WatchEvent' -GROUP BY repo_name +SELECT repo_name, count() AS stars +FROM github_events +WHERE event_type = 'WatchEvent' +GROUP BY repo_name ORDER BY stars DESC LIMIT 5 ┌─repo_name────────────┬─stars─┐ @@ -181,7 +181,7 @@ ORDER BY stars DESC LIMIT 5 ### Use Quickwit search inside Clickhouse Clickhouse has an exciting feature called [URL Table Engine](https://clickhouse.com/docs/en/engines/table-engines/special/url/) that queries data from a remote HTTP/HTTPS server. -This is precisely what we need: by creating a table pointing to Quickwit search stream endpoint, we will fetch ids that match a query from Clickhouse. +This is precisely what we need: by creating a table pointing to Quickwit search stream endpoint, we will fetch ids that match a query from Clickhouse. ```SQL SELECT count(*) FROM url('http://127.0.0.1:7280/api/v1/gh-archive/search/stream?query=log4j+OR+log4shell&fastField=id&outputFormat=clickHouseRowBinary', RowBinary, 'id UInt64') diff --git a/docs/guides/tutorial-hdfs-logs-distributed-search-aws-s3.md b/docs/guides/tutorial-hdfs-logs-distributed-search-aws-s3.md index f79f6f16401..20be0da7b91 100644 --- a/docs/guides/tutorial-hdfs-logs-distributed-search-aws-s3.md +++ b/docs/guides/tutorial-hdfs-logs-distributed-search-aws-s3.md @@ -163,7 +163,7 @@ Now let's launch a searcher node for this instance. ```bash # Then start the http server search service. -./quickwit service run searcher --config ./config.yaml +./quickwit run --service searcher --config ./config.yaml ``` You will see in the terminal the confirmation that the instance has created a new cluster. Example of such a log: diff --git a/docs/guides/tutorial-hdfs-logs.md b/docs/guides/tutorial-hdfs-logs.md index 6dc1c44e2d3..873d2bc9752 100644 --- a/docs/guides/tutorial-hdfs-logs.md +++ b/docs/guides/tutorial-hdfs-logs.md @@ -125,7 +125,7 @@ The command `service run searcher` starts an http server which provides a [REST ```bash -./quickwit service run searcher +./quickwit run --service searcher ``` Let's execute the same query on field `severity_text` but with `cURL`: @@ -188,17 +188,17 @@ Once the configuration files are downloaded, start a searcher node for each of t ```bash # run this in the first terminal window. -./quickwit service run searcher --config ./searcher-1.yaml +./quickwit run --service searcher --config ./searcher-1.yaml ``` ```bash # run this in the second terminal window. -./quickwit service run searcher --config ./searcher-2.yaml +./quickwit run --service searcher --config ./searcher-2.yaml ``` ```bash # run this in the third terminal window. -./quickwit service run searcher --config ./searcher-3.yaml +./quickwit run --service searcher --config ./searcher-3.yaml ``` You will see in your terminal the confirmation that the instance has created or joined a cluster. Example of such a log: diff --git a/docs/reference/cli.md b/docs/reference/cli.md index fa92caedf0d..f02f3943c28 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -338,16 +338,25 @@ Possible values are `staged`, `published`, and `marked`. `--tags` Comma-separated list of tags, only splits that contain all of the tags will be returned. `--config` Quickwit config file. `--data-dir` Where data is persisted. Override data-dir defined in config file, default is `./qwdata`. -## service -Launches services. -### service run -Starts a service. Currently, the only services available are `indexer` and `searcher`. -`quickwit service run [args]` -### service run searcher +## run -Starts a web server at `rest_listing_address:rest_list_port` that exposes the [Quickwit REST API](rest-api.md) +Starts quickwit services. By default, both `search` and `indexing` will be started. +It is however possible to specifically run only one of these services by adding a `--service` parameter. + +::: +`quickwit run [args]` + +*Options* + +`--service` Selects a specific service to run. (searcher or indexer) +`--config` Quickwit config file. +`--data-dir` Where data is persisted. Override data-dir defined in config file, default is `./qwdata`. + +### Searcher service + +The searcher service starts a web server at `rest_listing_address:rest_list_port` that exposes the [Quickwit REST API](rest-api.md) where `rest_listing_address` and `rest_list_port` are defined in Quickwit config file (quickwit.yaml). The node can optionally join a cluster using the `peer_seeds` parameter. This list of node addresses is used to discover the remaining peer nodes in the cluster through a gossip protocol (SWIM). @@ -355,40 +364,26 @@ This list of node addresses is used to discover the remaining peer nodes in the :::note Behind the scenes, Quickwit needs to open the following port for cluster formation and workload distribution: - TCP port (default is 7280) for REST API - TCP and UDP port (default is 7280) for cluster membership protocol - TCP port + 1 (default is 7281) for gRPC address for the distributed search +| name | purpose | protocol | default value | +|-------------|------------------------------------------|----------|---------------| +| rest port | Serves Quickwit's UI and REST API | TCP | 7280 | +| gossip port | Serves for Quickwit's cluster membership | UDP | rest_port | +| grpc port | Serves Quickwit's gRPC API | TCP | rest_port + 1 | If ports are already taken, the serve command will fail. -::: -`quickwit service run searcher [args]` - -*Synopsis* - -```bash -quickwit service run searcher - --config - [--data-dir ] -``` - -*Options* - -`--config` Quickwit config file. -`--data-dir` Where data is persisted. Override data-dir defined in config file, default is `./qwdata`. - *Examples* *Start a Searcher* ```bash -quickwit service run searcher --config=./config/quickwit.yaml +quickwit run --service searcher --config=./config/quickwit.yaml ``` *Make a search request on a wikipedia index* ```bash # To create wikipedia index and ingest data, go to our tutorial https://quickwit.io/docs/get-started/quickstart. # Start a searcher. -quickwit service run searcher --config=./config/quickwit.yaml +quickwit run --service searcher --config=./config/quickwit.yaml # Make a request. curl "http://127.0.0.1:7280/api/v1/wikipedia/search?query=barack+obama" ``` @@ -401,42 +396,16 @@ quickwit index create --index-config gh_archive_index_config.yaml --config ./con # Download a data sample and ingest it. curl https://quickwit-datasets-public.s3.amazonaws.com/gh-archive-2022-01-text-only-10000.json.gz | gunzip | cargo r index ingest --index gh-archive --config=./config/quickwit.yaml # Start server. -quickwit service run searcher --config=./config/quickwit.yaml +quickwit run --service searcher --config=./config/quickwit.yaml # Finally make the search stream request. curl "http://127.0.0.1:7280/api/v1/gh-archive/search/stream?query=log4j&fastField=id&outputFormat=csv" # Make a search stream request with HTTP2. curl --http2-prior-knowledge "http://127.0.0.1:7280/api/v1/gh-archive/search/stream?query=log4j&fastField=id&outputFormat=csv" ``` -### service run indexer - -Starts an indexing server that consumes the sources of index IDs passed in `--indexes` argument. - -`quickwit service run indexer [args]` - -*Synopsis* - -```bash -quickwit service run indexer - --config - [--data-dir ] - --indexes -``` +### Indexer service -*Options* - -`--config` Quickwit config file. -`--data-dir` Where data is persisted. Override data-dir defined in config file, default is `./qwdata`. -`--indexes` IDs of the indexes to run the indexer for. - -*Examples* - -*Add a source to an index and start an Indexer* -```bash -quickwit source add --index wikipedia --source wikipedia-source --type file --params '{"filepath":"wiki-articles-10000.json"}' -quickwit service run indexer --indexes wikipedia --config=./config/quickwit.yaml - -``` +The indexer service will list indexes and their associated sources, and run an indexing pipeline for every single source. ## source Manages sources. diff --git a/quickwit-cli/src/cli.rs b/quickwit-cli/src/cli.rs index 469361e2058..bc4fb55a0bc 100644 --- a/quickwit-cli/src/cli.rs +++ b/quickwit-cli/src/cli.rs @@ -22,33 +22,33 @@ use clap::{ArgMatches, Command}; use tracing::Level; use crate::index::{build_index_command, IndexCliCommand}; -use crate::service::{build_service_command, ServiceCliCommand}; +use crate::service::{build_run_command, RunCliCommand}; use crate::source::{build_source_command, SourceCliCommand}; use crate::split::{build_split_command, SplitCliCommand}; pub fn build_cli<'a>() -> Command<'a> { Command::new("Quickwit") - .subcommand(build_source_command()) - .subcommand(build_service_command()) - .subcommand(build_split_command()) - .subcommand(build_index_command()) + .subcommand(build_run_command().display_order(1)) + .subcommand(build_index_command().display_order(2)) + .subcommand(build_source_command().display_order(3)) + .subcommand(build_split_command().display_order(4)) .disable_help_subcommand(true) .arg_required_else_help(true) } #[derive(Debug, PartialEq)] pub enum CliCommand { + Run(RunCliCommand), Index(IndexCliCommand), - Service(ServiceCliCommand), - Source(SourceCliCommand), Split(SplitCliCommand), + Source(SourceCliCommand), } impl CliCommand { pub fn default_log_level(&self) -> Level { match self { + CliCommand::Run(_) => Level::INFO, CliCommand::Index(subcommand) => subcommand.default_log_level(), - CliCommand::Service(_) => Level::INFO, CliCommand::Source(_) => Level::ERROR, CliCommand::Split(_) => Level::ERROR, } @@ -60,7 +60,7 @@ impl CliCommand { .ok_or_else(|| anyhow::anyhow!("Failed to parse command arguments."))?; match subcommand { "index" => IndexCliCommand::parse_cli_args(submatches).map(CliCommand::Index), - "service" => ServiceCliCommand::parse_cli_args(submatches).map(CliCommand::Service), + "run" => RunCliCommand::parse_cli_args(submatches).map(CliCommand::Run), "source" => SourceCliCommand::parse_cli_args(submatches).map(CliCommand::Source), "split" => SplitCliCommand::parse_cli_args(submatches).map(CliCommand::Split), _ => bail!("Subcommand `{}` is not implemented.", subcommand), @@ -70,7 +70,7 @@ impl CliCommand { pub async fn execute(self) -> anyhow::Result<()> { match self { CliCommand::Index(subcommand) => subcommand.execute().await, - CliCommand::Service(subcommand) => subcommand.execute().await, + CliCommand::Run(subcommand) => subcommand.execute().await, CliCommand::Source(subcommand) => subcommand.execute().await, CliCommand::Split(subcommand) => subcommand.execute().await, } diff --git a/quickwit-cli/src/cli_doc_ext.toml b/quickwit-cli/src/cli_doc_ext.toml index c31940310c6..0163b79da3c 100644 --- a/quickwit-cli/src/cli_doc_ext.toml +++ b/quickwit-cli/src/cli_doc_ext.toml @@ -124,14 +124,14 @@ If ports are already taken, the serve command will fail. [[service.run.searcher.examples]] name = "Start a Searcher" -command = "quickwit service run searcher --config=./config/quickwit.yaml" +command = "quickwit run --service searcher --config=./config/quickwit.yaml" [[service.run.searcher.examples]] name = "Make a search request on a wikipedia index" command = ''' # To create wikipedia index and ingest data, go to our tutorial https://quickwit.io/docs/get-started/quickstart. # Start a searcher. -quickwit service run searcher --config=./config/quickwit.yaml +quickwit run --service searcher --config=./config/quickwit.yaml # Make a request. curl "http://127.0.0.1:7280/api/v1/wikipedia/search?query=barack+obama" ''' @@ -146,7 +146,7 @@ quickwit index create --index-config gh_archive_index_config.yaml --config ./con # Download a data sample and ingest it. curl https://quickwit-datasets-public.s3.amazonaws.com/gh-archive-2022-01-text-only-10000.json.gz | gunzip | cargo r index ingest --index gh-archive --config=./config/quickwit.yaml # Start server. -quickwit service run searcher --config=./config/quickwit.yaml +quickwit run --service searcher --config=./config/quickwit.yaml # Finally make the search stream request. curl "http://127.0.0.1:7280/api/v1/gh-archive/search/stream?query=log4j&fastField=id&outputFormat=csv" # Make a search stream request with HTTP2. @@ -162,7 +162,7 @@ Starts an indexing server that consumes the sources of index IDs passed in `--in name = "Add a source to an index and start an Indexer" command = ''' quickwit source add --index wikipedia --source wikipedia-source --type file --params '{"filepath":"wiki-articles-10000.json"}' -quickwit service run indexer --indexes wikipedia --config=./config/quickwit.yaml +quickwit run --service indexer --indexes wikipedia --config=./config/quickwit.yaml ''' diff --git a/quickwit-cli/src/service.rs b/quickwit-cli/src/service.rs index 6150de28150..6da2dcb5578 100644 --- a/quickwit-cli/src/service.rs +++ b/quickwit-cli/src/service.rs @@ -17,182 +17,73 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashSet; +use std::iter; use std::path::PathBuf; -use anyhow::bail; use clap::{arg, ArgMatches, Command}; -use quickwit_actors::Universe; -use quickwit_common::run_checklist; +use itertools::Itertools; use quickwit_common::uri::Uri; -use quickwit_indexing::actors::IndexingServer; -use quickwit_indexing::models::SpawnPipelinesForIndex; -use quickwit_metastore::quickwit_metastore_uri_resolver; -use quickwit_serve::run_searcher; -use quickwit_storage::quickwit_storage_uri_resolver; +use quickwit_serve::{serve_quickwit, QuickwitService}; use quickwit_telemetry::payload::TelemetryEvent; use tracing::debug; use crate::load_quickwit_config; -pub fn build_service_command<'a>() -> Command<'a> { - Command::new("service") - .about("Launches services.") - .subcommand( - Command::new("run") - .about("Starts a service. Currently, the only services available are `indexer` and `searcher`.") - .subcommand( - Command::new("indexer") - .about("Starts an indexing process, aka an `indexer`.") - .args(&[ - arg!(--config "Quickwit config file").env("QW_CONFIG"), - arg!(--"data-dir" "Where data is persisted. Override data-dir defined in config file, default is `./qwdata`.") - .env("QW_DATA_DIR") - .required(false), - arg!(--indexes "IDs of the indexes to run the indexer for.") - .multiple_values(true), - ]) - ) - .subcommand( - Command::new("searcher") - .about("Starts a search process, aka a `searcher`.") - .args(&[ - arg!(--config "Quickwit config file").env("QW_CONFIG"), - arg!(--"data-dir" "Where data is persisted. Override data-dir defined in config file, default is `./qwdata`.") - .env("QW_DATA_DIR") - .required(false), - ]) - ) - ) - .arg_required_else_help(true) +pub fn build_run_command<'a>() -> Command<'a> { + Command::new("run") + .about("Runs quickwit services. By default, `indexer` and `searcher` are started.") + .args(&[ + arg!(--config "Quickwit config file").env("QW_CONFIG").required(true), + arg!(--"data-dir" "Where data is persisted. Override data-dir defined in config file, default is `./qwdata`.").env("QW_DATA_DIR").required(false), + arg!(--"service" "Services (searcher|indexer) to run. If unspecified run both `searcher` and `indexer`.").required(false) + ]) } #[derive(Debug, PartialEq)] -pub struct RunIndexerArgs { +pub struct RunCliCommand { pub config_uri: Uri, pub data_dir_path: Option, - pub index_ids: Vec, + pub services: HashSet, } -#[derive(Debug, PartialEq)] -pub struct RunSearcherArgs { - pub config_uri: Uri, - pub data_dir_path: Option, -} - -#[derive(Debug, PartialEq)] -pub enum ServiceCliCommand { - RunSearcher(RunSearcherArgs), - RunIndexer(RunIndexerArgs), -} - -impl ServiceCliCommand { +impl RunCliCommand { pub fn parse_cli_args(matches: &ArgMatches) -> anyhow::Result { - let (subcommand, submatches) = matches - .subcommand() - .ok_or_else(|| anyhow::anyhow!("Failed to parse sub-matches."))?; - match subcommand { - "run" => Self::parse_run_args(submatches), - _ => bail!("Service subcommand `{}` is not implemented.", subcommand), - } - } - - fn parse_run_args(matches: &ArgMatches) -> anyhow::Result { - let (subcommand, submatches) = matches - .subcommand() - .ok_or_else(|| anyhow::anyhow!("Failed to parse sub-matches."))?; - match subcommand { - "searcher" => Self::parse_searcher_args(submatches), - "indexer" => Self::parse_indexer_args(submatches), - _ => bail!( - "Service `{}` is not implemented. Available services are `indexer` and `searcher`.", - subcommand - ), - } - } - - fn parse_searcher_args(matches: &ArgMatches) -> anyhow::Result { let config_uri = matches .value_of("config") .map(Uri::try_new) .expect("`config` is a required arg.")?; - let data_dir = matches.value_of("data-dir").map(PathBuf::from); - Ok(ServiceCliCommand::RunSearcher(RunSearcherArgs { + let data_dir_path = matches.value_of("data-dir").map(PathBuf::from); + let services: HashSet = + if let Some(service_str) = matches.value_of("service") { + let service = QuickwitService::try_from(service_str)?; + iter::once(service).collect() + } else { + [QuickwitService::Indexer, QuickwitService::Searcher] + .into_iter() + .collect() + }; + Ok(RunCliCommand { config_uri, - data_dir_path: data_dir, - })) + data_dir_path, + services, + }) } - fn parse_indexer_args(matches: &ArgMatches) -> anyhow::Result { - let config_uri = matches - .value_of("config") - .map(Uri::try_new) - .expect("`config` is a required arg.")?; - let data_dir = matches.value_of("data-dir").map(PathBuf::from); - let index_ids = matches - .values_of("indexes") - .expect("`indexes` is a required arg.") - .map(String::from) - .collect(); - Ok(ServiceCliCommand::RunIndexer(RunIndexerArgs { - config_uri, - index_ids, - data_dir_path: data_dir, - })) - } - - pub async fn execute(self) -> anyhow::Result<()> { - match self { - Self::RunSearcher(args) => run_searcher_cli(args).await, - Self::RunIndexer(args) => run_indexer_cli(args).await, - } - } -} - -async fn run_indexer_cli(args: RunIndexerArgs) -> anyhow::Result<()> { - debug!(args = ?args, "run-indexer"); - let telemetry_event = TelemetryEvent::RunService("indexer".to_string()); - quickwit_telemetry::send_telemetry_event(telemetry_event).await; - - let config = load_quickwit_config(&args.config_uri, args.data_dir_path).await?; - let metastore = quickwit_metastore_uri_resolver() - .resolve(&config.metastore_uri()) - .await?; - let storage_resolver = quickwit_storage_uri_resolver().clone(); - let indexing_server = IndexingServer::new( - config.data_dir_path, - config.indexer_config, - metastore, - storage_resolver, - ); - let universe = Universe::new(); - let (indexing_server_mailbox, indexing_server_handle) = - universe.spawn_actor(indexing_server).spawn(); - - for index_id in args.index_ids { - indexing_server_mailbox - .ask_for_res(SpawnPipelinesForIndex { index_id }) - .await?; - } - let (exit_status, _) = indexing_server_handle.join().await; - if exit_status.is_success() { - bail!(exit_status) + pub async fn execute(&self) -> anyhow::Result<()> { + debug!(args = ?self, "run-service"); + let service_str = self + .services + .iter() + .map(|service| format!("{service:?}")) + .join(","); + let telemetry_event = TelemetryEvent::RunService(service_str); + quickwit_telemetry::send_telemetry_event(telemetry_event).await; + + let config = load_quickwit_config(&self.config_uri, self.data_dir_path.clone()).await?; + serve_quickwit(&config, &self.services).await?; + Ok(()) } - Ok(()) -} - -async fn run_searcher_cli(args: RunSearcherArgs) -> anyhow::Result<()> { - debug!(args = ?args, "run-searcher"); - let telemetry_event = TelemetryEvent::RunService("searcher".to_string()); - quickwit_telemetry::send_telemetry_event(telemetry_event).await; - - let config = load_quickwit_config(&args.config_uri, args.data_dir_path).await?; - let metastore_uri_resolver = quickwit_metastore_uri_resolver(); - let metastore = metastore_uri_resolver - .resolve(&config.metastore_uri()) - .await?; - run_checklist(vec![("metastore", metastore.check_connectivity().await)]); - run_searcher(config, metastore).await?; - Ok(()) } #[cfg(test)] @@ -202,12 +93,30 @@ mod tests { use crate::cli::{build_cli, CliCommand}; #[test] - fn test_parse_run_searcher_args() -> anyhow::Result<()> { + fn test_parse_service_run_args_all_services() -> anyhow::Result<()> { + let command = build_cli().no_binary_name(true); + let matches = command.try_get_matches_from(vec!["run", "--config", "/config.yaml"])?; + let command = CliCommand::parse_cli_args(&matches)?; + let expected_config_uri = Uri::try_new("file:///config.yaml").unwrap(); + assert!(matches!( + command, + CliCommand::Run(RunCliCommand { + config_uri, + data_dir_path: None, + services + }) + if config_uri == expected_config_uri && services.len() == 2 + )); + Ok(()) + } + + #[test] + fn test_parse_service_run_args_indexer_only() -> anyhow::Result<()> { let command = build_cli().no_binary_name(true); let matches = command.try_get_matches_from(vec![ - "service", "run", - "searcher", + "--service", + "indexer", "--config", "/config.yaml", ])?; @@ -215,36 +124,35 @@ mod tests { let expected_config_uri = Uri::try_new("file:///config.yaml").unwrap(); assert!(matches!( command, - CliCommand::Service(ServiceCliCommand::RunSearcher(RunSearcherArgs { + CliCommand::Run(RunCliCommand { config_uri, data_dir_path: None, - })) if config_uri == expected_config_uri + services + }) + if config_uri == expected_config_uri && services.len() == 1 && services.iter().cloned().next().unwrap() == QuickwitService::Indexer )); Ok(()) } - #[test] - fn test_parse_run_indexer_args() -> anyhow::Result<()> { + fn test_parse_service_run_indexer_only_args() -> anyhow::Result<()> { let command = build_cli().no_binary_name(true); let matches = command.try_get_matches_from(vec![ - "service", "run", + "--service", "indexer", "--config", "/config.yaml", - "--indexes", - "foo", - "bar", ])?; let command = CliCommand::parse_cli_args(&matches)?; let expected_config_uri = Uri::try_new("file:///config.yaml").unwrap(); assert!(matches!( command, - CliCommand::Service(ServiceCliCommand::RunIndexer(RunIndexerArgs { + CliCommand::Run(RunCliCommand { config_uri, data_dir_path: None, - index_ids, - })) if config_uri == expected_config_uri && index_ids == ["foo", "bar"] + services + }) + if config_uri == expected_config_uri && services.len() == 1 && services.contains(&QuickwitService::Indexer) )); Ok(()) } diff --git a/quickwit-cli/tests/cli.rs b/quickwit-cli/tests/cli.rs index 83559c7f946..d2cd7ade182 100644 --- a/quickwit-cli/tests/cli.rs +++ b/quickwit-cli/tests/cli.rs @@ -777,7 +777,7 @@ async fn test_all_local_index() -> Result<()> { // serve & api-search let mut server_process = spawn_command( format!( - "service run searcher --config {}", + "run --service searcher --config {}", test_env.resource_files["config"].display(), ) .as_str(), diff --git a/quickwit-cluster/Cargo.toml b/quickwit-cluster/Cargo.toml index 5250e0c8001..dfdc74dd9d9 100644 --- a/quickwit-cluster/Cargo.toml +++ b/quickwit-cluster/Cargo.toml @@ -14,6 +14,7 @@ anyhow = "1.0" async-trait = "0.1" flume = "0.10" quickwit-common = { version = "0.2.1", path = "../quickwit-common"} +quickwit-config = { version = "0.2.1", path = "../quickwit-config"} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" diff --git a/quickwit-cluster/src/cluster.rs b/quickwit-cluster/src/cluster.rs index 8326a6a72df..90908661eec 100644 --- a/quickwit-cluster/src/cluster.rs +++ b/quickwit-cluster/src/cluster.rs @@ -43,7 +43,7 @@ pub struct Member { /// An ID that makes a member unique. pub node_unique_id: String, /// timestamp (ms) when node starts. - pub generation: i64, + pub generation: u64, /// advertised UdpServerSocket pub gossip_public_address: SocketAddr, /// If true, it means self. @@ -51,7 +51,7 @@ pub struct Member { } impl Member { - pub fn new(node_unique_id: String, generation: i64, gossip_public_address: SocketAddr) -> Self { + pub fn new(node_unique_id: String, generation: u64, gossip_public_address: SocketAddr) -> Self { Self { node_unique_id, gossip_public_address, diff --git a/quickwit-cluster/src/lib.rs b/quickwit-cluster/src/lib.rs index 5e1b3bae3a3..24e3262725e 100644 --- a/quickwit-cluster/src/lib.rs +++ b/quickwit-cluster/src/lib.rs @@ -17,6 +17,50 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -pub mod cluster; -pub mod error; -pub mod service; +mod cluster; +mod error; +mod service; + +use std::sync::Arc; + +use quickwit_config::QuickwitConfig; +use scuttlebutt::FailureDetectorConfig; + +pub use crate::cluster::{ + create_cluster_for_test, grpc_addr_from_listen_addr_for_test, Cluster, Member, +}; +pub use crate::error::{ClusterError, ClusterResult}; +pub use crate::service::ClusterService; + +fn unix_timestamp() -> u64 { + let duration_since_epoch = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH!"); + duration_since_epoch.as_secs() +} + +pub async fn start_cluster_service( + quickwit_config: &QuickwitConfig, +) -> anyhow::Result> { + let seed_nodes = quickwit_config + .seed_socket_addrs()? + .iter() + .map(|addr| addr.to_string()) + .collect::>(); + + let member = Member::new( + quickwit_config.node_id.clone(), + unix_timestamp(), + quickwit_config.gossip_public_addr()?, + ); + + let cluster = Arc::new(Cluster::new( + member, + quickwit_config.gossip_socket_addr()?, + quickwit_config.grpc_socket_addr()?, + &seed_nodes, + FailureDetectorConfig::default(), + )?); + + Ok(cluster) +} diff --git a/quickwit-cluster/src/service.rs b/quickwit-cluster/src/service.rs index 6625d880e6a..f130c81a4d1 100644 --- a/quickwit-cluster/src/service.rs +++ b/quickwit-cluster/src/service.rs @@ -17,11 +17,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::sync::Arc; - use async_trait::async_trait; use quickwit_proto::{ - tonic, ClusterStateRequest, ClusterStateResponse, LeaveClusterRequest, LeaveClusterResponse, + ClusterStateRequest, ClusterStateResponse, LeaveClusterRequest, LeaveClusterResponse, ListMembersRequest, ListMembersResponse, Member as PMember, }; @@ -56,32 +54,14 @@ pub trait ClusterService: 'static + Send + Sync { ) -> Result; } -/// Cluster service implementation. -/// This is a service to check the status of the cluster and to operate the cluster. -pub struct ClusterServiceImpl { - cluster: Arc, -} - -impl ClusterServiceImpl { - /// Create a cluster service given a cluster. - pub fn new(cluster: Arc) -> Self { - ClusterServiceImpl { cluster } - } -} - -#[tonic::async_trait] -impl ClusterService for ClusterServiceImpl { +#[async_trait] +impl ClusterService for Cluster { /// This is the API to get the list of cluster members. async fn list_members( &self, _request: ListMembersRequest, ) -> Result { - let members = self - .cluster - .members() - .into_iter() - .map(PMember::from) - .collect(); + let members = self.members().into_iter().map(PMember::from).collect(); Ok(ListMembersResponse { members }) } @@ -90,7 +70,7 @@ impl ClusterService for ClusterServiceImpl { &self, _request: LeaveClusterRequest, ) -> Result { - self.cluster.leave().await; + self.leave().await; Ok(LeaveClusterResponse {}) } @@ -99,7 +79,7 @@ impl ClusterService for ClusterServiceImpl { &self, _request: ClusterStateRequest, ) -> Result { - let cluster_state = self.cluster.state().await; + let cluster_state = self.state().await; let state_serialized_json = serde_json::to_string(&cluster_state).map_err(|err| { ClusterError::ClusterStateError { message: err.to_string(), diff --git a/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit-indexing/src/actors/indexing_pipeline.rs index 426815d93fc..2dbd42d684e 100644 --- a/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -41,7 +41,7 @@ use crate::actors::{ GarbageCollector, Indexer, MergeExecutor, MergePlanner, NamedField, Packager, Publisher, Uploader, }; -use crate::models::{IndexingDirectory, IndexingStatistics}; +use crate::models::{IndexingDirectory, IndexingStatistics, Observe}; use crate::source::{quickwit_supported_sources, SourceActor}; use crate::split_store::{IndexingSplitStore, IndexingSplitStoreParams}; use crate::{MergePolicy, StableMultitenantWithTimestampMergePolicy}; @@ -71,9 +71,6 @@ pub struct IndexingPipelineHandler { #[derive(Debug, Clone, Copy)] pub struct Supervise; -#[derive(Debug, Clone, Copy)] -pub struct Observe; - #[derive(Debug, Clone, Copy, Default)] pub struct Spawn { retry_count: usize, diff --git a/quickwit-indexing/src/actors/indexing_server.rs b/quickwit-indexing/src/actors/indexing_server.rs index 5ad1f50c1ec..305230a45a0 100644 --- a/quickwit-indexing/src/actors/indexing_server.rs +++ b/quickwit-indexing/src/actors/indexing_server.rs @@ -33,8 +33,8 @@ use thiserror::Error; use tracing::{error, info}; use crate::models::{ - DetachPipeline, IndexingPipelineId, ObservePipeline, SpawnMergePipeline, SpawnPipeline, - SpawnPipelinesForIndex, + DetachPipeline, IndexingPipelineId, Observe, ObservePipeline, SpawnMergePipeline, + SpawnPipeline, SpawnPipelinesForIndex, }; use crate::{IndexingPipeline, IndexingPipelineParams, IndexingStatistics}; @@ -346,6 +346,18 @@ impl Handler for IndexingServer { } } +#[async_trait] +impl Handler for IndexingServer { + type Reply = Self::ObservableState; + async fn handle( + &mut self, + _message: Observe, + _ctx: &ActorContext, + ) -> Result { + Ok(self.observable_state()) + } +} + #[async_trait] impl Handler for IndexingServer { type Reply = Result, IndexingServerError>; @@ -390,7 +402,7 @@ mod tests { let indexer_config = IndexerConfig::for_test().unwrap(); let storage_resolver = StorageUriResolver::for_test(); let indexing_server = IndexingServer::new( - data_dir_path.clone(), + data_dir_path, indexer_config, metastore.clone(), storage_resolver.clone(), diff --git a/quickwit-indexing/src/actors/mod.rs b/quickwit-indexing/src/actors/mod.rs index 0ea7081cfeb..2e16b6f4e47 100644 --- a/quickwit-indexing/src/actors/mod.rs +++ b/quickwit-indexing/src/actors/mod.rs @@ -27,7 +27,7 @@ mod publisher; mod uploader; pub use indexing_pipeline::{IndexingPipeline, IndexingPipelineHandler, IndexingPipelineParams}; -pub use indexing_server::{IndexingServer, INDEXING}; +pub use indexing_server::{IndexingServer, IndexingServerError, INDEXING}; use tantivy::schema::{Field, FieldType}; mod merge_executor; mod merge_planner; diff --git a/quickwit-indexing/src/lib.rs b/quickwit-indexing/src/lib.rs index 3fd4d9a1748..7ad896674f0 100644 --- a/quickwit-indexing/src/lib.rs +++ b/quickwit-indexing/src/lib.rs @@ -21,13 +21,15 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::bail; -use quickwit_actors::Universe; -use quickwit_config::{IndexerConfig, SourceConfig}; +use quickwit_actors::{Mailbox, Universe}; +use quickwit_config::{IndexerConfig, QuickwitConfig, SourceConfig}; use quickwit_metastore::Metastore; use quickwit_storage::StorageUriResolver; +use tracing::info; +pub use crate::actors::IndexingServerError; use crate::actors::{IndexingPipeline, IndexingPipelineParams, IndexingServer}; -use crate::models::{DetachPipeline, IndexingStatistics, SpawnPipeline}; +use crate::models::{DetachPipeline, IndexingStatistics, SpawnPipeline, SpawnPipelinesForIndex}; pub use crate::split_store::{ get_tantivy_directory_from_split_bundle, IndexingSplitStore, IndexingSplitStoreParams, SplitFolder, @@ -45,7 +47,7 @@ mod test_utils; pub use test_utils::{mock_split, mock_split_meta, TestSandbox}; pub use self::garbage_collection::{delete_splits_with_files, run_garbage_collect, FileEntry}; -pub use self::merge_policy::{MergePolicy, StableMultitenantWithTimestampMergePolicy}; +use self::merge_policy::{MergePolicy, StableMultitenantWithTimestampMergePolicy}; pub use self::source::check_source_connectivity; pub async fn index_data( @@ -77,3 +79,29 @@ pub async fn index_data( pub fn new_split_id() -> String { ulid::Ulid::new().to_string() } + +pub async fn start_indexer_service( + universe: &Universe, + config: &QuickwitConfig, + metastore: Arc, + storage_uri_resolver: StorageUriResolver, +) -> anyhow::Result> { + info!("start-indexer-service"); + let index_metadatas = metastore.list_indexes_metadatas().await?; + let indexing_server = IndexingServer::new( + config.data_dir_path.to_path_buf(), + config.indexer_config.clone(), + metastore, + storage_uri_resolver, + ); + let (indexer_service_mailbox, _) = universe.spawn_actor(indexing_server).spawn(); + for index_metadata in index_metadatas { + info!(index_id=%index_metadata.index_id, "spawn-indexing-pipeline"); + indexer_service_mailbox + .ask_for_res(SpawnPipelinesForIndex { + index_id: index_metadata.index_id, + }) + .await?; + } + Ok(indexer_service_mailbox) +} diff --git a/quickwit-indexing/src/models/mod.rs b/quickwit-indexing/src/models/mod.rs index c59d060b19d..f8b8cf6d535 100644 --- a/quickwit-indexing/src/models/mod.rs +++ b/quickwit-indexing/src/models/mod.rs @@ -41,3 +41,6 @@ pub use packaged_split::{PackagedSplit, PackagedSplitBatch}; pub use publisher_message::{PublishOperation, PublisherMessage}; pub use raw_doc_batch::RawDocBatch; pub use scratch_directory::ScratchDirectory; + +#[derive(Debug, Copy, Clone)] +pub struct Observe; diff --git a/quickwit-proto/Cargo.toml b/quickwit-proto/Cargo.toml index f9c16e23daa..8cee06c688e 100644 --- a/quickwit-proto/Cargo.toml +++ b/quickwit-proto/Cargo.toml @@ -10,10 +10,10 @@ homepage = "https://quickwit.io/" documentation = "https://quickwit.io/docs/" [dependencies] -tonic = "0.6" prost = { version = "0.9", default-features = false, features = ["prost-derive"] } serde = { version = "1.0", features = ["derive"] } +tonic = {git="https://github.com/hyperium/tonic", rev="01e5be5"} [build-dependencies] -tonic-build = "0.6" +tonic-build = {git="https://github.com/hyperium/tonic", rev="01e5be5"} prost-build = "0.9" diff --git a/quickwit-proto/build.rs b/quickwit-proto/build.rs index fac140ff872..2dde28825ff 100644 --- a/quickwit-proto/build.rs +++ b/quickwit-proto/build.rs @@ -29,7 +29,6 @@ fn main() -> Result<(), Box> { ".", "#[derive(Serialize, Deserialize)]\n#[serde(rename_all = \"camelCase\")]", ) - .format(true) .out_dir("src/") .compile_with_config( prost_config, diff --git a/quickwit-proto/proto/cluster.proto b/quickwit-proto/proto/cluster.proto index a38fabfbd89..5638f8ff6d9 100644 --- a/quickwit-proto/proto/cluster.proto +++ b/quickwit-proto/proto/cluster.proto @@ -45,8 +45,8 @@ message Member { /// If true, it means self. bool is_self = 3; - /// member reincarnation - int64 generation = 4; + /// member reincarnation + uint64 generation = 4; } message ListMembersRequest { diff --git a/quickwit-proto/src/cluster.rs b/quickwit-proto/src/cluster.rs index c31b5b4742d..5840e701057 100644 --- a/quickwit-proto/src/cluster.rs +++ b/quickwit-proto/src/cluster.rs @@ -4,50 +4,54 @@ #[derive(Clone, PartialEq, ::prost::Message)] pub struct Member { //// Member ID. A string of the UUID. - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub id: ::prost::alloc::string::String, //// Cluster listen address. string of IP and port number. //// E.g. 127.0.0.1:5000 - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub listen_address: ::prost::alloc::string::String, //// If true, it means self. - #[prost(bool, tag = "3")] + #[prost(bool, tag="3")] pub is_self: bool, //// member reincarnation - #[prost(int64, tag = "4")] - pub generation: i64, + #[prost(uint64, tag="4")] + pub generation: u64, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ListMembersRequest {} +pub struct ListMembersRequest { +} #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ListMembersResponse { - #[prost(message, repeated, tag = "1")] + #[prost(message, repeated, tag="1")] pub members: ::prost::alloc::vec::Vec, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct LeaveClusterRequest {} +pub struct LeaveClusterRequest { +} #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct LeaveClusterResponse {} +pub struct LeaveClusterResponse { +} #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct ClusterStateRequest {} +pub struct ClusterStateRequest { +} #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ClusterStateResponse { - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub state_serialized_json: ::prost::alloc::string::String, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod cluster_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; @@ -56,7 +60,7 @@ pub mod cluster_service_client { inner: tonic::client::Grpc, } impl ClusterServiceClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where D: std::convert::TryInto, @@ -69,8 +73,8 @@ pub mod cluster_service_client { impl ClusterServiceClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + 'static, T::Error: Into, + T::ResponseBody: Default + Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { @@ -89,85 +93,103 @@ pub mod cluster_service_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { ClusterServiceClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] + /// Compress requests with `gzip`. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] pub fn send_gzip(mut self) -> Self { self.inner = self.inner.send_gzip(); self } - #[doc = r" Enable decompressing responses with `gzip`."] + /// Enable decompressing responses with `gzip`. + #[must_use] pub fn accept_gzip(mut self) -> Self { self.inner = self.inner.accept_gzip(); self } - #[doc = "/ Retrieves members of the cluster."] + //// Retrieves members of the cluster. pub async fn list_members( &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/cluster.ClusterService/ListMembers"); + let path = http::uri::PathAndQuery::from_static( + "/cluster.ClusterService/ListMembers", + ); self.inner.unary(request.into_request(), path, codec).await } - #[doc = "/ Removes itself from the cluster."] - #[doc = "/ Removed node will be isolated from the cluster."] + //// Removes itself from the cluster. + //// Removed node will be isolated from the cluster. pub async fn leave_cluster( &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/cluster.ClusterService/LeaveCluster"); + let path = http::uri::PathAndQuery::from_static( + "/cluster.ClusterService/LeaveCluster", + ); self.inner.unary(request.into_request(), path, codec).await } pub async fn cluster_state( &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/cluster.ClusterService/ClusterState"); + let path = http::uri::PathAndQuery::from_static( + "/cluster.ClusterService/ClusterState", + ); self.inner.unary(request.into_request(), path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod cluster_service_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with ClusterServiceServer."] + ///Generated trait containing gRPC methods that should be implemented for use with ClusterServiceServer. #[async_trait] pub trait ClusterService: Send + Sync + 'static { - #[doc = "/ Retrieves members of the cluster."] + //// Retrieves members of the cluster. async fn list_members( &self, request: tonic::Request, ) -> Result, tonic::Status>; - #[doc = "/ Removes itself from the cluster."] - #[doc = "/ Removed node will be isolated from the cluster."] + //// Removes itself from the cluster. + //// Removed node will be isolated from the cluster. async fn leave_cluster( &self, request: tonic::Request, @@ -186,7 +208,9 @@ pub mod cluster_service_server { struct _Inner(Arc); impl ClusterServiceServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, @@ -194,7 +218,10 @@ pub mod cluster_service_server { send_compression_encodings: Default::default(), } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -208,9 +235,12 @@ pub mod cluster_service_server { B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -219,17 +249,23 @@ pub mod cluster_service_server { "/cluster.ClusterService/ListMembers" => { #[allow(non_camel_case_types)] struct ListMembersSvc(pub Arc); - impl tonic::server::UnaryService - for ListMembersSvc - { + impl< + T: ClusterService, + > tonic::server::UnaryService + for ListMembersSvc { type Response = super::ListMembersResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = self.0.clone(); - let fut = async move { (*inner).list_members(request).await }; + let fut = async move { + (*inner).list_members(request).await + }; Box::pin(fut) } } @@ -240,10 +276,11 @@ pub mod cluster_service_server { let inner = inner.0; let method = ListMembersSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -252,17 +289,23 @@ pub mod cluster_service_server { "/cluster.ClusterService/LeaveCluster" => { #[allow(non_camel_case_types)] struct LeaveClusterSvc(pub Arc); - impl tonic::server::UnaryService - for LeaveClusterSvc - { + impl< + T: ClusterService, + > tonic::server::UnaryService + for LeaveClusterSvc { type Response = super::LeaveClusterResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = self.0.clone(); - let fut = async move { (*inner).leave_cluster(request).await }; + let fut = async move { + (*inner).leave_cluster(request).await + }; Box::pin(fut) } } @@ -273,10 +316,11 @@ pub mod cluster_service_server { let inner = inner.0; let method = LeaveClusterSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -285,17 +329,23 @@ pub mod cluster_service_server { "/cluster.ClusterService/ClusterState" => { #[allow(non_camel_case_types)] struct ClusterStateSvc(pub Arc); - impl tonic::server::UnaryService - for ClusterStateSvc - { + impl< + T: ClusterService, + > tonic::server::UnaryService + for ClusterStateSvc { type Response = super::ClusterStateResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = self.0.clone(); - let fut = async move { (*inner).cluster_state(request).await }; + let fut = async move { + (*inner).cluster_state(request).await + }; Box::pin(fut) } } @@ -306,23 +356,28 @@ pub mod cluster_service_server { let inner = inner.0; let method = ClusterStateSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/quickwit-proto/src/quickwit.rs b/quickwit-proto/src/quickwit.rs index 59a0674f594..bfb6e4a3b9f 100644 --- a/quickwit-proto/src/quickwit.rs +++ b/quickwit-proto/src/quickwit.rs @@ -5,37 +5,37 @@ #[derive(Clone, PartialEq, ::prost::Message)] pub struct SearchRequest { /// Index ID - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub index_id: ::prost::alloc::string::String, /// Query - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub query: ::prost::alloc::string::String, /// Fields to search on - #[prost(string, repeated, tag = "3")] + #[prost(string, repeated, tag="3")] pub search_fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Time filter - #[prost(int64, optional, tag = "4")] + #[prost(int64, optional, tag="4")] pub start_timestamp: ::core::option::Option, - #[prost(int64, optional, tag = "5")] + #[prost(int64, optional, tag="5")] pub end_timestamp: ::core::option::Option, /// Maximum number of hits to return. - #[prost(uint64, tag = "6")] + #[prost(uint64, tag="6")] pub max_hits: u64, /// First hit to return. Together with max_hits, this parameter /// can be used for pagination. /// /// E.g. /// The results with rank [start_offset..start_offset + max_hits) are returned. - #[prost(uint64, tag = "7")] + #[prost(uint64, tag="7")] pub start_offset: u64, /// Sort order - #[prost(enumeration = "SortOrder", optional, tag = "9")] + #[prost(enumeration="SortOrder", optional, tag="9")] pub sort_order: ::core::option::Option, /// Sort by fast field. If unset sort by docid - #[prost(string, optional, tag = "10")] + #[prost(string, optional, tag="10")] pub sort_by_field: ::core::option::Option<::prost::alloc::string::String>, /// json serialized aggregation_request - #[prost(string, optional, tag = "11")] + #[prost(string, optional, tag="11")] pub aggregation_request: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Serialize, Deserialize)] @@ -43,20 +43,20 @@ pub struct SearchRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct SearchResponse { /// Number of hits matching the query. - #[prost(uint64, tag = "1")] + #[prost(uint64, tag="1")] pub num_hits: u64, /// Matched hits - #[prost(message, repeated, tag = "2")] + #[prost(message, repeated, tag="2")] pub hits: ::prost::alloc::vec::Vec, /// Elapsed time to perform the request. This time is measured /// server-side and expressed in microseconds. - #[prost(uint64, tag = "3")] + #[prost(uint64, tag="3")] pub elapsed_time_micros: u64, /// The searcherrors that occured formatted as string. - #[prost(string, repeated, tag = "4")] + #[prost(string, repeated, tag="4")] pub errors: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Serialized aggregation response - #[prost(string, optional, tag = "5")] + #[prost(string, optional, tag="5")] pub aggregation: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Serialize, Deserialize)] @@ -64,13 +64,13 @@ pub struct SearchResponse { #[derive(Clone, PartialEq, ::prost::Message)] pub struct SplitSearchError { /// The searcherror that occured formatted as string. - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub error: ::prost::alloc::string::String, /// Split id that failed. - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub split_id: ::prost::alloc::string::String, /// Flag to indicate if the error can be considered a retryable error - #[prost(bool, tag = "3")] + #[prost(bool, tag="3")] pub retryable_error: bool, } #[derive(Serialize, Deserialize)] @@ -79,18 +79,18 @@ pub struct SplitSearchError { pub struct LeafSearchRequest { /// Search request. This is a perfect copy of the original search request, /// that was sent to root apart from the start_offset & max_hits params. - #[prost(message, optional, tag = "1")] + #[prost(message, optional, tag="1")] pub search_request: ::core::option::Option, /// Index split ids to apply the query on. /// This ids are resolved from the index_uri defined in the search_request. - #[prost(message, repeated, tag = "4")] + #[prost(message, repeated, tag="4")] pub split_offsets: ::prost::alloc::vec::Vec, /// `DocMapper` as json serialized trait. - #[prost(string, tag = "5")] + #[prost(string, tag="5")] pub doc_mapper: ::prost::alloc::string::String, /// Index URI. The index URI defines the location of the storage that contains the /// split files. - #[prost(string, tag = "6")] + #[prost(string, tag="6")] pub index_uri: ::prost::alloc::string::String, } #[derive(Serialize, Deserialize)] @@ -99,13 +99,13 @@ pub struct LeafSearchRequest { pub struct SplitIdAndFooterOffsets { /// Index split id to apply the query on. /// This id is resolved from the index_uri defined in the search_request. - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub split_id: ::prost::alloc::string::String, /// The offset of the start of footer in the split bundle. The footer contains the file bundle metadata and the hotcache. - #[prost(uint64, tag = "2")] + #[prost(uint64, tag="2")] pub split_footer_start: u64, /// The offset of the end of the footer in split bundle. The footer contains the file bundle metada and the hotcache. - #[prost(uint64, tag = "3")] + #[prost(uint64, tag="3")] pub split_footer_end: u64, } #[derive(Serialize, Deserialize)] @@ -113,10 +113,10 @@ pub struct SplitIdAndFooterOffsets { #[derive(Clone, PartialEq, ::prost::Message)] pub struct Hit { /// The actual content of the hit/ - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub json: ::prost::alloc::string::String, /// The partial hit (ie: the sorting field + the document address) - #[prost(message, optional, tag = "2")] + #[prost(message, optional, tag="2")] pub partial_hit: ::core::option::Option, } /// A partial hit, is a hit for which we have not fetch the content yet. @@ -137,16 +137,16 @@ pub struct PartialHit { /// - the split_id, /// - the segment_ord, /// - the doc id. - #[prost(uint64, tag = "1")] + #[prost(uint64, tag="1")] pub sorting_field_value: u64, - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub split_id: ::prost::alloc::string::String, /// (segment_ord, doc) form a tantivy DocAddress, which is sufficient to identify a document /// within a split - #[prost(uint32, tag = "3")] + #[prost(uint32, tag="3")] pub segment_ord: u32, /// The DocId identifies a unique document at the scale of a tantivy segment. - #[prost(uint32, tag = "4")] + #[prost(uint32, tag="4")] pub doc_id: u32, } #[derive(Serialize, Deserialize)] @@ -154,20 +154,20 @@ pub struct PartialHit { #[derive(Clone, PartialEq, ::prost::Message)] pub struct LeafSearchResponse { /// Total number of documents matched by the query. - #[prost(uint64, tag = "1")] + #[prost(uint64, tag="1")] pub num_hits: u64, /// List of the best top-K candidates for the given leaf query. - #[prost(message, repeated, tag = "2")] + #[prost(message, repeated, tag="2")] pub partial_hits: ::prost::alloc::vec::Vec, /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. - #[prost(message, repeated, tag = "3")] + #[prost(message, repeated, tag="3")] pub failed_splits: ::prost::alloc::vec::Vec, /// Total number of splits the leaf(s) were in charge of. /// num_attempted_splits = num_successful_splits + num_failed_splits. - #[prost(uint64, tag = "4")] + #[prost(uint64, tag="4")] pub num_attempted_splits: u64, /// json serialized intermediate aggregation_result. - #[prost(string, optional, tag = "5")] + #[prost(string, optional, tag="5")] pub intermediate_aggregation_result: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Serialize, Deserialize)] @@ -175,19 +175,19 @@ pub struct LeafSearchResponse { #[derive(Clone, PartialEq, ::prost::Message)] pub struct FetchDocsRequest { /// Request fetching the content of a given list of partial_hits. - #[prost(message, repeated, tag = "1")] + #[prost(message, repeated, tag="1")] pub partial_hits: ::prost::alloc::vec::Vec, /// Index ID - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub index_id: ::prost::alloc::string::String, /// Split footer offsets. They are required for fetch docs to /// fetch the document content in two reads, when the footer is not /// cached. - #[prost(message, repeated, tag = "3")] + #[prost(message, repeated, tag="3")] pub split_offsets: ::prost::alloc::vec::Vec, /// Index URI. The index URI defines the location of the storage that contains the /// split files. - #[prost(string, tag = "4")] + #[prost(string, tag="4")] pub index_uri: ::prost::alloc::string::String, } #[derive(Serialize, Deserialize)] @@ -195,7 +195,7 @@ pub struct FetchDocsRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct FetchDocsResponse { /// List of complete hits. - #[prost(message, repeated, tag = "1")] + #[prost(message, repeated, tag="1")] pub hits: ::prost::alloc::vec::Vec, } #[derive(Serialize, Deserialize)] @@ -203,27 +203,27 @@ pub struct FetchDocsResponse { #[derive(Clone, PartialEq, ::prost::Message)] pub struct SearchStreamRequest { /// Index ID - #[prost(string, tag = "1")] + #[prost(string, tag="1")] pub index_id: ::prost::alloc::string::String, /// Query - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub query: ::prost::alloc::string::String, /// Fields to search on - #[prost(string, repeated, tag = "3")] + #[prost(string, repeated, tag="3")] pub search_fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// The time filter is interpreted as a semi-open interval. [start, end) - #[prost(int64, optional, tag = "4")] + #[prost(int64, optional, tag="4")] pub start_timestamp: ::core::option::Option, - #[prost(int64, optional, tag = "5")] + #[prost(int64, optional, tag="5")] pub end_timestamp: ::core::option::Option, /// Name of the fast field to extract - #[prost(string, tag = "6")] + #[prost(string, tag="6")] pub fast_field: ::prost::alloc::string::String, /// The output format - #[prost(enumeration = "OutputFormat", tag = "7")] + #[prost(enumeration="OutputFormat", tag="7")] pub output_format: i32, /// The field by which we want to partition - #[prost(string, optional, tag = "9")] + #[prost(string, optional, tag="9")] pub partition_by_field: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Serialize, Deserialize)] @@ -232,18 +232,18 @@ pub struct SearchStreamRequest { pub struct LeafSearchStreamRequest { /// Stream request. This is a perfect copy of the original stream request, /// that was sent to root. - #[prost(message, optional, tag = "1")] + #[prost(message, optional, tag="1")] pub request: ::core::option::Option, /// Index split ids to apply the query on. /// This ids are resolved from the index_uri defined in the stream request. - #[prost(message, repeated, tag = "2")] + #[prost(message, repeated, tag="2")] pub split_offsets: ::prost::alloc::vec::Vec, /// `DocMapper` as json serialized trait. - #[prost(string, tag = "5")] + #[prost(string, tag="5")] pub doc_mapper: ::prost::alloc::string::String, /// Index URI. The index URI defines the location of the storage that contains the /// split files. - #[prost(string, tag = "6")] + #[prost(string, tag="6")] pub index_uri: ::prost::alloc::string::String, } #[derive(Serialize, Deserialize)] @@ -251,10 +251,10 @@ pub struct LeafSearchStreamRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct LeafSearchStreamResponse { /// Row of data serialized in bytes. - #[prost(bytes = "vec", tag = "1")] + #[prost(bytes="vec", tag="1")] pub data: ::prost::alloc::vec::Vec, /// Split id. - #[prost(string, tag = "2")] + #[prost(string, tag="2")] pub split_id: ::prost::alloc::string::String, } #[derive(Serialize, Deserialize)] @@ -285,7 +285,7 @@ pub enum OutputFormat { //// ClickHouseRowBinary = 1, } -#[doc = r" Generated client implementations."] +/// Generated client implementations. pub mod search_service_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; @@ -294,7 +294,7 @@ pub mod search_service_client { inner: tonic::client::Grpc, } impl SearchServiceClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] + /// Attempt to create a new client by connecting to a given endpoint. pub async fn connect(dst: D) -> Result where D: std::convert::TryInto, @@ -307,8 +307,8 @@ pub mod search_service_client { impl SearchServiceClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + 'static, T::Error: Into, + T::ResponseBody: Default + Body + Send + 'static, ::Error: Into + Send, { pub fn new(inner: T) -> Self { @@ -327,141 +327,165 @@ pub mod search_service_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { SearchServiceClient::new(InterceptedService::new(inner, interceptor)) } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] + /// Compress requests with `gzip`. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] pub fn send_gzip(mut self) -> Self { self.inner = self.inner.send_gzip(); self } - #[doc = r" Enable decompressing responses with `gzip`."] + /// Enable decompressing responses with `gzip`. + #[must_use] pub fn accept_gzip(mut self) -> Self { self.inner = self.inner.accept_gzip(); self } - #[doc = " Root search API."] - #[doc = " This RPC identifies the set of splits on which the query should run on,"] - #[doc = " and dispatch the several calls to `LeafSearch`."] - #[doc = ""] - #[doc = " It is also in charge of merging back the results."] + /// Root search API. + /// This RPC identifies the set of splits on which the query should run on, + /// and dispatch the several calls to `LeafSearch`. + /// + /// It is also in charge of merging back the results. pub async fn root_search( &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/quickwit.SearchService/RootSearch"); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.SearchService/RootSearch", + ); self.inner.unary(request.into_request(), path, codec).await } - #[doc = " Perform a leaf search on a given set of splits."] - #[doc = ""] - #[doc = " It is like a regular search except that:"] - #[doc = " - the node should perform the search locally instead of dispatching"] - #[doc = " it to other nodes."] - #[doc = " - it should be applied on the given subset of splits"] - #[doc = " - Hit content is not fetched, and we instead return so called `PartialHit`."] + /// Perform a leaf search on a given set of splits. + /// + /// It is like a regular search except that: + /// - the node should perform the search locally instead of dispatching + /// it to other nodes. + /// - it should be applied on the given subset of splits + /// - Hit content is not fetched, and we instead return so called `PartialHit`. pub async fn leaf_search( &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/quickwit.SearchService/LeafSearch"); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.SearchService/LeafSearch", + ); self.inner.unary(request.into_request(), path, codec).await } - #[doc = "/ Fetches the documents contents from the document store."] - #[doc = "/ This methods takes `PartialHit`s and returns `Hit`s."] + //// Fetches the documents contents from the document store. + //// This methods takes `PartialHit`s and returns `Hit`s. pub async fn fetch_docs( &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/quickwit.SearchService/FetchDocs"); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.SearchService/FetchDocs", + ); self.inner.unary(request.into_request(), path, codec).await } - #[doc = " Perform a leaf stream on a given set of splits."] + /// Perform a leaf stream on a given set of splits. pub async fn leaf_search_stream( &mut self, request: impl tonic::IntoRequest, ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = - http::uri::PathAndQuery::from_static("/quickwit.SearchService/LeafSearchStream"); + tonic::Response< + tonic::codec::Streaming, + >, + tonic::Status, + > { self.inner - .server_streaming(request.into_request(), path, codec) + .ready() .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.SearchService/LeafSearchStream", + ); + self.inner.server_streaming(request.into_request(), path, codec).await } } } -#[doc = r" Generated server implementations."] +/// Generated server implementations. pub mod search_service_server { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with SearchServiceServer."] + ///Generated trait containing gRPC methods that should be implemented for use with SearchServiceServer. #[async_trait] pub trait SearchService: Send + Sync + 'static { - #[doc = " Root search API."] - #[doc = " This RPC identifies the set of splits on which the query should run on,"] - #[doc = " and dispatch the several calls to `LeafSearch`."] - #[doc = ""] - #[doc = " It is also in charge of merging back the results."] + /// Root search API. + /// This RPC identifies the set of splits on which the query should run on, + /// and dispatch the several calls to `LeafSearch`. + /// + /// It is also in charge of merging back the results. async fn root_search( &self, request: tonic::Request, ) -> Result, tonic::Status>; - #[doc = " Perform a leaf search on a given set of splits."] - #[doc = ""] - #[doc = " It is like a regular search except that:"] - #[doc = " - the node should perform the search locally instead of dispatching"] - #[doc = " it to other nodes."] - #[doc = " - it should be applied on the given subset of splits"] - #[doc = " - Hit content is not fetched, and we instead return so called `PartialHit`."] + /// Perform a leaf search on a given set of splits. + /// + /// It is like a regular search except that: + /// - the node should perform the search locally instead of dispatching + /// it to other nodes. + /// - it should be applied on the given subset of splits + /// - Hit content is not fetched, and we instead return so called `PartialHit`. async fn leaf_search( &self, request: tonic::Request, ) -> Result, tonic::Status>; - #[doc = "/ Fetches the documents contents from the document store."] - #[doc = "/ This methods takes `PartialHit`s and returns `Hit`s."] + //// Fetches the documents contents from the document store. + //// This methods takes `PartialHit`s and returns `Hit`s. async fn fetch_docs( &self, request: tonic::Request, ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the LeafSearchStream method."] - type LeafSearchStreamStream: futures_core::Stream> + ///Server streaming response type for the LeafSearchStream method. + type LeafSearchStreamStream: futures_core::Stream< + Item = Result, + > + Send + 'static; - #[doc = " Perform a leaf stream on a given set of splits."] + /// Perform a leaf stream on a given set of splits. async fn leaf_search_stream( &self, request: tonic::Request, @@ -476,7 +500,9 @@ pub mod search_service_server { struct _Inner(Arc); impl SearchServiceServer { pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { let inner = _Inner(inner); Self { inner, @@ -484,7 +510,10 @@ pub mod search_service_server { send_compression_encodings: Default::default(), } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -498,9 +527,12 @@ pub mod search_service_server { B::Error: Into + Send + 'static, { type Response = http::Response; - type Error = Never; + type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -509,9 +541,15 @@ pub mod search_service_server { "/quickwit.SearchService/RootSearch" => { #[allow(non_camel_case_types)] struct RootSearchSvc(pub Arc); - impl tonic::server::UnaryService for RootSearchSvc { + impl< + T: SearchService, + > tonic::server::UnaryService + for RootSearchSvc { type Response = super::SearchResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -528,10 +566,11 @@ pub mod search_service_server { let inner = inner.0; let method = RootSearchSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -540,9 +579,15 @@ pub mod search_service_server { "/quickwit.SearchService/LeafSearch" => { #[allow(non_camel_case_types)] struct LeafSearchSvc(pub Arc); - impl tonic::server::UnaryService for LeafSearchSvc { + impl< + T: SearchService, + > tonic::server::UnaryService + for LeafSearchSvc { type Response = super::LeafSearchResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -559,10 +604,11 @@ pub mod search_service_server { let inner = inner.0; let method = LeafSearchSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -571,9 +617,15 @@ pub mod search_service_server { "/quickwit.SearchService/FetchDocs" => { #[allow(non_camel_case_types)] struct FetchDocsSvc(pub Arc); - impl tonic::server::UnaryService for FetchDocsSvc { + impl< + T: SearchService, + > tonic::server::UnaryService + for FetchDocsSvc { type Response = super::FetchDocsResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -590,10 +642,11 @@ pub mod search_service_server { let inner = inner.0; let method = FetchDocsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -602,20 +655,25 @@ pub mod search_service_server { "/quickwit.SearchService/LeafSearchStream" => { #[allow(non_camel_case_types)] struct LeafSearchStreamSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for LeafSearchStreamSvc - { + impl< + T: SearchService, + > tonic::server::ServerStreamingService< + super::LeafSearchStreamRequest, + > for LeafSearchStreamSvc { type Response = super::LeafSearchStreamResponse; type ResponseStream = T::LeafSearchStreamStream; - type Future = - BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = self.0.clone(); - let fut = async move { (*inner).leaf_search_stream(request).await }; + let fut = async move { + (*inner).leaf_search_stream(request).await + }; Box::pin(fut) } } @@ -626,23 +684,28 @@ pub mod search_service_server { let inner = inner.0; let method = LeafSearchStreamSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/quickwit-search/src/lib.rs b/quickwit-search/src/lib.rs index 76b93fe1aa4..77d98d5ef68 100644 --- a/quickwit-search/src/lib.rs +++ b/quickwit-search/src/lib.rs @@ -42,10 +42,12 @@ pub type Result = std::result::Result; use std::cmp::Reverse; use std::ops::Range; +use std::sync::Arc; use anyhow::Context; use itertools::Itertools; -use quickwit_config::build_doc_mapper; +use quickwit_cluster::Cluster; +use quickwit_config::{build_doc_mapper, QuickwitConfig, SEARCHER_CONFIG_INSTANCE}; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_metastore::{Metastore, SplitMetadata, SplitState}; use quickwit_proto::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; @@ -204,6 +206,27 @@ pub async fn single_node_search( }) } +/// Starts a search node, aka a `searcher`. +pub async fn start_searcher_service( + quickwit_config: &QuickwitConfig, + metastore: Arc, + storage_uri_resolver: StorageUriResolver, + cluster: Arc, +) -> anyhow::Result> { + SEARCHER_CONFIG_INSTANCE + .set(quickwit_config.searcher_config.clone()) + .expect("could not set searcher config in global once cell"); + let client_pool = SearchClientPool::create_and_keep_updated(cluster).await?; + let cluster_client = ClusterClient::new(client_pool.clone()); + let search_service = Arc::new(SearchServiceImpl::new( + metastore, + storage_uri_resolver, + cluster_client, + client_pool, + )); + Ok(search_service) +} + #[cfg(test)] mod tests { diff --git a/quickwit-search/src/search_client_pool.rs b/quickwit-search/src/search_client_pool.rs index 870292739f6..16cfc0410fb 100644 --- a/quickwit-search/src/search_client_pool.rs +++ b/quickwit-search/src/search_client_pool.rs @@ -25,7 +25,7 @@ use std::net::SocketAddr; use std::sync::{Arc, RwLock}; use http::Uri; -use quickwit_cluster::cluster::Cluster; +use quickwit_cluster::Cluster; use quickwit_proto::tonic; use tokio_stream::StreamExt; use tonic::transport::Endpoint; @@ -315,7 +315,7 @@ mod tests { use std::time::Duration; use itertools::Itertools; - use quickwit_cluster::cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test}; + use quickwit_cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test}; use super::create_search_service_client; use crate::root::SearchJob; diff --git a/quickwit-serve/Cargo.toml b/quickwit-serve/Cargo.toml index 07ad74630d4..6c99648fa0d 100644 --- a/quickwit-serve/Cargo.toml +++ b/quickwit-serve/Cargo.toml @@ -25,6 +25,7 @@ quickwit-common = { version = "0.2.1", path = "../quickwit-common" } quickwit-metastore = { version = "0.2.1", path = "../quickwit-metastore" } quickwit-telemetry = { version = "0.2.1", path = "../quickwit-telemetry" } quickwit-directories = { version = "0.2.1", path = "../quickwit-directories" } +quickwit-indexing = { version = "0.2.1", path = "../quickwit-indexing" } thiserror = "1" async-trait = "0.1" termcolor = "1" diff --git a/quickwit-serve/src/cluster_api/grpc_adapter.rs b/quickwit-serve/src/cluster_api/grpc_adapter.rs index 96f2979afce..20b36cb322c 100644 --- a/quickwit-serve/src/cluster_api/grpc_adapter.rs +++ b/quickwit-serve/src/cluster_api/grpc_adapter.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use async_trait::async_trait; -use quickwit_cluster::service::{ClusterService, ClusterServiceImpl}; +use quickwit_cluster::ClusterService; use quickwit_proto::{cluster_service_server as grpc, tonic}; use crate::error::convert_to_grpc_result; @@ -28,8 +28,8 @@ use crate::error::convert_to_grpc_result; #[derive(Clone)] pub struct GrpcClusterAdapter(Arc); -impl From> for GrpcClusterAdapter { - fn from(cluster_service_arc: Arc) -> Self { +impl From> for GrpcClusterAdapter { + fn from(cluster_service_arc: Arc) -> Self { GrpcClusterAdapter(cluster_service_arc) } } diff --git a/quickwit-serve/src/cluster_api/rest_handler.rs b/quickwit-serve/src/cluster_api/rest_handler.rs index a87c48d6b2b..1d8fdd2bbb2 100644 --- a/quickwit-serve/src/cluster_api/rest_handler.rs +++ b/quickwit-serve/src/cluster_api/rest_handler.rs @@ -20,16 +20,15 @@ use std::convert::Infallible; use std::sync::Arc; -use quickwit_cluster::error::ClusterError; -use quickwit_cluster::service::ClusterService; +use quickwit_cluster::{ClusterError, ClusterService}; use serde::Deserialize; use warp::{Filter, Rejection}; use crate::Format; /// Cluster handler. -pub fn cluster_handler( - cluster_service: Arc, +pub fn cluster_handler( + cluster_service: Arc, ) -> impl Filter + Clone { list_members_filter() .and(warp::any().map(move || cluster_service.clone())) @@ -53,17 +52,17 @@ fn list_members_filter( .and(serde_qs::warp::query(serde_qs::Config::default())) } -async fn list_members( +async fn list_members( request: ListMembersRequestQueryString, - cluster_service: Arc, + cluster_service: Arc, ) -> Result { Ok(request .format .make_rest_reply(list_members_endpoint(&*cluster_service).await)) } -async fn list_members_endpoint( - cluster_service: &TClusterService, +async fn list_members_endpoint( + cluster_service: &dyn ClusterService, ) -> Result { let list_members_req = quickwit_proto::ListMembersRequest {}; let list_members_resp = cluster_service.list_members(list_members_req).await?; diff --git a/quickwit-serve/src/error.rs b/quickwit-serve/src/error.rs index 2481d8ee770..3affcbdaee8 100644 --- a/quickwit-serve/src/error.rs +++ b/quickwit-serve/src/error.rs @@ -17,10 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::error::Error; +use std::convert::Infallible; +use std::fmt; use quickwit_actors::AskError; -use quickwit_cluster::error::ClusterError; +use quickwit_cluster::ClusterError; +use quickwit_indexing::IndexingServerError; use quickwit_proto::tonic; use quickwit_search::SearchError; use warp::http; @@ -79,7 +81,19 @@ impl ServiceError for ClusterError { } } -impl ServiceError for AskError { +impl ServiceError for IndexingServerError { + fn status_code(&self) -> ServiceErrorCode { + match self { + Self::MissingPipeline { .. } => ServiceErrorCode::NotFound, + Self::PipelineAlreadyExists { .. } => ServiceErrorCode::BadRequest, + Self::StorageError(_) => ServiceErrorCode::Internal, + Self::MetastoreError(_) => ServiceErrorCode::Internal, + Self::InvalidParams(_) => ServiceErrorCode::BadRequest, + } + } +} + +impl ServiceError for AskError { fn status_code(&self) -> ServiceErrorCode { match self { AskError::MessageNotDelivered => ServiceErrorCode::Internal, @@ -89,6 +103,12 @@ impl ServiceError for AskError { } } +impl ServiceError for Infallible { + fn status_code(&self) -> ServiceErrorCode { + unreachable!() + } +} + pub(crate) trait ServiceError: ToString { fn grpc_error(&self) -> tonic::Status { let grpc_code = self.status_code().to_grpc_status_code(); diff --git a/quickwit-serve/src/format.rs b/quickwit-serve/src/format.rs index 68a8e062933..44f64fd023a 100644 --- a/quickwit-serve/src/format.rs +++ b/quickwit-serve/src/format.rs @@ -117,4 +117,18 @@ impl Format { Err(err) => self.make_reply_for_err(err), } } + + pub(crate) fn make_rest_reply_non_serializable_error( + self, + result: Result, + ) -> WithStatus> + where + T: serde::Serialize, + E: ServiceError + ToString, + { + self.make_rest_reply(result.map_err(|err| FormatError { + code: err.status_code(), + error: err.to_string(), + })) + } } diff --git a/quickwit-serve/src/grpc.rs b/quickwit-serve/src/grpc.rs index 62bcf2fdf63..8fabbb47839 100644 --- a/quickwit-serve/src/grpc.rs +++ b/quickwit-serve/src/grpc.rs @@ -27,19 +27,25 @@ use tracing::*; use crate::cluster_api::GrpcClusterAdapter; use crate::search_api::GrpcSearchAdapter; +use crate::QuickwitServices; /// Start gRPC service given a gRPC address and a search service and cluster service. -pub async fn start_grpc_service( +pub(crate) async fn start_grpc_server( grpc_addr: SocketAddr, - search_service: GrpcSearchAdapter, - cluster_service: GrpcClusterAdapter, + quickwit_services: &QuickwitServices, ) -> anyhow::Result<()> { info!(grpc_addr=?grpc_addr, "Start gRPC service."); - Server::builder() - .add_service(ClusterServiceServer::new(cluster_service)) - .add_service(SearchServiceServer::new(search_service)) - .serve(grpc_addr) - .await?; + let mut server = Server::builder(); + + let grpc_cluster_service = GrpcClusterAdapter::from(quickwit_services.cluster_service.clone()); + let mut server_router = server.add_service(ClusterServiceServer::new(grpc_cluster_service)); + + if let Some(search_service) = quickwit_services.search_service.clone() { + let grpc_search_service = GrpcSearchAdapter::from(search_service); + server_router = server_router.add_service(SearchServiceServer::new(grpc_search_service)); + } + + server_router.serve(grpc_addr).await?; Ok(()) } diff --git a/quickwit-serve/src/indexing_api/mod.rs b/quickwit-serve/src/indexing_api/mod.rs new file mode 100644 index 00000000000..649ad7e18be --- /dev/null +++ b/quickwit-serve/src/indexing_api/mod.rs @@ -0,0 +1,22 @@ +// Copyright (C) 2021 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod rest_handler; + +pub use rest_handler::indexing_get_handler; diff --git a/quickwit-serve/src/indexing_api/rest_handler.rs b/quickwit-serve/src/indexing_api/rest_handler.rs new file mode 100644 index 00000000000..76bfd286f3c --- /dev/null +++ b/quickwit-serve/src/indexing_api/rest_handler.rs @@ -0,0 +1,47 @@ +// Copyright (C) 2021 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::convert::Infallible; + +use quickwit_actors::Mailbox; +use quickwit_indexing::actors::IndexingServer; +use quickwit_indexing::models::Observe; +use warp::{Filter, Rejection}; + +use crate::format::Format; +use crate::require; + +async fn indexing_endpoint( + indexing_service_mailbox: Mailbox, +) -> Result { + let obs = indexing_service_mailbox.ask(Observe).await; + Ok(Format::PrettyJson.make_rest_reply_non_serializable_error(obs)) +} + +fn indexing_get_filter() -> impl Filter + Clone { + warp::path!("api" / "v1" / "indexing").and(warp::get()) +} + +pub fn indexing_get_handler( + indexing_service_mailbox_opt: Option>, +) -> impl Filter + Clone { + indexing_get_filter() + .and(require(indexing_service_mailbox_opt)) + .and_then(indexing_endpoint) +} diff --git a/quickwit-serve/src/lib.rs b/quickwit-serve/src/lib.rs index a82392014be..7cca0c29e1c 100644 --- a/quickwit-serve/src/lib.rs +++ b/quickwit-serve/src/lib.rs @@ -21,88 +21,129 @@ mod args; mod counters; mod error; mod format; + mod grpc; +mod rest; mod cluster_api; mod health_check_api; +mod indexing_api; mod search_api; -mod rest; - +use std::collections::HashSet; use std::sync::Arc; -use chrono::Utc; +use anyhow::bail; use format::Format; -use quickwit_cluster::cluster::{Cluster, Member}; -use quickwit_cluster::service::ClusterServiceImpl; -use quickwit_config::{QuickwitConfig, SEARCHER_CONFIG_INSTANCE}; -use quickwit_metastore::Metastore; -use quickwit_search::{ClusterClient, SearchClientPool, SearchService, SearchServiceImpl}; +use quickwit_actors::{Mailbox, Universe}; +use quickwit_cluster::ClusterService; +use quickwit_config::QuickwitConfig; +use quickwit_indexing::actors::IndexingServer; +use quickwit_indexing::start_indexer_service; +use quickwit_metastore::quickwit_metastore_uri_resolver; +use quickwit_search::{start_searcher_service, SearchService}; use quickwit_storage::quickwit_storage_uri_resolver; -use scuttlebutt::FailureDetectorConfig; -use tracing::info; +use warp::{Filter, Rejection}; pub use crate::args::ServeArgs; -use crate::cluster_api::GrpcClusterAdapter; pub use crate::counters::COUNTERS; -use crate::grpc::start_grpc_service; #[cfg(test)] use crate::rest::recover_fn; -use crate::rest::start_rest_service; -use crate::search_api::GrpcSearchAdapter; -/// Starts a search node, aka a `searcher`. -pub async fn run_searcher( - quickwit_config: QuickwitConfig, - metastore: Arc, +fn require( + val_opt: Option, +) -> impl Filter + Clone { + warp::any().and_then(move || { + let val_opt_clone = val_opt.clone(); + async move { + if let Some(val) = val_opt_clone { + Ok(val) + } else { + Err(warp::reject()) + } + } + }) +} + +#[derive(Debug, PartialEq, Eq, Copy, Hash, Clone)] +pub enum QuickwitService { + Indexer, + Searcher, +} + +impl TryFrom<&str> for QuickwitService { + type Error = anyhow::Error; + + fn try_from(service_str: &str) -> Result { + match service_str { + "indexer" => Ok(QuickwitService::Indexer), + "searcher" => Ok(QuickwitService::Searcher), + _ => { + bail!("Service `{service_str}` unknown"); + } + } + } +} +struct QuickwitServices { + pub cluster_service: Arc, + pub search_service: Option>, + pub indexer_service: Option>, +} + +pub async fn serve_quickwit( + config: &QuickwitConfig, + services: &HashSet, ) -> anyhow::Result<()> { - SEARCHER_CONFIG_INSTANCE - .set(quickwit_config.searcher_config.clone()) - .expect("could not set searcher config in global once cell"); - - let seed_nodes = quickwit_config - .seed_socket_addrs()? - .iter() - .map(|addr| addr.to_string()) - .collect::>(); - - let member = Member::new( - quickwit_config.node_id.clone(), - Utc::now().timestamp(), - quickwit_config.gossip_public_addr()?, - ); - let cluster = Arc::new(Cluster::new( - member, - quickwit_config.gossip_socket_addr()?, - quickwit_config.grpc_socket_addr()?, - &seed_nodes, - FailureDetectorConfig::default(), - )?); - let storage_uri_resolver = quickwit_storage_uri_resolver().clone(); - let client_pool = SearchClientPool::create_and_keep_updated(cluster.clone()).await?; - let cluster_client = ClusterClient::new(client_pool.clone()); - let search_service = Arc::new(SearchServiceImpl::new( - metastore, - storage_uri_resolver, - cluster_client, - client_pool, - )); - - let cluster_service = Arc::new(ClusterServiceImpl::new(cluster.clone())); - - let grpc_addr = quickwit_config.grpc_socket_addr()?; - let grpc_search_service = - GrpcSearchAdapter::from(search_service.clone() as Arc); - let grpc_cluster_service = GrpcClusterAdapter::from(cluster_service.clone()); - let grpc_server = start_grpc_service(grpc_addr, grpc_search_service, grpc_cluster_service); - - let rest_socket_addr = quickwit_config.rest_socket_addr()?; - let rest_server = start_rest_service(rest_socket_addr, search_service, cluster_service); - info!( - "Searcher ready to accept requests at http://{}/", - rest_socket_addr - ); + let metastore = quickwit_metastore_uri_resolver() + .resolve(&config.metastore_uri()) + .await?; + let storage_resolver = quickwit_storage_uri_resolver().clone(); + + let cluster_service = quickwit_cluster::start_cluster_service(config).await?; + + let universe = Universe::new(); + let indexer_service: Option> = + if services.contains(&QuickwitService::Indexer) { + let indexer_service = start_indexer_service( + &universe, + config, + metastore.clone(), + storage_resolver.clone(), + ) + .await?; + Some(indexer_service) + } else { + None + }; + + let search_service: Option> = + if services.contains(&QuickwitService::Searcher) { + let search_service = start_searcher_service( + config, + metastore.clone(), + storage_resolver, + cluster_service.clone(), + ) + .await?; + Some(search_service) + } else { + None + }; + + let quickwit_services = QuickwitServices { + cluster_service, + search_service, + indexer_service, + }; + + let rest_addr = config.rest_socket_addr()?; + let grpc_addr = config.grpc_socket_addr()?; + + let grpc_server = grpc::start_grpc_server(grpc_addr, &quickwit_services); + let rest_server = rest::start_rest_server(rest_addr, &quickwit_services); + tokio::try_join!(rest_server, grpc_server)?; + Ok(()) } @@ -117,11 +158,13 @@ mod tests { use quickwit_metastore::{IndexMetadata, MockMetastore, SplitState}; use quickwit_proto::search_service_server::SearchServiceServer; use quickwit_proto::{tonic, OutputFormat}; - use quickwit_search::{root_search_stream, MockSearchService, SearchError, SearchService}; + use quickwit_search::{ + root_search_stream, ClusterClient, MockSearchService, SearchClientPool, SearchError, + SearchService, + }; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::Server; - use super::*; use crate::search_api::GrpcSearchAdapter; async fn start_test_server( diff --git a/quickwit-serve/src/rest.rs b/quickwit-serve/src/rest.rs index b1179d35d30..a9f75431ec8 100644 --- a/quickwit-serve/src/rest.rs +++ b/quickwit-serve/src/rest.rs @@ -18,11 +18,8 @@ // along with this program. If not, see . use std::net::SocketAddr; -use std::sync::Arc; -use quickwit_cluster::service::ClusterServiceImpl; use quickwit_common::metrics; -use quickwit_search::SearchServiceImpl; use tracing::info; use warp::{Filter, Rejection, Reply}; @@ -30,14 +27,14 @@ use crate::cluster_api::cluster_handler; use crate::error::ServiceErrorCode; use crate::format::FormatError; use crate::health_check_api::liveness_check_handler; +use crate::indexing_api::indexing_get_handler; use crate::search_api::{search_get_handler, search_post_handler, search_stream_handler}; -use crate::Format; +use crate::{Format, QuickwitServices}; /// Start REST service given a HTTP address and a search service. -pub async fn start_rest_service( +pub(crate) async fn start_rest_server( rest_addr: SocketAddr, - search_service: Arc, - cluster_service: Arc, + quickwit_services: &QuickwitServices, ) -> anyhow::Result<()> { info!(rest_addr=?rest_addr, "Starting REST service."); let request_counter = warp::log::custom(|_| { @@ -47,13 +44,21 @@ pub async fn start_rest_service( .and(warp::get()) .map(metrics::metrics_handler); let rest_routes = liveness_check_handler() - .or(cluster_handler(cluster_service)) - .or(search_get_handler(search_service.clone())) - .or(search_post_handler(search_service.clone())) - .or(search_stream_handler(search_service)) + .or(cluster_handler(quickwit_services.cluster_service.clone())) + .or(indexing_get_handler( + quickwit_services.indexer_service.clone(), + )) + .or(search_get_handler(quickwit_services.search_service.clone())) + .or(search_post_handler( + quickwit_services.search_service.clone(), + )) + .or(search_stream_handler( + quickwit_services.search_service.clone(), + )) .or(metrics_service) .with(request_counter) .recover(recover_fn); + info!("Searcher ready to accept requests at http://{rest_addr}/"); warp::serve(rest_routes).run(rest_addr).await; Ok(()) } diff --git a/quickwit-serve/src/search_api/rest_handler.rs b/quickwit-serve/src/search_api/rest_handler.rs index 5569ff9290a..f3ec496ed6f 100644 --- a/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit-serve/src/search_api/rest_handler.rs @@ -33,7 +33,7 @@ use warp::hyper::StatusCode; use warp::{reply, Filter, Rejection, Reply}; use crate::error::ServiceError; -use crate::Format; +use crate::{require, Format}; fn sort_by_field_mini_dsl<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de> { @@ -125,10 +125,10 @@ fn get_proto_search_by(search_request: &SearchRequestQueryString) -> (Option( +async fn search_endpoint( index_id: String, search_request: SearchRequestQueryString, - search_service: &TSearchService, + search_service: &dyn SearchService, ) -> Result { let (sort_order, sort_by_field) = get_proto_search_by(&search_request); let search_request = quickwit_proto::SearchRequest { @@ -165,10 +165,10 @@ fn search_post_filter( .and(warp::body::json()) } -async fn search( +async fn search( index_id: String, search_request: SearchRequestQueryString, - search_service: Arc, + search_service: Arc, ) -> Result { info!(index_id = %index_id, request =? search_request, "search"); Ok(search_request @@ -179,30 +179,30 @@ async fn search( /// REST GET search handler. /// /// Parses the search request from the -pub fn search_get_handler( - search_service: Arc, +pub fn search_get_handler( + search_service_opt: Option>, ) -> impl Filter + Clone { search_get_filter() - .and(warp::any().map(move || search_service.clone())) + .and(require(search_service_opt)) .and_then(search) } /// REST POST search handler. /// /// Parses the search request from the -pub fn search_post_handler( - search_service: Arc, +pub fn search_post_handler( + search_service_opt: Option>, ) -> impl Filter + Clone { search_post_filter() - .and(warp::any().map(move || search_service.clone())) + .and(require(search_service_opt)) .and_then(search) } -pub fn search_stream_handler( - search_service: Arc, +pub fn search_stream_handler( + search_service_opt: Option>, ) -> impl Filter + Clone { search_stream_filter() - .and(warp::any().map(move || search_service.clone())) + .and(require(search_service_opt)) .and_then(search_stream) } @@ -232,10 +232,10 @@ struct SearchStreamRequestQueryString { pub partition_by_field: Option, } -async fn search_stream_endpoint( +async fn search_stream_endpoint( index_id: String, search_request: SearchStreamRequestQueryString, - search_service: &TSearchService, + search_service: &dyn SearchService, ) -> Result { let request = quickwit_proto::SearchStreamRequest { index_id, @@ -297,10 +297,10 @@ fn make_streaming_reply(result: Result) -> impl Reply reply::with_status(body, status_code) } -async fn search_stream( +async fn search_stream( index_id: String, request: SearchStreamRequestQueryString, - search_service: Arc, + search_service: Arc, ) -> Result { info!(index_id=%index_id,request=?request, "search_stream"); let content_type = match request.output_format { @@ -540,7 +540,7 @@ mod tests { async fn test_rest_search_api_route_invalid_key() -> anyhow::Result<()> { let mock_search_service = MockSearchService::new(); let rest_search_api_handler = - super::search_get_handler(Arc::new(mock_search_service)).recover(recover_fn); + super::search_get_handler(Some(Arc::new(mock_search_service))).recover(recover_fn); let resp = warp::test::request() .path("/api/v1/quickwit-demo-index/search?query=*&end_unix_timestamp=1450720000") .reply(&rest_search_api_handler) @@ -567,7 +567,7 @@ mod tests { }) }); let rest_search_api_handler = - super::search_get_handler(Arc::new(mock_search_service)).recover(recover_fn); + super::search_get_handler(Some(Arc::new(mock_search_service))).recover(recover_fn); let resp = warp::test::request() .path("/api/v1/quickwit-demo-index/search?query=*") .reply(&rest_search_api_handler) @@ -595,7 +595,7 @@ mod tests { )) .returning(|_| Ok(Default::default())); let rest_search_api_handler = - super::search_get_handler(Arc::new(mock_search_service)).recover(recover_fn); + super::search_get_handler(Some(Arc::new(mock_search_service))).recover(recover_fn); assert_eq!( warp::test::request() .path("/api/v1/quickwit-demo-index/search?query=*&start_offset=5&max_hits=30") @@ -616,7 +616,7 @@ mod tests { }) }); let rest_search_api_handler = - super::search_get_handler(Arc::new(mock_search_service)).recover(recover_fn); + super::search_get_handler(Some(Arc::new(mock_search_service))).recover(recover_fn); assert_eq!( warp::test::request() .path("/api/v1/index-does-not-exist/search?query=myfield:test") @@ -635,7 +635,7 @@ mod tests { .expect_root_search() .returning(|_| Err(SearchError::InternalError("ty".to_string()))); let rest_search_api_handler = - super::search_get_handler(Arc::new(mock_search_service)).recover(recover_fn); + super::search_get_handler(Some(Arc::new(mock_search_service))).recover(recover_fn); assert_eq!( warp::test::request() .path("/api/v1/index-does-not-exist/search?query=myfield:test") @@ -654,7 +654,7 @@ mod tests { .expect_root_search() .returning(|_| Err(SearchError::InvalidQuery("invalid query".to_string()))); let rest_search_api_handler = - super::search_get_handler(Arc::new(mock_search_service)).recover(recover_fn); + super::search_get_handler(Some(Arc::new(mock_search_service))).recover(recover_fn); assert_eq!( warp::test::request() .path("/api/v1/my-index/search?query=myfield:test") @@ -678,7 +678,7 @@ mod tests { ]))) }); let rest_search_stream_api_handler = - super::search_stream_handler(Arc::new(mock_search_service)).recover(recover_fn); + super::search_stream_handler(Some(Arc::new(mock_search_service))).recover(recover_fn); let response = warp::test::request() .path( "/api/v1/my-index/search/stream?query=obama&fast_field=external_id&\