Skip to content

Commit

Permalink
feat(all): Task store
Browse files Browse the repository at this point in the history
https://github.com/meilisearch/specifications/blob/develop/text/0060-refashion-updates-apis.md

- [x] The update resource is renamed task. The names of existing API routes are also changed to reflect this change.
- [x]    Tasks are now also accessible as an independent resource of an index. GET - /tasks; GET - /tasks/:taskUid
- [x] The task uid is not incremented by index anymore. The sequence is generated globally.
- [x] A task_not_found error is introduced.
- [x] The format of the task object is updated.
- [x] updateId becomes uid.
- [x] Attributes of an error appearing in a failed task are now contained in a dedicated error object.
- [x] type is no longer an object. It now becomes a string containing the values of its name field previously defined in the type object.
- [x] The possible values for the type field are reworked to be more clear and consistent with our naming rules.
- [x] A details object is added to contain specific information related to a task payload that was previously displayed in the type nested object. Previous number key is renamed numberOfDocuments.
- [x] An indexUid field is added to give information about the related index on which the task is performed.
- [x] duration format has been updated to express an ISO 8601 duration.
- [x] processed status changes to succeeded.
- [x] startedProcessingAt is updated to startedAt.
- [x] processedAt is updated to finishedAt.
- [x] 202 Accepted requests previously returning an updateId are now returning a summarized task object.
- [x] MEILI_MAX_UDB_SIZE env var is updated MEILI_MAX_TASK_DB_SIZE.
- [x] --max-udb-size cli option is updated to --max-task-db-size.
- [x] task object lists are now returned under a results array.
- [x] Each operation on an index (creation, update, deletion) is now asynchronous and represented by a task.

- [x] Restore Snapshots
- [x] Restore dumps of documents
- [x] Implements the dump of updates
- [x] Error  handling
- [x] Fix stats
- [x] Restore the Analytics
- [x] [Add the new analytics](https://github.com/meilisearch/specifications/pull/92/files)
- [x] Fix tests
- [x] ~Deleting tasks when index is deleted (see bellow)~ see #1891 instead
- [x] Improve details for documents addition and deletion tasks
- [ ] Add integration test
- [ ] Test task store filtering
- [x] Rename `UuidStore` to `IndexMetaStore`, and simplify the trait.
- [x] Fix task store initialization: fill pending queue from hard state
- [x] Synchronously return error when creating an index with an invalid index_uid and add test
- [x] Task should be returned in decreasing uid + tests (on index task route)
- [x] Summarized task view
- [x] fix snapshot permissions

- #1889
- #1891
- #1892
- #1902
- #1906
- #1911
- #1914
- #1915
- #1916
- #1918
- #1924
- #1925
- #1926
- #1930
- #1936
- #1937
- #1942
- #1944
- #1945
- #1946
- #1947
- #1950
- #1951
- #1957
- #1959
- #1960
- #1961
- #1962
- #1964

- meilisearch/milli#414
- meilisearch/milli#409
- meilisearch/milli#406
- meilisearch/milli#418

- close #1687
- close #1786
- close #1940
- close #1948
- close #1949
- close #1932
- close #1956

- meilisearch/specifications#90
  • Loading branch information
MarinPostma committed Dec 2, 2021
1 parent c9f3726 commit 582036a
Show file tree
Hide file tree
Showing 88 changed files with 5,543 additions and 4,489 deletions.
821 changes: 422 additions & 399 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@ members = [
"meilisearch-error",
"meilisearch-lib",
]
resolver = "2"

[patch.crates-io]
pest = { git = "https://github.com/pest-parser/pest.git", rev = "51fd1d49f1041f7839975664ef71fe15c7dcaf67" }
resolver = "2"
7 changes: 7 additions & 0 deletions meilisearch-error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,11 @@ edition = "2018"

[dependencies]
actix-http = "=3.0.0-beta.10"
actix-web = "4.0.0-beta.9"
proptest = { version = "1.0.0", optional = true }
proptest-derive = { version = "0.3.0", optional = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.69"

[features]
test-traits = ["proptest", "proptest-derive"]
81 changes: 80 additions & 1 deletion meilisearch-error/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,76 @@
use std::fmt;

use actix_http::http::StatusCode;
use actix_http::{body::Body, http::StatusCode};
use actix_web::{self as aweb, HttpResponseBuilder};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(feature = "test-traits", derive(proptest_derive::Arbitrary))]
pub struct ResponseError {
#[serde(skip)]
#[cfg_attr(
feature = "test-traits",
proptest(strategy = "strategy::status_code_strategy()")
)]
code: StatusCode,
message: String,
#[serde(rename = "code")]
error_code: String,
#[serde(rename = "type")]
error_type: String,
#[serde(rename = "link")]
error_link: String,
}

