Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compat dump v3 #1951

Merged
merged 4 commits into from
Dec 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions meilisearch-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ pub struct ResponseError {
error_link: String,
}

impl ResponseError {
pub fn from_msg(message: String, code: Code) -> Self {
Self {
code: code.http(),
message,
error_code: code.err_code().error_name.to_string(),
error_type: code.type_(),
error_link: code.url(),
}
}
}

impl fmt::Display for ResponseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.message.fmt(f)
Expand Down
1 change: 1 addition & 0 deletions meilisearch-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ whoami = { version = "1.1.3", optional = true }
reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-features = false, optional = true }
sysinfo = "0.20.2"
derivative = "2.2.0"
fs_extra = "1.2.0"

[dev-dependencies]
actix-rt = "2.2.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod v3;
196 changes: 196 additions & 0 deletions meilisearch-lib/src/index_controller/dump_actor/compat/v3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use chrono::{DateTime, Utc};
use meilisearch_error::{Code, ResponseError};
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::index::{Settings, Unchecked};
use crate::index_resolver::IndexUid;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskResult};

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "camelCase")]
pub enum UpdateStatus {
Processing(Processing),
Enqueued(Enqueued),
Processed(Processed),
Failed(Failed),
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DocumentAdditionResult {
pub nb_documents: usize,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {
DocumentsAddition(DocumentAdditionResult),
DocumentDeletion { deleted: u64 },
Other,
}

impl From<UpdateResult> for TaskResult {
fn from(other: UpdateResult) -> Self {
match other {
UpdateResult::DocumentsAddition(result) => TaskResult::DocumentAddition {
indexed_documents: result.nb_documents as u64,
},
UpdateResult::DocumentDeletion { deleted } => TaskResult::DocumentDeletion {
deleted_documents: deleted,
},
UpdateResult::Other => TaskResult::Other,
}
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Update {
DeleteDocuments(Vec<String>),
DocumentAddition {
primary_key: Option<String>,
method: IndexDocumentsMethod,
content_uuid: Uuid,
},
Settings(Settings<Unchecked>),
ClearDocuments,
}

impl From<Update> for TaskContent {
fn from(other: Update) -> Self {
match other {
Update::DeleteDocuments(ids) => {
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids))
}
Update::DocumentAddition {
primary_key,
method,
..
} => TaskContent::DocumentAddition {
content_uuid: Uuid::default(),
merge_strategy: method,
primary_key,
// document count is unknown for legacy updates
documents_count: 0,
},
Update::Settings(settings) => TaskContent::SettingsUpdate {
settings,
// There is no way to know now, so we assume it isn't
is_deletion: false,
},
Update::ClearDocuments => TaskContent::DocumentDeletion(DocumentDeletion::Clear),
}
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum UpdateMeta {
DocumentsAddition {
method: IndexDocumentsMethod,
primary_key: Option<String>,
},
ClearDocuments,
DeleteDocuments {
ids: Vec<String>,
},
Settings(Settings<Unchecked>),
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Enqueued {
pub update_id: u64,
pub meta: Update,
pub enqueued_at: DateTime<Utc>,
}

impl Enqueued {
fn update_task(self, task: &mut Task) {
task.id = self.update_id;
task.content = self.meta.into();
task.events.push(TaskEvent::Created(self.enqueued_at));
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processed {
pub success: UpdateResult,
pub processed_at: DateTime<Utc>,
#[serde(flatten)]
pub from: Processing,
}

impl Processed {
fn update_task(self, task: &mut Task) {
self.from.update_task(task);

let event = TaskEvent::Succeded {
result: TaskResult::from(self.success),
timestamp: self.processed_at,
};
task.events.push(event);
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Processing {
#[serde(flatten)]
pub from: Enqueued,
pub started_processing_at: DateTime<Utc>,
}

impl Processing {
fn update_task(self, task: &mut Task) {
self.from.update_task(task);

let event = TaskEvent::Processing(self.started_processing_at);
task.events.push(event);
}
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Failed {
#[serde(flatten)]
pub from: Processing,
pub msg: String,
pub code: Code,
pub failed_at: DateTime<Utc>,
}

impl Failed {
fn update_task(self, task: &mut Task) {
self.from.update_task(task);

let event = TaskEvent::Failed {
error: ResponseError::from_msg(self.msg, self.code),
timestamp: self.failed_at,
};
task.events.push(event);
}
}

impl From<(UpdateStatus, String)> for Task {
fn from((update, uid): (UpdateStatus, String)) -> Self {
// Dummy task
let mut task = Task {
id: 0,
index_uid: IndexUid::new(uid).unwrap(),
content: TaskContent::IndexDeletion,
events: Vec::new(),
};

match update {
UpdateStatus::Processing(u) => u.update_task(&mut task),
UpdateStatus::Enqueued(u) => u.update_task(&mut task),
UpdateStatus::Processed(u) => u.update_task(&mut task),
UpdateStatus::Failed(u) => u.update_task(&mut task),
}

task
}
}
1 change: 1 addition & 0 deletions meilisearch-lib/src/index_controller/dump_actor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ internal_error!(
tokio::sync::oneshot::error::RecvError,
serde_json::error::Error,
tempfile::PersistError,
fs_extra::error::Error,
TaskError
);

Expand Down
114 changes: 97 additions & 17 deletions meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;

use anyhow::Context;
use fs_extra::dir::{self, CopyOptions};
use log::info;
use serde::{Deserialize, Serialize};
use tempfile::tempdir;
use uuid::Uuid;

use crate::analytics;
use crate::index_controller::dump_actor::compat::v3::UpdateStatus;
use crate::index_controller::dump_actor::Metadata;
// use crate::index_controller::index_resolver::IndexResolver;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::index_resolver::IndexResolver;
use crate::options::IndexerOpts;
use crate::tasks::task::Task;

/// dump structure for V3:
/// .
/// ├── indexes
/// │   └── 25f10bb8-6ea8-42f0-bd48-ad5857f77648
/// │   ├── documents.jsonl
/// │   └── meta.json
/// ├── index_uuids
/// │   └── data.jsonl
/// ├── metadata.json
/// └── updates
/// └── data.jsonl

#[allow(dead_code)]
pub fn load_dump(
Expand All @@ -18,23 +36,85 @@ pub fn load_dump(
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V3",
meta.dump_date, meta.db_version
);

IndexResolver::load_dump(
src.as_ref(),
&dst,
info!("Patching dump V3 to dump V4...");

let patched_dir = tempdir()?;

let options = CopyOptions::default();
dir::copy(src.as_ref().join("indexes"), patched_dir.path(), &options)?;
dir::copy(
src.as_ref().join("index_uuids"),
patched_dir.path(),
&options,
)?;

let uuid_map = read_uuid_map(src.as_ref().join("index_uuids/data.jsonl"))?;

fs::copy(
src.as_ref().join("metadata.json"),
patched_dir.path().join("metadata.json"),
)?;

patch_updates(&src, patched_dir.path(), uuid_map)?;

super::v4::load_dump(
meta,
patched_dir.path(),
dst,
index_db_size,
meta_env_size,
indexing_options,
)?;
UpdateFileStore::load_dump(src.as_ref(), &dst)?;
// TaskStore::load_dump(&src, &dst, update_db_size)?;
analytics::copy_user_id(src.as_ref(), dst.as_ref());
)
}

fn read_uuid_map(path: impl AsRef<Path>) -> anyhow::Result<HashMap<Uuid, String>> {
let file = File::open(path)?;

#[derive(Serialize, Deserialize)]
struct DumpEntry {
uuid: Uuid,
uid: String,
}

serde_json::Deserializer::from_reader(file)
.into_iter::<DumpEntry>()
.try_fold(HashMap::new(), |mut map, entry| {
let entry = entry?;
map.insert(entry.uuid, entry.uid);
Ok(map)
})
}

fn patch_updates(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
uuid_map: HashMap<Uuid, String>,
) -> anyhow::Result<()> {
let dst = dst.as_ref().join("updates");
fs::create_dir_all(&dst)?;

let mut dst_file = BufWriter::new(File::create(dst.join("data.jsonl"))?);
let src_file = BufReader::new(File::open(src.as_ref().join("updates/data.jsonl"))?);

#[derive(Serialize, Deserialize)]
pub struct UpdateEntry {
pub uuid: Uuid,
pub update: UpdateStatus,
}
serde_json::Deserializer::from_reader(src_file)
.into_iter::<UpdateEntry>()
.try_for_each(|entry| -> anyhow::Result<()> {
let entry = entry?;
let name = uuid_map
.get(&entry.uuid)
.with_context(|| format!("Unknown index uuid: {}", entry.uuid))?
.clone();
serde_json::to_writer(&mut dst_file, &Task::from((entry.update, name)))?;
dst_file.write_all(b"\n")?;
Ok(())
})?;

info!("Loading indexes.");
dst_file.flush()?;

Ok(())
}
Loading