Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

caom2persistence: new select skeleton implementation #299

Merged
merged 5 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion caom2persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ sourceCompatibility = 1.8

group = 'org.opencadc'

version = '2.4.17'
version = '2.4.18'

description = 'OpenCADC CAOM database library'
def git_url = 'https://github.com/opencadc/caom2db'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
******************* CANADIAN ASTRONOMY DATA CENTRE *******************
************** CENTRE CANADIEN DE DONNÉES ASTRONOMIQUES **************
*
* (c) 2023. (c) 2023.
* (c) 2024. (c) 2024.
* Government of Canada Gouvernement du Canada
* National Research Council Conseil national de recherches
* Ottawa, Canada, K1A 0R6 Ottawa, Canada, K1A 0R6
Expand Down Expand Up @@ -89,7 +89,6 @@
import ca.nrc.cadc.caom2.util.CaomValidator;
import ca.nrc.cadc.date.DateUtil;
import ca.nrc.cadc.net.PreconditionFailedException;
import ca.nrc.cadc.net.TransientException;
import java.net.URI;
import java.text.DateFormat;
import java.util.ArrayList;
Expand Down Expand Up @@ -454,25 +453,37 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa
// NOTE: this is by ID which means to update the caller must get(uri) then put(o)
// and if they do not get(uri) they can get a duplicate observation error
// if they violate unique keys
String sql = gen.getSelectSQL(obs.getID(), SQLGenerator.MAX_DEPTH, true);
log.debug("PUT: " + sql);
//String skelSQL = gen.getSelectSQL(obs.getID(), 1, true);
//log.debug("PUT: " + skelSQL);

long tt = System.currentTimeMillis();
final ObservationSkeleton dirtyRead = getSkelImpl(obs.getID(), jdbc, false); // obs only
final long dirtyReadTime = System.currentTimeMillis() - tt;

final ObservationSkeleton dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());

log.debug("starting transaction");
tt = System.currentTimeMillis();
getTransactionManager().startTransaction();
txnOpen = true;

final long txnStartTime = System.currentTimeMillis() - tt;

// obtain row lock on observation update
ObservationSkeleton cur = null;
long lockTime = 0L;
long selectSkelTime = 0L;
if (dirtyRead != null) {

String lock = gen.getUpdateLockSQL(obs.getID());
log.debug("LOCK SQL: " + lock);
tt = System.currentTimeMillis();
jdbc.update(lock);
lockTime = System.currentTimeMillis() - tt;

// req-acquire current state after obtaining lock
cur = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());

tt = System.currentTimeMillis();
//skelSQL = gen.getSelectSQL(obs.getID(), SQLGenerator.MAX_DEPTH, true);
cur = getSkelImpl(obs.getID(), jdbc, true);
selectSkelTime = System.currentTimeMillis() - tt;

// check conditional update
if (expectedMetaChecksum != null) {
if (cur == null) {
Expand All @@ -490,6 +501,7 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa
updateEntity(obs, cur, now);

// delete obsolete children
tt = System.currentTimeMillis();
List<Pair<Plane>> pairs = new ArrayList<Pair<Plane>>();
if (cur != null) {
// delete the skeletons that are not in obs.getPlanes()
Expand All @@ -510,49 +522,129 @@ public void put(Observation obs, URI expectedMetaChecksum) throws PreconditionFa
pairs.add(new Pair<Plane>(null, p));
}
}

final long deletePlanesTime = System.currentTimeMillis() - tt;

tt = System.currentTimeMillis();
super.put(cur, obs, null, jdbc);

final long putObservationTime = System.currentTimeMillis() - tt;

// insert/update children
tt = System.currentTimeMillis();
LinkedList<CaomEntity> parents = new LinkedList<CaomEntity>();
parents.push(obs);
for (Pair<Plane> p : pairs) {
planeDAO.put(p.cur, p.val, parents, jdbc);
}

final long putPlanesTime = System.currentTimeMillis() - tt;

log.debug("committing transaction");
tt = System.currentTimeMillis();
getTransactionManager().commitTransaction();
log.debug("commit: OK");
final long txnCommitTime = System.currentTimeMillis() - tt;
txnOpen = false;

log.debug("transaction start=" + txnStartTime
+ " dirtyRead=" + dirtyReadTime
+ " lock=" + lockTime
+ " select=" + selectSkelTime
+ " del-planes=" + deletePlanesTime
+ " put-obs=" + putObservationTime
+ " put-planes=" + putPlanesTime
+ " commit=" + txnCommitTime);
} catch (DataIntegrityViolationException e) {
log.debug("failed to insert " + obs + ": ", e);
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("failed to rollback", tex);
}
txnOpen = false;
throw e;
} catch (PreconditionFailedException ex) {
log.debug("failed to update " + obs + ": ", ex);
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("failed to rollback", tex);
}
txnOpen = false;
throw ex;
} catch (Exception e) {
log.error("failed to insert " + obs + ": ", e);
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("failed to rollback", tex);
}
txnOpen = false;
throw e;
} finally {
if (txnOpen) {
log.error("BUG - open transaction in finally");
getTransactionManager().rollbackTransaction();
log.error("rollback: OK");
try {
getTransactionManager().rollbackTransaction();
log.debug("rollback: OK");
} catch (Exception tex) {
log.error("BADNESS: failed to rollback in finally", tex);
}
}
long dt = System.currentTimeMillis() - t;
log.debug("PUT: " + obs.getURI() + " " + dt + "ms");
}
}

