Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
redkale committed Feb 15, 2020
1 parent 96408a6 commit 5441537
Showing 1 changed file with 51 additions and 11 deletions.
62 changes: 51 additions & 11 deletions src/org/redkalex/source/mysql/MysqlDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected <T> CompletableFuture<Integer> insertDB(EntityInfo<T> info, T... value
String realsql = ba.toString(StandardCharsets.UTF_8);
if (info.isLoggable(logger, Level.FINEST, realsql)) logger.finest(info.getType().getSimpleName() + " insert sql=" + realsql);
}
return writePool.pollAsync().thenCompose((conn) -> executeBatchUpdate(info, conn, UpdateMode.INSERT, values[0], true, sqlBytesArray).thenApply((int[] rs) -> {
return writePool.pollAsync().thenCompose((conn) -> executeBatchUpdate(info, conn, UpdateMode.INSERT, values[0], sqlBytesArray).thenApply((int[] rs) -> {
int count = 0;
for (int i : rs) count += i;
return count;
Expand All @@ -133,15 +133,55 @@ protected <T> CompletableFuture<Integer> clearTableDB(EntityInfo<T> info, final
if (info.isLoggable(logger, Level.FINEST)) {
if (info.isLoggable(logger, Level.FINEST, sql)) logger.finest(info.getType().getSimpleName() + " clearTable sql=" + sql);
}
return writePool.pollAsync().thenCompose((conn) -> executeOneUpdate(info, conn, UpdateMode.CLEAR, sql.getBytes(StandardCharsets.UTF_8)));
return writePool.pollAsync().thenCompose((conn) -> {
CompletableFuture<Integer> future = executeOneUpdate(info, conn, UpdateMode.CLEAR, sql.getBytes(StandardCharsets.UTF_8));
final CompletableFuture<Integer> newFuture = new CompletableFuture<>();
future.whenComplete((o, ex1) -> {
if (ex1 == null) {
newFuture.complete(o);
return;
}
try {
while (ex1 instanceof CompletionException) ex1 = ex1.getCause();
if (info.isTableNotExist((SQLException) ex1)) {
newFuture.complete(-1);
} else {
newFuture.completeExceptionally(ex1);
}
} catch (Throwable e) {
newFuture.completeExceptionally(ex1);
}
});
return newFuture;
});
}

@Override
protected <T> CompletableFuture<Integer> dropTableDB(EntityInfo<T> info, final String table, String sql) {
if (info.isLoggable(logger, Level.FINEST)) {
if (info.isLoggable(logger, Level.FINEST, sql)) logger.finest(info.getType().getSimpleName() + " dropTable sql=" + sql);
}
return writePool.pollAsync().thenCompose((conn) -> executeOneUpdate(info, conn, UpdateMode.DROP, sql.getBytes(StandardCharsets.UTF_8)));
return writePool.pollAsync().thenCompose((conn) -> {
CompletableFuture<Integer> future = executeOneUpdate(info, conn, UpdateMode.DROP, sql.getBytes(StandardCharsets.UTF_8));
final CompletableFuture<Integer> newFuture = new CompletableFuture<>();
future.whenComplete((o, ex1) -> {
if (ex1 == null) {
newFuture.complete(o);
return;
}
try {
while (ex1 instanceof CompletionException) ex1 = ex1.getCause();
if (info.isTableNotExist((SQLException) ex1)) {
newFuture.complete(-1);
} else {
newFuture.completeExceptionally(ex1);
}
} catch (Throwable e) {
newFuture.completeExceptionally(ex1);
}
});
return newFuture;
});
}

@Override
Expand Down Expand Up @@ -178,7 +218,7 @@ protected <T> CompletableFuture<Integer> updateDB(EntityInfo<T> info, final T...
}
ba.clear();
}
return writePool.pollAsync().thenCompose((conn) -> executeBatchUpdate(info, conn, UpdateMode.UPDATE, null, false, sqlBytesArray).thenApply((int[] rs) -> {
return writePool.pollAsync().thenCompose((conn) -> executeBatchUpdate(info, conn, UpdateMode.UPDATE, null, sqlBytesArray).thenApply((int[] rs) -> {
int count = 0;
for (int i : rs) count += i;
return count;
Expand Down Expand Up @@ -390,7 +430,7 @@ protected CompletableFuture<ResultSet> exceptionallyQueryTableNotExist(Completab
}

protected <T> CompletableFuture<Integer> exceptionallyUpdateTableNotExist(CompletableFuture<Integer> future,
EntityInfo<T> info, final AsyncConnection conn, final UpdateMode mode, final byte[] array, final T oneEntity, boolean checkAndCreateTable, final byte[] sqlBytes) {
EntityInfo<T> info, final AsyncConnection conn, final UpdateMode mode, final byte[] array, final T oneEntity, final byte[] sqlBytes) {
final CompletableFuture<Integer> newFuture = new CompletableFuture<>();
future.whenComplete((o, ex1) -> {
if (ex1 == null) {
Expand All @@ -400,7 +440,7 @@ protected <T> CompletableFuture<Integer> exceptionallyUpdateTableNotExist(Comple
try {
while (ex1 instanceof CompletionException) ex1 = ex1.getCause();
if (info.getTableStrategy() != null && ex1 instanceof SQLException && info.isTableNotExist((SQLException) ex1)) {
if (!checkAndCreateTable) { //update、delete、clear或drop
if (mode != UpdateMode.INSERT) { //update、delete、clear或drop
newFuture.complete((mode == UpdateMode.DROP || mode == UpdateMode.CLEAR) ? -1 : 0);
return;
}
Expand Down Expand Up @@ -491,14 +531,14 @@ protected static <T> byte[] formatPrepareParam(EntityInfo<T> info, Attribute<T,
}

protected <T> CompletableFuture<Integer> executeOneUpdate(final EntityInfo<T> info, final AsyncConnection conn, final UpdateMode mode, final byte[] sqlBytes) {
return executeBatchUpdate(info, conn, mode, null, false, sqlBytes).thenApply(a -> a[0]);
return executeBatchUpdate(info, conn, mode, null, sqlBytes).thenApply(a -> a[0]);
}

protected <T> CompletableFuture<int[]> executeBatchUpdate(final EntityInfo<T> info, final AsyncConnection conn, final UpdateMode mode, final T oneEntity, boolean checkAndCreateTable, final byte[]... sqlBytesArray) {
protected <T> CompletableFuture<int[]> executeBatchUpdate(final EntityInfo<T> info, final AsyncConnection conn, final UpdateMode mode, final T oneEntity, final byte[]... sqlBytesArray) {
final byte[] array = conn.getAttribute(MyPoolSource.CONN_ATTR_BYTES_NAME);
if (sqlBytesArray.length == 1) {
return executeAtomicOneUpdate(info, conn, array, SQL_SET_AUTOCOMMIT_1).thenCompose(o
-> checkAndCreateTable ? exceptionallyUpdateTableNotExist(executeAtomicOneUpdate(info, conn, array, sqlBytesArray[0]), info, conn, mode, array, oneEntity, checkAndCreateTable, sqlBytesArray[0])
-> mode == UpdateMode.INSERT ? exceptionallyUpdateTableNotExist(executeAtomicOneUpdate(info, conn, array, sqlBytesArray[0]), info, conn, mode, array, oneEntity, sqlBytesArray[0])
: executeAtomicOneUpdate(info, conn, array, sqlBytesArray[0])).thenApply(a -> new int[]{a}).whenComplete((o, t) -> {
if (t == null) {
writePool.offerConnection(conn);
Expand All @@ -517,7 +557,7 @@ protected <T> CompletableFuture<int[]> executeBatchUpdate(final EntityInfo<T> in
future = future.thenCompose(a -> {
CompletableFuture<Integer> nextFuture = executeAtomicOneUpdate(info, conn, array, sqlBytes);
nextFuture.thenAccept(b -> rs[index] = b);
if (checkAndCreateTable && info != null && info.getTableStrategy() != null) nextFuture = exceptionallyUpdateTableNotExist(nextFuture, info, conn, mode, array, oneEntity, checkAndCreateTable, sqlBytes);
if (mode == UpdateMode.INSERT && info != null && info.getTableStrategy() != null) nextFuture = exceptionallyUpdateTableNotExist(nextFuture, info, conn, mode, array, oneEntity, sqlBytes);
nextFuture.whenComplete((o, t) -> {
if (t != null) executeAtomicOneUpdate(info, conn, array, SQL_ROLLBACK).join();
});
Expand Down Expand Up @@ -775,7 +815,7 @@ public int[] directExecute(String... sqls) {
sqlBytesArray[i] = sqls[i].getBytes(StandardCharsets.UTF_8);

}
return writePool.pollAsync().thenCompose((conn) -> executeBatchUpdate(null, conn, UpdateMode.OTHER, null, false, sqlBytesArray)).join();
return writePool.pollAsync().thenCompose((conn) -> executeBatchUpdate(null, conn, UpdateMode.OTHER, null, sqlBytesArray)).join();
}

@Local
Expand Down

0 comments on commit 5441537

Please sign in to comment.