Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: create cargo-paradedb for easy dev script execution #1028

Merged
merged 19 commits into from
Apr 18, 2024
654 changes: 622 additions & 32 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
resolver = "2"
members = [
"cargo-paradedb",
philippemnoel marked this conversation as resolved.
Show resolved Hide resolved
"pg_analytics",
"pg_search",
"shared",
Expand Down
31 changes: 31 additions & 0 deletions cargo-paradedb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "cargo-paradedb"
version = "0.6.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.81"
async-std = "1.12.0"
chrono = { version = "0.4.34", features = ["clock", "alloc", "serde"] }
clap = { version = "4.5.4", features = ["derive", "env"] }
cmd_lib = "1.9.3"
criterion = { version = "0.5.1", features = ["async_std"] }
dotenvy = "0.15.7"
futures = "0.3.30"
glob = "0.3.1"
itertools = "0.12.1"
once_cell = "1.19.0"
reqwest = { version = "0.12.3", features = ["json", "blocking"] }
serde = "1.0.197"
serde_json = "1.0.115"
sqlx = { version = "0.7.4", features = ["postgres", "runtime-async-std", "chrono"] }
tempfile = "3.10.1"
testcontainers = "0.15.0"
testcontainers-modules = { version = "0.3.7", features = ["elastic_search"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[dev-dependencies]
rstest = "0.18.2"
86 changes: 86 additions & 0 deletions cargo-paradedb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# ParadeDB Cargo Dev Tool

## Installation

The first time you install `cargo-paradedb`, you should navigate to the `cargo-paradedb` crate and run:

```sh
cargo run install
```

After this first-time installation, you can run `cargo paradedb install` from anywhere and Cargo will globally re-install `cargo-paradedb` with the latest code changes from your source folder.

If you don't want to install globally, you can always just `cargo run` from the `cargo-paradedb` crate folder.

### Installing From Git Url

In containers or cloud instances, it's useful to be able to install globally with a single command:

```sh
cargo install --git https://github.com/paradedb/paradedb.git cargo-paradedb
```

This will install the tool for use as `cargo paradedb` without having to clone the repository first. You can also specify a branch:

```sh
cargo install \
--git https://github.com/paradedb/paradedb.git \
--branch new-feature-branch \
cargo-paradedb
```

## Benchmarks

To run benchmarks, you must be able to connect to a running ParadeDB instance. `cargo-paradedb` accepts a Postgres connection url in one of the following ways:
neilyio marked this conversation as resolved.
Show resolved Hide resolved

1. A `.env` file with a `DATABASE_URL='postgresql://...'` entry, located in a parent folder of `cargo-paradedb`.
2. Setting the `DATABASE_URL` environment variable when running `cargo paradedb`.
3. Passing a `--url` argument directly to `cargo paradedb bench` commands.

Benchmark tools are run under the `cargo paradedb bench` subcommand, which are organized in `NOUN -> VERB` convention as `DATASET -> ACTION`. The first argument to `cargo paradedb bench` should be which dataset (or "corpus") you would like to benchmark. `eslogs` is the generated corpus for Elasticsearch's benchmarks, and the main corpus that we use for our benchmarks. A example command would be:

```sh
cargo paradedb bench eslogs query-search-index
```

### Generating Benchmark Data

Our benchmarks use the same generated data as the [elasticsearch-opensearch-benchmark](https://github.com/elastic/elasticsearch-opensearch-benchmark) project. To run the data generation tool, you must have [Go](https://go.dev/doc/install) installed. Run the generator tool with:
neilyio marked this conversation as resolved.
Show resolved Hide resolved

```sh
cargo paradedb bench eslogs generate
```

In the command above, `generate` can accept arguments to specify a random seed, number of events to generate, table name, and more. Pass `--help` to the `generate` command to see the available options.

The `generate` tool is idempotent. It will produce a table in your Postgres database with the number of events that you asked it to generate. As it generates data, it will periodically commit the `INSERT` transaction to Postgres. If you kill the process, it will pick up where it left off the next time you run it.

### Running Benchmarks

All commands below operate on default tables, visible with `--help`. Defaults can be overidden with options passed to each command.

Benchmarks that build a table or index are only run once, as these operations usually take a long time. Benchmarks that peform fast operations, like queries, are sampled many times with the [Criterion](https://github.com/bheisler/criterion.rs) library.

Build a `pg_search` index:

```sh
cargo paradedb bench eslogs build-search-index
```

Query a `pg_search` index (index must already exist):

```sh
cargo paradedb bench eslogs query-search-index
```

Build a `pg_analytics` table using `parquet`:

```sh
cargo paradedb bench eslogs build-parquet-table
```

Count rows in a `pg_analytics` table using `parquet`:

```sh
cargo paradedb bench eslogs count-parquet-table
```
106 changes: 106 additions & 0 deletions cargo-paradedb/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use anyhow::Result;
use async_std::sync::Mutex;
use async_std::task::block_on;
use criterion::async_executor::AsyncStdExecutor;
use criterion::Criterion;
use sqlx::Executor;
use sqlx::{postgres::PgConnectOptions, Connection, PgConnection};
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use tracing::debug;

pub struct Benchmark {
pub group_name: String,
pub function_name: String,
pub setup_query: Option<String>,
pub query: String,
pub database_url: String,
}

impl Benchmark {
pub async fn setup_query(&self, conn: &mut PgConnection) -> Result<()> {
if let Some(query) = &self.setup_query {
conn.execute(query.as_ref()).await?;
}

Ok(())
}
pub async fn run_pg(&self) -> Result<()> {
// One-time setup code goes here.
debug!(DATABASE_URL = self.database_url);
let mut criterion = Criterion::default();
let mut group = criterion.benchmark_group(&self.group_name);

// Lowered from default sample size to remove Criterion warning.
// Must be higher than 10, or Criterion will panic.
group.sample_size(60);
group.bench_function(&self.function_name, |runner| {
// Per-sample (note that a sample can be many iterations) setup goes here.
let conn_opts = &PgConnectOptions::from_str(&self.database_url).unwrap();
let conn = block_on(async {
Arc::new(Mutex::new(
PgConnection::connect_with(conn_opts).await.unwrap(),
))
});

// Run setup query.
block_on(async {
let local_conn = conn.clone();
let mut conn = local_conn.lock().await; // Acquire the lock asynchronously.
self.setup_query(&mut conn).await.unwrap();
});

runner.to_async(AsyncStdExecutor).iter(|| {
// Measured code goes here.
async {
let local_conn = conn.clone();
let mut conn = local_conn.lock().await; // Acquire the lock asynchronously.
sqlx::query(&self.query).execute(&mut *conn).await.unwrap();
}
});
});

group.finish();

Ok(())
}

pub async fn run_pg_once(&self) -> Result<()> {
let conn_opts = &PgConnectOptions::from_str(&self.database_url).unwrap();
let mut conn = PgConnection::connect_with(conn_opts).await.unwrap();

// Run setup query if present.
self.setup_query(conn.as_mut()).await.unwrap();

// Run actual query to be benchmarked.
let start_time = SystemTime::now();
block_on(async {
sqlx::query(&self.query).execute(&mut conn).await.unwrap();
});
let end_time = SystemTime::now();

Self::print_results(start_time, end_time);

Ok(())
}

pub fn print_results(start_time: SystemTime, end_time: SystemTime) {
if let Ok(duration) = end_time.duration_since(start_time) {
println!("Start time: {:?}", start_time);
println!("End time: {:?}", end_time);

let milliseconds = duration.as_millis();
let seconds = duration.as_secs_f64(); // Use floating point for seconds
let minutes = seconds / 60.0; // Convert seconds to minutes
let hours = seconds / 3600.0; // Convert seconds to hours

println!("Duration: {} milliseconds", milliseconds);
println!("Duration: {:.4} seconds", seconds); // Print with 4 decimal places
println!("Duration: {:.4} minutes", minutes); // Print with 4 decimal places
println!("Duration: {:.4} hours", hours); // Print with 4 decimal places
} else {
println!("An error occurred while calculating the duration.");
}
}
}
151 changes: 151 additions & 0 deletions cargo-paradedb/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use clap::Parser;

const DEFAULT_BENCH_ESLOGS_TABLE: &str = "benchmark_eslogs";
const DEFAULT_BENCH_ESLOGS_INDEX_NAME: &str = "benchmark_eslogs_pg_search";

/// A wrapper struct for subcommands.
#[derive(Parser)]
#[command(version, about, long_about = None, bin_name = "cargo")]
pub struct Cli {
#[command(subcommand)]
pub subcommand: Subcommand,
}

// Top-level commands for the cargo-paradedb tool.
#[derive(clap::Subcommand)]
pub enum Subcommand {
Install,
Bench(CorpusArgs),
}

// A wrapper struct for a subcommand under 'cargo paradedb bench' which
// select a corpus to generate/run.
#[derive(Debug, clap::Args)]
pub struct CorpusArgs {
#[command(subcommand)]
pub corpus: Corpus,
}

/// Which benchmark dataset to run or generate.
#[derive(Debug, clap::Subcommand)]
pub enum Corpus {
// The generated logs from the ElasticSearch benchmark tool.
Eslogs(EsLogsArgs),
}

/// A wrapper struct for the command to run on the eslogs corpus.
#[derive(Debug, clap::Args)]
pub struct EsLogsArgs {
#[command(subcommand)]
pub command: EsLogsCommand,
}

/// The command to run on the eslogs corpus.
#[derive(Debug, clap::Subcommand)]
pub enum EsLogsCommand {
/// Generate the eslogs corpus, inserting into a Postgres table.
Generate {
/// Starting seed for random generation.
#[arg(long, short, default_value_t = 1)]
seed: u64,
/// Total number of events to generate per file.
/// Defaults to a file size of 100MB.
#[arg(long, short, default_value_t = 118891)]
events: u64,
/// Postgres table name to insert into.
#[arg(short, long, default_value = DEFAULT_BENCH_ESLOGS_TABLE)]
table: String,
/// Postgres database url to connect to.
#[arg(short, long, env = "DATABASE_URL")]
url: String,
},
BuildSearchIndex {
/// Postgres table name to index.
#[arg(short, long, default_value = DEFAULT_BENCH_ESLOGS_TABLE)]
table: String,
/// Postgres table name to index.
#[arg(short, long, default_value = DEFAULT_BENCH_ESLOGS_INDEX_NAME)]
index: String,
/// Postgres database url to connect to.
#[arg(short, long, env = "DATABASE_URL")]
url: String,
},
QuerySearchIndex {
/// Postgres index name to query.
#[arg(short, long, default_value = DEFAULT_BENCH_ESLOGS_INDEX_NAME)]
index: String,
/// Query to run.
#[arg(short, long, default_value = "message:flame")]
query: String,
/// Limit results to return.
#[arg(short, long, default_value_t = 1)]
limit: u64,
/// Postgres database url to connect to.
#[arg(short, long, env = "DATABASE_URL")]
url: String,
},
BuildParquetTable {
/// Postgres table name to build from.
#[arg(short, long, default_value = DEFAULT_BENCH_ESLOGS_TABLE)]
table: String,
/// Postgres database url to connect to.
#[arg(short, long, env = "DATABASE_URL")]
url: String,
},
CountParquetTable {
/// Postgres table name to build from.
#[arg(short, long, default_value = DEFAULT_BENCH_ESLOGS_TABLE)]
table: String,
/// Postgres database url to connect to.
#[arg(short, long, env = "DATABASE_URL")]
url: String,
},
BuildElasticIndex {
/// Postgres table name to build from.
#[arg(short, long, default_value = DEFAULT_BENCH_ESLOGS_TABLE)]
table: String,
/// Postgres database url to connect to.
#[arg(short, long, env = "DATABASE_URL")]
url: String,
/// Elastic index url to connect to.
/// Should contain the index name as a path subcomponent.
#[arg(short, long)]
elastic_url: String,
},
QueryElasticIndex {
/// Index field to match on.
#[arg(short, long, default_value = "message")]
field: String,
/// Search term in index field to match on.
#[arg(short, long, default_value = "flame")]
term: String,
/// Elastic index url to connect to.
/// Should contain the index name as a path subcomponent.
#[arg(short, long)]
elastic_url: String,
},
}

impl Default for Cli {
fn default() -> Self {
// Usually, clap CLI tools can just use `Self::parse()` to initialize a
// struct with the CLI arguments... but seeing as this will be run as a
// cargo subcommand, we need to do some extra work.
//
// Because we're running e.g. "cargo paradedb install"... clap will think
// that "paradedb" is the first argument that was passed to the binary.
// Instead we want "install" to be the first argument, with "paradedb"
// ignored. For this reason, we'll manually collect and process the CLI
// arguments ourselves.
//
// We check to see if the argument at index 1 is "paradedb"...
// as the argument at index 0 is always the path to the binary executable.
// If "paradedb" is found, we'll parse the arguments starting at index 1.
// Otherwise, we'll use Self::parse() like usual.
let args = std::env::args().collect::<Vec<String>>();
match args.get(1) {
Some(arg) if arg == "paradedb" => Self::parse_from(&args[1..]),
_ => Self::parse(),
}
}
}
Loading