Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
bottomless: add xz compression option
Browse files Browse the repository at this point in the history
Empirical testing shows, that gzip achieves mere x2 compression ratio
even with very simple and repeatable data patterns.
Since compression is very important for optimizing our egress traffic
and throughput in general, .xz algorithm is hereby implemented
as well. Ran with the same data set, it achieved ~x50 compression ratio,
which is orders of magnitude better than gzip, at the cost of elevated
CPU usage.

Note: with more algos implemented, we should also consider adding code
that detects which compression methods was used when restoring a snapshot,
to allow restoring from a gzip file, but continue new snapshots with xz.
Currently, setting the compression methods via the env var assumes
that both restore and backup use the same algorithm.
  • Loading branch information
psarna committed Oct 16, 2023
1 parent eb92268 commit f232316
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 11 deletions.
2 changes: 1 addition & 1 deletion bottomless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL"

[dependencies]
anyhow = "1.0.66"
async-compression = { version = "0.3.15", features = ["tokio", "gzip"] }
async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] }
aws-config = { version = "0.55" }
aws-sdk-s3 = { version = "0.28" }
bytes = "1"
Expand Down
8 changes: 8 additions & 0 deletions bottomless/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ impl WalCopier {
wal.copy_frames(&mut gzip, len).await?;
gzip.shutdown().await?;
}
CompressionKind::Xz => {
let mut xz = async_compression::tokio::write::XzEncoder::with_quality(
&mut out,
async_compression::Level::Best,
);
wal.copy_frames(&mut xz, len).await?;
xz.shutdown().await?;
}
}
if tracing::enabled!(tracing::Level::DEBUG) {
let elapsed = Instant::now() - period_start;
Expand Down
6 changes: 5 additions & 1 deletion bottomless/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::bufread::{GzipDecoder, XzEncoder};
use aws_sdk_s3::primitives::ByteStream;
use std::io::ErrorKind;
use std::pin::Pin;
Expand Down Expand Up @@ -32,6 +32,10 @@ impl BatchReader {
let gzip = GzipDecoder::new(reader);
Box::pin(gzip)
}
CompressionKind::Xz => {
let xz = XzEncoder::new(reader);
Box::pin(xz)
}
},
}
}
Expand Down
52 changes: 43 additions & 9 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
use crate::wal::WalFileReader;
use anyhow::{anyhow, bail};
use arc_swap::ArcSwapOption;
use async_compression::tokio::write::GzipEncoder;
use async_compression::tokio::write::{GzipEncoder, XzEncoder};
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
Expand Down Expand Up @@ -653,7 +653,7 @@ impl Replicator {
CompressionKind::None => Ok(ByteStream::from_path(db_path).await?),
CompressionKind::Gzip => {
let mut reader = File::open(db_path).await?;
let gzip_path = Self::db_gzip_path(db_path);
let gzip_path = Self::db_compressed_path(db_path, "gz");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
Expand All @@ -671,13 +671,34 @@ impl Replicator {
);
Ok(ByteStream::from_path(gzip_path).await?)
}
CompressionKind::Xz => {
let mut reader = File::open(db_path).await?;
let gzip_path = Self::db_compressed_path(db_path, "xz");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.truncate(true)
.open(&gzip_path)
.await?;
let mut writer =
XzEncoder::with_quality(compressed_file, async_compression::Level::Best);
let size = tokio::io::copy(&mut reader, &mut writer).await?;
writer.shutdown().await?;
tracing::debug!(
"Compressed database file ({} bytes) into `{}`",
size,
gzip_path.display()
);
Ok(ByteStream::from_path(gzip_path).await?)
}
}
}

fn db_gzip_path(db_path: &Path) -> PathBuf {
let mut gzip_path = db_path.to_path_buf();
gzip_path.pop();
gzip_path.join("db.gz")
fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf {
let mut compressed_path: PathBuf = db_path.to_path_buf();
compressed_path.pop();
compressed_path.join(format!("db.{suffix}"))
}

fn restore_db_path(&self) -> PathBuf {
Expand Down Expand Up @@ -816,9 +837,10 @@ impl Replicator {
let _ = snapshot_notifier.send(Ok(Some(generation)));
let elapsed = Instant::now() - start;
tracing::debug!("Snapshot upload finished (took {:?})", elapsed);
// cleanup gzip database snapshot if exists
let gzip_path = Self::db_gzip_path(&db_path);
let _ = tokio::fs::remove_file(gzip_path).await;
// cleanup gzip/xz database snapshot if exists
for suffix in &["gz", "xz"] {
let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await;
}
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);
Expand Down Expand Up @@ -1163,6 +1185,7 @@ impl Replicator {
let main_db_path = match self.use_compression {
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation),
};

if let Ok(db_file) = self.get_object(main_db_path).send().await {
Expand All @@ -1175,6 +1198,12 @@ impl Replicator {
);
tokio::io::copy(&mut decompress_reader, db).await?
}
CompressionKind::Xz => {
let mut decompress_reader = async_compression::tokio::bufread::XzDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
};
db.flush().await?;

Expand Down Expand Up @@ -1235,6 +1264,7 @@ impl Replicator {
Some(result) => result,
None => {
if !key.ends_with(".gz")
&& !key.ends_with(".xz")
&& !key.ends_with(".db")
&& !key.ends_with(".meta")
&& !key.ends_with(".dep")
Expand Down Expand Up @@ -1423,6 +1453,7 @@ impl Replicator {
let str = fpath.to_str()?;
if str.ends_with(".db")
| str.ends_with(".gz")
| str.ends_with(".xz")
| str.ends_with(".raw")
| str.ends_with(".meta")
| str.ends_with(".dep")
Expand Down Expand Up @@ -1670,13 +1701,15 @@ pub enum CompressionKind {
#[default]
None,
Gzip,
Xz,
}

impl CompressionKind {
pub fn parse(kind: &str) -> std::result::Result<Self, &str> {
match kind {
"gz" | "gzip" => Ok(CompressionKind::Gzip),
"raw" | "" => Ok(CompressionKind::None),
"xz" => Ok(CompressionKind::Xz),
other => Err(other),
}
}
Expand All @@ -1687,6 +1720,7 @@ impl std::fmt::Display for CompressionKind {
match self {
CompressionKind::None => write!(f, "raw"),
CompressionKind::Gzip => write!(f, "gz"),
CompressionKind::Xz => write!(f, "xz"),
}
}
}

0 comments on commit f232316

Please sign in to comment.