private ObservationSkeleton getSkelImpl(UUID id, JdbcTemplate jdbc, boolean complete) {
return getSkelNav(id, jdbc, complete);
}

private ObservationSkeleton getSkelJoin(UUID id, JdbcTemplate jdbc) {
String skelSQL = gen.getSelectSQL(id, SQLGenerator.MAX_DEPTH, true);
log.debug("getSkel: " + skelSQL);
ObservationSkeleton ret = (ObservationSkeleton) jdbc.query(skelSQL, new ObservationSkeletonExtractor());
return ret;
}

private ObservationSkeleton getSkelNav(UUID id, JdbcTemplate jdbc, boolean complete) {
ObservationSkeletonExtractor ose = new ObservationSkeletonExtractor();
String skelSQL = gen.getSelectSQL(ObservationSkeleton.class, id, true); // by PK
log.debug("getSkel: " + skelSQL);
List<ObservationSkeleton> skels = jdbc.query(skelSQL, ose.observationMapper);
if (skels == null || skels.isEmpty()) {
return null;
}
ObservationSkeleton ret = skels.get(0);
if (!complete) {
return ret;
}
String planeSkelSQL = gen.getSelectSQL(PlaneSkeleton.class, id, false); // by FK
log.debug("getSkel: " + planeSkelSQL);
List<PlaneSkeleton> planes = jdbc.query(planeSkelSQL, ose.planeMapper);
for (PlaneSkeleton ps : planes) {
ret.planes.add(ps);
String artifactSkelSQL = gen.getSelectSQL(ArtifactSkeleton.class, ps.id, false); // by FK
log.debug("getSkel: " + artifactSkelSQL);
List<ArtifactSkeleton> artifacts = jdbc.query(artifactSkelSQL, ose.artifactMapper);
for (ArtifactSkeleton as : artifacts) {
ps.artifacts.add(as);
String partSkelSQL = gen.getSelectSQL(PartSkeleton.class, as.id, false); // by FK
log.debug("getSkel: " + partSkelSQL);
List<PartSkeleton> parts = jdbc.query(partSkelSQL, ose.partMapper);
for (PartSkeleton pas : parts) {
as.parts.add(pas);
String chunkSkelSQL = gen.getSelectSQL(ChunkSkeleton.class, pas.id, false); // by FK
log.debug("getSkel: " + chunkSkelSQL);
List<ChunkSkeleton> chunks = jdbc.query(chunkSkelSQL, ose.chunkMapper);
pas.chunks.addAll(chunks);
}
}
}
return ret;
}

