Skip to content

Commit

Permalink
Merge pull request #943 from rax-maas/unstatic-datastaxio
Browse files Browse the repository at this point in the history
de-static-ify DatastaxIO
  • Loading branch information
zzantozz committed Sep 22, 2022
2 parents 968d59a + 18c8034 commit 8b1946d
Showing 1 changed file with 73 additions and 20 deletions.
Expand Up @@ -35,23 +35,60 @@
* to use datastax driver can use this class to get a {@link com.datastax.driver.core.Session}
* object for the read/write statements to Cassandra.
*/
public class DatastaxIO {
public class DatastaxIO implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DatastaxIO.class);
private static final IOConfig ioconfig = IOConfig.singleton();
private static final String metricsPrefix = "datastax.";

private static Cluster cluster;
private static Session session;
private final IOConfig ioconfig = IOConfig.singleton();
private final Cluster cluster;
private final Session session;

static {
connect();
monitorConnection();
public enum Keyspace { NO_KEYSPACE, DATA_KEYSPACE }
private static DatastaxIO INSTANCE;

public static DatastaxIO getInstance() {
// Have to lazy init this, or it might try to connect to the DATA keyspace before a test server has come up and
// been initialized.
if (INSTANCE == null) {
INSTANCE = new DatastaxIO(Keyspace.DATA_KEYSPACE, true);
}
return INSTANCE;
}

/**
* Oh Java, how I wish you had tuples.
*/
private static class ClusterAndSession {
final Cluster cluster;
final Session session;

public ClusterAndSession(Cluster cluster, Session session) {
this.cluster = cluster;
this.session = session;
}
}

private DatastaxIO() {
public DatastaxIO(DatastaxIO.Keyspace keyspace, boolean monitor) {
ClusterAndSession result;
if (keyspace == Keyspace.NO_KEYSPACE) {
result = connect(null);
} else if (keyspace == Keyspace.DATA_KEYSPACE) {
result = connect(CassandraModel.QUOTED_KEYSPACE);
} else {
throw new IllegalArgumentException("No such keyspace setting: " + keyspace);
}
cluster = result.cluster;
session = result.session;
if ( LOG.isDebugEnabled() ) {
logDebugConnectionInfo();
}
if (monitor) {
monitorConnection();
}
}

private static void connect() {
private ClusterAndSession connect(String keyspace) {

Set<InetSocketAddress> dbHosts = ioconfig.getUniqueBinaryTransportHostsAsInetSocketAddresses();

int readTimeoutMaxRetries = ioconfig.getReadTimeoutMaxRetries();
Expand All @@ -60,7 +97,7 @@ private static void connect() {

CodecRegistry codecRegistry = new CodecRegistry();

cluster = Cluster.builder()
Cluster cluster = Cluster.builder()
.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(ioconfig.getDatacenterName()).build(), false))
.withPoolingOptions(getPoolingOptions())
.withRetryPolicy(new RetryNTimes(readTimeoutMaxRetries, writeTimeoutMaxRetries, unavailableMaxRetries))
Expand All @@ -75,20 +112,22 @@ private static void connect() {

cluster.register(queryLogger);

if ( LOG.isDebugEnabled() ) {
logDebugConnectionInfo();
}

Session session;
try {
session = cluster.connect( CassandraModel.QUOTED_KEYSPACE );
if (keyspace == null) {
session = cluster.connect();
} else {
session = cluster.connect(keyspace);
}
}
catch (NoHostAvailableException e){
// TODO: figure out how to bubble this up
throw new RuntimeException(e);
}
return new ClusterAndSession(cluster, session);
}

private static void logDebugConnectionInfo() {
private void logDebugConnectionInfo() {
if ( cluster == null ) {
throw new IllegalStateException("cluster is not initialized");
}
Expand All @@ -100,14 +139,14 @@ private static void logDebugConnectionInfo() {
}
}

private static SocketOptions getSocketOptions() {
private SocketOptions getSocketOptions() {
final SocketOptions socketOptions = new SocketOptions();
socketOptions.setConnectTimeoutMillis(ioconfig.getRequestTimeout())
.setReadTimeoutMillis(ioconfig.getRequestTimeout());
return socketOptions;
}

private static PoolingOptions getPoolingOptions(){
private PoolingOptions getPoolingOptions(){

final PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
Expand All @@ -117,7 +156,7 @@ private static PoolingOptions getPoolingOptions(){
return poolingOptions;
}

private static void monitorConnection() {
private void monitorConnection() {
final MetricRegistry bfMetricsRegistry = com.rackspacecloud.blueflood.utils.Metrics.getRegistry();
cluster.getMetrics().getRegistry().addListener(new com.codahale.metrics.MetricRegistryListener() {
@Override
Expand Down Expand Up @@ -172,12 +211,26 @@ public void onTimerRemoved(String name) {
});
}

@Override
public void close() { //Not to be used with time-series data.
session.close();
cluster.close();
}

public static Session getSession() {
/**
* Gets the session for this instance. It's named oddly because of the existing {@link #getSession()}. Convert all
* callers of that to use DatastaxIO.getInstance().getInstanceSession(), and then that method can go away, and this
* can be renamed.
*/
public Session getInstanceSession() {
return session;
}

/**
* Direct access to the singleton's Session because this is the way it was originally written. Prefer to {@link
* #getInstance()} and get the session from there with {@link #getInstanceSession()}.
*/
public static Session getSession() {
return getInstance().session;
}
}

0 comments on commit 8b1946d

Please sign in to comment.