Skip to content

Commit

Permalink
Adding database locking functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
mcuthbert committed Jun 4, 2020
1 parent b2ce686 commit 54590fc
Show file tree
Hide file tree
Showing 15 changed files with 440 additions and 32 deletions.
Expand Up @@ -8,6 +8,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.sql.DataSource;

import org.apache.commons.dbcp.BasicDataSource;
import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
Expand Down Expand Up @@ -64,6 +66,14 @@ public DatabaseContext2(DatabaseLoginCredentials loginCredentials) {
}
}

/**
* Retrieves the data source associated with this database context.
*
* @return {@link DataSource}
*/
public DataSource getDataSource() {
return this.dataSource;
}

private OsmosisRuntimeException createUnknownDbTypeException() {
return new OsmosisRuntimeException("Unknown database type " + dbType + ".");
Expand Down
Expand Up @@ -9,6 +9,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.DeltaToDiffReader;
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.ChangeContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.lifecycle.ReleasableIterator;
Expand Down Expand Up @@ -109,13 +110,17 @@ protected void runImpl(DatabaseContext2 dbCtx) {
* Reads all data from the database and send it to the sink.
*/
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
dbCtx.executeWithinTransaction(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus arg0) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
} });

}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Expand Up @@ -4,11 +4,13 @@
import java.util.HashMap;
import java.util.Map;