/**
* Delete a stored observation by URI.
*
Expand Down Expand Up @@ -592,7 +684,12 @@ private void deleteImpl(UUID id, ObservationURI uri) {
sql = gen.getSelectSQL(uri, SQLGenerator.MAX_DEPTH, true);
}
log.debug("DELETE: " + sql);
final ObservationSkeleton dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());
ObservationSkeleton dirtyRead;
if (id != null) {
dirtyRead = getSkelImpl(id, jdbc, false); // obs only
} else {
dirtyRead = (ObservationSkeleton) jdbc.query(sql, new ObservationSkeletonExtractor());
}

log.debug("starting transaction");
getTransactionManager().startTransaction();
Expand All @@ -606,7 +703,11 @@ private void deleteImpl(UUID id, ObservationURI uri) {
jdbc.update(lock);

// req-acquire current state after obtaining lock
skel = (ObservationSkeleton) jdbc.query(sql, gen.getSkeletonExtractor(ObservationSkeleton.class));
if (id != null) {
skel = getSkelImpl(id, jdbc, true);
} else {
skel = (ObservationSkeleton) jdbc.query(sql, gen.getSkeletonExtractor(ObservationSkeleton.class));
}
}

if (skel != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import ca.nrc.cadc.caom2.persistence.skel.ObservationSkeleton;
import ca.nrc.cadc.caom2.persistence.skel.PartSkeleton;
import ca.nrc.cadc.caom2.persistence.skel.PlaneSkeleton;
import ca.nrc.cadc.caom2.persistence.skel.Skeleton;
import ca.nrc.cadc.date.DateUtil;
import java.net.URI;
import java.sql.ResultSet;
Expand All @@ -84,6 +85,7 @@
import org.apache.log4j.Logger;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowMapper;

/**
*
Expand All @@ -96,6 +98,7 @@ public class ObservationSkeletonExtractor implements ResultSetExtractor {
// lastModified,id,... for obs,plane,artifact,part,chunk
private final Calendar utcCalendar = Calendar.getInstance(DateUtil.UTC);

// extract from join
@Override
public Object extractData(ResultSet rs) throws SQLException, DataAccessException {
ObservationSkeleton ret = null;
Expand Down Expand Up @@ -132,6 +135,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// plane
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand All @@ -150,6 +154,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// artifact
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand All @@ -168,6 +173,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// part
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand All @@ -186,6 +192,7 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
}
if (ncol > col) {
// chunk
col++; // skip FK
d = Util.getDate(rs, col++, utcCalendar);
md = Util.getDate(rs, col++, utcCalendar);
cs = Util.getURI(rs, col++);
Expand Down Expand Up @@ -213,4 +220,56 @@ public Object extractData(ResultSet rs) throws SQLException, DataAccessException
return ret;
}

public void extract(Skeleton ret, ResultSet rs, int col) throws SQLException {
ret.lastModified = Util.getDate(rs, col++, utcCalendar);
ret.maxLastModified = Util.getDate(rs, col++, utcCalendar);
ret.metaChecksum = Util.getURI(rs, col++);
ret.accMetaChecksum = Util.getURI(rs, col++);
ret.id = Util.getUUID(rs, col++);
}

public final RowMapper<ObservationSkeleton> observationMapper = new RowMapper<ObservationSkeleton>() {
@Override
public ObservationSkeleton mapRow(ResultSet rs, int i) throws SQLException {
ObservationSkeleton ret = new ObservationSkeleton();
extract(ret, rs, 1);
return ret;
}
};

public final RowMapper<PlaneSkeleton> planeMapper = new RowMapper<PlaneSkeleton>() {
@Override
public PlaneSkeleton mapRow(ResultSet rs, int i) throws SQLException {
PlaneSkeleton ret = new PlaneSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};

public final RowMapper<ArtifactSkeleton> artifactMapper = new RowMapper<ArtifactSkeleton>() {
@Override
public ArtifactSkeleton mapRow(ResultSet rs, int i) throws SQLException {
ArtifactSkeleton ret = new ArtifactSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};

public final RowMapper<PartSkeleton> partMapper = new RowMapper<PartSkeleton>() {
@Override
public PartSkeleton mapRow(ResultSet rs, int i) throws SQLException {
PartSkeleton ret = new PartSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};

public final RowMapper<ChunkSkeleton> chunkMapper = new RowMapper<ChunkSkeleton>() {
@Override
public ChunkSkeleton mapRow(ResultSet rs, int i) throws SQLException {
ChunkSkeleton ret = new ChunkSkeleton();
extract(ret, rs, 2); // skip FK
return ret;
}
};
}
Loading
Loading