Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist results in s3 #18

Merged
merged 10 commits into from Apr 6, 2023
10 changes: 7 additions & 3 deletions crates/abq_cli/src/instance.rs
Expand Up @@ -49,13 +49,14 @@ pub async fn start_abq_forever(
let manifests_path = tempfile::tempdir().expect("unable to create a temporary file");
let persist_manifest = persistence::manifest::FilesystemPersistor::new_shared(
manifests_path.path(),
remote_persistence,
remote_persistence.clone(),
);

let results_path = tempfile::tempdir().expect("unable to create a temporary file");
let persist_results = persistence::results::FilesystemPersistor::new_shared(
results_path.path(),
RESULTS_PERSISTENCE_LRU_CAPACITY,
remote_persistence,
);

let run_timeout_strategy = RunTimeoutStrategy::RUN_BASED;
Expand Down Expand Up @@ -208,8 +209,11 @@ impl AbqInstance {
);

let results_path = tempfile::tempdir().expect("unable to create a temporary file");
let persist_results =
persistence::results::FilesystemPersistor::new_shared(results_path.path(), 10);
let persist_results = persistence::results::FilesystemPersistor::new_shared(
results_path.path(),
10,
NoopPersister,
);

let mut config = QueueConfig::new(persist_manifest, persist_results);
config.server_options = ServerOptions::new(server_auth, server_tls);
Expand Down
161 changes: 133 additions & 28 deletions crates/abq_cli/tests/cli.rs
Expand Up @@ -3000,6 +3000,13 @@ fn write_to_temp(content: &str) -> NamedTempFile {
fi
}

fn heuristic_wait_for_written_path(path: &Path) {
while !path.exists() {
thread::sleep(Duration::from_millis(10));
}
thread::sleep(Duration::from_millis(10));
}

#[test]
#[serial]
fn custom_remote_persistence() {
Expand Down Expand Up @@ -3105,40 +3112,138 @@ fn custom_remote_persistence() {
);
}

let custom_persisted_manifest_path = custom_persisted_path.join("store-manifest-test-run-id");
while !custom_persisted_manifest_path.exists() {
thread::sleep(Duration::from_millis(10));
}
let manifest = std::fs::read_to_string(&custom_persisted_manifest_path)
.unwrap_or_else(|_| panic!("Nothing at {:?}", custom_persisted_manifest_path));

let manifest: serde_json::Value = serde_json::from_str(&manifest).unwrap();
insta::assert_json_snapshot!(manifest, {
".items[0].spec.work_id" => "[redacted]",
}, @r###"
{
"assigned_entities": [
let custom_persisted_manifest_path =
custom_persisted_path.join("store-manifest-test-run-id");
heuristic_wait_for_written_path(&custom_persisted_manifest_path);

let manifest = std::fs::read_to_string(&custom_persisted_manifest_path)
.unwrap_or_else(|_| panic!("Nothing at {:?}", custom_persisted_manifest_path));

let manifest: serde_json::Value = serde_json::from_str(&manifest).unwrap();
insta::assert_json_snapshot!(manifest, {
".items[0].spec.work_id" => "[redacted]",
}, @r###"
{
"Runner": [
0,
1
"assigned_entities": [
{
"Runner": [
0,
1
]
}
],
"items": [
{
"run_number": 1,
"spec": {
"test_case": {
"id": "test1",
"meta": {}
},
"work_id": "[redacted]"
}
}
]
}
],
"items": [
{
"run_number": 1,
"spec": {
"test_case": {
"id": "test1",
"meta": {}
},
"work_id": "[redacted]"
"###);
}

{
use std::{fs::File, io::BufReader};

let custom_persisted_results_path = custom_persisted_path.join("store-results-test-run-id");
heuristic_wait_for_written_path(&custom_persisted_results_path);

let lines = BufReader::new(File::open(&custom_persisted_results_path).unwrap()).lines();
let mut lines: Vec<_> = lines.map(|line| line.unwrap()).collect();
lines.sort();

let result_lines: Vec<_> = lines
.into_iter()
.map(|line| serde_json::from_str::<serde_json::Value>(&line).unwrap())
.collect();

insta::assert_json_snapshot!(result_lines, {
"[0].Results[0].work_id" => "[redacted]",
"[0].Results[0].results[0].result.timestamp" => "[redacted]",
}, @r###"
[
{
"Results": [
{
"after_all_tests": null,
"before_any_test": {
"stderr": [],
"stdout": []
},
"results": [
{
"result": {
"display_name": "zzz-faux",
"finished_at": "1994-11-05T13:17:30Z",
"id": "zzz-faux",
"lineage": [
"TopLevel",
"SubModule",
"Test"
],
"location": {
"column": 15,
"file": "a/b/x.file",
"line": 10
},
"meta": {},
"output": "my test output",
"runtime": {
"Nanoseconds": 0
},
"started_at": "1994-11-05T13:15:30Z",
"status": "Success",
"stderr": [],
"stdout": [],
"timestamp": "[redacted]"
},
"source": {
"has_stdout_reporters": false,
"is_singleton": true,
"runner": [
0,
1
]
}
}
],
"run_number": 1,
"work_id": "[redacted]"
}
]
},
{
"Summary": {
"manifest_size_nonce": 1,
"native_runner_info": {
"protocol_version": {
"major": 0,
"minor": 2,
"type": "abq_protocol_version"
},
"specification": {
"host": "ruby 3.1.2p20 (2022-04-12 revision 4491bb740a) [x86_64-darwin21]",
"language": "ruby",
"language_version": "3.1.2p20",
"name": "test",
"test_framework": "rspec",
"test_framework_version": "3.12.0",
"type": "abq_native_runner_specification",
"version": "0.0.0"
}
}
}
}
}
]
]
"###);
}
"###);

term(queue_proc);
}
8 changes: 8 additions & 0 deletions crates/abq_queue/src/persistence/manifest.rs
Expand Up @@ -34,6 +34,14 @@ impl ManifestView {
}
}

pub fn len(&self) -> usize {
self.items.len()
}

pub fn is_empty(&self) -> bool {
self.items.is_empty()
}

fn get_partition_for_entity(self, entity_tag: Tag) -> Vec<WorkerTest> {
let Self {
mut items,
Expand Down
13 changes: 4 additions & 9 deletions crates/abq_queue/src/persistence/remote.rs
Expand Up @@ -3,7 +3,7 @@
//! [results]: super::results
//! [manifest]: super::manifest

use std::path::Path;
use std::{path::Path, sync::Arc};

use abq_utils::{error::OpaqueResult, net_protocol::workers::RunId};
use async_trait::async_trait;
Expand Down Expand Up @@ -72,8 +72,9 @@ pub trait RemotePersistence {
fn boxed_clone(&self) -> Box<dyn RemotePersistence + Send + Sync>;
}

#[derive(Clone)]
#[repr(transparent)]
pub struct RemotePersister(Box<dyn RemotePersistence + Send + Sync>);
pub struct RemotePersister(Arc<Box<dyn RemotePersistence + Send + Sync>>);

impl<T> From<T> for RemotePersister
where
Expand All @@ -86,7 +87,7 @@ where

impl RemotePersister {
pub fn new(persister: impl RemotePersistence + Send + Sync + 'static) -> RemotePersister {
RemotePersister(Box::new(persister))
RemotePersister(Arc::new(Box::new(persister)))
}

pub async fn store(
Expand Down Expand Up @@ -116,9 +117,3 @@ impl RemotePersister {
self.0.load(kind, run_id, into_local_path).await
}
}

impl Clone for RemotePersister {
fn clone(&self) -> Self {
Self(self.0.boxed_clone())
}
}