Skip to content

Commit

Permalink
fix issue #750
Browse files Browse the repository at this point in the history
  • Loading branch information
eeeebbbbrrrr committed Jun 29, 2022
1 parent 741215d commit 177194d
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/elasticsearch/bulk.rs
Expand Up @@ -143,7 +143,7 @@ impl ElasticsearchBulkRequest {
bulk.do_refresh = false; // we don't need to do a refresh for this bulk as we'll take care of it below
deferred_commands.into_iter().for_each(|command| {
bulk.handler
.queue_command(command)
.queue_command_ex(command, true)
.expect("failed to queue leftover command")
});
let (t, nr) = bulk.finish()?;
Expand Down Expand Up @@ -715,10 +715,18 @@ impl Handler {
}

pub fn queue_command(
&mut self,
command: BulkRequestCommand<'static>,
) -> Result<(), crossbeam_channel::SendError<BulkRequestCommand<'static>>> {
self.queue_command_ex(command, false)
}

pub fn queue_command_ex(
&mut self,
mut command: BulkRequestCommand<'static>,
is_deferred: bool,
) -> Result<(), crossbeam_channel::SendError<BulkRequestCommand<'static>>> {
if self.current_xid.is_none() {
if is_deferred == false && self.current_xid.is_none() {
match &command {
BulkRequestCommand::Insert { .. } | BulkRequestCommand::Update { .. } => {
let current_xid = unsafe { pg_sys::GetCurrentTransactionId() };
Expand Down
123 changes: 123 additions & 0 deletions test/expected/issue-750.out
@@ -0,0 +1,123 @@
CREATE TABLE IF NOT EXISTS table_a (
id serial primary key,
val text
);
DROP TABLE IF EXISTS table_b;
CREATE TABLE IF NOT EXISTS table_b (
id serial primary key,
id_a integer references table_a(id),
val text
);
alter table table_a add column id_b integer references table_b(id);
CREATE TYPE table_a_type AS (
id integer,
id_b integer,
val text
);
CREATE FUNCTION table_a_idx(table_a) RETURNS table_a_type IMMUTABLE STRICT LANGUAGE sql AS $$
SELECT ROW (
$1.id,
$1.id_b,
$1.val
)
$$;
CREATE INDEX table_a_idx
ON table_a
USING zombodb (table_a_idx(table_a));
CREATE TYPE table_b_type AS (
id integer,
val text
);
CREATE FUNCTION table_b_idx(table_b) RETURNS table_b_type IMMUTABLE STRICT LANGUAGE sql AS $$
SELECT ROW (
$1.id,
$1.val
)
$$;
CREATE INDEX table_b_idx
ON table_b
USING zombodb (table_b_idx(table_b));
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
BEGIN;
savepoint abc;
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
savepoint asdf;
delete from table_a where val = '123';
UPDATE table_a set id_b = 1 where val ='hello';
UPDATE table_b set id_a = 1 where val = 'world';
release savepoint asdf;
release savepoint abc;
COMMIT;
DROP function table_a_idx cascade;
DROP TYPE table_a_type cascade;
DROP function table_b_idx cascade;
DROP TYPE table_b_type cascade;
DROP TABLE table_a CASCADE;
DROP TABLE table_b CASCADE;
138 changes: 138 additions & 0 deletions test/sql/issue-750.sql
@@ -0,0 +1,138 @@
CREATE TABLE IF NOT EXISTS table_a (
id serial primary key,
val text
);
DROP TABLE IF EXISTS table_b;

CREATE TABLE IF NOT EXISTS table_b (
id serial primary key,
id_a integer references table_a(id),
val text
);

alter table table_a add column id_b integer references table_b(id);



CREATE TYPE table_a_type AS (
id integer,
id_b integer,
val text
);

CREATE FUNCTION table_a_idx(table_a) RETURNS table_a_type IMMUTABLE STRICT LANGUAGE sql AS $$
SELECT ROW (
$1.id,
$1.id_b,
$1.val
)
$$;

CREATE INDEX table_a_idx
ON table_a
USING zombodb (table_a_idx(table_a));

CREATE TYPE table_b_type AS (
id integer,
val text
);

CREATE FUNCTION table_b_idx(table_b) RETURNS table_b_type IMMUTABLE STRICT LANGUAGE sql AS $$
SELECT ROW (
$1.id,
$1.val
)
$$;

CREATE INDEX table_b_idx
ON table_b
USING zombodb (table_b_idx(table_b));

Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('hello');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');
Insert into table_a (val) values ('123');

BEGIN;
savepoint abc;
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');
insert into table_b (val) values ('world');

savepoint asdf;
delete from table_a where val = '123';
UPDATE table_a set id_b = 1 where val ='hello';
UPDATE table_b set id_a = 1 where val = 'world';
release savepoint asdf;
release savepoint abc;
COMMIT;

DROP function table_a_idx cascade;
DROP TYPE table_a_type cascade;
DROP function table_b_idx cascade;
DROP TYPE table_b_type cascade;

DROP TABLE table_a CASCADE;
DROP TABLE table_b CASCADE;

0 comments on commit 177194d

Please sign in to comment.