Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,23 @@ where
.data
.insert(row.clone())
.map_err(WorkTableError::PagesError)?;
self.pk_map
if let Err(e) = self
.pk_map
.insert(pk.clone(), link)
.map_or(Ok(()), |_| Err(WorkTableError::AlreadyExists))?;
self.indexes.save_row(row, link)?;
.map_or(Ok(()), |_| Err(WorkTableError::AlreadyExists))
{
self.data.delete(link).map_err(WorkTableError::PagesError)?;
self.pk_map.remove(&pk);

return Err(e);
};
if let Err(e) = self.indexes.save_row(row.clone(), link) {
self.data.delete(link).map_err(WorkTableError::PagesError)?;
self.pk_map.remove(&pk);
self.indexes.delete_row(row, link)?;

return Err(e);
}

Ok(pk)
}
Expand Down Expand Up @@ -218,15 +231,25 @@ where
.map_err(WorkTableError::PagesError)?;
let (exists, primary_key_events) = self.pk_map.insert_cdc(pk.clone(), link);
if exists.is_some() {
self.data.delete(link).map_err(WorkTableError::PagesError)?;
self.pk_map.remove(&pk);

return Err(WorkTableError::AlreadyExists);
}
let secondary_keys_events = self.indexes.save_row_cdc(row, link)?;
let indexes_res = self.indexes.save_row_cdc(row.clone(), link);
if let Err(e) = indexes_res {
self.data.delete(link).map_err(WorkTableError::PagesError)?;
self.pk_map.remove(&pk);
self.indexes.delete_row(row, link)?;

return Err(e);
}

let op = Operation::Insert(InsertOperation {
id: Default::default(),
pk_gen_state: self.pk_gen.get_state(),
primary_key_events,
secondary_keys_events,
secondary_keys_events: indexes_res.expect("was checked before"),
bytes,
link,
});
Expand Down
6 changes: 5 additions & 1 deletion tests/persistence/sync/string_re_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ worktable!(
third: String,
last: String,
},
indexes: {
first_idx: first,
second_idx: second unique,
},
);

#[test]
Expand Down Expand Up @@ -44,7 +48,7 @@ fn test_key() {
.unwrap();
table
.insert(StringReReadRow {
first: "first_again".to_string(),
first: "first".to_string(),
id: table.get_next_pk().into(),
third: "third_again".to_string(),
second: "second_again".to_string(),
Expand Down
16 changes: 0 additions & 16 deletions tests/worktable/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,6 @@ async fn upsert_spawn() {
assert!(table.select(2.into()).is_none())
}

#[test]
fn insert() {
let table = TestWorkTable::default();
let row = TestRow {
id: table.get_next_pk().into(),
test: 1,
another: 1,
exchange: "test".to_string(),
};
let pk = table.insert(row.clone()).unwrap();
let selected_row = table.select(pk).unwrap();

assert_eq!(selected_row, row);
assert!(table.select(2.into()).is_none())
}

#[tokio::test]
async fn update() {
let table = TestWorkTable::default();
Expand Down
4 changes: 2 additions & 2 deletions tests/worktable/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ worktable!(
}
);

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn rw_lock_hash_map_vs_wt() {
// #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn _rw_lock_hash_map_vs_wt() {
let wt = Arc::new(MapWorkTable::default());
let hash_map = Arc::new(RwLock::new(HashMap::<u64, String>::default()));

Expand Down
94 changes: 94 additions & 0 deletions tests/worktable/index/insert.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use worktable::prelude::*;
use worktable::worktable;

worktable!(
name: Test,
columns: {
id: u64 primary_key autoincrement,
val: i64,
attr1: String,
attr2: i16,
attr3: u64,
},
indexes: {
attr1_idx: attr1,
attr2_idx: attr2 unique,
}
);

#[test]
fn insert() {
let table = TestWorkTable::default();
let row = TestRow {
id: table.get_next_pk().into(),
val: 13,
attr1: "Attribute".to_string(),
attr2: -128,
attr3: 123456789,
};
let pk = table.insert(row.clone()).unwrap();
let selected_row = table.select(pk).unwrap();

assert_eq!(selected_row, row);
assert!(table.select(2.into()).is_none())
}

#[test]
fn insert_when_pk_exists() {
let table = TestWorkTable::default();
let row = TestRow {
id: table.get_next_pk().into(),
val: 13,
attr1: "Attribute".to_string(),
attr2: -128,
attr3: 123456789,
};
let pk = table.insert(row.clone()).unwrap();

let next_row = TestRow {
id: pk.0,
val: 0,
attr1: "some str".to_string(),
attr2: 0,
attr3: 0,
};
assert!(table.insert(next_row.clone()).is_err());
assert!(table
.0
.indexes
.attr1_idx
.get(&next_row.attr1)
.collect::<Vec<_>>()
.is_empty());
assert!(table.0.indexes.attr2_idx.get(&next_row.attr2).is_none())
}

#[test]
fn insert_when_secondary_unique_exists() {
let table = TestWorkTable::default();
let row = TestRow {
id: table.get_next_pk().into(),
val: 13,
attr1: "Attribute".to_string(),
attr2: -128,
attr3: 123456789,
};
let _ = table.insert(row.clone()).unwrap();

let next_row = TestRow {
id: table.get_next_pk().into(),
val: 0,
attr1: "some str".to_string(),
attr2: row.attr2,
attr3: 0,
};
assert!(table.insert(next_row.clone()).is_err());
assert!(table
.0
.indexes
.attr1_idx
.get(&next_row.attr1)
.collect::<Vec<_>>()
.is_empty());
assert!(table.0.indexes.attr2_idx.get(&next_row.attr2).is_none());
}
1 change: 1 addition & 0 deletions tests/worktable/index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod insert;
mod update_by_pk;
mod update_full;
mod update_query;
Expand Down
Loading