Permalink
Browse files

applied fix for sparse issue 169

  • Loading branch information...
1 parent 8ed4775 commit 5320324dee88a56ca706ab6b98c408df27a66e1b Jonathan Cook committed Apr 20, 2012
@@ -368,8 +368,9 @@ public Connection getConnection() throws SQLException {
return connection;
}
-
-
+ public void resetConnection() {
+ connectionManager.clean();
+ }
public String getValidationSql() {
if ( sqlConfig != null ) {
@@ -65,6 +65,16 @@ public void set(Connection connection) {
threadMap.put(t, ch);
}
+ public void clean() {
+ Thread t = Thread.currentThread();
+ threadMap.get(t);
+ ConnectionHolder c = threadMap.get(t);
+ if (c != null) {
+ c.close();
+ threadMap.remove(t);
+ }
+ }
+
private void cleanThreadMap() {
if ( closing ) {
return;
@@ -122,7 +122,7 @@
"ac:_:parenthash");
private static final Map<String, String> COLUMN_NAME_MAPPING = ImmutableMap.of("_:parenthash","parenthash");
- private BaseJDBCStorageClientPool jcbcStorageClientConnection;
+ private BaseJDBCStorageClientPool jdbcStorageClientConnection;
private Map<String, Object> sqlConfig;
private boolean active;
private StreamedContentHelper streamedContentHelper;
@@ -155,7 +155,7 @@ public JDBCStorageClient(BaseJDBCStorageClientPool jdbcStorageClientConnectionPo
if ( indexColumns == null ) {
throw new StorageClientException("Null Index Colums, cant create Client");
}
- this.jcbcStorageClientConnection = jdbcStorageClientConnectionPool;
+ this.jdbcStorageClientConnection = jdbcStorageClientConnectionPool;
streamedContentHelper = new FileStreamContentHelper(this, properties);
this.sqlConfig = sqlConfig;
@@ -215,15 +215,30 @@ public JDBCStorageClient(BaseJDBCStorageClientPool jdbcStorageClientConnectionPo
Map<String, Object> result = Maps.newHashMap();
PreparedStatement selectStringRow = null;
try {
- selectStringRow = getStatement(keySpace, columnFamily, SQL_BLOCK_SELECT_ROW, rid, null);
- inc("A");
- selectStringRow.clearWarnings();
- selectStringRow.clearParameters();
- selectStringRow.setString(1, rid);
- body = selectStringRow.executeQuery();
- inc("B");
- if (body.next()) {
- Types.loadFromStream(rid, result, body.getBinaryStream(1), columnFamily);
+ boolean hasRetried = false;
+ for (;;) {
+ try {
+ selectStringRow = getStatement(keySpace, columnFamily, SQL_BLOCK_SELECT_ROW, rid, null);
+ inc("A");
+ selectStringRow.clearWarnings();
+ selectStringRow.clearParameters();
+ selectStringRow.setString(1, rid);
+ long t1 = System.currentTimeMillis();
+ body = selectStringRow.executeQuery();
+ inc("B");
+ if (body.next()) {
+ Types.loadFromStream(rid, result, body.getBinaryStream(1), columnFamily);
+ }
+ break;
+ } catch (SQLException ex) {
+ if (!hasRetried) {
+ resetConnection(null);
+ hasRetried = true;
+ } else {
+ throw ex;
+ }
+
+ }
}
} catch (SQLException e) {
LOGGER.warn("Failed to perform get operation on " + keySpace + ":" + columnFamily
@@ -421,6 +436,7 @@ public void insert(String keySpace, String columnFamily, String key, Map<String,
endBlock(autoCommit);
} catch (SQLException e) {
abandonBlock(autoCommit);
+ resetConnection(statementCache);
LOGGER.warn("Failed to perform insert/update operation on {}:{}:{} ", new Object[] {
keySpace, columnFamily, key }, e);
throw new StorageClientException(e.getMessage(), e);
@@ -452,7 +468,7 @@ String getSql(String keySpace, String columnFamily, String name) {
private void abandonBlock(boolean autoCommit) {
if (autoCommit) {
try {
- Connection connection = jcbcStorageClientConnection.getConnection();
+ Connection connection = jdbcStorageClientConnection.getConnection();
connection.rollback();
connection.setAutoCommit(autoCommit);
if ( storageClientListener != null ) {
@@ -466,7 +482,7 @@ private void abandonBlock(boolean autoCommit) {
private void endBlock(boolean autoCommit) throws SQLException {
if (autoCommit) {
- Connection connection = jcbcStorageClientConnection.getConnection();
+ Connection connection = jdbcStorageClientConnection.getConnection();
connection.commit();
connection.setAutoCommit(autoCommit);
if ( storageClientListener != null ) {
@@ -476,7 +492,7 @@ private void endBlock(boolean autoCommit) throws SQLException {
}
private boolean startBlock() throws SQLException {
- Connection connection = jcbcStorageClientConnection.getConnection();
+ Connection connection = jdbcStorageClientConnection.getConnection();
boolean autoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
if ( storageClientListener != null ) {
@@ -518,6 +534,7 @@ public void remove(String keySpace, String columnFamily, String key)
endBlock(autoCommit);
} catch (SQLException e) {
abandonBlock(autoCommit);
+ resetConnection(null);
LOGGER.warn("Failed to perform delete operation on {}:{}:{} ", new Object[] { keySpace,
columnFamily, key }, e);
throw new StorageClientException(e.getMessage(), e);
@@ -529,7 +546,7 @@ public void remove(String keySpace, String columnFamily, String key)
public void close() {
passivate();
- jcbcStorageClientConnection.releaseClient(this);
+ jdbcStorageClientConnection.releaseClient(this);
}
public void destroy() {
@@ -597,7 +614,7 @@ PreparedStatement getStatement(String keySpace, String columnFamily,
return statementCache.get(k);
} else {
- PreparedStatement pst = jcbcStorageClientConnection.getConnection()
+ PreparedStatement pst = jdbcStorageClientConnection.getConnection()
.prepareStatement((String) sqlConfig.get(k));
if (statementCache != null) {
inc("cachedStatement");
@@ -616,12 +633,12 @@ PreparedStatement getStatement(String sql, Map<String, PreparedStatement> stat
if ( statementCache.containsKey(sql)) {
pst = statementCache.get(sql);
} else {
- pst = jcbcStorageClientConnection.getConnection().prepareStatement(sql);
+ pst = jdbcStorageClientConnection.getConnection().prepareStatement(sql);
inc("cachedStatement");
statementCache.put(sql, pst);
}
} else {
- pst = jcbcStorageClientConnection.getConnection().prepareStatement(sql);
+ pst = jdbcStorageClientConnection.getConnection().prepareStatement(sql);
}
return pst;
}
@@ -659,18 +676,16 @@ public boolean validate() throws StorageClientException {
checkActive(false);
Statement statement = null;
try {
- statement = jcbcStorageClientConnection.getConnection().createStatement();
- inc("vaidate");
-
- statement.execute(getSql(SQL_VALIDATE));
+ // just get a connection, that will be enough to validate.
+ // this is not a perfect solution. A better solution would be to handle the failiure in the client code on update.
+ statement = jdbcStorageClientConnection.getConnection().createStatement();
return true;
} catch (SQLException e) {
LOGGER.warn("Failed to validate connection ", e);
return false;
} finally {
try {
statement.close();
- dec("vaidate");
} catch (Throwable e) {
LOGGER.debug("Failed to close statement in validate ", e);
}
@@ -697,7 +712,7 @@ public void checkSchema(String[] clientConfigLocations) throws ClientPoolExcepti
Statement statement = null;
try {
- statement = jcbcStorageClientConnection.getConnection().createStatement();
+ statement = jdbcStorageClientConnection.getConnection().createStatement();
try {
statement.execute(getSql(SQL_CHECKSCHEMA));
inc("schema");
@@ -814,7 +829,7 @@ public boolean hasBody(Map<String, Object> content, String streamId) {
protected Connection getConnection() throws StorageClientException, SQLException {
checkActive();
- return jcbcStorageClientConnection.getConnection();
+ return jdbcStorageClientConnection.getConnection();
}
public DisposableIterator<Map<String, Object>> listChildren(String keySpace, String columnFamily, String key, DirectCacheAccess cachingManager) throws StorageClientException {
@@ -850,7 +865,7 @@ protected Connection getConnection() throws StorageClientException, SQLException
ResultSet trs = null;
try {
LOGGER.debug("Preparing {} ", sql);
- tpst = jcbcStorageClientConnection.getConnection().prepareStatement(sql);
+ tpst = jdbcStorageClientConnection.getConnection().prepareStatement(sql);
inc("iterator");
tpst.clearParameters();
@@ -933,6 +948,7 @@ public void close() {
}
});
} catch (SQLException e) {
+ resetConnection(null);
LOGGER.error(e.getMessage(), e);
throw new StorageClientException(e.getMessage() + " SQL Statement was " + sql,
e);
@@ -1004,6 +1020,13 @@ private void close(PreparedStatement pst, String name) {
}
}
+ void resetConnection(Map<String, PreparedStatement> statementCache) {
+ if ( statementCache != null ) {
+ closeStatementCache(statementCache);
+ }
+ jdbcStorageClientConnection.resetConnection();
+ }
+
public void closeStatementCache(Map<String, PreparedStatement> statementCache) {
for (PreparedStatement pst : statementCache.values()) {
if (pst != null) {
@@ -1032,7 +1055,7 @@ public void closeStatementCache(Map<String, PreparedStatement> statementCache) {
PreparedStatement selectColumnsPst = null;
PreparedStatement insertColumnsPst = null;
ResultSet rs = null;
- Connection connection = jcbcStorageClientConnection.getConnection();
+ Connection connection = jdbcStorageClientConnection.getConnection();
Statement statement = null;
try {
selectColumnsPst = connection.prepareStatement(selectColumns);
@@ -1194,7 +1217,7 @@ public long allCount(String keySpace, String columnFamily) throws StorageClientE
ResultSet trs = null;
try {
LOGGER.debug("Preparing {} ", sql);
- tpst = jcbcStorageClientConnection.getConnection().prepareStatement(sql);
+ tpst = jdbcStorageClientConnection.getConnection().prepareStatement(sql);
inc("iterator");
tpst.clearParameters();
@@ -1213,6 +1236,7 @@ public long allCount(String keySpace, String columnFamily) throws StorageClientE
}
return 0;
} catch (SQLException e) {
+ resetConnection(null);
LOGGER.error(e.getMessage(), e);
throw new StorageClientException(e.getMessage() + " SQL Statement was " + sql,
e);
@@ -1242,7 +1266,7 @@ public void setStorageClientListener(StorageClientListener storageClientListener
}
public Map<String, CacheHolder> getQueryCache() {
- StorageCacheManager storageCacheManager = this.jcbcStorageClientConnection.getStorageCacheManager();
+ StorageCacheManager storageCacheManager = this.jdbcStorageClientConnection.getStorageCacheManager();
if ( storageCacheManager != null ) {
return storageCacheManager.getCache("sparseQueryCache");
}
@@ -286,6 +286,7 @@ protected boolean internalHasNext() {
} catch (SQLException e) {
LOGGER.error(e.getMessage(), e);
close();
+ client.resetConnection(null);
nextValue = null;
return false;
} catch (StorageClientException e) {
@@ -501,6 +501,7 @@ private boolean isColumnArray(String keySpace, String columnFamily, String k) {
return client.registerDisposable(cacheResults(keySpace, columnFamily, properties, new PreemptiveCachedMapIterator(client, keySpace, columnFamily, rs, pst, rawResults, cachingManager)));
} catch (SQLException e) {
LOGGER.error(e.getMessage(), e);
+ client.resetConnection(null);
throw new StorageClientException(e.getMessage() + " SQL Statement was " + sqlStatement,
e);
} finally {

0 comments on commit 5320324

Please sign in to comment.