Skip to content

Commit

Permalink
icewind: improve error handling and logging
Browse files Browse the repository at this point in the history
update caom2persistence dependency to latest for put performance
  • Loading branch information
pdowler committed Jul 12, 2024
1 parent 02c3683 commit b39aa6e
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 73 deletions.
6 changes: 6 additions & 0 deletions icewind/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ org.opencadc.icewind.collection={collection name}
# Maximum sleep between runs (seconds)
org.opencadc.icewind.maxIdle={integer}
# optional: number of observations to read into memory per batch (default: 100)
org.opencadc.icewind.batchSize={num}
# optional: number of threads used to read observations from repoService (default: 1 + batchSize/10)
org.opencadc.icewind.numThreads={num}
# Destination caom2 database settings
org.opencadc.icewind.caom.schema={CAOM schema name}
org.opencadc.icewind.caom.username={username for CAOM admin}
Expand Down
2 changes: 1 addition & 1 deletion icewind/VERSION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## deployable containers have a semantic and build tag
# semantic version tag: major.minor[.patch]
# build version tag: timestamp
VER=0.9.11
VER=0.9.12
TAGS="${VER} ${VER}-$(date --utc +"%Y%m%dT%H%M%S")"
unset VER
2 changes: 1 addition & 1 deletion icewind/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mainClassName = 'org.opencadc.icewind.Main'
dependencies {
implementation 'org.opencadc:cadc-util:[1.6,2.0)'
implementation 'org.opencadc:caom2:[2.4.4,2.5)'
implementation 'org.opencadc:caom2persistence:[2.4.17,2.5)'
implementation 'org.opencadc:caom2persistence:[2.4.18,2.5)'
implementation 'org.opencadc:caom2-repo:[1.4.8,1.5)'

// needed for validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private Progress doit() {
+ " modified date " + format(cur.getMaxLastModified()));
if (deleted.after(lastUpdate)) {
log.info("delete: " + de.getClass().getSimpleName() + " " + de.getURI() + " "
+ de.getID());
+ de.getID() + " " + format(de.lastModified));
obsDAO.delete(de.getID());
ret.deleted++;
} else {
Expand Down
16 changes: 15 additions & 1 deletion icewind/src/main/java/org/opencadc/icewind/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public class Main {
private static final String REPO_SERVICE_CONFIG_KEY = CONFIG_PREFIX + ".repoService";
private static final String COLLECTION_CONFIG_KEY = CONFIG_PREFIX + ".collection";
private static final String MAX_IDLE_CONFIG_KEY = CONFIG_PREFIX + ".maxIdle";
private static final String BATCH_SIZE_CONFIG_KEY = CONFIG_PREFIX + ".batchSize";
private static final String NUM_THREADS_CONFIG_KEY = CONFIG_PREFIX + ".numThreads";
private static final String DB_URL_CONFIG_KEY = CONFIG_PREFIX + ".caom.url";
private static final String DB_SCHEMA_CONFIG_KEY = CONFIG_PREFIX + ".caom.schema";
private static final String DB_USERNAME_CONFIG_KEY = CONFIG_PREFIX + ".caom.username";
Expand All @@ -124,6 +126,7 @@ public class Main {
public static void main(final String[] args) {
Log4jInit.setLevel("ca.nrc.cadc.caom2", Level.INFO);
Log4jInit.setLevel("org.opencadc.caom2", Level.INFO);
Log4jInit.setLevel("org.opencadc.icewind", Level.INFO);

try {
final PropertiesReader propertiesReader = new PropertiesReader(CONFIG_FILE_NAME);
Expand Down Expand Up @@ -222,12 +225,23 @@ public static void main(final String[] args) {

final String configuredMaxSleep = props.getFirstPropertyValue(MAX_IDLE_CONFIG_KEY);
final long maxSleep = Long.parseLong(configuredMaxSleep);

String configBatchSize = props.getFirstPropertyValue(BATCH_SIZE_CONFIG_KEY);
int batchSize = DEFAULT_BATCH_SIZE;
if (configBatchSize != null) {
batchSize = Integer.parseInt(configBatchSize);
}
String configNumThreads = props.getFirstPropertyValue(BATCH_SIZE_CONFIG_KEY);
int numThreads = 1 + batchSize / 10;
if (configNumThreads != null) {
numThreads = Integer.parseInt(configNumThreads);
}

// full=false: incremental harvest
final boolean full = false;
final boolean noChecksum = false;
CaomHarvester harvester = new CaomHarvester(sourceHarvestResource, destinationHarvestResource,
configuredCollections, basePublisherID, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_SIZE / 10,
configuredCollections, basePublisherID, batchSize, numThreads,
full, retrySkipped, noChecksum, exitWhenComplete, maxSleep);
harvester.retryErrorMessagePattern = errorMessagePattern;

Expand Down
153 changes: 84 additions & 69 deletions icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,15 @@ private Progress doit() {

String skipMsg = null;


if (destObservationDAO.getTransactionManager().isOpen()) {
throw new RuntimeException("BUG: found open transaction at start of next observation");
}
long tmp = System.currentTimeMillis();
log.debug("starting transaction");
destObservationDAO.getTransactionManager().startTransaction();
boolean ok = false;
long txnStartTime = System.currentTimeMillis() - tmp;
log.debug("skipped=" + skipped
+ " o=" + o
+ " ow.entity=" + ow.entity
Expand Down Expand Up @@ -428,90 +431,92 @@ private Progress doit() {
harvestSkipDAO.delete(hs);
} else {
// defer to the main catch for error handling
throw new HarvestReadException(ow.entity.error);
throw ow.entity.error;
}
} else if (ow.entity.error != null) {
// o == null when harvesting from service: try to make progress on failures
if (state != null && ow.entity.observationState.maxLastModified != null) {
state.curLastModified = ow.entity.observationState.maxLastModified;
state.curID = null; // unknown
}
if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) {
ObservationURI uri = new ObservationURI(hs.getSkipID());
//if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) {
if (ow.entity.error instanceof ResourceNotFoundException) {
ObservationURI uri = ow.entity.observationState.getURI();
log.info("delete: " + uri);
destObservationDAO.delete(uri);
if (hs != null) {
log.info("delete: " + hs + " " + format(hs.getLastModified()));
harvestSkipDAO.delete(hs);
}
} else {
throw new HarvestReadException(ow.entity.error);
throw ow.entity.error;
}
}

tmp = System.currentTimeMillis();
log.debug("committing transaction");
destObservationDAO.getTransactionManager().commitTransaction();
log.debug("commit: OK");

long txnCommitTime = System.currentTimeMillis() - tmp;
log.warn("transaction: start=" + txnStartTime + " commit=" + txnCommitTime);
ok = true;
ret.ingested++;
} catch (Throwable oops) {
} catch (IllegalStateException oops) {
if (oops.getMessage().contains("XML failed schema validation")) {
log.error("CONTENT PROBLEM - XML failed schema validation: " + oops.getMessage());
ret.handled++;
} else if (oops.getMessage().contains("failed to read")) {
log.error("CONTENT PROBLEM - " + oops.getMessage(), oops.getCause());
ret.handled++;
} else {
// TODO
}
skipMsg = oops.getMessage(); // message for HarvestSkipURI record
} catch (MismatchedChecksumException oops) {
log.error("CONTENT PROBLEM - mismatching checksums: " + ow.entity.observationState.getURI());
ret.handled++;
skipMsg = oops.getMessage(); // message for HarvestSkipURI record
} catch (IllegalArgumentException oops) {
log.error("CONTENT PROBLEM - invalid observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage());
if (oops.getCause() != null) {
log.error("cause: " + oops.getCause());
}
ret.handled++;
skipMsg = oops.getMessage(); // message for HarvestSkipURI record
} catch (TransientException oops) {
log.error("NETWORK PROBLEM - " + oops.getMessage());
ret.handled++;
skipMsg = oops.getMessage(); // message for HarvestSkipURI record
} catch (NullPointerException ex) {
log.error("BUG", ex);
ret.abort = true;
skipMsg = "BUG: " + ex.getClass().getName(); // message for HarvestSkipURI record
} catch (BadSqlGrammarException ex) {
log.error("BUG", ex);
BadSqlGrammarException bad = (BadSqlGrammarException) ex;
SQLException sex1 = bad.getSQLException();
if (sex1 != null) {
log.error("CAUSE", sex1);
SQLException sex2 = sex1.getNextException();
log.error("NEXT CAUSE", sex2);
}
ret.abort = true;
skipMsg = ex.getMessage(); // message for HarvestSkipURI record
} catch (DataAccessResourceFailureException ex) {
log.error("FATAL PROBLEM - probably out of space in database", ex);
ret.abort = true;
skipMsg = "FATAL: " + ex.getMessage(); // message for HarvestSkipURI record
} catch (Exception oops) {
// need to inspect the error messages
log.debug("exception during harvest", oops);
skipMsg = null;
String str = oops.toString();
if (oops instanceof HarvestReadException) {
// unwrap HarvestReadException from above
oops = oops.getCause();
// unwrap intervening RuntimeException(s)
while (oops.getCause() != null && oops instanceof RuntimeException) {
oops = oops.getCause();
}
log.error("HARVEST PROBLEM - failed to read observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage());
ret.handled++;
} else if (oops instanceof IllegalStateException) {
if (oops.getMessage().contains("XML failed schema validation")) {
log.error("CONTENT PROBLEM - XML failed schema validation: " + oops.getMessage());
ret.handled++;
} else if (oops.getMessage().contains("failed to read")) {
log.error("CONTENT PROBLEM - " + oops.getMessage(), oops.getCause());
ret.handled++;
}
} else if (oops instanceof IllegalArgumentException) {
log.error("CONTENT PROBLEM - invalid observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage());
if (oops.getCause() != null) {
log.error("cause: " + oops.getCause());
}
ret.handled++;
} else if (oops instanceof MismatchedChecksumException) {
log.error("CONTENT PROBLEM - mismatching checksums: " + ow.entity.observationState.getURI());
ret.handled++;
} else if (str.contains("duplicate key value violates unique constraint \"i_observationuri\"")) {

if (str.contains("duplicate key value violates unique constraint \"i_observationuri\"")) {
log.error("CONTENT PROBLEM - duplicate observation: " + ow.entity.observationState.getURI());
ret.handled++;
} else if (oops instanceof TransientException) {
log.error("CONTENT PROBLEM - " + oops.getMessage());
ret.handled++;
} else if (oops instanceof Error) {
log.error("FATAL - probably installation or environment", oops);
ret.abort = true;
} else if (oops instanceof NullPointerException) {
log.error("BUG", oops);
ret.abort = true;
} else if (oops instanceof BadSqlGrammarException) {
log.error("BUG", oops);
BadSqlGrammarException bad = (BadSqlGrammarException) oops;
SQLException sex1 = bad.getSQLException();
if (sex1 != null) {
log.error("CAUSE", sex1);
SQLException sex2 = sex1.getNextException();
log.error("NEXT CAUSE", sex2);
}
ret.abort = true;
} else if (oops instanceof DataAccessResourceFailureException) {
log.error("SEVERE PROBLEM - probably out of space in database", oops);
ret.abort = true;
} else if (str.contains("spherepoly_from_array")) {
log.error("CONTENT PROBLEM - failed to persist: " + ow.entity.observationState.getURI() + " - " + oops.getMessage());
log.error("PGSPHERE PROBLEM - failed to persist: " + ow.entity.observationState.getURI() + " - " + oops.getMessage());
oops = new IllegalArgumentException("invalid polygon (spoly): " + oops.getMessage(), oops);
ret.handled++;
} else if (str.contains("value out of range: underflow")) {
Expand All @@ -522,11 +527,18 @@ private Progress doit() {
}
// message for HarvestSkipURI record
skipMsg = oops.getMessage();
} catch (Error err) {
log.error("FATAL - probably installation or environment", err);
ret.abort = true;
} finally {
if (!ok) {
destObservationDAO.getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
timeTransaction += System.currentTimeMillis() - t;
try {
destObservationDAO.getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
timeTransaction += System.currentTimeMillis() - t;
} catch (Exception tex) {
log.error("failed to rollback obs transaction", tex);
}

try {
log.debug("starting HarvestSkipURI transaction");
Expand Down Expand Up @@ -574,8 +586,12 @@ private Progress doit() {
log.debug("commit HarvestSkipURI: OK");
} catch (Throwable oops) {
log.warn("failed to insert HarvestSkipURI", oops);
destObservationDAO.getTransactionManager().rollbackTransaction();
log.debug("rollback HarvestSkipURI: OK");
try {
destObservationDAO.getTransactionManager().rollbackTransaction();
log.debug("rollback HarvestSkipURI: OK");
} catch (Exception tex) {
log.error("failed to rollback skip transaction", tex);
}
ret.abort = true;
}
ret.failed++;
Expand Down Expand Up @@ -603,13 +619,6 @@ private Progress doit() {
return ret;
}

private static class HarvestReadException extends Exception {

public HarvestReadException(Exception cause) {
super(cause);
}
}

private void validateChecksum(Observation o) throws MismatchedChecksumException {
if (o.getAccMetaChecksum() == null) {
return; // no check
Expand Down Expand Up @@ -727,15 +736,21 @@ private String computeTreeSize(Observation o) {
sb.append("[");
int numA = 0;
int numP = 0;
int numC = 0;
for (Plane p : o.getPlanes()) {
numA += p.getArtifacts().size();
for (Artifact a : p.getArtifacts()) {
numP += a.getParts().size();
for (Part pp : a.getParts()) {
numC += pp.getChunks().size();
}
}
}
sb.append("1,"); // obs
sb.append(o.getPlanes().size()).append(",");
sb.append(numA).append(",");
sb.append(numP).append("]");
sb.append(numP).append(",");
sb.append(numC).append("]");
return sb.toString();
}

Expand Down

0 comments on commit b39aa6e

Please sign in to comment.