Skip to content

Commit

Permalink
Merge pull request #299 from pdowler/main
Browse files Browse the repository at this point in the history
caom2persistence: new select skeleton implementation
  • Loading branch information
pdowler committed Jul 12, 2024
2 parents 2799d70 + 82d4d2f commit 7793052
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 107 deletions.
2 changes: 1 addition & 1 deletion caom2persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sourceCompatibility = 1.8

group = 'org.opencadc'

version = '2.4.17'
version = '2.4.18'

description = 'OpenCADC CAOM database library'
def git_url = 'https://github.com/opencadc/caom2db'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
******************* CANADIAN ASTRONOMY DATA CENTRE *******************
************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES **************
*
* (c) 2023. (c) 2023.
* (c) 2024. (c) 2024.
* Government of Canada Gouvernement du Canada
* National Research Council Conseil national de recherches
* Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6
Expand Down Expand Up @@ -89,7 +89,6 @@
import ca.nrc.cadc.caom2.util.CaomValidator;
import ca.nrc.cadc.date.DateUtil;
import ca.nrc.cadc.net.PreconditionFailedException;
import ca.nrc.cadc.net.TransientException;
import java.net.URI;
import java.text.DateFormat;
import java.util.ArrayList;
Expand Down Expand Up @@ -454,25 +453,37 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa
// NOTE: this is by ID which means to update the caller must get(uri) then put(o)
// and if they do not get(uri) they can get a duplicate observation error
// if they violate unique keys
String sql = gen.getSelectSQL(obs.getID(), SQLGenerator.MAX_DEPTH, true);
log.debug("PUT: " + sql);
//String skelSQL = gen.getSelectSQL(obs.getID(), 1, true);
//log.debug("PUT: " + skelSQL);

long tt = System.currentTimeMillis();
final ObservationSkeleton dirtyRead = getSkelImpl(obs.getID(), jdbc, false); // obs only
final long dirtyReadTime = System.currentTimeMillis() - tt;

final ObservationSkeleton dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());

log.debug("starting transaction");
tt = System.currentTimeMillis();
getTransactionManager().startTransaction();
txnOpen = true;

final long txnStartTime = System.currentTimeMillis() - tt;

// obtain row lock on observation update
ObservationSkeleton cur = null;
long lockTime = 0L;
long selectSkelTime = 0L;
if (dirtyRead != null) {

String lock = gen.getUpdateLockSQL(obs.getID());
log.debug("LOCK SQL: " + lock);
tt = System.currentTimeMillis();
jdbc.update(lock);
lockTime = System.currentTimeMillis() - tt;

// req-acquire current state after obtaining lock
cur = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());

tt = System.currentTimeMillis();
//skelSQL = gen.getSelectSQL(obs.getID(), SQLGenerator.MAX_DEPTH, true);
cur = getSkelImpl(obs.getID(), jdbc, true);
selectSkelTime = System.currentTimeMillis() - tt;

