diff --git a/.github/workflows/gradle.yml b/.github/workflows/gradle.yml index 49abaeae..b751b3c6 100644 --- a/.github/workflows/gradle.yml +++ b/.github/workflows/gradle.yml @@ -56,4 +56,7 @@ jobs: run: cd caom2-artifact-discover && ../gradlew --info clean build javadoc checkstyleMain - name: build and test caom2-artifact-download - run: cd caom2-artifact-download && ../gradlew --info clean build javadoc checkstyleMain + run: cd caom2-artifact-download && ../gradlew --info clean build javadoc checkstyleMain + + - name: build and test caom2-meta-sync + run: cd caom2-meta-sync && ../gradlew --info clean build javadoc checkstyleMain diff --git a/caom2-meta-sync/Dockerfile b/caom2-meta-sync/Dockerfile new file mode 100644 index 00000000..16942dd7 --- /dev/null +++ b/caom2-meta-sync/Dockerfile @@ -0,0 +1,5 @@ +FROM images.opencadc.org/library/cadc-java:1 + +ADD build/distributions/caom2-meta-sync.tar / + +CMD ["/caom2-meta-sync/bin/caom2-meta-sync"] diff --git a/caom2-meta-sync/README.md b/caom2-meta-sync/README.md new file mode 100644 index 00000000..78d4b845 --- /dev/null +++ b/caom2-meta-sync/README.md @@ -0,0 +1,94 @@ +# CAOM2 Meta Sync process + +Process to sync Observations from a CAOM2 repository service +to a CAOM2 database. Process runs continuously exiting only +when source queries return no results. + +## configuration + +See the [cadc-java](https://github.com/opencadc/docker-base/tree/master/cadc-java) +image docs for general config requirements. + +Runtime configuration must be made available via the `/config` directory. + + +### caom2-meta-sync.properties +``` +# log level +org.opencadc.caom2.metasync.logging={info|debug} + +# Source repository service +org.opencadc.caom2.metasync.repoService={uri} + +# The collections to sync, one collection per line +org.opencadc.caom2.metasync.collection={collection name} + +# Maximum number of seconds to pause between runs +org.opencadc.caom2.metasync.maxIdle={integer} + +# Destination caom2 database settings +org.opencadc.caom2.metasync.db.schema={schema} +org.opencadc.caom2.metasync.db.username={dbuser} +org.opencadc.caom2.metasync.db.password={dbpassword} +org.opencadc.caom2.metasync.db.url=jdbc:postgresql://{server}/{database} + +# Base for generating Plane publisherID values +org.opencadc.caom2.metasync.basePublisherID={uri} + +# Optional - exit after processing collections once +org.opencadc.caom2.metasync.exitWhenComplete=true|false +``` + +_repoService_ is the resource identifier for a registered +caom2 repository service (e.g. ivo://cadc.nrc.ca/ams) + +_collection_ is the collection name used to query for Artifacts +in the repository service. For multiple collections use multiple lines, +one collection per line. + +_maxIdle_ is the maximum time in seconds to pause between runs +when _exitWhenComplete_ is _false_. The idle time starts at 60 seconds, +doubling every time no data is found to sync, until maxIdle is reached. +The idle time will reset to 60 seconds when data is found to sync. + +_basePublisherID_ is the base for generating Plane +publisherID values. The base is an uri of the form ivo://[/] +publisherID values: /?/ + +_exitWhenComplete_ is optional and defaults to _false_. +When _true_ each collection is processed once, and then the application exits. +The default is collections are continuously processed in a loop. + + +### cadcproxy.pem +Optional certificate in /config is used to authenticate https calls +to other services if challenged for a client certificate. +If cadcproxy.pem is not present, queries to the repository service +are made anonymously. + + +## building it +``` +gradle clean build +docker build -t caom2-meta-sync -f Dockerfile . +``` + +## checking it +``` +docker run -it caom2-meta-sync:latest /bin/bash +``` + +## running it +``` +docker run --user opencadc:opencadc -v /path/to/external/config:/config:ro --name caom2-meta-sync caom2-meta-sync:latest +``` + +## apply version tags +```bash +. VERSION && echo "tags: $TAGS" +for t in $TAGS; do + docker image tag caom2-meta-sync:latest caom2-meta-sync:$t +done +unset TAGS +docker image list caom2-meta-sync +``` diff --git a/caom2-meta-sync/VERSION b/caom2-meta-sync/VERSION new file mode 100644 index 00000000..2f5a77f3 --- /dev/null +++ b/caom2-meta-sync/VERSION @@ -0,0 +1,4 @@ +## deployable containers have a semantic and build tag +# semantic version tag: major.minor[.patch] +# build version tag: timestamp +TAGS="0.1-$(date --utc +"%Y%m%dT%H%M%S")" \ No newline at end of file diff --git a/caom2-meta-sync/build.gradle b/caom2-meta-sync/build.gradle new file mode 100644 index 00000000..1c5cc802 --- /dev/null +++ b/caom2-meta-sync/build.gradle @@ -0,0 +1,37 @@ +plugins { + id 'java' + id 'maven' + id 'application' + id 'checkstyle' +} + +repositories { + mavenCentral() + mavenLocal() +} + +sourceCompatibility = 1.8 + +group = 'org.opencadc' + +description = 'OpenCADC CAOM Metadata Sync application' +def git_url = 'https://github.com/opencadc/caom2db' + +mainClassName = 'org.opencadc.caom2.metasync.Main' + +dependencies { + implementation 'org.opencadc:cadc-util:[1.6,2.0)' + implementation 'org.opencadc:caom2:[2.4.4,2.5)' + implementation 'org.opencadc:caom2persistence:[2.4.14,2.5)' + implementation 'org.opencadc:caom2-repo:[1.4,1.5)' + + // needed to run plane metadata compute plugin (--compute) + implementation 'org.opencadc:caom2-compute:[2.4.6,2.5)' + + // needed to run access-control regen plugin (--generate-ac) + implementation 'org.opencadc:caom2-access-control:[2.4,2.5)' + + runtimeOnly 'org.postgresql:postgresql:[42.2,43.0)' +} + +apply from: '../opencadc.gradle' diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/CaomHarvester.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/CaomHarvester.java new file mode 100644 index 00000000..027c3b3d --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/CaomHarvester.java @@ -0,0 +1,268 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + * $Revision: 5 $ + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +import ca.nrc.cadc.caom2.DeletedObservation; +import ca.nrc.cadc.caom2.harvester.state.HarvestState; +import ca.nrc.cadc.caom2.version.InitDatabase; +import ca.nrc.cadc.db.ConnectionConfig; +import ca.nrc.cadc.db.DBUtil; +import java.io.File; +import java.net.URI; +import java.util.Date; +import java.util.List; +import javax.sql.DataSource; +import org.apache.log4j.Logger; + +/** + * A wrapper that calls the Harvester implementations in the right order. + * + * @author pdowler + */ +public class CaomHarvester implements Runnable { + private static final Logger log = Logger.getLogger(CaomHarvester.class); + private static final Long DEFAULT_IDLE_TIME = 6000L; + + private final InitDatabase initdb; + private final HarvesterResource src; + private final HarvesterResource dest; + private final List collections; + private final URI basePublisherID; + private final int batchSize; + private final int nthreads; + private final boolean full; + private final boolean skip; + private final boolean nochecksum; + private final boolean exitWhenComplete; + private final long maxIdle; + private Date minDate; + private Date maxDate; + private boolean computePlaneMetadata; + private File readAccessConfigFile; + + /** + * Harvest everything. + * + * @param src source resource + * @param dest destination resource (must be a server/database/schema) + * @param collections list of collections to process + * @param basePublisherID base to use in generating Plane publisherID values in destination database + * @param batchSize number of observations per batch (~memory consumption) + * @param nthreads max threads when harvesting from a service + * @param full full harvest of all source entities + * @param skip attempt retry of all skipped observations + * @param nochecksum disable metadata checksum comparison + * @param exitWhenComplete exit after processing each collection if true, else continuously loop + * @param maxIdle max sleep time in seconds between runs when running continuously + */ + public CaomHarvester(HarvesterResource src, HarvesterResource dest, List collections, + URI basePublisherID, int batchSize, int nthreads, boolean full, boolean skip, + boolean nochecksum, boolean exitWhenComplete, long maxIdle) { + this.src = src; + this.dest = dest; + this.collections = collections; + this.basePublisherID = basePublisherID; + this.batchSize = batchSize; + this.nthreads = nthreads; + this.full = full; + this.skip = skip; + this.nochecksum = nochecksum; + this.exitWhenComplete = exitWhenComplete; + this.maxIdle = maxIdle; + this.minDate = null; + this.maxDate = null; + this.computePlaneMetadata = false; + this.readAccessConfigFile = null; + + ConnectionConfig cc = new ConnectionConfig(null, null, dest.getUsername(), dest.getPassword(), + HarvesterResource.POSTGRESQL_DRIVER, dest.getJdbcUrl()); + DataSource ds = DBUtil.getDataSource(cc); + this.initdb = new InitDatabase(ds, dest.getDatabase(), dest.getSchema()); + } + + public void setMinDate(Date minDate) { + this.minDate = minDate; + } + + public void setMaxDate(Date maxDate) { + this.maxDate = maxDate; + } + + /** + * Enable the plane metadata compute plugin. + * + * @param compute enable Plane metadata computation if true + */ + public void setCompute(boolean compute) { + this.computePlaneMetadata = compute; + } + + /** + * Enable the generate read access grants plugin with the specified config. + * + * @param config enable read access generation from the specified config file + */ + public void setGenerateReadAccess(String config) { + this.readAccessConfigFile = new File(config); + } + + @Override + public void run() { + + if (this.computePlaneMetadata) { + // make sure wcslib can be loaded + try { + log.info("loading ca.nrc.cadc.wcs.WCSLib"); + Class.forName("ca.nrc.cadc.wcs.WCSLib"); + } catch (Throwable t) { + throw new RuntimeException("FATAL - failed to load WCSLib JNI binding", t); + } + } + + boolean init = false; + if (initdb != null) { + boolean created = initdb.doInit(); + if (created) { + init = true; // database is empty so can bypass processing old + } // deletions + } + + long sleep = 0; + boolean done = false; + while (!done) { + int ingested = 0; + for (String collection : collections) { + log.info("processing collection: " + collection); + + URI publisherID = URI.create(basePublisherID + collection); + ObservationHarvester obsHarvester = new ObservationHarvester(src, dest, collection, publisherID, batchSize, + nthreads, full, nochecksum); + obsHarvester.setSkipped(skip); + obsHarvester.setComputePlaneMetadata(computePlaneMetadata); + if (minDate != null) { + obsHarvester.setMaxDate(minDate); + } + if (maxDate != null) { + obsHarvester.setMaxDate(maxDate); + } + if (readAccessConfigFile != null) { + obsHarvester.setGenerateReadAccessTuples(readAccessConfigFile); + } + + // deletions in incremental mode only + if (!full && !skip && !src.getIdentifier(collection).equals(dest.getIdentifier(collection))) { + DeletionHarvester obsDeleter = new DeletionHarvester(DeletedObservation.class, src, dest, + collection, batchSize * 100); + if (minDate != null) { + obsDeleter.setMaxDate(minDate); + } + if (maxDate != null) { + obsDeleter.setMaxDate(maxDate); + } + boolean initDel = init; + if (!init) { + // check if we have ever harvested before + HarvestState hs = obsHarvester.harvestStateDAO.get(obsHarvester.source, obsHarvester.cname); + initDel = (hs.curID == null && hs.curLastModified == null); // never harvested + } + + // delete observations before harvest to avoid observationURI conflicts from delete+create + log.info("init: " + obsDeleter.source + " " + obsDeleter.cname); + obsDeleter.setInitHarvestState(initDel); + log.debug("************** obsDeleter.run() ****************"); + obsDeleter.run(); + } + + // harvest observations + log.debug("************** obsHarvester.run() ***************"); + obsHarvester.run(); + ingested += obsHarvester.getIngested(); + log.info(" source: " + src.getIdentifier(collection)); + log.info("destination: " + dest.getIdentifier(collection)); + } + if (this.exitWhenComplete) { + done = true; + } else { + if (ingested > 0 || sleep == 0) { + sleep = DEFAULT_IDLE_TIME; + } else { + sleep = Math.min(sleep * 2, maxIdle * 1000L); + } + try { + log.debug("sleeping for " + sleep); + Thread.sleep(sleep); + } catch (InterruptedException e) { + throw new RuntimeException("Thread sleep interrupted", e); + } + } + } + } + +} diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/DeletionHarvester.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/DeletionHarvester.java new file mode 100644 index 00000000..88b35cdd --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/DeletionHarvester.java @@ -0,0 +1,416 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + * $Revision: 5 $ + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +import ca.nrc.cadc.caom2.DeletedEntity; +import ca.nrc.cadc.caom2.DeletedObservation; +import ca.nrc.cadc.caom2.ObservationState; +import ca.nrc.cadc.caom2.harvester.state.HarvestState; +import ca.nrc.cadc.caom2.persistence.DeletedEntityDAO; +import ca.nrc.cadc.caom2.persistence.ObservationDAO; +import ca.nrc.cadc.caom2.repo.client.RepoClient; +import ca.nrc.cadc.db.ConnectionConfig; +import ca.nrc.cadc.db.DBUtil; +import ca.nrc.cadc.db.TransactionManager; +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import javax.naming.NamingException; +import org.apache.log4j.Logger; + +/** + * Harvest and perform deletions of observations. + * + * @author pdowler + */ +public class DeletionHarvester extends Harvester implements Runnable { + + private static final Logger log = Logger.getLogger(DeletionHarvester.class); + + private DeletedEntityDAO deletedDAO; + private RepoClient repoClient; + private ObservationDAO obsDAO; + private TransactionManager txnManager; + private boolean initHarvestState; + private Date initDate; + private boolean ready = false; + + /** + * Constructor. + * + * @param entityClass the class specifying what should be deleted + * @param src source server.database.schema + * @param dest destination server.database.schema + * @param collection the collection to process + * @param batchSize ignored, always full list + */ + public DeletionHarvester(Class entityClass, HarvesterResource src, HarvesterResource dest, + String collection, int batchSize) { + super(entityClass, src, dest, collection, batchSize, false); + init(); + } + + /** + * Initialise harvest state with the current date. + * + * @param initHarvestState value for this attribute + */ + public void setInitHarvestState(boolean initHarvestState) { + this.initHarvestState = initHarvestState; + if (initHarvestState) { + this.initDate = new Date(); // timestamp at startup, not when run + } + } + + /** + * Initialize the harvester + */ + private void init() { + // source + if (src.getResourceType() == HarvesterResource.SOURCE_DB) { + Map srcConfig = getConfigDAO(src); + ConnectionConfig srcConnectionConfig = new ConnectionConfig(null, null, + src.getUsername(), src.getPassword(), HarvesterResource.POSTGRESQL_DRIVER, src.getJdbcUrl()); + final String srcDS = "jdbc/srcDelHarvest"; + try { + DBUtil.createJNDIDataSource(srcDS, srcConnectionConfig); + } catch (NamingException e) { + throw new IllegalStateException(String.format("Error creating source JNDI datasource for %s reason: %s", + src, e.getMessage())); + } + srcConfig.put("jndiDataSourceName", srcDS); + this.deletedDAO = new DeletedEntityDAO(); + deletedDAO.setConfig(srcConfig); + ready = true; + } else { + this.repoClient = new RepoClient(src.getResourceID(), 1); + } + + // destination + Map destConfig = getConfigDAO(dest); + ConnectionConfig destConnectionConfig = new ConnectionConfig(null, null, + dest.getUsername(), dest.getPassword(), HarvesterResource.POSTGRESQL_DRIVER, dest.getJdbcUrl()); + final String destDS = "jdbc/destObsHarvest"; + try { + DBUtil.createJNDIDataSource(destDS, destConnectionConfig); + } catch (NamingException e) { + throw new IllegalStateException(String.format("Error creating destination JNDI datasource for %s reason: %s", + dest, e.getMessage())); + } + destConfig.put("jndiDataSourceName", destDS); + this.obsDAO = new ObservationDAO(); + obsDAO.setConfig(destConfig); + this.txnManager = obsDAO.getTransactionManager(); + initHarvestState(obsDAO.getDataSource(), entityClass); + + if (repoClient != null) { + if (repoClient.isDelAvailable()) { + ready = true; + } else { + log.warn("Not available deletion endpoint in " + repoClient.toString()); + } + } + } + + /** + * cleanup connections and state + * + * @throws IOException + */ + private void close() throws IOException { + // TODO + } + + /** + * run + */ + @Override + public void run() { + + if (!ready) { + return; + } + + log.info("START: " + entityClass.getSimpleName()); + + boolean go = true; + while (go) { + Progress num = doit(); + if (num.found > 0) { + log.info("finished batch: " + num); + } + if (num.failed > num.found / 2) { + log.warn("failure rate is quite high: " + num.failed + "/" + num.found); + num.abort = true; + } + if (num.abort) { + log.error("batched aborted"); + } + go = (!num.abort && !num.done); + full = false; // do not start at min(lastModified) again + } + try { + close(); + } catch (Throwable oops) { + log.error("failed to cleanup connections and state", oops); + return; + } + log.info("DONE: " + entityClass.getSimpleName() + "\n"); + } + + /** + * class that does the work + * + */ + private static class Progress { + + boolean done = false; + boolean abort = false; + int found = 0; + int deleted = 0; + int skipped = 0; + int failed = 0; + + @Override + public String toString() { + return found + " deleted: " + deleted + " skipped: " + skipped + " failed: " + failed; + } + } + + private Date startDate; + private Date endDate; + private boolean firstIteration = true; + + /** + * Does the work + * + * @return progress status + */ + private Progress doit() { + log.info("batch: " + entityClass.getSimpleName()); + Progress ret = new Progress(); + int expectedNum = batchSize; + boolean correct = true; + + try { + HarvestState state = harvestStateDAO.get(source, cname); + log.info("last harvest: " + format(state.curLastModified)); + + if (initHarvestState && state.curLastModified == null) { + state.curLastModified = initDate; + harvestStateDAO.put(state); + state = harvestStateDAO.get(source, cname); + log.info("harvest state initialised to: " + df.format(state.curLastModified)); + } + + startDate = state.curLastModified; + if (firstIteration) { + if (super.minDate != null) { // override state + startDate = super.minDate; + } + endDate = super.maxDate; + // harvest up to a little in the past because the head of the + // sequence may be volatile + long fiveMinAgo = System.currentTimeMillis() - 5 * 60000L; + if (endDate == null) { + endDate = new Date(fiveMinAgo); + } else { + endDate = new Date(Math.min(fiveMinAgo, endDate.getTime())); + } + } + firstIteration = false; + + List entityList = null; + String source = null; + if (deletedDAO != null) { + source = "deletedDAO"; + entityList = deletedDAO.getList(collection, startDate, endDate, batchSize); + } else { + source = "repoClient"; + entityList = repoClient.getDeleted(collection, startDate, endDate, batchSize); + } + + if (entityList == null) { + throw new RuntimeException("Error gathering deleted observations from " + source); + } + + if (entityList.size() == expectedNum) { + detectLoop(entityList); + } + + ret.found = entityList.size(); + log.info("found: " + entityList.size()); + ListIterator iter = entityList.listIterator(); + while (iter.hasNext()) { + DeletedObservation de = iter.next(); + iter.remove(); // allow garbage collection asap + log.debug("Observation read from deletion end-point: " + de.getID() + " date = " + + de.lastModified); + + txnManager.startTransaction(); + boolean ok = false; + try { + state.curLastModified = de.lastModified; + state.curID = de.getID(); + + ObservationState cur = obsDAO.getState(de.getID()); + if (cur != null) { + log.debug("Observation: " + de.getID() + " found in DB"); + Date lastUpdate = cur.getMaxLastModified(); + Date deleted = de.lastModified; + log.debug("to be deleted: " + de.getClass().getSimpleName() + " " + de.getURI() + " " + + de.getID() + "deleted date " + format(de.lastModified) + + " modified date " + format(cur.getMaxLastModified())); + if (deleted.after(lastUpdate)) { + log.info("delete: " + de.getClass().getSimpleName() + " " + de.getURI() + " " + + de.getID()); + obsDAO.delete(de.getID()); + ret.deleted++; + } else { + log.info("skip out-of-date delete: " + de.getClass().getSimpleName() + " " + + de.getURI() + " " + de.getID() + " " + format(de.lastModified)); + ret.skipped++; + } + } else { + log.debug("Observation: " + de.getID() + " not found in DB"); + } + + // track progress + harvestStateDAO.put(state); + + log.debug("committing transaction"); + txnManager.commitTransaction(); + log.debug("commit: OK"); + ok = true; + + } catch (Throwable t) { + log.error("unexpected exception", t); + } finally { + if (!ok) { + log.warn("failed to process " + de + ": trying to rollback the transaction"); + txnManager.rollbackTransaction(); + log.warn("rollback: OK"); + ret.abort = true; + } + } + } + if (ret.found < expectedNum) { + ret.done = true; + if (state != null && state.curLastModified != null && ret.found > 0) { + // tweak HarvestState so we don't keep picking up the same + // one + Date n = new Date(state.curLastModified.getTime() + 1L); // 1 + // ms + // ahead + Date now = new Date(); + if (now.getTime() - n.getTime() > 600 * 1000L) { + n = new Date(state.curLastModified.getTime() + 100L); + } + // ahead + state.curLastModified = n; + log.info("reached last " + entityClass.getSimpleName() + ": setting curLastModified to " + + format(state.curLastModified)); + harvestStateDAO.put(state); + } + } + } catch (Throwable t) { + log.error("unexpected exception", t); + ret.abort = true; + correct = false; + } finally { + if (correct) { + log.debug("DONE"); + } + } + return ret; + } + + /** + * detects loops + * + * @param entityList + * list of entities to detect loops with + */ + private void detectLoop(List entityList) { + if (entityList.size() < 2) { + return; + } + DeletedEntity start = entityList.get(0); + DeletedEntity end = entityList.get(entityList.size() - 1); + if (start.lastModified.equals(end.lastModified)) { + throw new RuntimeException("detected infinite harvesting loop: " + entityClass.getSimpleName() + + " at " + format(start.lastModified)); + } + + } + +} diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/Harvester.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/Harvester.java new file mode 100644 index 00000000..88c0570a --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/Harvester.java @@ -0,0 +1,166 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + * $Revision: 5 $ + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +import ca.nrc.cadc.caom2.harvester.state.HarvestStateDAO; +import ca.nrc.cadc.caom2.harvester.state.PostgresqlHarvestStateDAO; +import ca.nrc.cadc.caom2.persistence.PostgreSQLGenerator; +import ca.nrc.cadc.caom2.persistence.SQLGenerator; +import ca.nrc.cadc.date.DateUtil; +import java.text.DateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import javax.sql.DataSource; +import org.apache.log4j.Logger; + +/** + * + * @author pdowler + */ +public abstract class Harvester implements Runnable { + + private static final Logger log = Logger.getLogger(Harvester.class); + + public static final String POSTGRESQL = "postgresql"; + + protected String source; + protected String cname; + protected Class entityClass; + protected int batchSize; + protected boolean full; + protected Date minDate; + protected Date maxDate; + protected String collection; + protected HarvesterResource src; + protected HarvesterResource dest; + protected HarvestStateDAO harvestStateDAO; + + protected Harvester() { + } + + protected Harvester(Class entityClass, HarvesterResource src, HarvesterResource dest, String collection, + Integer batchSize, boolean full) { + this.entityClass = entityClass; + this.src = src; + this.dest = dest; + this.collection = collection; + this.batchSize = batchSize; + this.full = full; + } + + public void setMinDate(Date d) { + this.minDate = d; + } + + public void setMaxDate(Date d) { + this.maxDate = d; + } + + protected Map getConfigDAO(HarvesterResource harvestResource) { + Map ret = new HashMap<>(); + if (harvestResource.getJdbcUrl().contains(POSTGRESQL)) { + ret.put(SQLGenerator.class.getName(), PostgreSQLGenerator.class); + ret.put("disableHashJoin", Boolean.TRUE); + } else { + throw new IllegalArgumentException("unknown SQL dialect: " + harvestResource.getDatabaseServer()); + } + ret.put("server", harvestResource.getDatabaseServer()); + ret.put("database", harvestResource.getDatabase()); + ret.put("schema", harvestResource.getSchema()); + return ret; + } + + /** + * @param ds + * DataSource from the destination DAO class + * @param c + * class being persisted via the destination DAO class + */ + protected void initHarvestState(DataSource ds, Class c) { + this.cname = c.getSimpleName(); + + log.debug("creating HarvestState tracker: " + cname + " in " + dest.getDatabase() + "." + dest.getSchema()); + this.harvestStateDAO = new PostgresqlHarvestStateDAO(ds, dest.getDatabase(), dest.getSchema()); + + log.debug("creating HarvestSkip tracker: " + cname + " in " + dest.getDatabase() + "." + dest.getSchema()); + + this.source = src.getIdentifier(collection); + } + + DateFormat df = DateUtil.getDateFormat(DateUtil.ISO_DATE_FORMAT, DateUtil.UTC); + + protected String format(Date d) { + if (d == null) { + return "null"; + } + return df.format(d); + } +} diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/HarvesterResource.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/HarvesterResource.java new file mode 100644 index 00000000..d37bd97b --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/HarvesterResource.java @@ -0,0 +1,180 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +import java.net.URI; +import org.apache.log4j.Logger; + +/** + * Encapsulate the information about a source or destination for harvesting + * instances. + * + * @author pdowler + */ +public class HarvesterResource { + + private static final Logger log = Logger.getLogger(HarvesterResource.class); + + private String databaseServer; + private String database; + private String schema; + private String username; + private String password; + private URI resourceID; + private String jdbcUrl; + private final int resourceType; + + public static final int SOURCE_DB = 0; + public static final int SOURCE_URI = 1; + public static final int SOURCE_UNKNOWN = -1; + public static final String POSTGRESQL_DRIVER = "org.postgresql.Driver"; + + /** + * Constructor for a JDBC url. + * + * @param jdbcUrl JDBC database url + * @param server database server + * @param database database name + * @param username database username + * @param password database password + * @param schema schema name + */ + public HarvesterResource(String jdbcUrl, String server, String database, String username, String password, + String schema) { + if (jdbcUrl == null || server == null || database == null || username == null || password == null + || schema == null) { + throw new IllegalArgumentException("args cannot be null"); + } + this.jdbcUrl = jdbcUrl; + this.databaseServer = server; + this.database = database; + this.username = username; + this.password = password; + this.schema = schema; + this.resourceType = SOURCE_DB; + } + + public HarvesterResource(URI resourceID) { + if (resourceID == null) { + throw new IllegalArgumentException("resourceID arg cannot be null"); + } + this.resourceID = resourceID; + this.resourceType = SOURCE_URI; + } + + public String getIdentifier(String collection) { + if (resourceID != null) { + return resourceID.toASCIIString() + "?" + collection; + } + return databaseServer + "." + database + "." + schema + "?" + collection; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public URI getResourceID() { + return resourceID; + } + + public String getDatabaseServer() { + return databaseServer; + } + + public String getDatabase() { + return database; + } + + public String getSchema() { + return schema; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public int getResourceType() { + return resourceType; + } + + @Override + public String toString() { + if (resourceType == SOURCE_URI) { + return this.resourceID.toASCIIString(); + } else if (resourceType == SOURCE_DB) { + return this.databaseServer; + } else { + return "UNKNOWN"; + } + } + +} diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/Main.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/Main.java new file mode 100644 index 00000000..8e2b7896 --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/Main.java @@ -0,0 +1,252 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + * $Revision: 5 $ + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +import ca.nrc.cadc.auth.RunnableAction; +import ca.nrc.cadc.auth.SSLUtil; +import ca.nrc.cadc.util.Log4jInit; +import ca.nrc.cadc.util.MultiValuedProperties; +import ca.nrc.cadc.util.PropertiesReader; +import ca.nrc.cadc.util.StringUtil; +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import javax.security.auth.Subject; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * Entry point for running the caom2-meta-sync process. + * + * @author jburke + */ +public class Main { + private static final Logger log = Logger.getLogger(Main.class); + + private static final String CONFIG_FILE_NAME = "caom2-meta-sync.properties"; + private static final String CERTIFICATE_FILE_LOCATION = System.getProperty("user.home") + "/.ssl/cadcproxy.pem"; + private static final String CONFIG_PREFIX = Main.class.getPackage().getName(); + private static final String LOGGING_CONFIG_KEY = CONFIG_PREFIX + ".logging"; + + private static final String REPO_SERVICE_CONFIG_KEY = CONFIG_PREFIX + ".repoService"; + private static final String COLLECTION_CONFIG_KEY = CONFIG_PREFIX + ".collection"; + private static final String MAX_IDLE_CONFIG_KEY = CONFIG_PREFIX + ".maxIdle"; + private static final String DB_URL_CONFIG_KEY = CONFIG_PREFIX + ".db.url"; + private static final String DB_SCHEMA_CONFIG_KEY = CONFIG_PREFIX + ".db.schema"; + private static final String DB_USERNAME_CONFIG_KEY = CONFIG_PREFIX + ".db.username"; + private static final String DB_PASSWORD_CONFIG_KEY = CONFIG_PREFIX + ".db.password"; + private static final String BASE_PUBLISHER_ID_CONFIG_KEY = CONFIG_PREFIX + ".basePublisherID"; + private static final String EXIT_WHEN_COMPLETE_CONFIG_KEY = CONFIG_PREFIX + ".exitWhenComplete"; + + private static final boolean DEFAULT_EXIT_WHEN_COMPLETE = false; + private static final int DEFAULT_BATCH_SIZE = 100; + + + // Used to verify configuration items. See the README for descriptions. + private static final String[] MANDATORY_PROPERTY_KEYS = { + LOGGING_CONFIG_KEY, REPO_SERVICE_CONFIG_KEY, COLLECTION_CONFIG_KEY, MAX_IDLE_CONFIG_KEY, + DB_SCHEMA_CONFIG_KEY, DB_USERNAME_CONFIG_KEY, DB_PASSWORD_CONFIG_KEY, DB_URL_CONFIG_KEY, + BASE_PUBLISHER_ID_CONFIG_KEY + }; + + public static void main(final String[] args) { + Log4jInit.setLevel("ca.nrc.cadc.caom2", Level.INFO); + Log4jInit.setLevel("org.opencadc.caom2", Level.INFO); + + try { + final PropertiesReader propertiesReader = new PropertiesReader(CONFIG_FILE_NAME); + final MultiValuedProperties props = propertiesReader.getAllProperties(); + if (props == null) { + log.fatal(String.format("Configuration file not found: %s\n", CONFIG_FILE_NAME)); + System.exit(2); + } + + final String[] missingKeys = Main.verifyConfiguration(props); + if (missingKeys.length > 0) { + log.fatal(String.format("Configuration file %s missing one or more values: %s.\n", CONFIG_FILE_NAME, + Arrays.toString(missingKeys))); + System.exit(2); + } + + final String configuredLogging = props.getFirstPropertyValue(LOGGING_CONFIG_KEY); + Level loggingLevel = Level.toLevel(configuredLogging.toUpperCase()); + Log4jInit.setLevel(CONFIG_PREFIX, loggingLevel); + Log4jInit.setLevel("ca.nrc.cadc.caom2", loggingLevel); + Log4jInit.setLevel("ca.nrc.cadc.db.version", loggingLevel); + if (loggingLevel.equals(Level.DEBUG)) { + Log4jInit.setLevel("ca.nrc.cadc.reg.client", loggingLevel); + } + + final String configuredSourceRepoService = props.getFirstPropertyValue(REPO_SERVICE_CONFIG_KEY); + final HarvesterResource sourceHarvestResource; + try { + sourceHarvestResource = new HarvesterResource(URI.create(configuredSourceRepoService)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("%s - invalid repository service URI: %s reason: %s", REPO_SERVICE_CONFIG_KEY, + configuredSourceRepoService, e.getMessage())); + } + + final String configuredDestinationUrl = props.getFirstPropertyValue(DB_URL_CONFIG_KEY); + String [] destinationServerDatabase = parseServerDatabase(configuredDestinationUrl); + final String destinationServer = destinationServerDatabase[0]; + final String destinationDatabase = destinationServerDatabase[1]; + + final String configuredDestinationSchema = props.getFirstPropertyValue(DB_SCHEMA_CONFIG_KEY); + final String configuredDestinationUsername = props.getFirstPropertyValue(DB_USERNAME_CONFIG_KEY); + final String configuredDestinationPassword = props.getFirstPropertyValue(DB_PASSWORD_CONFIG_KEY); + + final HarvesterResource destinationHarvestResource = new HarvesterResource(configuredDestinationUrl, + destinationServer, destinationDatabase, configuredDestinationUsername, + configuredDestinationPassword, configuredDestinationSchema); + + final List configuredCollections = props.getProperty(COLLECTION_CONFIG_KEY); + if (configuredCollections.isEmpty()) { + throw new IllegalArgumentException( + String.format("%s must be configured with a minimum of one collection", COLLECTION_CONFIG_KEY)); + } + + String configuredBasePublisherIDUrl = props.getFirstPropertyValue(BASE_PUBLISHER_ID_CONFIG_KEY); + if (!StringUtil.hasText(configuredBasePublisherIDUrl)) { + throw new IllegalArgumentException(String.format("%s - has no value", BASE_PUBLISHER_ID_CONFIG_KEY)); + } + if (!configuredBasePublisherIDUrl.endsWith("/")) { + configuredBasePublisherIDUrl += "/"; + } + URI basePublisherID; + try { + basePublisherID = URI.create(configuredBasePublisherIDUrl); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + String.format("%s - invalid URI: %s because: %s", BASE_PUBLISHER_ID_CONFIG_KEY, + configuredBasePublisherIDUrl, e.getMessage())); + } + if (!"ivo".equals(basePublisherID.getScheme()) || !StringUtil.hasText(basePublisherID.getAuthority())) { + throw new IllegalArgumentException( + String.format("%s - invalid basePublisherID: %s expected: ivo:// or ivo:///", + BASE_PUBLISHER_ID_CONFIG_KEY, configuredBasePublisherIDUrl)); + } + + final boolean exitWhenComplete; + final String configuredExitWhenComplete = props.getFirstPropertyValue(EXIT_WHEN_COMPLETE_CONFIG_KEY); + if (configuredExitWhenComplete == null) { + exitWhenComplete = DEFAULT_EXIT_WHEN_COMPLETE; + } else { + exitWhenComplete = Boolean.parseBoolean(configuredExitWhenComplete); + } + + final String configuredMaxSleep = props.getFirstPropertyValue(MAX_IDLE_CONFIG_KEY); + final long maxSleep = Long.parseLong(configuredMaxSleep); + + // full=false, skip=false: incremental harvest + final boolean full = false; + final boolean skip = false; + final boolean noChecksum = false; + CaomHarvester harvester = new CaomHarvester(sourceHarvestResource, destinationHarvestResource, + configuredCollections, basePublisherID, DEFAULT_BATCH_SIZE, DEFAULT_BATCH_SIZE / 10, + full, skip, noChecksum, exitWhenComplete, maxSleep); + + final Subject subject = SSLUtil.createSubject(new File(CERTIFICATE_FILE_LOCATION)); + Subject.doAs(subject, new RunnableAction(harvester)); + } catch (Throwable unexpected) { + log.fatal("Unexpected failure", unexpected); + System.exit(-1); + } + } + + /** + * Verify all mandatory properties. + * @param properties The properties to check the mandatory keys against. + * @return An array of missing String keys, or empty array. Never null. + */ + private static String[] verifyConfiguration(final MultiValuedProperties properties) { + final Set keySet = properties.keySet(); + return Arrays.stream(MANDATORY_PROPERTY_KEYS).filter(k -> !keySet.contains(k)).toArray(String[]::new); + } + + private static String[] parseServerDatabase(final String dbUrl) { + try { + String[] parts = dbUrl.split("/+"); + String server = parts[1].split(":")[0]; + String database = parts[2]; + return new String[] {server, database}; + } catch (Exception e) { + throw new IllegalArgumentException(String.format("Unable to parse server/database from url %s because %s", + dbUrl, e.getMessage())); + } + } + + private Main() { + } + +} + diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/MismatchedChecksumException.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/MismatchedChecksumException.java new file mode 100644 index 00000000..ec764fa8 --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/MismatchedChecksumException.java @@ -0,0 +1,83 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + * $Revision: 5 $ + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +public class MismatchedChecksumException extends Exception { + + public MismatchedChecksumException(String string) { + super(string); + } + + /** + * + */ + private static final long serialVersionUID = 1L; + +} diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/ObservationHarvester.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/ObservationHarvester.java new file mode 100644 index 00000000..5f2a2572 --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/ObservationHarvester.java @@ -0,0 +1,836 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + * $Revision: 5 $ + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +import ca.nrc.cadc.caom2.Artifact; +import ca.nrc.cadc.caom2.Observation; +import ca.nrc.cadc.caom2.ObservationResponse; +import ca.nrc.cadc.caom2.ObservationState; +import ca.nrc.cadc.caom2.ObservationURI; +import ca.nrc.cadc.caom2.Plane; +import ca.nrc.cadc.caom2.ac.ReadAccessGenerator; +import ca.nrc.cadc.caom2.compute.CaomWCSValidator; +import ca.nrc.cadc.caom2.compute.ComputeUtil; +import ca.nrc.cadc.caom2.harvester.state.HarvestSkipURI; +import ca.nrc.cadc.caom2.harvester.state.HarvestSkipURIDAO; +import ca.nrc.cadc.caom2.harvester.state.HarvestState; +import ca.nrc.cadc.caom2.persistence.ObservationDAO; +import ca.nrc.cadc.caom2.repo.client.RepoClient; +import ca.nrc.cadc.caom2.util.CaomValidator; +import ca.nrc.cadc.db.ConnectionConfig; +import ca.nrc.cadc.db.DBUtil; +import ca.nrc.cadc.net.ResourceNotFoundException; +import ca.nrc.cadc.net.TransientException; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.URI; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import javax.naming.NamingException; +import javax.sql.DataSource; +import org.apache.log4j.Logger; +import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.jdbc.BadSqlGrammarException; + +/** + * + * @author pdowler + */ +public class ObservationHarvester extends Harvester { + + private static final Logger log = Logger.getLogger(ObservationHarvester.class); + + private final URI basePublisherID; + private final boolean nochecksum; + private RepoClient srcObservationService; + private ObservationDAO srcObservationDAO; + private ObservationDAO destObservationDAO; + private HarvestSkipURIDAO harvestSkipDAO; + private boolean skipped; + private boolean computePlaneMetadata; + private ReadAccessGenerator acGenerator; + private boolean ready = false; + private int ingested = 0; + + public ObservationHarvester(HarvesterResource src, HarvesterResource dest, String collection, URI basePublisherID, + Integer batchSize, int nthreads, boolean full, boolean nochecksum) { + super(Observation.class, src, dest, collection, batchSize, full); + this.basePublisherID = basePublisherID; + this.nochecksum = nochecksum; + this.computePlaneMetadata = false; + init(nthreads); + } + + public void setSkipped(boolean skipped) { + this.skipped = skipped; + } + + public boolean getComputePlaneMetadata() { + return this.computePlaneMetadata; + } + + public void setComputePlaneMetadata(boolean computePlaneMetadata) { + this.computePlaneMetadata = computePlaneMetadata; + } + + public void setGenerateReadAccessTuples(File config) { + try { + Properties props = new Properties(); + props.load(new FileReader(config)); + String line = props.getProperty(collection); + if (line == null) { + throw new IllegalArgumentException("CONFIG: collection not found: " + collection); + } + + Map groupConfig = new HashMap<>(); + String[] parts = line.split("[ \t]+"); // one or more spaces and tabs + for (int i = 0; i < parts.length; i++) { + String option = parts[i]; // key=value pair + String[] kv = option.split("="); + if (kv.length != 2) { + throw new IllegalArgumentException("invalid key=value pair: " + option); + } + if (ReadAccessGenerator.PROPOSAL_GROUP_KEY.equals(kv[0])) { + boolean proposalGroup = "true".equals(kv[1]); + if (proposalGroup) { + groupConfig.put(ReadAccessGenerator.PROPOSAL_GROUP_KEY, proposalGroup); + } + } else if (ReadAccessGenerator.OPERATOR_GROUP_KEY.equals(kv[0])) { + String og = kv[1]; + if (og != null) { + URI ouri = new URI(og); + groupConfig.put(ReadAccessGenerator.OPERATOR_GROUP_KEY, ouri); + } + } else if (ReadAccessGenerator.STAFF_GROUP_KEY.equals(kv[0])) { + String sg = kv[1]; + if (sg != null) { + URI suri = new URI(sg); + groupConfig.put(ReadAccessGenerator.STAFF_GROUP_KEY, suri); + } + } + } + for (Map.Entry me : groupConfig.entrySet()) { + log.debug("generate config for " + collection + ": " + me.getKey() + " = " + me.getValue()); + } + this.acGenerator = new ReadAccessGenerator(collection, groupConfig); + } catch (IOException ex) { + throw new RuntimeException("failed to read config from " + config, ex); + } catch (Exception ex) { + throw new RuntimeException("CONFIG: invalid config " + config, ex); + } + } + + public int getIngested() { + return this.ingested; + } + + private void init(int nthreads) { + if (src.getResourceType() == HarvesterResource.SOURCE_DB) { + Map srcConfig = getConfigDAO(src); + ConnectionConfig srcConnectionConfig = new ConnectionConfig(null, null, + src.getUsername(), src.getPassword(), HarvesterResource.POSTGRESQL_DRIVER, src.getJdbcUrl()); + final String srcDS = "jdbc/obsHarvestSrc"; + try { + DBUtil.createJNDIDataSource(srcDS, srcConnectionConfig); + } catch (NamingException e) { + throw new IllegalStateException(String.format("Error creating source JNDI datasource for %s reason: %s", + src, e.getMessage())); + } + srcConfig.put("jndiDataSourceName", srcDS); + this.srcObservationDAO = new ObservationDAO(); + srcObservationDAO.setConfig(srcConfig); + ready = true; + } else if (src.getResourceType() == HarvesterResource.SOURCE_URI) { + this.srcObservationService = new RepoClient(src.getResourceID(), nthreads); + } else { + throw new IllegalStateException("BUG: unexpected HarvestResource resource type: " + src); + } + + // for now, dest is always a database + Map destConfig = getConfigDAO(dest); + ConnectionConfig destConnectionConfig = new ConnectionConfig(null, null, + dest.getUsername(), dest.getPassword(), HarvesterResource.POSTGRESQL_DRIVER, dest.getJdbcUrl()); + final String destDS = "jdbc/obsHarvestDest"; + try { + DBUtil.createJNDIDataSource(destDS, destConnectionConfig); + } catch (NamingException e) { + throw new IllegalStateException(String.format("Error creating destination JNDI datasource for %s reason: %s", + dest, e.getMessage())); + } + destConfig.put("jndiDataSourceName", destDS); + destConfig.put("basePublisherID", basePublisherID.toASCIIString()); + this.destObservationDAO = new ObservationDAO(); + destObservationDAO.setConfig(destConfig); + if (src.getIdentifier(collection).equals(dest.getIdentifier(collection))) { + log.info("source = destination = " + dest.getIdentifier(collection) + ": setting origin=true"); + destObservationDAO.setOrigin(true); // reproc in a single db should update timestamps + } else { + destObservationDAO.setOrigin(false); // copy as-is + } + initHarvestState(destObservationDAO.getDataSource(), Observation.class); + + if (srcObservationService != null) { + if (srcObservationService.isObsAvailable()) { + ready = true; + } else { + log.error("Not available obs endpoint in " + srcObservationService.toString()); + } + } + } + + private String format(UUID id) { + if (id == null) { + return "null"; + } + return id.toString(); + } + + @Override + public void run() { + log.info("START: " + Observation.class.getSimpleName()); + + boolean go = true; + while (go) { + Progress num = doit(); + + ingested += num.ingested; + if (num.found > 0) { + log.debug("***************** finished batch: " + num + " *******************"); + } + + if (num.abort) { + log.error("batched aborted"); + } + go = (num.found > 0 && !num.abort && !num.done); + if (num.found < batchSize / 2) { + go = false; + } + full = false; // do not start at beginning again + } + + log.info("DONE: " + entityClass.getSimpleName() + "\n"); + } + + private static class Progress { + + boolean done = false; + boolean abort = false; + int found = 0; + int ingested = 0; + int failed = 0; + int handled = 0; + + @Override + public String toString() { + return found + " ingested: " + ingested + " failed: " + failed; + } + } + + private Date startDate; + private Date endDate; + private boolean firstIteration = true; + + private Progress doit() { + + Progress ret = new Progress(); + + if (!ready) { + log.error("Observation Harvester not ready"); + ret.abort = true; + return ret; + } + long t = System.currentTimeMillis(); + long timeState = -1; + long timeQuery = -1; + long timeTransaction = -1; + int expectedNum = batchSize; + + try { + System.gc(); // hint + t = System.currentTimeMillis(); + + HarvestState state = null; + if (!skipped) { + state = harvestStateDAO.get(source, Observation.class.getSimpleName()); + startDate = state.curLastModified; + log.debug("state " + state); + } + + timeState = System.currentTimeMillis() - t; + t = System.currentTimeMillis(); + + if (firstIteration) { + if (full) { + startDate = null; + } else if (super.minDate != null) { + startDate = super.minDate; + } + endDate = super.maxDate; + if (!skipped) { + // harvest up to a little in the past because the head of + // the sequence may be volatile + long fiveMinAgo = System.currentTimeMillis() - 5 * 60000L; + if (endDate == null) { + endDate = new Date(fiveMinAgo); + } else { + endDate = new Date(Math.min(fiveMinAgo, endDate.getTime())); + } + } + } + firstIteration = false; + + List> entityList; + if (skipped) { + entityList = getSkipped(startDate); + } else { + log.info("harvest window: " + format(startDate) + " :: " + format(endDate) + " [" + batchSize + "]"); + List obsList; + if (srcObservationDAO != null) { + obsList = srcObservationDAO.getList(collection, startDate, endDate, batchSize + 1); + } else { + obsList = srcObservationService.getList(collection, startDate, endDate, batchSize + 1); + } + entityList = wrap(obsList); + } + + // avoid re-processing the last successful one stored in + // HarvestState (normal case because query: >= startDate) + if (!entityList.isEmpty() && !skipped) { + ListIterator> iter = entityList.listIterator(); + Observation curBatchLeader = iter.next().entity.observation; + if (curBatchLeader != null) { + log.debug("currentBatch: " + curBatchLeader.getURI() + " " + format(curBatchLeader.getMaxLastModified())); + log.debug("harvestState: " + format(state.curID) + " " + format(state.curLastModified)); + if (curBatchLeader.getID().equals(state.curID) && curBatchLeader.getMaxLastModified().equals(state.curLastModified)) { + iter.remove(); + expectedNum--; + } + } + } + + ret.found = entityList.size(); + log.debug("found: " + entityList.size()); + + timeQuery = System.currentTimeMillis() - t; + t = System.currentTimeMillis(); + + ListIterator> iter1 = entityList.listIterator(); + // int i = 0; + while (iter1.hasNext()) { + SkippedWrapperURI ow = iter1.next(); + Observation o = null; + if (ow.entity != null) { + o = ow.entity.observation; + } + HarvestSkipURI hs = ow.skip; + iter1.remove(); // allow garbage collection during loop + + String skipMsg = null; + + if (destObservationDAO.getTransactionManager().isOpen()) { + throw new RuntimeException("BUG: found open transaction at start of next observation"); + } + log.debug("starting transaction"); + destObservationDAO.getTransactionManager().startTransaction(); + boolean ok = false; + log.debug("skipped=" + skipped + + " o=" + o + + " ow.entity=" + ow.entity + + " ow.entity.error=" + (ow.entity != null || ow.entity.error != null)); + try { + // o could be null in skip mode cleanup + if (o != null) { + String treeSize = computeTreeSize(o); + log.info("put: " + o.getClass().getSimpleName() + " " + o.getURI() + " " + format(o.getMaxLastModified()) + " " + treeSize); + } else if (hs != null) { + log.info("put (retry error): " + hs.getName() + " " + hs.getSkipID() + " " + format(hs.getLastModified())); + } else { + log.info("put (error): Observation " + ow.entity.observationState.getURI() + " " + format(ow.entity.observationState.maxLastModified)); + } + + if (skipped) { + startDate = hs.getTryAfter(); + } + + if (o != null) { + if (state != null) { + state.curLastModified = o.getMaxLastModified(); + state.curID = o.getID(); + } + + // try to avoid DataIntegrityViolationException due + // to missed deletion followed by insert with new + // UUID + ObservationState cur = destObservationDAO.getState(o.getURI()); + if (cur != null && !cur.getID().equals(o.getID())) { + // missed harvesting a deletion: trust source + log.info("delete: " + o.getClass().getSimpleName() + " " + cur.getURI() + " " + cur.getID() + + " (ObservationURI conflict avoided)"); + destObservationDAO.delete(cur.getID()); + } + + // verify we retrieved the observation intact + if (!nochecksum) { + validateChecksum(o); + } + + // extended content verification + CaomValidator.validate(o); + + for (Plane p : o.getPlanes()) { + for (Artifact a : p.getArtifacts()) { + CaomWCSValidator.validate(a); + } + } + + // optionally augment the observation + if (computePlaneMetadata) { + log.debug("computePlaneMetadata: " + o.getURI()); + for (Plane p : o.getPlanes()) { + ComputeUtil.computeTransientState(o, p); + } + } + + if (acGenerator != null) { + log.debug("generateReadAccessTuples: " + o.getURI()); + acGenerator.generateTuples(o); + } + + // everything is OK + destObservationDAO.put(o); + + if (!skipped) { + harvestStateDAO.put(state); + } + + if (hs == null) { + // normal harvest mode: try to cleanup skip + // records immediately + hs = harvestSkipDAO.get(source, cname, o.getURI().getURI()); + } + + if (hs != null) { + log.info("delete: " + hs + " " + format(hs.getLastModified())); + harvestSkipDAO.delete(hs); + } + } else if (skipped) { + // o == null + if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) { + // observation not obtainable from source == missed deletion + ObservationURI uri = new ObservationURI(hs.getSkipID()); + log.info("delete: " + uri); + destObservationDAO.delete(uri); + log.info("delete: " + hs + " " + format(hs.getLastModified())); + harvestSkipDAO.delete(hs); + } else { + // defer to the main catch for error handling + throw new HarvestReadException(ow.entity.error); + } + } else if (ow.entity.error != null) { + // o == null when harvesting from service: try to make progress on failures + if (state != null && ow.entity.observationState.maxLastModified != null) { + state.curLastModified = ow.entity.observationState.maxLastModified; + state.curID = null; // unknown + } + if (srcObservationDAO != null || ow.entity.error instanceof ResourceNotFoundException) { + ObservationURI uri = new ObservationURI(hs.getSkipID()); + log.info("delete: " + uri); + destObservationDAO.delete(uri); + if (hs != null) { + log.info("delete: " + hs + " " + format(hs.getLastModified())); + harvestSkipDAO.delete(hs); + } + } else { + throw new HarvestReadException(ow.entity.error); + } + } + + log.debug("committing transaction"); + destObservationDAO.getTransactionManager().commitTransaction(); + log.debug("commit: OK"); + + ok = true; + ret.ingested++; + } catch (Throwable oops) { + log.debug("exception during harvest", oops); + skipMsg = null; + String str = oops.toString(); + if (oops instanceof HarvestReadException) { + // unwrap HarvestReadException from above + oops = oops.getCause(); + // unwrap intervening RuntimeException(s) + while (oops.getCause() != null && oops instanceof RuntimeException) { + oops = oops.getCause(); + } + log.error("HARVEST PROBLEM - failed to read observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); + ret.handled++; + } else if (oops instanceof IllegalStateException) { + if (oops.getMessage().contains("XML failed schema validation")) { + log.error("CONTENT PROBLEM - XML failed schema validation: " + oops.getMessage()); + ret.handled++; + } else if (oops.getMessage().contains("failed to read")) { + log.error("CONTENT PROBLEM - " + oops.getMessage(), oops.getCause()); + ret.handled++; + } + } else if (oops instanceof IllegalArgumentException) { + log.error("CONTENT PROBLEM - invalid observation: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); + if (oops.getCause() != null) { + log.error("cause: " + oops.getCause()); + } + ret.handled++; + } else if (oops instanceof MismatchedChecksumException) { + log.error("CONTENT PROBLEM - mismatching checksums: " + ow.entity.observationState.getURI()); + ret.handled++; + } else if (str.contains("duplicate key value violates unique constraint \"i_observationuri\"")) { + log.error("CONTENT PROBLEM - duplicate observation: " + ow.entity.observationState.getURI()); + ret.handled++; + } else if (oops instanceof TransientException) { + log.error("CONTENT PROBLEM - " + oops.getMessage()); + ret.handled++; + } else if (oops instanceof Error) { + log.error("FATAL - probably installation or environment", oops); + ret.abort = true; + } else if (oops instanceof NullPointerException) { + log.error("BUG", oops); + ret.abort = true; + } else if (oops instanceof BadSqlGrammarException) { + log.error("BUG", oops); + BadSqlGrammarException bad = (BadSqlGrammarException) oops; + SQLException sex1 = bad.getSQLException(); + if (sex1 != null) { + log.error("CAUSE", sex1); + SQLException sex2 = sex1.getNextException(); + log.error("NEXT CAUSE", sex2); + } + ret.abort = true; + } else if (oops instanceof DataAccessResourceFailureException) { + log.error("SEVERE PROBLEM - probably out of space in database", oops); + ret.abort = true; + } else if (str.contains("spherepoly_from_array")) { + log.error("CONTENT PROBLEM - failed to persist: " + ow.entity.observationState.getURI() + " - " + oops.getMessage()); + oops = new IllegalArgumentException("invalid polygon (spoly): " + oops.getMessage(), oops); + ret.handled++; + } else { + log.error("unexpected exception", oops); + } + // message for HarvestSkipURI record + skipMsg = oops.getMessage(); + } finally { + if (!ok) { + destObservationDAO.getTransactionManager().rollbackTransaction(); + log.debug("rollback: OK"); + timeTransaction += System.currentTimeMillis() - t; + + try { + log.debug("starting HarvestSkipURI transaction"); + HarvestSkipURI skip = null; + if (o != null) { + skip = harvestSkipDAO.get(source, cname, o.getURI().getURI()); + } else { + skip = harvestSkipDAO.get(source, cname, ow.entity.observationState.getURI().getURI()); + } + Date tryAfter = ow.entity.observationState.maxLastModified; + if (o != null) { + tryAfter = o.getMaxLastModified(); + } + if (skip == null) { + if (o != null) { + skip = new HarvestSkipURI(source, cname, o.getURI().getURI(), tryAfter, skipMsg); + } else { + skip = new HarvestSkipURI(source, cname, ow.entity.observationState.getURI().getURI(), tryAfter, skipMsg); + } + } else { + skip.errorMessage = skipMsg; + skip.setTryAfter(tryAfter); + } + + log.debug("starting HarvestSkipURI transaction"); + destObservationDAO.getTransactionManager().startTransaction(); + + if (!skipped) { + // track the harvest state progress + harvestStateDAO.put(state); + } + + // track the fail + log.info("put: " + skip); + harvestSkipDAO.put(skip); + + if (!src.getIdentifier(collection).equals(dest.getIdentifier(collection))) { + // delete previous version of observation (if any) + log.info("delete: " + ow.entity.observationState.getURI()); + destObservationDAO.delete(ow.entity.observationState.getURI()); + } + + log.debug("committing HarvestSkipURI transaction"); + destObservationDAO.getTransactionManager().commitTransaction(); + log.debug("commit HarvestSkipURI: OK"); + } catch (Throwable oops) { + log.warn("failed to insert HarvestSkipURI", oops); + destObservationDAO.getTransactionManager().rollbackTransaction(); + log.debug("rollback HarvestSkipURI: OK"); + ret.abort = true; + } + ret.failed++; + } + } + if (ret.abort) { + return ret; + } + } + if (ret.found < expectedNum) { + ret.done = true; + } + } catch (InterruptedException | ExecutionException e) { + log.error("SEVERE PROBLEM - ThreadPool harvesting Observations failed: " + e.getMessage()); + ret.abort = true; + } finally { + timeTransaction = System.currentTimeMillis() - t; + log.debug("time to get HarvestState: " + timeState + "ms"); + log.debug("time to run ObservationListQuery: " + timeQuery + "ms"); + log.debug("time to run transactions: " + timeTransaction + "ms"); + } + return ret; + } + + private static class HarvestReadException extends Exception { + + public HarvestReadException(Exception cause) { + super(cause); + } + } + + private void validateChecksum(Observation o) throws MismatchedChecksumException { + if (o.getAccMetaChecksum() == null) { + return; // no check + } + try { + URI calculatedChecksum = o.computeAccMetaChecksum(MessageDigest.getInstance("MD5")); + + log.debug("validateChecksum: " + o.getURI() + " -- " + o.getAccMetaChecksum() + " vs " + calculatedChecksum); + if (!calculatedChecksum.equals(o.getAccMetaChecksum())) { + throw new MismatchedChecksumException("Observation.accMetaChecksum mismatch"); + } + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("MD5 digest algorithm not available"); + } + } + + private String computeTreeSize(Observation o) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + int numA = 0; + int numP = 0; + for (Plane p : o.getPlanes()) { + numA += p.getArtifacts().size(); + for (Artifact a : p.getArtifacts()) { + numP += a.getParts().size(); + } + } + sb.append(o.getPlanes().size()).append(","); + sb.append(numA).append(","); + sb.append(numP).append("]"); + return sb.toString(); + } + + private void detectLoop(List> entityList) { + if (entityList.size() < 2) { + return; + } + SkippedWrapperURI start = entityList.get(0); + SkippedWrapperURI end = entityList.get(entityList.size() - 1); + if (skipped) { + if (start.skip.getLastModified().equals(end.skip.getLastModified())) { + throw new RuntimeException("detected infinite harvesting loop: " + HarvestSkipURI.class.getSimpleName() + " at " + + format(start.skip.getLastModified())); + } + return; + } + Date d1 = null; + Date d2 = null; + if (start.entity.observation != null) { + d1 = start.entity.observation.getMaxLastModified(); + } else if (start.entity.observationState != null) { + d1 = start.entity.observationState.maxLastModified; + } + if (end.entity.observation != null) { + d2 = end.entity.observation.getMaxLastModified(); + } else if (end.entity.observationState != null) { + d2 = end.entity.observationState.maxLastModified; + } + if (d1 == null || d2 == null) { + throw new RuntimeException("detectLoop: FAIL -- cannotr comapre timestamps " + d1 + " vs " + d2); + } + + if (d1.equals(d2)) { + throw new RuntimeException("detected infinite harvesting loop: " + entityClass.getSimpleName() + " at " + + format(start.entity.observation.getMaxLastModified())); + } + } + + private List> wrap(List obsList) { + List> ret = new ArrayList>(obsList.size()); + for (ObservationResponse wr : obsList) { + ret.add(new SkippedWrapperURI(wr, null)); + } + return ret; + } + + private List> getSkipped(Date start) throws ExecutionException, InterruptedException { + log.info("harvest window (skip): " + format(start) + " [" + batchSize + "]" + " source = " + source + " cname = " + cname); + List skip = harvestSkipDAO.get(source, cname, start, null, batchSize); + + List> ret = new ArrayList>(skip.size()); + + if (srcObservationDAO != null) { + for (HarvestSkipURI hs : skip) { + log.debug("getSkipped: " + hs.getSkipID()); + ObservationURI ouri = new ObservationURI(hs.getSkipID()); + ObservationResponse wr = srcObservationDAO.getObservationResponse(ouri); + log.debug("response: " + wr); + ret.add(new SkippedWrapperURI(wr, hs)); + } + } else { + // srcObservationService + List listUris = new ArrayList<>(); + for (HarvestSkipURI hs : skip) { + log.debug("getSkipped: " + hs.getSkipID()); + listUris.add(new ObservationURI(hs.getSkipID())); + } + List listResponses = srcObservationService.get(listUris); + log.warn("getSkipped: " + skip.size() + " HarvestSkipURI -> " + listResponses.size() + " ObservationResponse"); + + for (ObservationResponse o : listResponses) { + HarvestSkipURI hs = findSkip(o.observationState.getURI().getURI(), skip); + o.observationState.maxLastModified = hs.getTryAfter(); // overwrite bogus value from RepoClient + ret.add(new SkippedWrapperURI<>(o, hs)); + } + } + // re-order so we process in tryAfter order + Collections.sort(ret, new SkipWrapperComparator()); + return ret; + } + + private HarvestSkipURI findSkip(URI uri, List skip) { + for (HarvestSkipURI hs : skip) { + if (hs.getSkipID().equals(uri)) { + return hs; + } + } + return null; + } + + private static class SkipWrapperComparator implements Comparator { + @Override + public int compare(SkippedWrapperURI o1, SkippedWrapperURI o2) { + return o1.skip.getTryAfter().compareTo(o2.skip.getTryAfter()); + } + } + + /* + * private List> getSkippedState(Date + * start) { log.info("harvest window (skip): " + format(start) + " [" + + * batchSize + "]" + " source = " + source + " cname = " + cname); + * List skip = harvestSkip.get(source, cname, start); + * + * List> ret = new + * ArrayList>(skip.size()); for + * (HarvestSkipURI hs : skip) { ObservationState o = null; + * log.debug("getSkipped: " + hs.getSkipID()); log.debug("start: " + start); + * + * ObservationResponse wr = srcObservationService.get(src.getCollection(), + * hs.getSkipID(), start); + * + * if (wr != null && wr.getObservationState() != null) o = + * wr.getObservationState(); + * + * if (o != null) { ret.add(new SkippedWrapperURI(o, hs)); + * } } return ret; } + */ + @Override + protected void initHarvestState(DataSource ds, Class c) { + super.initHarvestState(ds, c); + this.harvestSkipDAO = new HarvestSkipURIDAO(ds, dest.getDatabase(), dest.getSchema()); + } + +} diff --git a/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/SkippedWrapperURI.java b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/SkippedWrapperURI.java new file mode 100644 index 00000000..ee077159 --- /dev/null +++ b/caom2-meta-sync/src/main/java/org/opencadc/caom2/metasync/SkippedWrapperURI.java @@ -0,0 +1,90 @@ +/* + ************************************************************************ + ******************* CANADIAN ASTRONOMY DATA CENTRE ******************* + ************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES ************** + * + * (c) 2023. (c) 2023. + * Government of Canada Gouvernement du Canada + * National Research Council Conseil national de recherches + * Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6 + * All rights reserved Tous droits réservés + * + * NRC disclaims any warranties, Le CNRC dénie toute garantie + * expressed, implied, or énoncée, implicite ou légale, + * statutory, of any kind with de quelque nature que ce + * respect to the software, soit, concernant le logiciel, + * including without limitation y compris sans restriction + * any warranty of merchantability toute garantie de valeur + * or fitness for a particular marchande ou de pertinence + * purpose. NRC shall not be pour un usage particulier. + * liable in any event for any Le CNRC ne pourra en aucun cas + * damages, whether direct or être tenu responsable de tout + * indirect, special or general, dommage, direct ou indirect, + * consequential or incidental, particulier ou général, + * arising from the use of the accessoire ou fortuit, résultant + * software. Neither the name de l'utilisation du logiciel. Ni + * of the National Research le nom du Conseil National de + * Council of Canada nor the Recherches du Canada ni les noms + * names of its contributors may de ses participants ne peuvent + * be used to endorse or promote être utilisés pour approuver ou + * products derived from this promouvoir les produits dérivés + * software without specific prior de ce logiciel sans autorisation + * written permission. préalable et particulière + * par écrit. + * + * This file is part of the Ce fichier fait partie du projet + * OpenCADC project. OpenCADC. + * + * OpenCADC is free software: OpenCADC est un logiciel libre ; + * you can redistribute it and/or vous pouvez le redistribuer ou le + * modify it under the terms of modifier suivant les termes de + * the GNU Affero General Public la “GNU Affero General Public + * License as published by the License” telle que publiée + * Free Software Foundation, par la Free Software Foundation + * either version 3 of the : soit la version 3 de cette + * License, or (at your option) licence, soit (à votre gré) + * any later version. toute version ultérieure. + * + * OpenCADC is distributed in the OpenCADC est distribué + * hope that it will be useful, dans l’espoir qu’il vous + * but WITHOUT ANY WARRANTY; sera utile, mais SANS AUCUNE + * without even the implied GARANTIE : sans même la garantie + * warranty of MERCHANTABILITY implicite de COMMERCIALISABILITÉ + * or FITNESS FOR A PARTICULAR ni d’ADÉQUATION À UN OBJECTIF + * PURPOSE. See the GNU Affero PARTICULIER. Consultez la Licence + * General Public License for Générale Publique GNU Affero + * more details. pour plus de détails. + * + * You should have received Vous devriez avoir reçu une + * a copy of the GNU Affero copie de la Licence Générale + * General Public License along Publique GNU Affero avec + * with OpenCADC. If not, see OpenCADC ; si ce n’est + * . pas le cas, consultez : + * . + * + * $Revision: 5 $ + * + ************************************************************************ + */ + +package org.opencadc.caom2.metasync; + +import ca.nrc.cadc.caom2.harvester.state.HarvestSkipURI; +import org.apache.log4j.Logger; + +/** + * + * @author pdowler + */ +public class SkippedWrapperURI { + + private static final Logger log = Logger.getLogger(SkippedWrapperURI.class); + + public T entity; + public HarvestSkipURI skip; + + public SkippedWrapperURI(T entity, HarvestSkipURI skip) { + this.entity = entity; + this.skip = skip; + } +}