import org.openstreetmap.osmosis.apidb.common.DatabaseContext2;
import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
import org.openstreetmap.osmosis.apidb.v0_6.impl.ActionChangeWriter;
import org.openstreetmap.osmosis.apidb.v0_6.impl.ChangeWriter;
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.ChangeContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.task.common.ChangeAction;
Expand All @@ -27,6 +29,8 @@ public class ApidbChangeWriter implements ChangeSink {
private final Map<ChangeAction, ActionChangeWriter> actionWriterMap;

private final SchemaVersionValidator schemaVersionValidator;
private final DatabaseContext2 dbCtx;
private final DatabaseLocker locker;

/**
* Creates a new instance.
Expand All @@ -39,12 +43,14 @@ public class ApidbChangeWriter implements ChangeSink {
public ApidbChangeWriter(DatabaseLoginCredentials loginCredentials, DatabasePreferences preferences,
boolean populateCurrentTables) {
changeWriter = new ChangeWriter(loginCredentials, populateCurrentTables);
actionWriterMap = new HashMap<ChangeAction, ActionChangeWriter>();
actionWriterMap = new HashMap<>();
actionWriterMap.put(ChangeAction.Create, new ActionChangeWriter(changeWriter, ChangeAction.Create));
actionWriterMap.put(ChangeAction.Modify, new ActionChangeWriter(changeWriter, ChangeAction.Modify));
actionWriterMap.put(ChangeAction.Delete, new ActionChangeWriter(changeWriter, ChangeAction.Delete));

schemaVersionValidator = new SchemaVersionValidator(loginCredentials, preferences);
dbCtx = new DatabaseContext2(loginCredentials);
locker = new DatabaseLocker(dbCtx.getDataSource(), true);
}

/**
Expand All @@ -59,6 +65,7 @@ public void initialize(Map<String, Object> metaData) {
*/
public void process(ChangeContainer change) {
ChangeAction action;
this.locker.lockDatabase(this.getClass().getSimpleName());

// Verify that the schema version is supported.
schemaVersionValidator.validateVersion(ApidbVersionConstants.SCHEMA_MIGRATIONS);
Expand All @@ -85,6 +92,8 @@ public void complete() {
* {@inheritDoc}
*/
public void close() {
this.locker.unlockDatabase();
changeWriter.close();
this.dbCtx.close();
}
}
Expand Up @@ -9,6 +9,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.BoundContainer;
import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.domain.v0_6.Bound;
Expand Down Expand Up @@ -90,13 +91,16 @@ protected void runImpl(DatabaseContext2 dbCtx) {
* Reads all data from the database and send it to the sink.
*/
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
dbCtx.executeWithinTransaction(new TransactionCallbackWithoutResult() {

@Override
protected void doInTransactionWithoutResult(TransactionStatus arg0) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
} });
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Expand Up @@ -10,6 +10,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.TimeDao;
import org.openstreetmap.osmosis.apidb.v0_6.impl.TransactionDao;
import org.openstreetmap.osmosis.apidb.v0_6.impl.TransactionManager;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.task.v0_6.ChangeSink;
Expand Down Expand Up @@ -95,8 +96,12 @@ protected void runImpl(DatabaseContext2 dbCtx) {
*/
@Override
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
Expand Up @@ -11,6 +11,7 @@
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.BoundContainer;
import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLocker;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.domain.v0_6.Bound;
Expand Down Expand Up @@ -95,13 +96,17 @@ protected void runImpl(DatabaseContext2 dbCtx) {
* Reads all data from the database and send it to the sink.
*/
public void run() {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials)) {
try (DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
DatabaseLocker locker = new DatabaseLocker(dbCtx.getDataSource(), false)) {
dbCtx.executeWithinTransaction(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus arg0) {
locker.lockDatabase(this.getClass().getSimpleName());
runImpl(dbCtx);
}
});
}
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}
2 changes: 2 additions & 0 deletions osmosis-core/build.gradle
Expand Up @@ -8,6 +8,8 @@ dependencies {
compile group: 'com.fasterxml.woodstox', name: 'woodstox-core', version: dependencyVersionWoodstoxCore
compile group: 'org.codehaus.woodstox', name: 'stax2-api', version: dependencyVersionWoodstoxStax2
compile group: 'org.apache.commons', name: 'commons-compress', version: dependencyVersionCommonsCompress
compile group: 'xerces', name: 'xercesImpl', version: dependencyVersionXerces
compile group: 'org.springframework', name: 'spring-jdbc', version: dependencyVersionSpring
}

/*
Expand Down
@@ -0,0 +1,142 @@
// This software is released into the Public Domain. See copying.txt for details.
package org.openstreetmap.osmosis.core.database;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Logger;

import javax.sql.DataSource;

/**
* A check against the database to see if it is locked.
*
* @author mcuthbert
*/
public class DatabaseLocker implements AutoCloseable {

private static Logger logger = Logger.getLogger(DatabaseLocker.class.getSimpleName());
private final DataSource source;
private final boolean writeLock;
private boolean enabled = true;
private int lockedIdentifier = -1;

/**
* Static function to fully unlock the database.
*
* @param source {@link DataSource} to execute the query
*/
public static void fullUnlockDatabase(final DataSource source) {
unlockDatabase(-1, source);
}

private static void unlockDatabase(final int identifier, final DataSource source) {
try (Connection connection = source.getConnection();
PreparedStatement statement = connection.prepareStatement("SELECT unlock_database(?)")) {
statement.setInt(1, identifier);
final ResultSet result = statement.executeQuery();
if (result.next()) {
final boolean unlocked = result.getBoolean(1);
if (unlocked) {
logger.info(String.format("Unlocked database using identifier %d.", identifier));
return;
}
}
throw new RuntimeException("Failed to unlock the database");
} catch (final SQLException e) {
throw new RuntimeException("Failed to unlock the database", e);
}
}

/**
* Default Constructor.
*
* @param source The DataSource for the connection
* @param writeLock Whether the lock that is being requested is a write lock
*/
public DatabaseLocker(final DataSource source, final boolean writeLock) {
this.source = source;
this.writeLock = writeLock;
// check to see if the function exists in the database and if it doesn't then throw a
// warning on the log and don't try any locking
try (Connection connection = source.getConnection();
Statement statement = connection.createStatement()) {
statement.executeQuery("SELECT 'lock_database'::regproc, 'unlock_database'::regproc");
} catch (final Exception e) {
logger.warning("Locking functions do not exist in database. Disabling locking.");
this.enabled = false;
}
}

/**
* Helper function for primary lock database function that simply sets the source to empty string.
*
* @param process The process that is locking the database
*/
public void lockDatabase(final String process) {
this.lockDatabase(process, "");
}

/**
* Will attempt to lock the database.
*
* @param process
* The process that will lock the database. This would be something like "Extracts" or "Replication".
* @param description
* The source of the process. This is basically a description for the process, like
* "Pipeline Extraction for Build X".
*/
public void lockDatabase(final String process, final String description) {
if (this.enabled) {
String lockName = "read";
if (this.writeLock) {
lockName = "write";
}
try (Connection connection = source.getConnection();
PreparedStatement statement = connection.prepareStatement("SELECT lock_database(?, ?, ?, ?)")) {
statement.setString(1, process);
statement.setString(2, description);
statement.setString(3, InetAddress.getLocalHost().getHostName());
statement.setBoolean(4, this.writeLock);
final ResultSet result = statement.executeQuery();
if (result.next()) {
this.lockedIdentifier = result.getInt(1);
logger.info(String.format("Obtained %s lock to database for process: %s "
+ "from source: '%s', with lockedID: %d",
lockName, process, description, this.lockedIdentifier));
} else {
throw new RuntimeException("Failed to retrieve lock for the database.");
}
} catch (final SQLException | UnknownHostException e) {
throw new RuntimeException("Failed to lock the database.", e);
}
}
}

/**
* Will attempt to unlock the database with the given locked identifier.
*/
public void unlockDatabase() {
if (this.enabled && this.lockedIdentifier > 0) {
unlockDatabase(this.lockedIdentifier, this.source);
}
}

/**
* Does a full unlock of the database. So no matter what is locking it, this will unlock it.
*/
public void fullUnlockDatabase() {
if (this.enabled) {
fullUnlockDatabase(this.source);
}
}

@Override
public void close() throws Exception {
this.unlockDatabase();
}
}
Expand Up @@ -66,6 +66,14 @@ public DatabaseContext(DatabaseLoginCredentials loginCredentials) {
}
}

/**
* Returns the DataSource associated with this database context.
*
* @return {@link DataSource}
*/
public DataSource getDataSource() {
return this.dataSource;
}

/**
* Begins a new database transaction. This is not required if
Expand Down

0 comments on commit 54590fc

Please sign in to comment.