From b39aa6eeb6aa372e031c73146855025586c4c17e Mon Sep 17 00:00:00 2001 From: Patrick Dowler Date: Fri, 12 Jul 2024 08:47:04 -0700 Subject: [PATCH] icewind: improve error handling and logging update caom2persistence dependency to latest for put performance --- icewind/README.md | 6 + icewind/VERSION | 2 +- icewind/build.gradle | 2 +- .../opencadc/icewind/DeletionHarvester.java | 2 +- .../main/java/org/opencadc/icewind/Main.java | 16 +- .../icewind/ObservationHarvester.java | 153 ++++++++++-------- 6 files changed, 108 insertions(+), 73 deletions(-) diff --git a/icewind/README.md b/icewind/README.md index 0fdee119..4d73cc7a 100644 --- a/icewind/README.md +++ b/icewind/README.md @@ -33,6 +33,12 @@ org.opencadc.icewind.collection={collection name} # Maximum sleep between runs (seconds) org.opencadc.icewind.maxIdle={integer} +# optional: number of observations to read into memory per batch (default: 100) +org.opencadc.icewind.batchSize={num} + +# optional: number of threads used to read observations from repoService (default: 1 + batchSize/10) +org.opencadc.icewind.numThreads={num} + # Destination caom2 database settings org.opencadc.icewind.caom.schema={CAOM schema name} org.opencadc.icewind.caom.username={username for CAOM admin} diff --git a/icewind/VERSION b/icewind/VERSION index 5677aca3..9a449308 100644 --- a/icewind/VERSION +++ b/icewind/VERSION @@ -1,6 +1,6 @@ ## deployable containers have a semantic and build tag # semantic version tag: major.minor[.patch] # build version tag: timestamp -VER=0.9.11 +VER=0.9.12 TAGS="${VER} ${VER}-$(date --utc +"%Y%m%dT%H%M%S")" unset VER diff --git a/icewind/build.gradle b/icewind/build.gradle index e54f9495..12a53177 100644 --- a/icewind/build.gradle +++ b/icewind/build.gradle @@ -19,7 +19,7 @@ mainClassName = 'org.opencadc.icewind.Main' dependencies { implementation 'org.opencadc:cadc-util:[1.6,2.0)' implementation 'org.opencadc:caom2:[2.4.4,2.5)' - implementation 'org.opencadc:caom2persistence:[2.4.17,2.5)' + implementation 'org.opencadc:caom2persistence:[2.4.18,2.5)' implementation 'org.opencadc:caom2-repo:[1.4.8,1.5)' // needed for validation diff --git a/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java b/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java index c1c587c7..a8c185f4 100644 --- a/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java @@ -322,7 +322,7 @@ private Progress doit() { + " modified date " + format(cur.getMaxLastModified())); if (deleted.after(lastUpdate)) { log.info("delete: " + de.getClass().getSimpleName() + " " + de.getURI() + " " - + de.getID()); + + de.getID() + " " + format(de.lastModified)); obsDAO.delete(de.getID()); ret.deleted++; } else { diff --git a/icewind/src/main/java/org/opencadc/icewind/Main.java b/icewind/src/main/java/org/opencadc/icewind/Main.java index 696f26b9..381ffdf1 100644 --- a/icewind/src/main/java/org/opencadc/icewind/Main.java +++ b/icewind/src/main/java/org/opencadc/icewind/Main.java @@ -103,6 +103,8 @@ public class Main { private static final String REPO_SERVICE_CONFIG_KEY = CONFIG_PREFIX + ".repoService"; private static final String COLLECTION_CONFIG_KEY = CONFIG_PREFIX + ".collection"; private static final String MAX_IDLE_CONFIG_KEY = CONFIG_PREFIX + ".maxIdle"; + private static final String BATCH_SIZE_CONFIG_KEY = CONFIG_PREFIX + ".batchSize"; + private static final String NUM_THREADS_CONFIG_KEY = CONFIG_PREFIX + ".numThreads"; private static final String DB_URL_CONFIG_KEY = CONFIG_PREFIX + ".caom.url"; private static final String DB_SCHEMA_CONFIG_KEY = CONFIG_PREFIX + ".caom.schema"; private static final String DB_USERNAME_CONFIG_KEY = CONFIG_PREFIX + ".caom.username"; @@ -124,6 +126,7 @@ public class Main { public static void main(final String[] args) { Log4jInit.setLevel("ca.nrc.cadc.caom2", Level.INFO); Log4jInit.setLevel("org.opencadc.caom2", Level.INFO); + Log4jInit.setLevel("org.opencadc.icewind", Level.INFO); try { final PropertiesReader propertiesReader = new PropertiesReader(CONFIG_FILE_NAME); @@ -222,12 +225,23 @@ public static void main(final String[] args) { final String configuredMaxSleep = props.getFirstPropertyValue(MAX_IDLE_CONFIG_KEY); final long maxSleep = Long.parseLong(configuredMaxSleep); + + String configBatchSize = props.getFirstPropertyValue(BATCH_SIZE_CONFIG_KEY); + int batchSize = DEFAULT_BATCH_SIZE; + if (configBatchSize != null) { + batchSize = Integer.parseInt(configBatchSize); + } + String configNumThreads = props.getFirstPropertyValue(BATCH_SIZE_CONFIG_KEY); + int numThreads = 1 + batchSize / 10; + if (configNumThreads != null) { + numThreads = Integer.parseInt(configNumThreads); + } // full=false: incremental harvest final boolean full = false; final boolean noChecksum = false; CaomHarvester harvester = new CaomHarvester(sourceHarvestResource, destinationHarvestResource, - configuredCollections, basePublisherID, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_SIZE / 10, + configuredCollections, basePublisherID, batchSize, numThreads, full, retrySkipped, noChecksum, exitWhenComplete, maxSleep); harvester.retryErrorMessagePattern = errorMessagePattern; diff --git a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java index 93e8d704..6e5bc259 100644 --- a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java @@ -340,12 +340,15 @@ private Progress doit() { String skipMsg = null; + if (destObservationDAO.getTransactionManager().isOpen()) { throw new RuntimeException("BUG: found open transaction at start of next observation"); } + long tmp = System.currentTimeMillis(); log.debug("starting transaction"); destObservationDAO.getTransactionManager().startTransaction(); boolean ok = false; + long txnStartTime = System.currentTimeMillis() - tmp; log.debug("skipped=" + skipped + " o=" + o + " ow.entity=" + ow.entity @@ -428,7 +431,7 @@ private Progress doit() { harvestSkipDAO.delete(hs); } else { // defer to the main catch for error handling - throw new HarvestReadException(ow.entity.error); + throw ow.entity.error; } } else if (ow.entity.error != null) { // o == null when harvesting from service: try to make progress on failures @@ -436,8 +439,9 @@ private Progress doit() { state.curLastModified = ow.entity.observationState.maxLastModified; state.curID = null; // unknown } - if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) { - ObservationURI uri = new ObservationURI(hs.getSkipID()); + //if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) { + if (ow.entity.error instanceof ResourceNotFoundException) { + ObservationURI uri = ow.entity.observationState.getURI(); log.info("delete: " + uri); destObservationDAO.delete(uri); if (hs != null) { @@ -445,73 +449,74 @@ private Progress doit() { harvestSkipDAO.delete(hs); } } else { - throw new HarvestReadException(ow.entity.error); + throw ow.entity.error; } } + tmp = System.currentTimeMillis(); log.debug("committing transaction"); destObservationDAO.getTransactionManager().commitTransaction(); log.debug("commit: OK"); - + long txnCommitTime = System.currentTimeMillis() - tmp; + log.warn("transaction: start=" + txnStartTime + " commit=" + txnCommitTime); ok = true; ret.ingested++; - } catch (Throwable oops) { + } catch (IllegalStateException oops) { + if (oops.getMessage().contains("XML failed schema validation")) { + log.error("CONTENT PROBLEM - XML failed schema validation: " + oops.getMessage()); + ret.handled++; + } else if (oops.getMessage().contains("failed to read")) { + log.error("CONTENT PROBLEM - " + oops.getMessage(), oops.getCause()); + ret.handled++; + } else { + // TODO + } + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (MismatchedChecksumException oops) { + log.error("CONTENT PROBLEM - mismatching checksums: " + ow.entity.observationState.getURI()); + ret.handled++; + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (IllegalArgumentException oops) { + log.error("CONTENT PROBLEM - invalid observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); + if (oops.getCause() != null) { + log.error("cause: " + oops.getCause()); + } + ret.handled++; + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (TransientException oops) { + log.error("NETWORK PROBLEM - " + oops.getMessage()); + ret.handled++; + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (NullPointerException ex) { + log.error("BUG", ex); + ret.abort = true; + skipMsg = "BUG: " + ex.getClass().getName(); // message for HarvestSkipURI record + } catch (BadSqlGrammarException ex) { + log.error("BUG", ex); + BadSqlGrammarException bad = (BadSqlGrammarException) ex; + SQLException sex1 = bad.getSQLException(); + if (sex1 != null) { + log.error("CAUSE", sex1); + SQLException sex2 = sex1.getNextException(); + log.error("NEXT CAUSE", sex2); + } + ret.abort = true; + skipMsg = ex.getMessage(); // message for HarvestSkipURI record + } catch (DataAccessResourceFailureException ex) { + log.error("FATAL PROBLEM - probably out of space in database", ex); + ret.abort = true; + skipMsg = "FATAL: " + ex.getMessage(); // message for HarvestSkipURI record + } catch (Exception oops) { + // need to inspect the error messages log.debug("exception during harvest", oops); skipMsg = null; String str = oops.toString(); - if (oops instanceof HarvestReadException) { - // unwrap HarvestReadException from above - oops = oops.getCause(); - // unwrap intervening RuntimeException(s) - while (oops.getCause() != null && oops instanceof RuntimeException) { - oops = oops.getCause(); - } - log.error("HARVEST PROBLEM - failed to read observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); - ret.handled++; - } else if (oops instanceof IllegalStateException) { - if (oops.getMessage().contains("XML failed schema validation")) { - log.error("CONTENT PROBLEM - XML failed schema validation: " + oops.getMessage()); - ret.handled++; - } else if (oops.getMessage().contains("failed to read")) { - log.error("CONTENT PROBLEM - " + oops.getMessage(), oops.getCause()); - ret.handled++; - } - } else if (oops instanceof IllegalArgumentException) { - log.error("CONTENT PROBLEM - invalid observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); - if (oops.getCause() != null) { - log.error("cause: " + oops.getCause()); - } - ret.handled++; - } else if (oops instanceof MismatchedChecksumException) { - log.error("CONTENT PROBLEM - mismatching checksums: " + ow.entity.observationState.getURI()); - ret.handled++; - } else if (str.contains("duplicate key value violates unique constraint \"i_observationuri\"")) { + + if (str.contains("duplicate key value violates unique constraint \"i_observationuri\"")) { log.error("CONTENT PROBLEM - duplicate observation: " + ow.entity.observationState.getURI()); ret.handled++; - } else if (oops instanceof TransientException) { - log.error("CONTENT PROBLEM - " + oops.getMessage()); - ret.handled++; - } else if (oops instanceof Error) { - log.error("FATAL - probably installation or environment", oops); - ret.abort = true; - } else if (oops instanceof NullPointerException) { - log.error("BUG", oops); - ret.abort = true; - } else if (oops instanceof BadSqlGrammarException) { - log.error("BUG", oops); - BadSqlGrammarException bad = (BadSqlGrammarException) oops; - SQLException sex1 = bad.getSQLException(); - if (sex1 != null) { - log.error("CAUSE", sex1); - SQLException sex2 = sex1.getNextException(); - log.error("NEXT CAUSE", sex2); - } - ret.abort = true; - } else if (oops instanceof DataAccessResourceFailureException) { - log.error("SEVERE PROBLEM - probably out of space in database", oops); - ret.abort = true; } else if (str.contains("spherepoly_from_array")) { - log.error("CONTENT PROBLEM - failed to persist: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); + log.error("PGSPHERE PROBLEM - failed to persist: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); oops = new IllegalArgumentException("invalid polygon (spoly): " + oops.getMessage(), oops); ret.handled++; } else if (str.contains("value out of range: underflow")) { @@ -522,11 +527,18 @@ private Progress doit() { } // message for HarvestSkipURI record skipMsg = oops.getMessage(); + } catch (Error err) { + log.error("FATAL - probably installation or environment", err); + ret.abort = true; } finally { if (!ok) { - destObservationDAO.getTransactionManager().rollbackTransaction(); - log.debug("rollback: OK"); - timeTransaction += System.currentTimeMillis() - t; + try { + destObservationDAO.getTransactionManager().rollbackTransaction(); + log.debug("rollback: OK"); + timeTransaction += System.currentTimeMillis() - t; + } catch (Exception tex) { + log.error("failed to rollback obs transaction", tex); + } try { log.debug("starting HarvestSkipURI transaction"); @@ -574,8 +586,12 @@ private Progress doit() { log.debug("commit HarvestSkipURI: OK"); } catch (Throwable oops) { log.warn("failed to insert HarvestSkipURI", oops); - destObservationDAO.getTransactionManager().rollbackTransaction(); - log.debug("rollback HarvestSkipURI: OK"); + try { + destObservationDAO.getTransactionManager().rollbackTransaction(); + log.debug("rollback HarvestSkipURI: OK"); + } catch (Exception tex) { + log.error("failed to rollback skip transaction", tex); + } ret.abort = true; } ret.failed++; @@ -603,13 +619,6 @@ private Progress doit() { return ret; } - private static class HarvestReadException extends Exception { - - public HarvestReadException(Exception cause) { - super(cause); - } - } - private void validateChecksum(Observation o) throws MismatchedChecksumException { if (o.getAccMetaChecksum() == null) { return; // no check @@ -727,15 +736,21 @@ private String computeTreeSize(Observation o) { sb.append("["); int numA = 0; int numP = 0; + int numC = 0; for (Plane p : o.getPlanes()) { numA += p.getArtifacts().size(); for (Artifact a : p.getArtifacts()) { numP += a.getParts().size(); + for (Part pp : a.getParts()) { + numC += pp.getChunks().size(); + } } } + sb.append("1,"); // obs sb.append(o.getPlanes().size()).append(","); sb.append(numA).append(","); - sb.append(numP).append("]"); + sb.append(numP).append(","); + sb.append(numC).append("]"); return sb.toString(); }