From 28399f9cdb2b5652fd2196f322c53109182f2420 Mon Sep 17 00:00:00 2001 From: Patrick Dowler Date: Fri, 12 Jul 2024 08:42:56 -0700 Subject: [PATCH 1/4] caom2persistence: re-implement select observation skeleton use recursive queries instead of join due to bad query plan in PG15 --- caom2persistence/build.gradle | 2 +- .../caom2/persistence/ObservationDAO.java | 145 +++++++++++++++--- .../ObservationSkeletonExtractor.java | 59 +++++++ .../cadc/caom2/persistence/SQLGenerator.java | 33 ++-- 4 files changed, 205 insertions(+), 34 deletions(-) diff --git a/caom2persistence/build.gradle b/caom2persistence/build.gradle index b485279e..df2b21f7 100644 --- a/caom2persistence/build.gradle +++ b/caom2persistence/build.gradle @@ -17,7 +17,7 @@ sourceCompatibility = 1.8 group = 'org.opencadc' -version = '2.4.17' +version = '2.4.18' description = 'OpenCADC CAOM database library' def git_url = 'https://github.com/opencadc/caom2db' diff --git a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java index 342cd5b2..d157d667 100644 --- a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java +++ b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java @@ -3,7 +3,7 @@ ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** * - * (c) 2023. (c) 2023. + * (c) 2024. (c) 2024. * Government of Canada Gouvernement du Canada * National Research Council Conseil national de recherches * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 @@ -89,7 +89,6 @@ import ca.nrc.cadc.caom2.util.CaomValidator; import ca.nrc.cadc.date.DateUtil; import ca.nrc.cadc.net.PreconditionFailedException; -import ca.nrc.cadc.net.TransientException; import java.net.URI; import java.text.DateFormat; import java.util.ArrayList; @@ -454,25 +453,37 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa // NOTE: this is by ID which means to update the caller must get(uri) then put(o) // and if they do not get(uri) they can get a duplicate observation error // if they violate unique keys - String sql = gen.getSelectSQL(obs.getID(), SQLGenerator.MAX_DEPTH, true); - log.debug("PUT: " + sql); + //String skelSQL = gen.getSelectSQL(obs.getID(), 1, true); + //log.warn("PUT: " + skelSQL); + + long tt = System.currentTimeMillis(); + final ObservationSkeleton dirtyRead = getSkelImpl(obs.getID(), jdbc, false); // obs only + final long dirtyReadTime = System.currentTimeMillis() - tt; - final ObservationSkeleton dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor()); - log.debug("starting transaction"); + tt = System.currentTimeMillis(); getTransactionManager().startTransaction(); txnOpen = true; - + final long txnStartTime = System.currentTimeMillis() - tt; + // obtain row lock on observation update ObservationSkeleton cur = null; + long lockTime = 0L; + long selectSkelTime = 0L; if (dirtyRead != null) { + String lock = gen.getUpdateLockSQL(obs.getID()); log.debug("LOCK SQL: " + lock); + tt = System.currentTimeMillis(); jdbc.update(lock); + lockTime = System.currentTimeMillis() - tt; // req-acquire current state after obtaining lock - cur = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor()); - + tt = System.currentTimeMillis(); + //skelSQL = gen.getSelectSQL(obs.getID(), SQLGenerator.MAX_DEPTH, true); + cur = getSkelImpl(obs.getID(), jdbc, true); + selectSkelTime = System.currentTimeMillis() - tt; + // check conditional update if (expectedMetaChecksum != null) { if (cur == null) { @@ -490,6 +501,7 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa updateEntity(obs, cur, now); // delete obsolete children + tt = System.currentTimeMillis(); List> pairs = new ArrayList>(); if (cur != null) { // delete the skeletons that are not in obs.getPlanes() @@ -510,49 +522,129 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa pairs.add(new Pair(null, p)); } } - + final long deletePlanesTime = System.currentTimeMillis() - tt; + + tt = System.currentTimeMillis(); super.put(cur, obs, null, jdbc); - + final long putObservationTime = System.currentTimeMillis() - tt; + // insert/update children + tt = System.currentTimeMillis(); LinkedList parents = new LinkedList(); parents.push(obs); for (Pair p : pairs) { planeDAO.put(p.cur, p.val, parents, jdbc); } - + final long putPlanesTime = System.currentTimeMillis() - tt; + log.debug("committing transaction"); + tt = System.currentTimeMillis(); getTransactionManager().commitTransaction(); log.debug("commit: OK"); + final long txnCommitTime = System.currentTimeMillis() - tt; txnOpen = false; + + log.warn("transaction start=" + txnStartTime + + " dirtyRead=" + dirtyReadTime + + " lock=" + lockTime + + " select=" + selectSkelTime + + " del-planes=" + deletePlanesTime + + " put-obs=" + putObservationTime + + " put-planes=" + putPlanesTime + + " commit=" + txnCommitTime); } catch (DataIntegrityViolationException e) { log.debug("failed to insert " + obs + ": ", e); - getTransactionManager().rollbackTransaction(); - log.debug("rollback: OK"); + try { + getTransactionManager().rollbackTransaction(); + log.debug("rollback: OK"); + } catch (Exception tex) { + log.error("failed to rollback", tex); + } txnOpen = false; throw e; } catch (PreconditionFailedException ex) { log.debug("failed to update " + obs + ": ", ex); - getTransactionManager().rollbackTransaction(); - log.debug("rollback: OK"); + try { + getTransactionManager().rollbackTransaction(); + log.debug("rollback: OK"); + } catch (Exception tex) { + log.error("failed to rollback", tex); + } txnOpen = false; throw ex; } catch (Exception e) { log.error("failed to insert " + obs + ": ", e); - getTransactionManager().rollbackTransaction(); - log.debug("rollback: OK"); + try { + getTransactionManager().rollbackTransaction(); + log.debug("rollback: OK"); + } catch (Exception tex) { + log.error("failed to rollback", tex); + } txnOpen = false; throw e; } finally { if (txnOpen) { log.error("BUG - open transaction in finally"); - getTransactionManager().rollbackTransaction(); - log.error("rollback: OK"); + try { + getTransactionManager().rollbackTransaction(); + log.debug("rollback: OK"); + } catch (Exception tex) { + log.error("BADNESS: failed to rollback in finally", tex); + } } long dt = System.currentTimeMillis() - t; log.debug("PUT: " + obs.getURI() + " " + dt + "ms"); } } + private ObservationSkeleton getSkelImpl(UUID id, JdbcTemplate jdbc, boolean complete) { + return getSkelNav(id, jdbc, complete); + } + + private ObservationSkeleton getSkelJoin(UUID id, JdbcTemplate jdbc) { + String skelSQL = gen.getSelectSQL(id, SQLGenerator.MAX_DEPTH, true); + log.debug("getSkel: " + skelSQL); + ObservationSkeleton ret = (ObservationSkeleton) jdbc.query(skelSQL, new ObservationSkeletonExtractor()); + return ret; + } + + private ObservationSkeleton getSkelNav(UUID id, JdbcTemplate jdbc, boolean complete) { + ObservationSkeletonExtractor ose = new ObservationSkeletonExtractor(); + String skelSQL = gen.getSelectSQL(ObservationSkeleton.class, id, true); // by PK + log.debug("getSkel: " + skelSQL); + List skels = jdbc.query(skelSQL, ose.observationMapper); + if (skels == null || skels.isEmpty()) { + return null; + } + ObservationSkeleton ret = skels.get(0); + if (!complete) { + return ret; + } + String planeSkelSQL = gen.getSelectSQL(PlaneSkeleton.class, id, false); // by FK + log.debug("getSkel: " + planeSkelSQL); + List planes = jdbc.query(planeSkelSQL, ose.planeMapper); + for (PlaneSkeleton ps : planes) { + ret.planes.add(ps); + String artifactSkelSQL = gen.getSelectSQL(ArtifactSkeleton.class, ps.id, false); // by FK + log.debug("getSkel: " + artifactSkelSQL); + List artifacts = jdbc.query(artifactSkelSQL, ose.artifactMapper); + for (ArtifactSkeleton as : artifacts) { + ps.artifacts.add(as); + String partSkelSQL = gen.getSelectSQL(PartSkeleton.class, as.id, false); // by FK + log.debug("getSkel: " + partSkelSQL); + List parts = jdbc.query(partSkelSQL, ose.partMapper); + for (PartSkeleton pas : parts) { + as.parts.add(pas); + String chunkSkelSQL = gen.getSelectSQL(ChunkSkeleton.class, pas.id, false); // by FK + log.debug("getSkel: " + chunkSkelSQL); + List chunks = jdbc.query(chunkSkelSQL, ose.chunkMapper); + pas.chunks.addAll(chunks); + } + } + } + return ret; + } + /** * Delete a stored observation by URI. * @@ -592,7 +684,12 @@ private void deleteImpl(UUID id, ObservationURI uri) { sql = gen.getSelectSQL(uri, SQLGenerator.MAX_DEPTH, true); } log.debug("DELETE: " + sql); - final ObservationSkeleton dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor()); + ObservationSkeleton dirtyRead; + if (id != null) { + dirtyRead = getSkelImpl(id, jdbc, false); // obs only + } else { + dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor()); + } log.debug("starting transaction"); getTransactionManager().startTransaction(); @@ -606,7 +703,11 @@ private void deleteImpl(UUID id, ObservationURI uri) { jdbc.update(lock); // req-acquire current state after obtaining lock - skel = (ObservationSkeleton) jdbc.query(sql, gen.getSkeletonExtractor(ObservationSkeleton.class)); + if (id != null) { + skel = getSkelImpl(id, jdbc, true); + } else { + skel = (ObservationSkeleton) jdbc.query(sql, gen.getSkeletonExtractor(ObservationSkeleton.class)); + } } if (skel != null) { diff --git a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationSkeletonExtractor.java b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationSkeletonExtractor.java index 4368be75..9ce147b4 100644 --- a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationSkeletonExtractor.java +++ b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationSkeletonExtractor.java @@ -74,6 +74,7 @@ import ca.nrc.cadc.caom2.persistence.skel.ObservationSkeleton; import ca.nrc.cadc.caom2.persistence.skel.PartSkeleton; import ca.nrc.cadc.caom2.persistence.skel.PlaneSkeleton; +import ca.nrc.cadc.caom2.persistence.skel.Skeleton; import ca.nrc.cadc.date.DateUtil; import java.net.URI; import java.sql.ResultSet; @@ -84,6 +85,7 @@ import org.apache.log4j.Logger; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ResultSetExtractor; +import org.springframework.jdbc.core.RowMapper; /** * @@ -96,6 +98,7 @@ public class ObservationSkeletonExtractor implements ResultSetExtractor { // lastModified,id,... for obs,plane,artifact,part,chunk private final Calendar utcCalendar = Calendar.getInstance(DateUtil.UTC); + // extract from join @Override public Object extractData(ResultSet rs) throws SQLException, DataAccessException { ObservationSkeleton ret = null; @@ -132,6 +135,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException } if (ncol > col) { // plane + col++; // skip FK d = Util.getDate(rs, col++, utcCalendar); md = Util.getDate(rs, col++, utcCalendar); cs = Util.getURI(rs, col++); @@ -150,6 +154,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException } if (ncol > col) { // artifact + col++; // skip FK d = Util.getDate(rs, col++, utcCalendar); md = Util.getDate(rs, col++, utcCalendar); cs = Util.getURI(rs, col++); @@ -168,6 +173,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException } if (ncol > col) { // part + col++; // skip FK d = Util.getDate(rs, col++, utcCalendar); md = Util.getDate(rs, col++, utcCalendar); cs = Util.getURI(rs, col++); @@ -186,6 +192,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException } if (ncol > col) { // chunk + col++; // skip FK d = Util.getDate(rs, col++, utcCalendar); md = Util.getDate(rs, col++, utcCalendar); cs = Util.getURI(rs, col++); @@ -213,4 +220,56 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException return ret; } + public void extract(Skeleton ret, ResultSet rs, int col) throws SQLException { + ret.lastModified = Util.getDate(rs, col++, utcCalendar); + ret.maxLastModified = Util.getDate(rs, col++, utcCalendar); + ret.metaChecksum = Util.getURI(rs, col++); + ret.accMetaChecksum = Util.getURI(rs, col++); + ret.id = Util.getUUID(rs, col++); + } + + public final RowMapper observationMapper = new RowMapper() { + @Override + public ObservationSkeleton mapRow(ResultSet rs, int i) throws SQLException { + ObservationSkeleton ret = new ObservationSkeleton(); + extract(ret, rs, 1); + return ret; + } + }; + + public final RowMapper planeMapper = new RowMapper() { + @Override + public PlaneSkeleton mapRow(ResultSet rs, int i) throws SQLException { + PlaneSkeleton ret = new PlaneSkeleton(); + extract(ret, rs, 2); // skip FK + return ret; + } + }; + + public final RowMapper artifactMapper = new RowMapper() { + @Override + public ArtifactSkeleton mapRow(ResultSet rs, int i) throws SQLException { + ArtifactSkeleton ret = new ArtifactSkeleton(); + extract(ret, rs, 2); // skip FK + return ret; + } + }; + + public final RowMapper partMapper = new RowMapper() { + @Override + public PartSkeleton mapRow(ResultSet rs, int i) throws SQLException { + PartSkeleton ret = new PartSkeleton(); + extract(ret, rs, 2); // skip FK + return ret; + } + }; + + public final RowMapper chunkMapper = new RowMapper() { + @Override + public ChunkSkeleton mapRow(ResultSet rs, int i) throws SQLException { + ChunkSkeleton ret = new ChunkSkeleton(); + extract(ret, rs, 2); // skip FK + return ret; + } + }; } diff --git a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/SQLGenerator.java b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/SQLGenerator.java index b942348d..c1869704 100644 --- a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/SQLGenerator.java +++ b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/SQLGenerator.java @@ -571,11 +571,12 @@ protected void init() { }; columnMap.put(DeletedObservation.class, deletedObservationCols); + // FK column first, PK last (except observation columnMap.put(ObservationSkeleton.class, new String[]{"lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "obsID"}); - columnMap.put(PlaneSkeleton.class, new String[]{"lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "planeID"}); - columnMap.put(ArtifactSkeleton.class, new String[]{"lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "artifactID"}); - columnMap.put(PartSkeleton.class, new String[]{"lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "partID"}); - columnMap.put(ChunkSkeleton.class, new String[]{"lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "chunkID"}); + columnMap.put(PlaneSkeleton.class, new String[]{"obsID", "lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "planeID"}); + columnMap.put(ArtifactSkeleton.class, new String[]{"planeID", "lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "artifactID"}); + columnMap.put(PartSkeleton.class, new String[]{"artifactID", "lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "partID"}); + columnMap.put(ChunkSkeleton.class, new String[]{"partID", "lastModified", "maxLastModified", "metaChecksum", "accMetaChecksum", "chunkID"}); columnMap.put(ObservationState.class, new String[]{"collection", "observationID", "maxLastModified", "accMetaChecksum", "obsID"}); } @@ -751,6 +752,11 @@ public String getSelectSQL(Class c, Date minLastModified, Date maxLastModified, } public String getSelectSQL(Class clz, UUID id) { + return getSelectSQL(clz, id, true); + } + + public String getSelectSQL(Class clz, UUID id, boolean primaryKey) { + StringBuilder sb = new StringBuilder(); sb.append("SELECT "); String[] cols = columnMap.get(clz); @@ -763,7 +769,11 @@ public String getSelectSQL(Class clz, UUID id) { sb.append(" FROM "); sb.append(getTable(clz)); sb.append(" WHERE "); - sb.append(getPrimaryKeyColumn(clz)); + if (primaryKey) { + sb.append(getPrimaryKeyColumn(clz)); + } else { + sb.append(getForeignKeyColumn(clz)); + } sb.append(" = "); sb.append(literal(id)); return sb.toString(); @@ -824,7 +834,8 @@ public String getPrimaryKeyColumn(Class c) { public String getForeignKeyColumn(Class c) { if (Observation.class.isAssignableFrom(c) - || DeletedEntity.class.isAssignableFrom(c)) { + || DeletedEntity.class.isAssignableFrom(c) + || ObservationSkeleton.class.isAssignableFrom(c)) { throw new IllegalArgumentException(c.getSimpleName() + " does not have a foreign key"); } String[] cols = columnMap.get(c); @@ -2977,7 +2988,7 @@ public int getColumnCount() { return columnMap.get(Observation.class).length; } - public Object mapRow(ResultSet rs, int row) + public Observation mapRow(ResultSet rs, int row) throws SQLException { return mapRow(rs, row, 1); } @@ -3170,7 +3181,7 @@ public int getColumnCount() { return columnMap.get(Plane.class).length; } - public Object mapRow(ResultSet rs, int row) + public Plane mapRow(ResultSet rs, int row) throws SQLException { return mapRow(rs, row, 1); } @@ -3445,7 +3456,7 @@ public int getColumnCount() { return columnMap.get(Artifact.class).length; } - public Object mapRow(ResultSet rs, int row) + public Artifact mapRow(ResultSet rs, int row) throws SQLException { return mapRow(rs, row, 1); } @@ -3539,7 +3550,7 @@ public int getColumnCount() { return columnMap.get(Part.class).length; } - public Object mapRow(ResultSet rs, int row) + public Part mapRow(ResultSet rs, int row) throws SQLException { return mapRow(rs, row, 1); } @@ -3610,7 +3621,7 @@ public int getColumnCount() { return columnMap.get(Chunk.class).length; } - public Object mapRow(ResultSet rs, int row) + public Chunk mapRow(ResultSet rs, int row) throws SQLException { return mapRow(rs, row, 1); } From 02c3683d96c1acd8644dc9aa80ef4e3828d02ebe Mon Sep 17 00:00:00 2001 From: Patrick Dowler Date: Fri, 12 Jul 2024 08:45:41 -0700 Subject: [PATCH 2/4] demote warn msg to debug --- .../java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java index d157d667..78b85409 100644 --- a/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java +++ b/caom2persistence/src/main/java/ca/nrc/cadc/caom2/persistence/ObservationDAO.java @@ -454,7 +454,7 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa // and if they do not get(uri) they can get a duplicate observation error // if they violate unique keys //String skelSQL = gen.getSelectSQL(obs.getID(), 1, true); - //log.warn("PUT: " + skelSQL); + //log.debug("PUT: " + skelSQL); long tt = System.currentTimeMillis(); final ObservationSkeleton dirtyRead = getSkelImpl(obs.getID(), jdbc, false); // obs only @@ -544,7 +544,7 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa final long txnCommitTime = System.currentTimeMillis() - tt; txnOpen = false; - log.warn("transaction start=" + txnStartTime + log.debug("transaction start=" + txnStartTime + " dirtyRead=" + dirtyReadTime + " lock=" + lockTime + " select=" + selectSkelTime From b39aa6eeb6aa372e031c73146855025586c4c17e Mon Sep 17 00:00:00 2001 From: Patrick Dowler Date: Fri, 12 Jul 2024 08:47:04 -0700 Subject: [PATCH 3/4] icewind: improve error handling and logging update caom2persistence dependency to latest for put performance --- icewind/README.md | 6 + icewind/VERSION | 2 +- icewind/build.gradle | 2 +- .../opencadc/icewind/DeletionHarvester.java | 2 +- .../main/java/org/opencadc/icewind/Main.java | 16 +- .../icewind/ObservationHarvester.java | 153 ++++++++++-------- 6 files changed, 108 insertions(+), 73 deletions(-) diff --git a/icewind/README.md b/icewind/README.md index 0fdee119..4d73cc7a 100644 --- a/icewind/README.md +++ b/icewind/README.md @@ -33,6 +33,12 @@ org.opencadc.icewind.collection={collection name} # Maximum sleep between runs (seconds) org.opencadc.icewind.maxIdle={integer} +# optional: number of observations to read into memory per batch (default: 100) +org.opencadc.icewind.batchSize={num} + +# optional: number of threads used to read observations from repoService (default: 1 + batchSize/10) +org.opencadc.icewind.numThreads={num} + # Destination caom2 database settings org.opencadc.icewind.caom.schema={CAOM schema name} org.opencadc.icewind.caom.username={username for CAOM admin} diff --git a/icewind/VERSION b/icewind/VERSION index 5677aca3..9a449308 100644 --- a/icewind/VERSION +++ b/icewind/VERSION @@ -1,6 +1,6 @@ ## deployable containers have a semantic and build tag # semantic version tag: major.minor[.patch] # build version tag: timestamp -VER=0.9.11 +VER=0.9.12 TAGS="${VER} ${VER}-$(date --utc +"%Y%m%dT%H%M%S")" unset VER diff --git a/icewind/build.gradle b/icewind/build.gradle index e54f9495..12a53177 100644 --- a/icewind/build.gradle +++ b/icewind/build.gradle @@ -19,7 +19,7 @@ mainClassName = 'org.opencadc.icewind.Main' dependencies { implementation 'org.opencadc:cadc-util:[1.6,2.0)' implementation 'org.opencadc:caom2:[2.4.4,2.5)' - implementation 'org.opencadc:caom2persistence:[2.4.17,2.5)' + implementation 'org.opencadc:caom2persistence:[2.4.18,2.5)' implementation 'org.opencadc:caom2-repo:[1.4.8,1.5)' // needed for validation diff --git a/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java b/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java index c1c587c7..a8c185f4 100644 --- a/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/DeletionHarvester.java @@ -322,7 +322,7 @@ private Progress doit() { + " modified date " + format(cur.getMaxLastModified())); if (deleted.after(lastUpdate)) { log.info("delete: " + de.getClass().getSimpleName() + " " + de.getURI() + " " - + de.getID()); + + de.getID() + " " + format(de.lastModified)); obsDAO.delete(de.getID()); ret.deleted++; } else { diff --git a/icewind/src/main/java/org/opencadc/icewind/Main.java b/icewind/src/main/java/org/opencadc/icewind/Main.java index 696f26b9..381ffdf1 100644 --- a/icewind/src/main/java/org/opencadc/icewind/Main.java +++ b/icewind/src/main/java/org/opencadc/icewind/Main.java @@ -103,6 +103,8 @@ public class Main { private static final String REPO_SERVICE_CONFIG_KEY = CONFIG_PREFIX + ".repoService"; private static final String COLLECTION_CONFIG_KEY = CONFIG_PREFIX + ".collection"; private static final String MAX_IDLE_CONFIG_KEY = CONFIG_PREFIX + ".maxIdle"; + private static final String BATCH_SIZE_CONFIG_KEY = CONFIG_PREFIX + ".batchSize"; + private static final String NUM_THREADS_CONFIG_KEY = CONFIG_PREFIX + ".numThreads"; private static final String DB_URL_CONFIG_KEY = CONFIG_PREFIX + ".caom.url"; private static final String DB_SCHEMA_CONFIG_KEY = CONFIG_PREFIX + ".caom.schema"; private static final String DB_USERNAME_CONFIG_KEY = CONFIG_PREFIX + ".caom.username"; @@ -124,6 +126,7 @@ public class Main { public static void main(final String[] args) { Log4jInit.setLevel("ca.nrc.cadc.caom2", Level.INFO); Log4jInit.setLevel("org.opencadc.caom2", Level.INFO); + Log4jInit.setLevel("org.opencadc.icewind", Level.INFO); try { final PropertiesReader propertiesReader = new PropertiesReader(CONFIG_FILE_NAME); @@ -222,12 +225,23 @@ public static void main(final String[] args) { final String configuredMaxSleep = props.getFirstPropertyValue(MAX_IDLE_CONFIG_KEY); final long maxSleep = Long.parseLong(configuredMaxSleep); + + String configBatchSize = props.getFirstPropertyValue(BATCH_SIZE_CONFIG_KEY); + int batchSize = DEFAULT_BATCH_SIZE; + if (configBatchSize != null) { + batchSize = Integer.parseInt(configBatchSize); + } + String configNumThreads = props.getFirstPropertyValue(BATCH_SIZE_CONFIG_KEY); + int numThreads = 1 + batchSize / 10; + if (configNumThreads != null) { + numThreads = Integer.parseInt(configNumThreads); + } // full=false: incremental harvest final boolean full = false; final boolean noChecksum = false; CaomHarvester harvester = new CaomHarvester(sourceHarvestResource, destinationHarvestResource, - configuredCollections, basePublisherID, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_SIZE / 10, + configuredCollections, basePublisherID, batchSize, numThreads, full, retrySkipped, noChecksum, exitWhenComplete, maxSleep); harvester.retryErrorMessagePattern = errorMessagePattern; diff --git a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java index 93e8d704..6e5bc259 100644 --- a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java @@ -340,12 +340,15 @@ private Progress doit() { String skipMsg = null; + if (destObservationDAO.getTransactionManager().isOpen()) { throw new RuntimeException("BUG: found open transaction at start of next observation"); } + long tmp = System.currentTimeMillis(); log.debug("starting transaction"); destObservationDAO.getTransactionManager().startTransaction(); boolean ok = false; + long txnStartTime = System.currentTimeMillis() - tmp; log.debug("skipped=" + skipped + " o=" + o + " ow.entity=" + ow.entity @@ -428,7 +431,7 @@ private Progress doit() { harvestSkipDAO.delete(hs); } else { // defer to the main catch for error handling - throw new HarvestReadException(ow.entity.error); + throw ow.entity.error; } } else if (ow.entity.error != null) { // o == null when harvesting from service: try to make progress on failures @@ -436,8 +439,9 @@ private Progress doit() { state.curLastModified = ow.entity.observationState.maxLastModified; state.curID = null; // unknown } - if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) { - ObservationURI uri = new ObservationURI(hs.getSkipID()); + //if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) { + if (ow.entity.error instanceof ResourceNotFoundException) { + ObservationURI uri = ow.entity.observationState.getURI(); log.info("delete: " + uri); destObservationDAO.delete(uri); if (hs != null) { @@ -445,73 +449,74 @@ private Progress doit() { harvestSkipDAO.delete(hs); } } else { - throw new HarvestReadException(ow.entity.error); + throw ow.entity.error; } } + tmp = System.currentTimeMillis(); log.debug("committing transaction"); destObservationDAO.getTransactionManager().commitTransaction(); log.debug("commit: OK"); - + long txnCommitTime = System.currentTimeMillis() - tmp; + log.warn("transaction: start=" + txnStartTime + " commit=" + txnCommitTime); ok = true; ret.ingested++; - } catch (Throwable oops) { + } catch (IllegalStateException oops) { + if (oops.getMessage().contains("XML failed schema validation")) { + log.error("CONTENT PROBLEM - XML failed schema validation: " + oops.getMessage()); + ret.handled++; + } else if (oops.getMessage().contains("failed to read")) { + log.error("CONTENT PROBLEM - " + oops.getMessage(), oops.getCause()); + ret.handled++; + } else { + // TODO + } + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (MismatchedChecksumException oops) { + log.error("CONTENT PROBLEM - mismatching checksums: " + ow.entity.observationState.getURI()); + ret.handled++; + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (IllegalArgumentException oops) { + log.error("CONTENT PROBLEM - invalid observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); + if (oops.getCause() != null) { + log.error("cause: " + oops.getCause()); + } + ret.handled++; + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (TransientException oops) { + log.error("NETWORK PROBLEM - " + oops.getMessage()); + ret.handled++; + skipMsg = oops.getMessage(); // message for HarvestSkipURI record + } catch (NullPointerException ex) { + log.error("BUG", ex); + ret.abort = true; + skipMsg = "BUG: " + ex.getClass().getName(); // message for HarvestSkipURI record + } catch (BadSqlGrammarException ex) { + log.error("BUG", ex); + BadSqlGrammarException bad = (BadSqlGrammarException) ex; + SQLException sex1 = bad.getSQLException(); + if (sex1 != null) { + log.error("CAUSE", sex1); + SQLException sex2 = sex1.getNextException(); + log.error("NEXT CAUSE", sex2); + } + ret.abort = true; + skipMsg = ex.getMessage(); // message for HarvestSkipURI record + } catch (DataAccessResourceFailureException ex) { + log.error("FATAL PROBLEM - probably out of space in database", ex); + ret.abort = true; + skipMsg = "FATAL: " + ex.getMessage(); // message for HarvestSkipURI record + } catch (Exception oops) { + // need to inspect the error messages log.debug("exception during harvest", oops); skipMsg = null; String str = oops.toString(); - if (oops instanceof HarvestReadException) { - // unwrap HarvestReadException from above - oops = oops.getCause(); - // unwrap intervening RuntimeException(s) - while (oops.getCause() != null && oops instanceof RuntimeException) { - oops = oops.getCause(); - } - log.error("HARVEST PROBLEM - failed to read observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); - ret.handled++; - } else if (oops instanceof IllegalStateException) { - if (oops.getMessage().contains("XML failed schema validation")) { - log.error("CONTENT PROBLEM - XML failed schema validation: " + oops.getMessage()); - ret.handled++; - } else if (oops.getMessage().contains("failed to read")) { - log.error("CONTENT PROBLEM - " + oops.getMessage(), oops.getCause()); - ret.handled++; - } - } else if (oops instanceof IllegalArgumentException) { - log.error("CONTENT PROBLEM - invalid observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); - if (oops.getCause() != null) { - log.error("cause: " + oops.getCause()); - } - ret.handled++; - } else if (oops instanceof MismatchedChecksumException) { - log.error("CONTENT PROBLEM - mismatching checksums: " + ow.entity.observationState.getURI()); - ret.handled++; - } else if (str.contains("duplicate key value violates unique constraint \"i_observationuri\"")) { + + if (str.contains("duplicate key value violates unique constraint \"i_observationuri\"")) { log.error("CONTENT PROBLEM - duplicate observation: " + ow.entity.observationState.getURI()); ret.handled++; - } else if (oops instanceof TransientException) { - log.error("CONTENT PROBLEM - " + oops.getMessage()); - ret.handled++; - } else if (oops instanceof Error) { - log.error("FATAL - probably installation or environment", oops); - ret.abort = true; - } else if (oops instanceof NullPointerException) { - log.error("BUG", oops); - ret.abort = true; - } else if (oops instanceof BadSqlGrammarException) { - log.error("BUG", oops); - BadSqlGrammarException bad = (BadSqlGrammarException) oops; - SQLException sex1 = bad.getSQLException(); - if (sex1 != null) { - log.error("CAUSE", sex1); - SQLException sex2 = sex1.getNextException(); - log.error("NEXT CAUSE", sex2); - } - ret.abort = true; - } else if (oops instanceof DataAccessResourceFailureException) { - log.error("SEVERE PROBLEM - probably out of space in database", oops); - ret.abort = true; } else if (str.contains("spherepoly_from_array")) { - log.error("CONTENT PROBLEM - failed to persist: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); + log.error("PGSPHERE PROBLEM - failed to persist: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); oops = new IllegalArgumentException("invalid polygon (spoly): " + oops.getMessage(), oops); ret.handled++; } else if (str.contains("value out of range: underflow")) { @@ -522,11 +527,18 @@ private Progress doit() { } // message for HarvestSkipURI record skipMsg = oops.getMessage(); + } catch (Error err) { + log.error("FATAL - probably installation or environment", err); + ret.abort = true; } finally { if (!ok) { - destObservationDAO.getTransactionManager().rollbackTransaction(); - log.debug("rollback: OK"); - timeTransaction += System.currentTimeMillis() - t; + try { + destObservationDAO.getTransactionManager().rollbackTransaction(); + log.debug("rollback: OK"); + timeTransaction += System.currentTimeMillis() - t; + } catch (Exception tex) { + log.error("failed to rollback obs transaction", tex); + } try { log.debug("starting HarvestSkipURI transaction"); @@ -574,8 +586,12 @@ private Progress doit() { log.debug("commit HarvestSkipURI: OK"); } catch (Throwable oops) { log.warn("failed to insert HarvestSkipURI", oops); - destObservationDAO.getTransactionManager().rollbackTransaction(); - log.debug("rollback HarvestSkipURI: OK"); + try { + destObservationDAO.getTransactionManager().rollbackTransaction(); + log.debug("rollback HarvestSkipURI: OK"); + } catch (Exception tex) { + log.error("failed to rollback skip transaction", tex); + } ret.abort = true; } ret.failed++; @@ -603,13 +619,6 @@ private Progress doit() { return ret; } - private static class HarvestReadException extends Exception { - - public HarvestReadException(Exception cause) { - super(cause); - } - } - private void validateChecksum(Observation o) throws MismatchedChecksumException { if (o.getAccMetaChecksum() == null) { return; // no check @@ -727,15 +736,21 @@ private String computeTreeSize(Observation o) { sb.append("["); int numA = 0; int numP = 0; + int numC = 0; for (Plane p : o.getPlanes()) { numA += p.getArtifacts().size(); for (Artifact a : p.getArtifacts()) { numP += a.getParts().size(); + for (Part pp : a.getParts()) { + numC += pp.getChunks().size(); + } } } + sb.append("1,"); // obs sb.append(o.getPlanes().size()).append(","); sb.append(numA).append(","); - sb.append(numP).append("]"); + sb.append(numP).append(","); + sb.append(numC).append("]"); return sb.toString(); } From 82d4d2fb87c48edd9f349b713446c57010da7b80 Mon Sep 17 00:00:00 2001 From: Patrick Dowler Date: Fri, 12 Jul 2024 09:00:38 -0700 Subject: [PATCH 4/4] logging fix --- .../main/java/org/opencadc/icewind/ObservationHarvester.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java index 6e5bc259..54648a8e 100644 --- a/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java +++ b/icewind/src/main/java/org/opencadc/icewind/ObservationHarvester.java @@ -458,7 +458,7 @@ private Progress doit() { destObservationDAO.getTransactionManager().commitTransaction(); log.debug("commit: OK"); long txnCommitTime = System.currentTimeMillis() - tmp; - log.warn("transaction: start=" + txnStartTime + " commit=" + txnCommitTime); + log.debug("transaction: start=" + txnStartTime + " commit=" + txnCommitTime); ok = true; ret.ingested++; } catch (IllegalStateException oops) {