Skip to content

Commit

Permalink
finish bench generate
Browse files Browse the repository at this point in the history
  • Loading branch information
neilyio committed Apr 4, 2024
1 parent 41c568e commit 07f82b3
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 144 deletions.
2 changes: 1 addition & 1 deletion cargo-paradedb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
anyhow = "1.0.81"
async-std = "1.12.0"
chrono = { version = "0.4.34", features = ["clock", "alloc", "serde"] }
clap = { version = "4.5.4", features = ["derive"] }
clap = { version = "4.5.4", features = ["derive", "env"] }
cmd_lib = "1.9.3"
glob = "0.3.1"
itertools = "0.12.1"
Expand Down
31 changes: 30 additions & 1 deletion cargo-paradedb/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::subcommand::Subcommand;
use clap::Parser;

#[derive(Parser)]
Expand All @@ -7,6 +6,36 @@ pub struct Cli {
#[command(subcommand)]
pub subcommand: Subcommand,
}
#[derive(clap::Subcommand)]
pub enum Subcommand {
Install,
Bench(BenchArgs),
}

#[derive(Debug, clap::Args)]
pub struct BenchArgs {
#[command(subcommand)]
pub command: Option<BenchCommand>,
}

#[derive(Debug, clap::Subcommand)]
pub enum BenchCommand {
Generate {
/// Starting seed for random generation.
#[arg(long, short, default_value_t = 1)]
seed: u64,
/// Total number of events to generate per file.
/// Defaults to a file size of 100MB.
#[arg(long, short, default_value_t = 118891)]
events: u64,
/// Postgres table name to insert into.
#[arg(short, long, default_value = "benchmark_eslogs")]
table: String,
/// Postgres database url to connect to.
#[arg(short, long, env = "DATABASE_URL")]
url: String,
},
}