// check conditional update
if (expectedMetaChecksum != null) {
if (cur == null) {
Expand All @@ -490,6 +501,7 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa
updateEntity(obs, cur, now);

// delete obsolete children
tt = System.currentTimeMillis();
List<Pair<Plane>> pairs = new ArrayList<Pair<Plane>>();
if (cur != null) {
// delete the skeletons that are not in obs.getPlanes()
Expand All @@ -510,49 +522,129 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa
pairs.add(new Pair<Plane>(null, p));
}
}

final long deletePlanesTime = System.currentTimeMillis() - tt;

tt = System.currentTimeMillis();
super.put(cur, obs, null, jdbc);

final long putObservationTime = System.currentTimeMillis() - tt;

// insert/update children
tt = System.currentTimeMillis();
LinkedList<CaomEntity> parents = new LinkedList<CaomEntity>();
parents.push(obs);
for (Pair<Plane> p : pairs) {
planeDAO.put(p.cur, p.val, parents, jdbc);
}

final long putPlanesTime = System.currentTimeMillis() - tt;

log.debug("committing transaction");
tt = System.currentTimeMillis();
getTransactionManager().commitTransaction();
log.debug("commit: OK");
final long txnCommitTime = System.currentTimeMillis() - tt;
txnOpen = false;

log.debug("transaction start=" + txnStartTime
+ " dirtyRead=" + dirtyReadTime
+ " lock=" + lockTime
+ " select=" + selectSkelTime
+ " del-planes=" + deletePlanesTime
+ " put-obs=" + putObservationTime
+ " put-planes=" + putPlanesTime
+ " commit=" + txnCommitTime);
} catch (DataIntegrityViolationException e) {
log.debug("failed to insert " + obs + ": ", e);
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("failed to rollback", tex);
}
txnOpen = false;
throw e;
} catch (PreconditionFailedException ex) {
log.debug("failed to update " + obs + ": ", ex);
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("failed to rollback", tex);
}
txnOpen = false;
throw ex;
} catch (Exception e) {
log.error("failed to insert " + obs + ": ", e);
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("failed to rollback", tex);
}
txnOpen = false;
throw e;
} finally {
if (txnOpen) {
log.error("BUG - open transaction in finally");
getTransactionManager().rollbackTransaction();
log.error("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("BADNESS: failed to rollback in finally", tex);
}
}
long dt = System.currentTimeMillis() - t;
log.debug("PUT: " + obs.getURI() + " " + dt + "ms");
}
}

private ObservationSkeleton getSkelImpl(UUID id, JdbcTemplate jdbc, boolean complete) {
return getSkelNav(id, jdbc, complete);
}

private ObservationSkeleton getSkelJoin(UUID id, JdbcTemplate jdbc) {
String skelSQL = gen.getSelectSQL(id, SQLGenerator.MAX_DEPTH, true);
log.debug("getSkel: " + skelSQL);
ObservationSkeleton ret = (ObservationSkeleton) jdbc.query(skelSQL, new ObservationSkeletonExtractor());
return ret;
}

private ObservationSkeleton getSkelNav(UUID id, JdbcTemplate jdbc, boolean complete) {
ObservationSkeletonExtractor ose = new ObservationSkeletonExtractor();
String skelSQL = gen.getSelectSQL(ObservationSkeleton.class, id, true); // by PK
log.debug("getSkel: " + skelSQL);
List<ObservationSkeleton> skels = jdbc.query(skelSQL, ose.observationMapper);
if (skels == null || skels.isEmpty()) {
return null;
}
ObservationSkeleton ret = skels.get(0);
if (!complete) {
return ret;
}
String planeSkelSQL = gen.getSelectSQL(PlaneSkeleton.class, id, false); // by FK
log.debug("getSkel: " + planeSkelSQL);
List<PlaneSkeleton> planes = jdbc.query(planeSkelSQL, ose.planeMapper);
for (PlaneSkeleton ps : planes) {
ret.planes.add(ps);
String artifactSkelSQL = gen.getSelectSQL(ArtifactSkeleton.class, ps.id, false); // by FK
log.debug("getSkel: " + artifactSkelSQL);
List<ArtifactSkeleton> artifacts = jdbc.query(artifactSkelSQL, ose.artifactMapper);
for (ArtifactSkeleton as : artifacts) {
ps.artifacts.add(as);
String partSkelSQL = gen.getSelectSQL(PartSkeleton.class, as.id, false); // by FK
log.debug("getSkel: " + partSkelSQL);
List<PartSkeleton> parts = jdbc.query(partSkelSQL, ose.partMapper);
for (PartSkeleton pas : parts) {
as.parts.add(pas);
String chunkSkelSQL = gen.getSelectSQL(ChunkSkeleton.class, pas.id, false); // by FK
log.debug("getSkel: " + chunkSkelSQL);
List<ChunkSkeleton> chunks = jdbc.query(chunkSkelSQL, ose.chunkMapper);
pas.chunks.addAll(chunks);
}
}
}
return ret;
}

