forked from voldemort/voldemort
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SystemStore.java
130 lines (117 loc) · 4.99 KB
/
SystemStore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package voldemort.client;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.Store;
import voldemort.store.system.SystemStoreConstants;
import voldemort.versioning.InconsistentDataException;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
public class SystemStore<K, V> {
private final Logger logger = Logger.getLogger(SystemStore.class);
private final SocketStoreClientFactory systemStoreFactory;
private final String storeName;
private volatile Store<K, V, Object> sysStore;
public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID) {
this(storeName, bootstrapUrls, clientZoneID, null, null);
}
public SystemStore(String storeName,
String[] bootstrapUrls,
int clientZoneID,
String clusterXml,
FailureDetector fd) {
String prefix = storeName.substring(0, SystemStoreConstants.NAME_PREFIX.length());
if(!SystemStoreConstants.NAME_PREFIX.equals(prefix))
throw new VoldemortException("Illegal system store : " + storeName);
ClientConfig config = new ClientConfig();
config.setSelectors(1)
.setBootstrapUrls(bootstrapUrls)
.setMaxConnectionsPerNode(2)
.setConnectionTimeout(1500, TimeUnit.MILLISECONDS)
.setSocketTimeout(5000, TimeUnit.MILLISECONDS)
.setRoutingTimeout(5000, TimeUnit.MILLISECONDS)
.setEnableJmx(false)
.setEnablePipelineRoutedStore(true)
.setClientZoneId(clientZoneID);
this.systemStoreFactory = new SystemStoreClientFactory(config);
this.storeName = storeName;
try {
this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
} catch(Exception e) {
logger.debug("Error while creating a system store client for store : " + this.storeName);
}
}
public Version putSysStore(K key, V value) {
Version version = null;
try {
logger.debug("Invoking Put for key : " + key + " on store name : " + this.storeName);
Versioned<V> versioned = getSysStore(key);
if(versioned == null)
versioned = Versioned.value(value, new VectorClock());
else
versioned.setObject(value);
this.sysStore.put(key, versioned, null);
version = versioned.getVersion();
} catch(Exception e) {
if(logger.isDebugEnabled()) {
logger.debug("Exception caught during putSysStore: " + e);
}
}
return version;
}
public Version putSysStore(K key, Versioned<V> value) {
Version version = null;
try {
logger.debug("Invoking Put for key : " + key + " on store name : " + this.storeName);
this.sysStore.put(key, value, null);
version = value.getVersion();
} catch(Exception e) {
if(logger.isDebugEnabled()) {
logger.debug("Exception caught during putSysStore: " + e);
}
}
return version;
}
public Versioned<V> getSysStore(K key) {
logger.debug("Invoking Get for key : " + key + " on store name : " + this.storeName);
Versioned<V> versioned = null;
try {
List<Versioned<V>> items = this.sysStore.get(key, null);
if(items.size() == 1)
versioned = items.get(0);
else if(items.size() > 1)
throw new InconsistentDataException("Unresolved versions returned from get(" + key
+ ") = " + items, items);
if(versioned != null)
logger.debug("Value for key : " + key + " = " + versioned.getValue()
+ " on store name : " + this.storeName);
else
logger.debug("Got null value");
} catch(Exception e) {
if(logger.isDebugEnabled()) {
logger.debug("Exception caught during getSysStore: " + e);
}
}
return versioned;
}
public V getValueSysStore(K key) {
V value = null;
try {
logger.debug("Invoking Get for key : " + key + " on store name : " + this.storeName);
Versioned<V> versioned = getSysStore(key);
if(versioned != null) {
logger.debug("Value for key : " + key + " = " + versioned.getValue()
+ " on store name : " + this.storeName);
value = versioned.getValue();
}
} catch(Exception e) {
if(logger.isDebugEnabled()) {
logger.debug("Exception caught during getSysStore: " + e);
}
}
return value;
}
}