impl ResponseError {
pub fn from_msg(message: String, code: Code) -> Self {
Self {
code: code.http(),
message,
error_code: code.err_code().error_name.to_string(),
error_type: code.type_(),
error_link: code.url(),
}
}
}

impl fmt::Display for ResponseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.message.fmt(f)
}
}

impl std::error::Error for ResponseError {}

impl<T> From<T> for ResponseError
where
T: ErrorCode,
{
fn from(other: T) -> Self {
Self {
code: other.http_status(),
message: other.to_string(),
error_code: other.error_name(),
error_type: other.error_type(),
error_link: other.error_url(),
}
}
}

impl aweb::error::ResponseError for ResponseError {
fn error_response(&self) -> aweb::HttpResponse<Body> {
let json = serde_json::to_vec(self).unwrap();
HttpResponseBuilder::new(self.status_code())
.content_type("application/json")
.body(json)
}

fn status_code(&self) -> StatusCode {
self.code
}
}

pub trait ErrorCode: std::error::Error {
fn error_code(&self) -> Code;

Expand Down Expand Up @@ -237,3 +305,14 @@ impl ErrCode {
}
}
}

#[cfg(feature = "test-traits")]
mod strategy {
use proptest::strategy::Strategy;

use super::*;

pub(super) fn status_code_strategy() -> impl Strategy<Value = StatusCode> {
(100..999u16).prop_map(|i| StatusCode::from_u16(i).unwrap())
}
}
15 changes: 9 additions & 6 deletions meilisearch-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ zip = { version = "0.5.13", optional = true }
actix-cors = { git = "https://github.com/MarinPostma/actix-extras.git", rev = "963ac94d" }
actix-web = { version = "4.0.0-beta.9", features = ["rustls"] }
actix-web-static-files = { git = "https://github.com/MarinPostma/actix-web-static-files.git", rev = "39d8006", optional = true }
# TODO: specifying this dependency so semver doesn't bump to next beta
actix-tls = "=3.0.0-beta.5"
anyhow = { version = "1.0.43", features = ["backtrace"] }
arc-swap = "1.3.2"
async-stream = "0.3.2"
async-trait = "0.1.51"
arc-swap = "1.3.2"
bstr = "0.2.17"
byte-unit = { version = "4.0.12", default-features = false, features = ["std"] }
bytes = "1.1.0"
chrono = { version = "0.4.19", features = ["serde"] }
Expand All @@ -44,13 +47,15 @@ http = "0.2.4"
indexmap = { version = "1.7.0", features = ["serde-1"] }
itertools = "0.10.1"
log = "0.4.14"
meilisearch-lib = { path = "../meilisearch-lib" }
meilisearch-error = { path = "../meilisearch-error" }
meilisearch-lib = { path = "../meilisearch-lib" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" }
mime = "0.3.16"
num_cpus = "1.13.0"
obkv = "0.2.0"
once_cell = "1.8.0"
parking_lot = "0.11.2"
pin-project = "1.0.8"
platform-dirs = "0.3.0"
rand = "0.8.4"
rayon = "1.5.1"
Expand All @@ -63,16 +68,14 @@ sha2 = "0.9.6"
siphasher = "0.3.7"
slice-group-by = "0.2.6"
structopt = "0.3.23"
sysinfo = "0.20.2"
tar = "0.4.37"
tempfile = "3.2.0"
thiserror = "1.0.28"
tokio = { version = "1.11.0", features = ["full"] }
tokio-stream = "0.1.7"
uuid = { version = "0.8.2", features = ["serde"] }
walkdir = "2.3.2"
obkv = "0.2.0"
pin-project = "1.0.8"
sysinfo = "0.20.2"
tokio-stream = "0.1.7"

[dev-dependencies]
actix-rt = "2.2.0"
Expand Down
76 changes: 51 additions & 25 deletions meilisearch-http/src/analytics/segment_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,30 @@ impl SegmentAnalytics {

let client = HttpClient::default();
let user = User::UserId { user_id };
let batcher = AutoBatcher::new(client, Batcher::new(None), SEGMENT_API_KEY.to_string());
let mut batcher = AutoBatcher::new(client, Batcher::new(None), SEGMENT_API_KEY.to_string());

// If Meilisearch is Launched for the first time:
// 1. Send an event Launched associated to the user `total_launch`.
// 2. Batch an event Launched with the real instance-id and send it in one hour.
if first_time_run {
let _ = batcher
.push(Track {
user: User::UserId {
user_id: "total_launch".to_string(),
},
event: "Launched".to_string(),
..Default::default()
})
.await;
let _ = batcher.flush().await;
let _ = batcher
.push(Track {
user: user.clone(),
event: "Launched".to_string(),
..Default::default()
})
.await;
}

let (sender, inbox) = mpsc::channel(100); // How many analytics can we bufferize

Expand All @@ -95,10 +118,6 @@ impl SegmentAnalytics {
sender,
user: user.clone(),
};
// batch the launched for the first time track event
if first_time_run {
this.publish("Launched".to_string(), json!({}), None);
}

(Arc::new(this), user.to_string())
}
Expand Down Expand Up @@ -216,7 +235,9 @@ impl Segment {

async fn run(mut self, meilisearch: MeiliSearch) {
const INTERVAL: Duration = Duration::from_secs(60 * 60); // one hour
let mut interval = tokio::time::interval(INTERVAL);
// The first batch must be sent after one hour.
let mut interval =
tokio::time::interval_at(tokio::time::Instant::now() + INTERVAL, INTERVAL);

loop {
select! {
Expand Down Expand Up @@ -304,10 +325,8 @@ pub struct SearchAggregator {
used_syntax: HashMap<String, usize>,

// q
// everytime a request has a q field, this field must be incremented by the number of terms
sum_of_terms_count: usize,
// everytime a request has a q field, this field must be incremented by one
total_number_of_q: usize,
// The maximum number of terms in a q request
max_terms_number: usize,

// pagination
max_limit: usize,
Expand Down Expand Up @@ -354,8 +373,7 @@ impl SearchAggregator {
}

if let Some(ref q) = query.q {
ret.total_number_of_q = 1;
ret.sum_of_terms_count = q.split_whitespace().count();
ret.max_terms_number = q.split_whitespace().count();
}

ret.max_limit = query.limit;
Expand All @@ -365,7 +383,7 @@ impl SearchAggregator {
}

pub fn succeed(&mut self, result: &SearchResult) {
self.total_succeeded += 1;
self.total_succeeded = self.total_succeeded.saturating_add(1);
self.time_spent.push(result.processing_time_ms as usize);
}

Expand All @@ -376,23 +394,31 @@ impl SearchAggregator {
self.user_agents.insert(user_agent);
}
// request
self.total_received += other.total_received;
self.total_succeeded += other.total_succeeded;
self.total_received = self.total_received.saturating_add(other.total_received);
self.total_succeeded = self.total_succeeded.saturating_add(other.total_succeeded);
self.time_spent.append(&mut other.time_spent);
// sort
self.sort_with_geo_point |= other.sort_with_geo_point;
self.sort_sum_of_criteria_terms += other.sort_sum_of_criteria_terms;
self.sort_total_number_of_criteria += other.sort_total_number_of_criteria;
self.sort_sum_of_criteria_terms = self
.sort_sum_of_criteria_terms
.saturating_add(other.sort_sum_of_criteria_terms);
self.sort_total_number_of_criteria = self
.sort_total_number_of_criteria
.saturating_add(other.sort_total_number_of_criteria);
// filter
self.filter_with_geo_radius |= other.filter_with_geo_radius;
self.filter_sum_of_criteria_terms += other.filter_sum_of_criteria_terms;
self.filter_total_number_of_criteria += other.filter_total_number_of_criteria;
self.filter_sum_of_criteria_terms = self
.filter_sum_of_criteria_terms
.saturating_add(other.filter_sum_of_criteria_terms);
self.filter_total_number_of_criteria = self
.filter_total_number_of_criteria
.saturating_add(other.filter_total_number_of_criteria);
for (key, value) in other.used_syntax.into_iter() {
*self.used_syntax.entry(key).or_insert(0) += value;
let used_syntax = self.used_syntax.entry(key).or_insert(0);
*used_syntax = used_syntax.saturating_add(value);
}
// q
self.sum_of_terms_count += other.sum_of_terms_count;
self.total_number_of_q += other.total_number_of_q;
self.max_terms_number = self.max_terms_number.max(other.max_terms_number);
// pagination
self.max_limit = self.max_limit.max(other.max_limit);
self.max_offset = self.max_offset.max(other.max_offset);
Expand All @@ -407,12 +433,12 @@ impl SearchAggregator {
// we get all the values in a sorted manner
let time_spent = self.time_spent.into_sorted_vec();
// We are only intersted by the slowest value of the 99th fastest results
let time_spent = time_spent[percentile_99th as usize];
let time_spent = time_spent.get(percentile_99th as usize);

let properties = json!({
"user-agent": self.user_agents,
"requests": {
"99th_response_time": format!("{:.2}", time_spent),
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": self.total_succeeded,
"total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics
"total_received": self.total_received,
Expand All @@ -427,7 +453,7 @@ impl SearchAggregator {
"most_used_syntax": self.used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
},
"q": {
"avg_terms_number": format!("{:.2}", self.sum_of_terms_count as f64 / self.total_number_of_q as f64),
"max_terms_number": self.max_terms_number,
},
"pagination": {
"max_limit": self.max_limit,
Expand Down
54 changes: 1 addition & 53 deletions meilisearch-http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ use std::error::Error;
use std::fmt;

use actix_web as aweb;
use actix_web::body::Body;
use actix_web::http::StatusCode;
use actix_web::HttpResponseBuilder;
use aweb::error::{JsonPayloadError, QueryPayloadError};
use meilisearch_error::{Code, ErrorCode};
use serde::{Deserialize, Serialize};
use meilisearch_error::{Code, ErrorCode, ResponseError};

#[derive(Debug, thiserror::Error)]
pub enum MeilisearchHttpError {
Expand Down Expand Up @@ -36,54 +32,6 @@ impl From<MeilisearchHttpError> for aweb::Error {
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResponseError {
#[serde(skip)]
code: StatusCode,
message: String,
#[serde(rename = "code")]
error_code: String,
#[serde(rename = "type")]
error_type: String,
#[serde(rename = "link")]
error_link: String,
}

impl fmt::Display for ResponseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.message.fmt(f)
}
}

impl<T> From<T> for ResponseError
where
T: ErrorCode,
{
fn from(other: T) -> Self {
Self {
code: other.http_status(),
message: other.to_string(),
error_code: other.error_name(),
error_type: other.error_type(),
error_link: other.error_url(),
}
}
}

impl aweb::error::ResponseError for ResponseError {
fn error_response(&self) -> aweb::HttpResponse<Body> {
let json = serde_json::to_vec(self).unwrap();
HttpResponseBuilder::new(self.status_code())
.content_type("application/json")
.body(json)
}

fn status_code(&self) -> StatusCode {
self.code
}
}

impl fmt::Display for PayloadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down
2 changes: 1 addition & 1 deletion meilisearch-http/src/extractors/authentication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::ops::Deref;
use actix_web::FromRequest;
use futures::future::err;
use futures::future::{ok, Ready};
use meilisearch_error::ResponseError;

use crate::error::ResponseError;
use error::AuthenticationError;

macro_rules! create_policies {
Expand Down

0 comments on commit 582036a

Please sign in to comment.