Skip to content

Commit cc8025a

Browse files
committed
use more stable ordering
1 parent 9443c8f commit cc8025a

File tree

1 file changed

+40
-41
lines changed

1 file changed

+40
-41
lines changed

etl-destinations/tests/iceberg_destination.rs

Lines changed: 40 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,15 @@ async fn cdc_streaming() {
301301
);
302302

303303
let mut actual_users = read_all_rows(&client, namespace.to_string(), users_table.clone()).await;
304-
// Drop the last column (non-deterministic sequence) before comparison.
304+
305+
// Sort deterministically by the primary key (id) and sequence number for stable assertions
306+
actual_users.sort_by(|a, b| {
307+
let a_key = format!("{:?}_{:?}", a.values[0], a.values[4]);
308+
let b_key = format!("{:?}_{:?}", b.values[0], b.values[4]);
309+
a_key.cmp(&b_key)
310+
});
311+
312+
// Drop the last column (non-deterministic sequence number) before comparison.
305313
for row in &mut actual_users {
306314
let _ = row.values.pop();
307315
}
@@ -310,13 +318,13 @@ async fn cdc_streaming() {
310318
// Note: order here is messed up due to limitations in how read_all_rows can't sort, so we sort manually
311319
// by id and cdc operation columns
312320
let expected_users = vec![
313-
// Delete of user with id 1
321+
// Initial insert of user 1
314322
TableRow {
315323
values: vec![
316324
Cell::I64(1),
317-
Cell::String("".to_string()),
318-
Cell::I32(0),
319-
Cell::String("DELETE".to_string()),
325+
Cell::String("user_1".to_string()),
326+
Cell::I32(1),
327+
Cell::String("UPSERT".to_string()),
320328
],
321329
},
322330
// Update of user 1
@@ -328,52 +336,48 @@ async fn cdc_streaming() {
328336
Cell::String("UPSERT".to_string()),
329337
],
330338
},
331-
// Initial insert of user 1
339+
// Delete of user with id 1
332340
TableRow {
333341
values: vec![
334342
Cell::I64(1),
335-
Cell::String("user_1".to_string()),
336-
Cell::I32(1),
337-
Cell::String("UPSERT".to_string()),
343+
Cell::String("".to_string()),
344+
Cell::I32(0),
345+
Cell::String("DELETE".to_string()),
338346
],
339347
},
340-
// Update of user 2
348+
// Initial insert of user 2
341349
TableRow {
342350
values: vec![
343351
Cell::I64(2),
344-
Cell::String("updated_name".to_string()),
345-
Cell::I32(42),
352+
Cell::String("user_2".to_string()),
353+
Cell::I32(2),
346354
Cell::String("UPSERT".to_string()),
347355
],
348356
},
349-
// Initial insert of user 2
357+
// Update of user 2
350358
TableRow {
351359
values: vec![
352360
Cell::I64(2),
353-
Cell::String("user_2".to_string()),
354-
Cell::I32(2),
361+
Cell::String("updated_name".to_string()),
362+
Cell::I32(42),
355363
Cell::String("UPSERT".to_string()),
356364
],
357365
},
358366
];
359367

360-
// Sort deterministically by the primary key (id) and operation for stable assertions.
361-
actual_users.sort_by(|a, b| {
362-
let a_key = format!(
363-
"{:?}_{:?}_{:?}_{:?}",
364-
a.values[0], a.values[1], a.values[2], a.values[3]
365-
);
366-
let b_key = format!(
367-
"{:?}_{:?}_{:?}_{:?}",
368-
b.values[0], b.values[1], b.values[2], b.values[3]
369-
);
370-
a_key.cmp(&b_key)
371-
});
372368
assert_eq!(actual_users, expected_users);
373369

374370
let mut actual_orders =
375371
read_all_rows(&client, namespace.to_string(), orders_table.clone()).await;
376-
// Drop the last column (non-deterministic sequence) before comparison.
372+
373+
// Sort deterministically by the primary key (id) and sequence number for stable assertions
374+
actual_orders.sort_by(|a, b| {
375+
let a_key = format!("{:?}_{:?}", a.values[0], a.values[3]);
376+
let b_key = format!("{:?}_{:?}", b.values[0], b.values[3]);
377+
a_key.cmp(&b_key)
378+
});
379+
380+
// Drop the last column (non-deterministic sequence number) before comparison.
377381
for row in &mut actual_orders {
378382
let _ = row.values.pop();
379383
}
@@ -395,14 +399,6 @@ async fn cdc_streaming() {
395399
Cell::String("UPSERT".to_string()),
396400
],
397401
},
398-
// Delete of order 2
399-
TableRow {
400-
values: vec![
401-
Cell::I64(2),
402-
Cell::String("".to_string()),
403-
Cell::String("DELETE".to_string()),
404-
],
405-
},
406402
// Initial insert of order 2
407403
TableRow {
408404
values: vec![
@@ -419,13 +415,16 @@ async fn cdc_streaming() {
419415
Cell::String("UPSERT".to_string()),
420416
],
421417
},
418+
// Delete of order 2
419+
TableRow {
420+
values: vec![
421+
Cell::I64(2),
422+
Cell::String("".to_string()),
423+
Cell::String("DELETE".to_string()),
424+
],
425+
},
422426
];
423427

424-
actual_orders.sort_by(|a, b| {
425-
let a_key = format!("{:?}_{:?}_{:?}", a.values[0], a.values[1], a.values[2]);
426-
let b_key = format!("{:?}_{:?}_{:?}", b.values[0], b.values[1], b.values[2]);
427-
a_key.cmp(&b_key)
428-
});
429428
assert_eq!(actual_orders, expected_orders);
430429

431430
// Stop the pipeline to finalize writes.

0 commit comments

Comments
 (0)