Skip to content

Commit

Permalink
Fix Notifier.flushQueues() IllegalStateException, re #81
Browse files Browse the repository at this point in the history
  • Loading branch information
safris committed Jun 17, 2023
1 parent 1373543 commit 028d1ae
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 49 deletions.
53 changes: 27 additions & 26 deletions jsql/src/main/java/org/jaxdb/jsql/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,32 @@ public void clear() {
}

private static int aggregate(final Compiler compiler, final OnNotifyCallbackList onNotifyCallbackList, final int[] counts, final Statement statement, final Command.Insert<?>[] generatedKeys, final int index, int total) throws SQLException {
if (total != Statement.EXECUTE_FAILED) {
boolean hasInfo = total != Statement.SUCCESS_NO_INFO;
if (!hasInfo)
total = 0;

int aggregate = 0;
for (int i = 0, i$ = counts.length; i < i$; ++i) { // [A]
final int count = counts[i];
if (count == Statement.EXECUTE_FAILED)
return Statement.EXECUTE_FAILED;

if (count != Statement.SUCCESS_NO_INFO) {
hasInfo = true;
aggregate += count;
}
else {
counts[i] = 0;
}
}

if (onNotifyCallbackList != null)
onNotifyCallbackList.setCount(aggregate);

total = hasInfo ? total + aggregate : Statement.SUCCESS_NO_INFO;
}

ResultSet resultSet = null;
for (int i = index, i$ = index + counts.length; i < i$; ++i) { // [A]
if (generatedKeys[i] != null) {
Expand All @@ -126,32 +152,7 @@ private static int aggregate(final Compiler compiler, final OnNotifyCallbackList
}
}

if (total == Statement.EXECUTE_FAILED)
return Statement.EXECUTE_FAILED;

boolean hasInfo = total != Statement.SUCCESS_NO_INFO;
if (!hasInfo)
total = 0;

int aggregate = 0;
for (int i = 0, i$ = counts.length; i < i$; ++i) { // [A]
final int count = counts[i];
if (count == Statement.EXECUTE_FAILED)
return Statement.EXECUTE_FAILED;

if (count != Statement.SUCCESS_NO_INFO) {
hasInfo = true;
aggregate += count;
}
else {
counts[i] = 0;
}
}

if (onNotifyCallbackList != null)
onNotifyCallbackList.count.set(aggregate);

return hasInfo ? total + aggregate : Statement.SUCCESS_NO_INFO;
return total;
}

private void onExecute(final int start, final int end, final int[] counts) {
Expand Down
29 changes: 24 additions & 5 deletions jsql/src/main/java/org/jaxdb/jsql/Callbacks.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public boolean testThrows(final Exception e, final int index, final int count) t

static class OnNotifyCallbackList extends DelegateCollection<OnNotifyCallback> implements BiConsumer<Schema,Exception> {
final String sessionId;
final AtomicInteger count = new AtomicInteger();
private final AtomicInteger count = new AtomicInteger(-1);
private final AtomicInteger indexIn = new AtomicInteger();
private final AtomicInteger indexOut = new AtomicInteger();
private final AtomicReference<OnNotifyCallback> root = new AtomicReference<>();
Expand All @@ -103,6 +103,13 @@ static class OnNotifyCallbackList extends DelegateCollection<OnNotifyCallback> i
this.sessionId = sessionId;
}

void setCount(final int count) {
synchronized (this.count) {
this.count.set(count);
this.count.notify();
}
}

boolean await(final long timeout) throws InterruptedException {
if (timeout <= 0)
return false;
Expand Down Expand Up @@ -138,12 +145,24 @@ boolean await(final long timeout) throws InterruptedException {
@Override
public void accept(final Schema schema, final Exception e) {
final int index = this.indexIn.incrementAndGet();
final int count = this.count.get();
if (index > count) {
// FIXME: This has happened!!!
throw new IllegalStateException("index (" + index + ") > count (" + count + ") for sessionId = " + sessionId);
int count = this.count.get();
if (count == -1) {
synchronized (this.count) {
count = this.count.get();
if (count == -1) {
try {
this.count.wait();
}
catch (final InterruptedException ie) {
throw new IllegalStateException(ie);
}
}
}
}

if (index > count)
throw new IllegalStateException("index (" + index + ") > count (" + count + ") for sessionId = " + sessionId, e);

try {
OnNotifyCallback prev = null;
// if (logger.isTraceEnabled()) logger.trace(getClass().getSimpleName() + "[" + sessionId + "].test(" + ObjectUtil.simpleIdentityString(e) + "): " + index + " " + count + "... " + ObjectUtil.simpleIdentityString(root.get()) + " " + ObjectUtil.simpleIdentityString(head.get()));
Expand Down
18 changes: 9 additions & 9 deletions jsql/src/main/java/org/jaxdb/jsql/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ static final class Insert<D extends data.Table> extends Command.Modification<key
private data.Column<?>[] primaries;
final data.Column<?>[] autos;
private keyword.Select.untyped.SELECT<?> select;
private data.Column<?>[] onConflict;
private boolean isOnConflict;
private data.Column<?>[] onConflictColumns;
private boolean doUpdate;

Insert(final data.Table entity) {
Expand Down Expand Up @@ -180,25 +181,24 @@ public INSERT<D> VALUES(final keyword.Select.untyped.SELECT<?> select) {

@Override
public keyword.Insert.ON_CONFLICT ON_CONFLICT() {
isOnConflict = true;
if (entity != null)
this.onConflict = entity._primary$;
onConflictColumns = entity._primary$;
else if (primaries != null)
this.onConflict = primaries;
else
throw new IllegalArgumentException("ON CONFLICT requires primary columns in the INSERT clause");
onConflictColumns = primaries;

return this;
}

@Override
public keyword.Insert.CONFLICT_ACTION DO_UPDATE() {
this.doUpdate = true;
doUpdate = true;
return this;
}

@Override
public keyword.Insert.CONFLICT_ACTION DO_NOTHING() {
this.doUpdate = false;
doUpdate = false;
return this;
}

Expand All @@ -222,8 +222,8 @@ final data.Column<?> getColumn() {
void compile(final Compilation compilation, final boolean isExpression) throws IOException, SQLException {
final data.Column<?>[] columns = this.columns != null ? this.columns : entity._column$;
final Compiler compiler = compilation.compiler;
if (onConflict != null)
compiler.compileInsertOnConflict(columns, select, onConflict, doUpdate, compilation);
if (isOnConflict)
compiler.compileInsertOnConflict(columns, select, onConflictColumns, doUpdate, compilation);
else if (select != null)
compiler.compileInsertSelect(columns, select, false, compilation);
else
Expand Down
17 changes: 10 additions & 7 deletions jsql/src/main/java/org/jaxdb/jsql/PostgreSQLCompiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -421,15 +421,18 @@ void compileInsertOnConflict(final data.Column<?>[] columns, final Select.untype
compileInsert(columns, false, compilation);

final StringBuilder sql = compilation.sql;
sql.append(" ON CONFLICT (");
for (int i = 0, i$ = onConflict.length; i < i$; ++i) { // [A]
if (i > 0)
sql.append(", ");

onConflict[i].compile(compilation, false);
sql.append(" ON CONFLICT ");
if (onConflict != null) {
sql.append('(');
for (int i = 0, i$ = onConflict.length; i < i$; ++i) { // [A]
if (i > 0)
sql.append(", ");

onConflict[i].compile(compilation, false);
}
sql.append(')');
}

sql.append(')');
if (doUpdate) {
sql.append(" DO UPDATE SET ");

Expand Down
4 changes: 2 additions & 2 deletions jsql/src/main/java/org/jaxdb/jsql/statement.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ else if (connection == null) {
try {
count = preparedStatement.executeUpdate();
if (onNotifyCallbackList != null)
onNotifyCallbackList.count.set(count);
onNotifyCallbackList.setCount(count);

if (sessionId != null)
compiler.setSessionId(sessionStatement, null);
Expand Down Expand Up @@ -177,7 +177,7 @@ else if (connection == null) {
if (autos == null) {
count = statement.executeUpdate(compilation.toString());
if (onNotifyCallbackList != null)
onNotifyCallbackList.count.set(count);
onNotifyCallbackList.setCount(count);

if (sessionId != null)
compiler.setSessionId(statement, null);
Expand Down

0 comments on commit 028d1ae

Please sign in to comment.