Skip to content

Commit

Permalink
Refactor TTL handling in diskstorage
Browse files Browse the repository at this point in the history
This commit is a wash in terms of simplification: 169 lines removed,
but still 165 added.  Might not be worth it.

It's also only barely tested (ran titan-cassandra but not titan-hbase
tests).

For #971
  • Loading branch information
dalaro committed Apr 21, 2015
1 parent fd118c2 commit b63852a
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 169 deletions.
Expand Up @@ -320,7 +320,7 @@ public void close() {
}

@Override
public synchronized AstyanaxKeyColumnValueStore openDatabase(String name) throws BackendException {
public synchronized AstyanaxKeyColumnValueStore openDatabase(String name, Configuration options) throws BackendException {
if (openStores.containsKey(name)) return openStores.get(name);
else {
ensureColumnFamilyExists(name);
Expand Down
Expand Up @@ -112,7 +112,7 @@ public void close() {
}

@Override
public synchronized KeyColumnValueStore openDatabase(String name) throws BackendException {
public synchronized KeyColumnValueStore openDatabase(String name, Configuration options) throws BackendException {
if (openStores.containsKey(name))
return openStores.get(name);

Expand Down
Expand Up @@ -320,7 +320,7 @@ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, St
}

@Override // TODO: *BIG FAT WARNING* 'synchronized is always *bad*, change openStores to use ConcurrentLinkedHashMap
public synchronized CassandraThriftKeyColumnValueStore openDatabase(final String name) throws BackendException {
public synchronized CassandraThriftKeyColumnValueStore openDatabase(final String name, final Configuration options) throws BackendException {
if (openStores.containsKey(name))
return openStores.get(name);

Expand Down

This file was deleted.

Expand Up @@ -52,7 +52,7 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {
}

@Override
public KeyColumnValueStore openDatabase(String name) throws BackendException {
public KeyColumnValueStore openDatabase(String name, Configuration options) throws BackendException {
return manager.openDatabase(name);
}

Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;

import java.util.Map;

Expand All @@ -15,7 +16,6 @@
*/
public interface KeyColumnValueStoreManager extends StoreManager {


/**
* Opens an ordered database by the given name. If the database does not exist, it is
* created. If it has already been opened, the existing handle is returned.
Expand All @@ -25,8 +25,21 @@ public interface KeyColumnValueStoreManager extends StoreManager {
* @throws com.thinkaurelius.titan.diskstorage.BackendException
*
*/
public KeyColumnValueStore openDatabase(String name) throws BackendException;
default KeyColumnValueStore openDatabase(String name) throws BackendException {
return openDatabase(name, Configuration.EMPTY);
}

/**
* Opens an ordered database by the given name. If the database does not exist, it is
* created. If it has already been opened, the existing handle is returned.
*
* @param name Name of database
* @param options Configuration data specific to this store
* @return Database Handle
* @throws com.thinkaurelius.titan.diskstorage.BackendException
*
*/
KeyColumnValueStore openDatabase(String name, Configuration options) throws BackendException;

/**
* Executes multiple mutations at once. For each store (identified by a string name) there is a map of (key,mutation) pairs
Expand All @@ -38,9 +51,6 @@ public interface KeyColumnValueStoreManager extends StoreManager {
* @param txh
* @throws com.thinkaurelius.titan.diskstorage.BackendException
*/
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException;



void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException;

}
Expand Up @@ -79,7 +79,7 @@ public StoreFeatures getFeatures() {
}

@Override
public KeyColumnValueStore openDatabase(final String name) throws BackendException {
public KeyColumnValueStore openDatabase(final String name, Configuration options) throws BackendException {
if (!stores.containsKey(name)) {
stores.putIfAbsent(name, new InMemoryKeyColumnValueStore(name));
}
Expand Down
Expand Up @@ -6,6 +6,7 @@
import com.thinkaurelius.titan.diskstorage.Entry;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;

import java.util.HashMap;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void clearStorage() throws BackendException {
}

@Override
public synchronized OrderedKeyValueStoreAdapter openDatabase(String name)
public synchronized OrderedKeyValueStoreAdapter openDatabase(String name, Configuration options)
throws BackendException {
if (!stores.containsKey(name) || stores.get(name).isClosed()) {
OrderedKeyValueStoreAdapter store = wrapKeyValueStore(manager.openDatabase(name), keyLengths);
Expand All @@ -72,6 +73,11 @@ public synchronized OrderedKeyValueStoreAdapter openDatabase(String name)
return stores.get(name);
}

@Override
public OrderedKeyValueStoreAdapter openDatabase(String name) throws BackendException {
return openDatabase(name, Configuration.EMPTY);
}

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
Map<String, KVMutation> converted = new HashMap<String, KVMutation>(mutations.size());
Expand Down
Expand Up @@ -23,7 +23,7 @@ public TTLKCVS(KeyColumnValueStore store, int ttl) {

@Override
public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
TTLKVCSManager.applyTTL(additions,ttl);
TTLKCVSManager.applyTTL(additions, ttl);
store.mutate(key, additions, deletions, unwrapTx(txh));
}

Expand Down
@@ -0,0 +1,89 @@
package com.thinkaurelius.titan.diskstorage.keycolumnvalue.ttl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.Entry;
import com.thinkaurelius.titan.diskstorage.EntryMetaData;
import com.thinkaurelius.titan.diskstorage.MetaAnnotatable;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVSManagerProxy;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;

import java.util.Collection;
import java.util.Map;

import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORE_TTL_SECONDS;

/**
* @author Matthias Broecheler (me@matthiasb.com)
*/
public class TTLKCVSManager extends KCVSManagerProxy {

private final StoreFeatures features;
private final Map<String, Integer> ttlEnabledStores = Maps.newConcurrentMap();

public TTLKCVSManager(KeyColumnValueStoreManager manager) {
super(manager);
Preconditions.checkArgument(supportsStoreTTL(manager),
"Wrapped store must support cell or store level TTL: %s", manager);
Preconditions.checkArgument(manager.getFeatures().hasCellTTL());
this.features = new StandardStoreFeatures.Builder(manager.getFeatures()).storeTTL(true).build();
}

public static boolean supportsStoreTTL(KeyColumnValueStoreManager manager) {
return supportsStoreTTL(manager.getFeatures());
}

public static boolean supportsStoreTTL(StoreFeatures features) {
return features.hasCellTTL() || features.hasStoreTTL();
}

@Override
public StoreFeatures getFeatures() {
return features;
}

@Override
public KeyColumnValueStore openDatabase(String name, Configuration options) throws BackendException {
KeyColumnValueStore store = manager.openDatabase(name);
int storeTTL = -1;
if (options.has(STORE_TTL_SECONDS)) {
storeTTL = options.get(STORE_TTL_SECONDS);
}
Preconditions.checkArgument(storeTTL>0,"TTL must be positive: %s", storeTTL);
ttlEnabledStores.put(name, storeTTL);
return new TTLKCVS(store, storeTTL);
}

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
if (!manager.getFeatures().hasStoreTTL()) {
assert manager.getFeatures().hasCellTTL();
for (Map.Entry<String,Map<StaticBuffer, KCVMutation>> sentry : mutations.entrySet()) {
Integer ttl = ttlEnabledStores.get(sentry.getKey());
if (null != ttl && 0 < ttl) {
for (KCVMutation mut : sentry.getValue().values()) {
if (mut.hasAdditions()) applyTTL(mut.getAdditions(), ttl);
}
}
}
}
manager.mutateMany(mutations,txh);
}

public static void applyTTL(Collection<Entry> additions, int ttl) {
for (Entry entry : additions) {
assert entry instanceof MetaAnnotatable;
((MetaAnnotatable)entry).setMetaData(EntryMetaData.TTL,ttl);
}
}


}

This file was deleted.

0 comments on commit b63852a

Please sign in to comment.