Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/iceberg_destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn record_batches_to_iceberg(
.collect();

let snapshot_id = fastrand::i64(..);
let sequence_number = 1;
let sequence_number = previous_metadata.last_sequence_number() + 1;

let manifest_file_path = format!("{}/metadata/manifest-{}.avro", target_url, Uuid::new_v4());
let manifest_file_output = file_io.new_output(manifest_file_path)?;
Expand Down
80 changes: 74 additions & 6 deletions tests/basic_integration.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::vec;

use arrow::array::{
Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampMicrosecondArray,
};
use arrow::datatypes::DataType;
use clap::Parser;
use futures::{StreamExt, TryStreamExt};
use iceberg::spec::TableMetadata;
use lakehouse_loader::delta_destination::object_store_keys_from_env;
use lakehouse_loader::error::DataLoadingError;
use lakehouse_loader::pg_arrow_source::PgArrowSource;
Expand Down Expand Up @@ -57,6 +60,10 @@ async fn test_pg_to_delta_e2e() {
assert!(paths[2].to_string().ends_with("-c000.snappy.parquet"));
}

const DATA_FILEPATH_PATTERN: &str = r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$";
const MANIFEST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$";
const MANIFEST_LIST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$";

#[tokio::test]
async fn test_pg_to_iceberg() {
let target_url = "s3://lhl-test-bucket/iceberg";
Expand All @@ -78,23 +85,40 @@ async fn test_pg_to_iceberg() {
let (store, path) =
object_store::parse_url_opts(&Url::parse(target_url).unwrap(), config).unwrap();

// THEN iceberg data and metadata files are written
let mut paths = store
.list(Some(&path))
.map_ok(|m| m.location)
.boxed()
.try_collect::<Vec<Path>>()
.await
.unwrap();

paths.sort();

// THEN iceberg data and metadata files are written
assert_eq!(paths.len(), 5);
assert!(Regex::new(r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$").unwrap().is_match(paths[0].as_ref()));
assert!(Regex::new(r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[1].as_ref()));
assert!(Regex::new(r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[2].as_ref()));
assert!(Regex::new(DATA_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[0].as_ref()));
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[1].as_ref()));
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[2].as_ref()));
assert_eq!(&paths[3].to_string(), "iceberg/metadata/v0.metadata.json");
assert_eq!(&paths[4].to_string(), "iceberg/metadata/version-hint.text");
// THEN iceberg metadata can be parsed
let metadata_bytes = store.get(&paths[3]).await.unwrap().bytes().await.unwrap();
let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap();
let metadata = serde_json::from_str::<TableMetadata>(metadata_str).unwrap();
// THEN metadata contains a single snapshot with sequence number 1
assert_eq!(metadata.last_sequence_number(), 1);
assert_eq!(
metadata
.snapshots()
.map(|s| s.sequence_number())
.collect::<Vec<_>>(),
vec![1]
);

// WHEN we try to write to an existing table without passing the overwrite flag
// THEN the command errors out
Expand Down Expand Up @@ -149,6 +173,50 @@ async fn test_pg_to_iceberg() {
"--overwrite",
];
assert!(do_main(Cli::parse_from(args.clone())).await.is_ok());

// THEN iceberg data and metadata files are written
let mut paths = store
.list(Some(&path))
.map_ok(|m| m.location)
.boxed()
.try_collect::<Vec<Path>>()
.await
.unwrap();
paths.sort();
assert_eq!(paths.len(), 9);
assert!(Regex::new(DATA_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[0].as_ref()));
assert!(Regex::new(DATA_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[1].as_ref()));
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[2].as_ref()));
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[3].as_ref()));
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[4].as_ref()));
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
.unwrap()
.is_match(paths[5].as_ref()));
assert_eq!(&paths[6].to_string(), "iceberg/metadata/v0.metadata.json");
assert_eq!(&paths[7].to_string(), "iceberg/metadata/v1.metadata.json");
assert_eq!(&paths[8].to_string(), "iceberg/metadata/version-hint.text");
// THEN iceberg metadata can be parsed
let metadata_bytes = store.get(&paths[7]).await.unwrap().bytes().await.unwrap();
let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap();
let metadata = serde_json::from_str::<TableMetadata>(metadata_str).unwrap();
// THEN metadata contains two snapshots with sequence numbers 1 and 2
assert_eq!(metadata.last_sequence_number(), 2);
let mut snapshot_ids = metadata
.snapshots()
.map(|s| s.sequence_number())
.collect::<Vec<_>>();
snapshot_ids.sort();
assert_eq!(snapshot_ids, vec![1, 2]);
}

#[tokio::test]
Expand Down
Loading