impl Default for Cli {
fn default() -> Self {
Expand Down
22 changes: 11 additions & 11 deletions cargo-paradedb/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ mod tables;

use anyhow::Result;
use async_std::task::block_on;
use cli::Cli;
use subcommand::Subcommand;
use cli::{BenchCommand, Cli, Subcommand};
use tracing_subscriber::EnvFilter;

fn main() -> Result<()> {
Expand All @@ -16,14 +15,15 @@ fn main() -> Result<()> {
let cli = Cli::default();
match cli.subcommand {
Subcommand::Install => subcommand::install(),
Subcommand::GenerateBenchmarkCorpus {
out,
events,
repeat,
seed,
} => subcommand::generate_benchmark_corpus(&out, events, repeat, seed),
Subcommand::InsertBenchmarkCorpus { input, table, url } => block_on(
subcommand::insert_benchmark_corpus(&input, &table, url.as_deref()),
),
Subcommand::Bench(bench) => match bench.command.unwrap() {
BenchCommand::Generate {
seed,
events,
table,
url,
} => block_on(subcommand::generate_benchmark_corpus(
seed, events, table, url,
)),
},
}
}
184 changes: 86 additions & 98 deletions cargo-paradedb/src/subcommand.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,12 @@
use crate::tables::{benchlogs::BenchLog, GlobReader};
use anyhow::{bail, ensure, Result};
use crate::tables::{benchlogs::EsLog, PathReader};
use anyhow::{bail, Result};
use cmd_lib::{run_cmd, run_fun};
use itertools::Itertools;
use sqlx::{postgres::PgConnectOptions, Connection, PgConnection, Postgres, QueryBuilder};
use std::{
os::unix::process::CommandExt,
path::{Path, PathBuf},
str::FromStr,
};
use std::{fs, os::unix::process::CommandExt, str::FromStr};
use tempfile::tempdir;
use tracing::debug;

#[derive(clap::Subcommand)]
pub enum Subcommand {
Install,
GenerateBenchmarkCorpus {
/// Path to an existing directory to write generated files.
#[arg(short, long)]
out: PathBuf,
/// Total number of events to generate per file.
/// Defaults to a file size of 100MB.
#[arg(long, short, default_value_t = 118891)]
events: u32,
/// How many times to repeat the generation.
/// Files are named after per-second timestamps.
#[arg(long, short, default_value_t = 1)]
repeat: u32,
/// Starting seed for random generation.
#[arg(long, short, default_value_t = 1)]
seed: u32,
},
InsertBenchmarkCorpus {
/// Glob path to generated json files.
#[arg(short, long = "in")]
input: String,
/// Postgres table name to insert into.
#[arg(short, long)]
table: String,
/// Postgres database url to connect to.
#[arg(short, long)]
url: Option<String>,
},
}

pub fn install() -> Result<()> {
// The crate_path is available to us at compile time with env!.
let crate_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).to_path_buf();
Expand All @@ -67,90 +31,114 @@ pub fn install() -> Result<()> {
// corpus tool changed its API since then, and no longer accepts a "total_bytes"
// argument. It now accepts a "total_events" argument. While more ergonomic, it means
// that we have to work backwards if we want to generate exactly 1TB of data.
pub fn generate_benchmark_corpus(
out_dir: &Path,
events: u32,
repeat: u32,
seed: u32,
pub async fn generate_benchmark_corpus(
seed: u64,
events: u64,
table: String,
url: String,
) -> Result<()> {
ensure!(out_dir.exists(), "Path {out_dir:?} does not exist.");
ensure!(out_dir.is_dir(), "Path {out_dir:?} is not a directory.");

// Ensure that golang and the generator tool are installed.
if let Err(err) = run_cmd!(go version > /dev/null) {
bail!("Golang is likely not installed... {err}")
}

run_cmd!(go install github.com/elastic/elastic-integration-corpus-generator-tool@latest)?;

let tempdir = tempdir()?;
let template_file = tempdir.path().join("template.tpl");
let fields_file = tempdir.path().join("fields.yml");
let config_file = tempdir.path().join("config-1.yml");

// We're going to use the generator configuration from the elasticsearch benchmarks.
// Download them into temporary files so they can be passed to the generator tool.
let config_tempdir = tempdir()?;
let template_file = config_tempdir.path().join("template.tpl");
let fields_file = config_tempdir.path().join("fields.yml");
let config_file = config_tempdir.path().join("config-1.yml");

let opensearch_repo_url =
"https://raw.githubusercontent.com/elastic/elasticsearch-opensearch-benchmark/main";

run_cmd!(curl -s -f -o $template_file $opensearch_repo_url/dataset/template.tpl)?;
run_cmd!(curl -s -f -o $fields_file $opensearch_repo_url/dataset/fields.yml)?;
run_cmd!(curl -s -f -o $config_file $opensearch_repo_url/dataset/config-1.yml)?;

// The generator tool uses the DATA_DIR env var to determine output location.
// It's important to make sure that this is set to a volume with adequate capacity.
std::env::set_var("DATA_DIR", out_dir);

// Set up necessary executable paths to call the generator tool.
let go_path = run_fun!(go env GOPATH)?;
let generator_exe = format!("{go_path}/bin/elastic-integration-corpus-generator-tool");

// The generator tool doesn't have many configuration options... including around
// how it names files. We're stuck with the behavior that filenames will just be
// timestamps (to the second). So if your `bytes` argument is so low that the
// file can be generated under a second... it will just overwrite the previous file.
// It only makes sense to `repeat` if you're generating lots of large files.
for r in 0..repeat {
let iter_seed = r + seed;
// Set up Postgres connection and ensure the table exists.
debug!(DATABASE_URL = std::env::var("DATABASE_URL").unwrap_or_default());
let conn_opts = &PgConnectOptions::from_str(&url)?;
let mut conn = PgConnection::connect_with(&conn_opts).await?;
sqlx::query(&EsLog::create_table_statement(&table))
.execute(&mut conn)
.await?;

// We'll skip events already created in the destination Postgres table.
let events_already_loaded: u64 =
sqlx::query_as::<_, (i64,)>(&format!("SELECT COUNT(id) from {}", table))
.fetch_one(&mut conn)
.await?
.0 as u64;

// The generator tool outputs to files, which we'll then read to load into Postgres.
// We want to cap the size of the output files to a reasonable size.
// 118891 events == 100MB of data, which is the size we'll cap each output file to.
let events_per_file = 118891;
let events_to_create = events - events_already_loaded;
let files_to_create = events_to_create.div_ceil(events_per_file);
debug!(files_to_create, events_to_create);

// For each generated file we intend to create, we'll run the generator tool once,
// and then immediately load the data into Postgres.
for i in 0..files_to_create {
// Setup transaction and how many events to be generated in this transaction.
let mut transaction = sqlx::Connection::begin(&mut conn).await?;
let transaction_events = events_per_file.min(events_to_create - i * events_per_file);
debug!(transaction_events);

// We want the generated files to be deleted after inserting into Postgres,
// so we'll make a tempdir that will be deleted when it's dropped at the
// end of this block.
let generated_tempdir = tempdir()?;
let generated_dir = &generated_tempdir.path().join("generated");

// Ensure output directory for the generated file exists.
fs::create_dir_all(&generated_dir)?;
// The generator tool uses the DATA_DIR env var to determine output location.
std::env::set_var("DATA_DIR", &generated_dir);

// The generator tool doesn't have many configuration options... including around
// how it names files. We're stuck with the behavior that filenames will just be
// timestamps (to the second). So if your `bytes` argument is so low that the
// file can be generated under a second... it will just overwrite the previous file.
// It only makes sense to `repeat` if you're generating lots of large files.
let iter_seed = i + seed;
run_cmd!(
$generator_exe generate-with-template $template_file $fields_file
-t $events
--tot-events $transaction_events
--config-file $config_file
--template-type gotext
--seed $iter_seed
> /dev/null
)?;
}
Ok(())
}

pub async fn insert_benchmark_corpus(glob: &str, table: &str, url: Option<&str>) -> Result<()> {
debug!(DATABASE_URL = std::env::var("DATABASE_URL").unwrap_or_default());

// Set up connection and transaction.
let conn_opts = &PgConnectOptions::from_str(
// If a URL isn't passed, try to parse one from DATABASE_URL.
&url.map(String::from)
.unwrap_or_else(|| std::env::var("DATABASE_URL").ok().unwrap_or_default()),
)?;
let mut conn = PgConnection::connect_with(&conn_opts).await?;
let mut transaction = sqlx::Connection::begin(&mut conn).await?;

// Ensure the table exists;
sqlx::query(&BenchLog::create_table_statement(table))
.execute(&mut *transaction)
.await?;

// Read JSON data files, chunked to not overload Postgres.
let log_chunks = BenchLog::read_all(glob)?.chunks(1000);
// Build an INSERT statement and write to database.
for (index, chunk) in log_chunks.into_iter().enumerate() {
debug!(index, "inserting json benchlog chunk");
QueryBuilder::<Postgres>::new(BenchLog::insert_header(table))
.push_values(chunk, BenchLog::insert_push_values)
.build()
.execute(&mut *transaction)
.await?;
// The files should have been generated, so build a glob string to match them.
// The tool generates the files under a few nested folders, so make sure to
// recursively glob for them.
let output_files_glob_string = generated_dir.join("**/*.tpl").display().to_string();

// Read event JSON, chunked to not overload Postgres.
let log_chunks = EsLog::read_all(&output_files_glob_string)?.chunks(1000);

// Build an INSERT statement and write to database.
for (index, chunk) in log_chunks.into_iter().enumerate() {
debug!(index, "inserting json benchlog chunk");
QueryBuilder::<Postgres>::new(EsLog::insert_header(&table))
.push_values(chunk, EsLog::insert_push_values)
.build()
.execute(&mut *transaction)
.await?;
}

// Commit the transaction. If it's successful, add to the counter.
transaction.commit().await?;
}

// Everything above happens in a single transaction, so all will
transaction.commit().await?;

Ok(())
}
43 changes: 18 additions & 25 deletions cargo-paradedb/src/tables/benchlogs.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
#![allow(unused, dead_code)]
use std::{error::Error, fs::File, io::BufReader};

use super::GlobReader;
use anyhow::Result;
use glob::glob;
use serde::Deserialize;
use sqlx::database::HasArguments;
use sqlx::query::Query;
use sqlx::query_builder::Separated;
use sqlx::types::chrono::{DateTime, Utc};
use sqlx::{Postgres, QueryBuilder};
use sqlx::Postgres;
use std::{fs::File, io::BufReader};

use super::{PathReader, PathSource};

#[derive(Debug, Deserialize)]
pub struct BenchLog {
pub struct EsLog {
#[serde(rename = "@timestamp")]
timestamp: DateTime<Utc>,
#[serde(rename = "aws.cloudwatch")]
Expand All @@ -32,7 +28,7 @@ pub struct BenchLog {
tags: Vec<String>,
}

impl BenchLog {
impl EsLog {
pub fn insert_header(table: &str) -> String {
format!(
r#"INSERT INTO {table} (
Expand Down Expand Up @@ -105,24 +101,21 @@ where
))
}

impl GlobReader for BenchLog {
impl PathReader for EsLog {
type Error = anyhow::Error;

fn read_all(
glob_pattern: &str,
fn read_all<S: PathSource>(
path_source: S,
) -> Result<Box<dyn Iterator<Item = Result<Self, Self::Error>>>, Self::Error> {
let paths = glob(glob_pattern).map_err(|e| Self::Error::from(e))?;

let iterators: Result<Vec<_>, Self::Error> = paths
.map(|path_result| {
path_result.map_err(Self::Error::from).and_then(|path| {
let file = File::open(path)?;
let buffered = BufReader::new(file);
let deserializer = serde_json::Deserializer::from_reader(buffered)
.into_iter::<Self>()
.map(|result| result.map_err(Self::Error::from));
Ok(deserializer)
})
let iterators: Result<Vec<_>, Self::Error> = path_source
.paths()
.map(|path| {
let file = File::open(path)?;
let buffered = BufReader::new(file);
let deserializer = serde_json::Deserializer::from_reader(buffered)
.into_iter::<Self>()
.map(|result| result.map_err(Self::Error::from));
Ok(deserializer)
})
.collect();

Expand Down
Loading

0 comments on commit 07f82b3

Please sign in to comment.