/**
* Delete a stored observation by URI.
*
Expand Down Expand Up @@ -592,7 +684,12 @@ private void deleteImpl(UUID id, ObservationURI uri) {
sql = gen.getSelectSQL(uri, SQLGenerator.MAX_DEPTH, true);
}
log.debug("DELETE: " + sql);
final ObservationSkeleton dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());
ObservationSkeleton dirtyRead;
if (id != null) {
dirtyRead = getSkelImpl(id, jdbc, false); // obs only
} else {
dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());
}

log.debug("starting transaction");
getTransactionManager().startTransaction();
Expand All @@ -606,7 +703,11 @@ private void deleteImpl(UUID id, ObservationURI uri) {
jdbc.update(lock);

// req-acquire current state after obtaining lock
skel = (ObservationSkeleton) jdbc.query(sql, gen.getSkeletonExtractor(ObservationSkeleton.class));
if (id != null) {
skel = getSkelImpl(id, jdbc, true);
} else {
skel = (ObservationSkeleton) jdbc.query(sql, gen.getSkeletonExtractor(ObservationSkeleton.class));
}
}

if (skel != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import ca.nrc.cadc.caom2.persistence.skel.ObservationSkeleton;
import ca.nrc.cadc.caom2.persistence.skel.PartSkeleton;
import ca.nrc.cadc.caom2.persistence.skel.PlaneSkeleton;
import ca.nrc.cadc.caom2.persistence.skel.Skeleton;
import ca.nrc.cadc.date.DateUtil;
import java.net.URI;
import java.sql.ResultSet;
Expand All @@ -84,6 +85,7 @@
import org.apache.log4j.Logger;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowMapper;

/**
*
Expand All @@ -96,6 +98,7 @@ public class ObservationSkeletonExtractor implements ResultSetExtractor {
// lastModified,id,... for obs,plane,artifact,part,chunk
private final Calendar utcCalendar = Calendar.getInstance(DateUtil.UTC);

// extract from join
@Override
public Object extractData(ResultSet rs) throws SQLException, DataAccessException {
ObservationSkeleton ret = null;
Expand Down Expand Up @@ -132,6 +135,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// plane
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand All @@ -150,6 +154,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// artifact
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand All @@ -168,6 +173,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// part
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand All @@ -186,6 +192,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// chunk
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand Down Expand Up @@ -213,4 +220,56 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
return ret;
}

public void extract(Skeleton ret, ResultSet rs, int col) throws SQLException {
ret.lastModified = Util.getDate(rs, col++, utcCalendar);
ret.maxLastModified = Util.getDate(rs, col++, utcCalendar);
ret.metaChecksum = Util.getURI(rs, col++);
ret.accMetaChecksum = Util.getURI(rs, col++);
ret.id = Util.getUUID(rs, col++);
}

public final RowMapper<ObservationSkeleton> observationMapper = new RowMapper<ObservationSkeleton>() {
@Override
public ObservationSkeleton mapRow(ResultSet rs, int i) throws SQLException {
ObservationSkeleton ret = new ObservationSkeleton();
extract(ret, rs, 1);
return ret;
}
};

public final RowMapper<PlaneSkeleton> planeMapper = new RowMapper<PlaneSkeleton>() {
@Override
public PlaneSkeleton mapRow(ResultSet rs, int i) throws SQLException {
PlaneSkeleton ret = new PlaneSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};

public final RowMapper<ArtifactSkeleton> artifactMapper = new RowMapper<ArtifactSkeleton>() {
@Override
public ArtifactSkeleton mapRow(ResultSet rs, int i) throws SQLException {
ArtifactSkeleton ret = new ArtifactSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};

public final RowMapper<PartSkeleton> partMapper = new RowMapper<PartSkeleton>() {
@Override
public PartSkeleton mapRow(ResultSet rs, int i) throws SQLException {
PartSkeleton ret = new PartSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};

public final RowMapper<ChunkSkeleton> chunkMapper = new RowMapper<ChunkSkeleton>() {
@Override
public ChunkSkeleton mapRow(ResultSet rs, int i) throws SQLException {
ChunkSkeleton ret = new ChunkSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};
}
Loading

0 comments on commit 7793052

Please sign in to comment.