Skip to content
This repository
Browse code

Better Entropy Detection + New JMX hook in SchedulerService

  • Loading branch information...
commit 5adf557129fbd1cce3b3f3078ea7719548f7e6aa 1 parent c3f448c
Roshan Sumbaly rsumbaly authored afeinberg committed
4 src/java/voldemort/client/protocol/admin/AdminClient.java
@@ -68,6 +68,7 @@
68 68 import voldemort.store.slop.Slop;
69 69 import voldemort.store.slop.Slop.Operation;
70 70 import voldemort.store.socket.SocketDestination;
  71 +import voldemort.store.views.ViewStorageConfiguration;
71 72 import voldemort.utils.ByteArray;
72 73 import voldemort.utils.ByteUtils;
73 74 import voldemort.utils.NetworkClassLoader;
@@ -117,7 +118,8 @@
117 118 private final AdminClientConfig adminClientConfig;
118 119
119 120 public final static List<String> restoreStoreEngineBlackList = Arrays.asList(MysqlStorageConfiguration.TYPE_NAME,
120   - ReadOnlyStorageConfiguration.TYPE_NAME);
  121 + ReadOnlyStorageConfiguration.TYPE_NAME,
  122 + ViewStorageConfiguration.TYPE_NAME);
121 123
122 124 private Cluster currentCluster;
123 125
9 src/java/voldemort/server/scheduler/SchedulerService.java
@@ -17,6 +17,7 @@
17 17 package voldemort.server.scheduler;
18 18
19 19 import java.util.Date;
  20 +import java.util.List;
20 21 import java.util.concurrent.ConcurrentHashMap;
21 22 import java.util.concurrent.ScheduledFuture;
22 23 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -27,6 +28,7 @@
27 28
28 29 import org.apache.log4j.Logger;
29 30
  31 +import voldemort.annotations.jmx.JmxGetter;
30 32 import voldemort.annotations.jmx.JmxManaged;
31 33 import voldemort.annotations.jmx.JmxOperation;
32 34 import voldemort.server.AbstractService;
@@ -34,6 +36,8 @@
34 36 import voldemort.server.VoldemortService;
35 37 import voldemort.utils.Time;
36 38
  39 +import com.google.common.collect.Lists;
  40 +
37 41 /**
38 42 * The voldemort scheduler
39 43 *
@@ -125,6 +129,11 @@ public void enable(String id) {
125 129 }
126 130 }
127 131
  132 + @JmxGetter(name = "getScheduledJobs", description = "Returns names of jobs in the scheduler")
  133 + public List<String> getScheduledJobs() {
  134 + return Lists.newArrayList(scheduledJobResults.keySet());
  135 + }
  136 +
128 137 public void scheduleNow(Runnable runnable) {
129 138 scheduler.execute(runnable);
130 139 }
222 src/java/voldemort/utils/EntropyDetection.java
... ... @@ -1,25 +1,24 @@
1 1 package voldemort.utils;
2 2
  3 +import java.io.File;
  4 +import java.io.FileInputStream;
  5 +import java.io.FileOutputStream;
3 6 import java.io.IOException;
4   -import java.util.ArrayList;
5 7 import java.util.Iterator;
6 8 import java.util.List;
7 9 import java.util.Set;
8 10
9 11 import joptsimple.OptionParser;
10 12 import joptsimple.OptionSet;
11   -import voldemort.client.protocol.RequestFormatType;
  13 +import voldemort.client.ClientConfig;
  14 +import voldemort.client.SocketStoreClientFactory;
  15 +import voldemort.client.StoreClient;
12 16 import voldemort.client.protocol.admin.AdminClient;
13 17 import voldemort.client.protocol.admin.AdminClientConfig;
14 18 import voldemort.cluster.Cluster;
15   -import voldemort.cluster.Node;
16   -import voldemort.routing.RoutingStrategy;
17   -import voldemort.routing.RoutingStrategyFactory;
18   -import voldemort.server.RequestRoutingType;
19   -import voldemort.store.Store;
  19 +import voldemort.serialization.DefaultSerializerFactory;
  20 +import voldemort.serialization.Serializer;
20 21 import voldemort.store.StoreDefinition;
21   -import voldemort.store.socket.SocketStoreFactory;
22   -import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
23 22 import voldemort.versioning.Versioned;
24 23
25 24 import com.google.common.base.Joiner;
@@ -33,21 +32,18 @@ public static void main(String args[]) throws IOException {
33 32 .withRequiredArg()
34 33 .describedAs("bootstrap-url")
35 34 .ofType(String.class);
36   - parser.accepts("first-id", "[REQUIRED] node id for first node")
  35 + parser.accepts("output-dir", "[REQUIRED] The output directory where we'll store the keys")
37 36 .withRequiredArg()
38   - .describedAs("node-id")
39   - .ofType(Integer.class);
40   - parser.accepts("second-id", "[REQUIRED] node id for second node")
41   - .withRequiredArg()
42   - .describedAs("node-id")
43   - .ofType(Integer.class);
44   - parser.accepts("store-name", "[REQUIRED] name of the store")
45   - .withRequiredArg()
46   - .describedAs("store-name")
  37 + .describedAs("output-dir")
47 38 .ofType(String.class);
48   - parser.accepts("skip-records", "number of records to skip [default: 0 i.e. none]")
  39 + parser.accepts("op", "Operation type (0 - gets keys [default], 1 - checks the keys")
49 40 .withRequiredArg()
  41 + .describedAs("op")
50 42 .ofType(Integer.class);
  43 + parser.accepts("num-keys", "Number of keys per store [ Default: 100 ]")
  44 + .withRequiredArg()
  45 + .describedAs("keys")
  46 + .ofType(Long.class);
51 47
52 48 OptionSet options = parser.parse(args);
53 49
@@ -56,11 +52,7 @@ public static void main(String args[]) throws IOException {
56 52 System.exit(0);
57 53 }
58 54
59   - Set<String> missing = CmdUtils.missing(options,
60   - "url",
61   - "first-id",
62   - "second-id",
63   - "store-name");
  55 + Set<String> missing = CmdUtils.missing(options, "url", "output-dir");
64 56 if(missing.size() > 0) {
65 57 System.err.println("Missing required arguments: " + Joiner.on(", ").join(missing));
66 58 parser.printHelpOn(System.err);
@@ -69,101 +61,113 @@ public static void main(String args[]) throws IOException {
69 61
70 62 // compulsory params
71 63 String url = (String) options.valueOf("url");
72   - Integer firstId = (Integer) options.valueOf("first-id");
73   - Integer secondId = (Integer) options.valueOf("second-id");
74   - String storeName = (String) options.valueOf("store-name");
75   -
76   - // optional params
77   - Integer skipRecords = CmdUtils.valueOf(options, "skip-records", 0);
  64 + String outputDirPath = (String) options.valueOf("output-dir");
  65 + int opType = CmdUtils.valueOf(options, "op", 0);
  66 + long numKeys = CmdUtils.valueOf(options, "num-keys", 100L);
78 67
79   - // Use admin client to get cluster / store definition
80   - AdminClient client = new AdminClient(url, new AdminClientConfig());
  68 + File outputDir = new File(outputDirPath);
81 69
82   - List<StoreDefinition> storeDefs = client.getRemoteStoreDefList(firstId).getValue();
83   - StoreDefinition storeDef = null;
84   - for(StoreDefinition def: storeDefs) {
85   - if(def.getName().compareTo(storeName) == 0) {
86   - storeDef = def;
87   - }
88   - }
89   -
90   - if(storeDef == null) {
91   - System.err.println("Store name mentioned not found");
  70 + if(!outputDir.exists()) {
  71 + outputDir.mkdirs();
  72 + } else if(!(outputDir.isDirectory() && outputDir.canWrite())) {
  73 + System.err.println("Cannot write to output directory " + outputDirPath);
92 74 parser.printHelpOn(System.err);
93 75 System.exit(1);
94 76 }
95 77
96   - // Find partitions which are replicated over to the other node
97   - Cluster cluster = client.getAdminClientCluster();
98   - Node firstNode = cluster.getNodeById(firstId), secondNode = cluster.getNodeById(secondId);
99   -
100   - RoutingStrategy strategy = new RoutingStrategyFactory().updateRoutingStrategy(storeDef,
101   - cluster);
102   -
103   - List<Integer> firstNodePartitionIds = firstNode.getPartitionIds();
104   - List<Integer> secondNodePartitionIds = secondNode.getPartitionIds();
105   -
106   - // This is list of partitions which we need to retrieve
107   - List<Integer> finalPartitionIds = new ArrayList<Integer>();
108   -
109   - for(Integer firstNodePartition: firstNodePartitionIds) {
110   - List<Integer> replicatingPartitionIds = strategy.getReplicatingPartitionList(firstNodePartition);
111   -
112   - // Check if replicating partition id is one of the partition ids
113   - for(Integer replicatingPartitionId: replicatingPartitionIds) {
114   - if(secondNodePartitionIds.contains(replicatingPartitionId)) {
115   - finalPartitionIds.add(firstNodePartition);
116   - break;
117   - }
118   - }
119   - }
120   -
121   - if(finalPartitionIds.size() == 0) {
122   - System.out.println("No partition found whose replica is on the other node");
123   - System.exit(0);
124   - }
125   - Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator = client.fetchEntries(firstId,
126   - storeName,
127   - finalPartitionIds,
128   - null,
129   - false,
130   - skipRecords);
131   -
132   - // Get Socket store for other node
133   - SocketStoreFactory storeFactory = new ClientRequestExecutorPool(10, 10000, 10000, 64 * 1024);
134   - Store<ByteArray, byte[], byte[]> secondStore = null;
  78 + AdminClient adminClient = null;
135 79 try {
136   - secondStore = storeFactory.create(storeName,
137   - secondNode.getHost(),
138   - secondNode.getSocketPort(),
139   - RequestFormatType.VOLDEMORT_V3,
140   - RequestRoutingType.NORMAL);
141   -
142   - long totalKeyValues = 0, totalCorrect = 0;
143   - while(iterator.hasNext()) {
144   - Pair<ByteArray, Versioned<byte[]>> entry = iterator.next();
145   - List<Versioned<byte[]>> otherValues = secondStore.get(entry.getFirst(), null);
146   -
147   - totalKeyValues++;
148   - for(Versioned<byte[]> value: otherValues) {
149   - if(ByteUtils.compare(value.getValue(), entry.getSecond().getValue()) == 0) {
150   - totalCorrect++;
  80 + adminClient = new AdminClient(url, new AdminClientConfig().setMaxThreads(10));
  81 +
  82 + // Get store definition meta-data from node 0
  83 + List<StoreDefinition> storeDefs = adminClient.getRemoteStoreDefList(0).getValue();
  84 + Cluster cluster = adminClient.getAdminClientCluster();
  85 +
  86 + for(StoreDefinition storeDef: storeDefs) {
  87 + File storesKeyFile = new File(outputDir, storeDef.getName());
  88 + if(AdminClient.restoreStoreEngineBlackList.contains(storeDef.getType())) {
  89 + System.out.println("Ignoring store " + storeDef.getName());
  90 + continue;
  91 + } else {
  92 + System.out.println("Working on store " + storeDef.getName());
  93 + }
  94 + switch(opType) {
  95 + case 0:
  96 + default:
  97 + if(storesKeyFile.exists()) {
  98 + System.err.println("Key files for " + storeDef.getName()
  99 + + " already exists");
  100 + continue;
  101 + }
  102 + FileOutputStream writer = null;
  103 + try {
  104 + writer = new FileOutputStream(storesKeyFile);
  105 + Iterator<ByteArray> keys = adminClient.fetchKeys(0,
  106 + storeDef.getName(),
  107 + cluster.getNodeById(0)
  108 + .getPartitionIds(),
  109 + null,
  110 + false);
  111 + for(long keyId = 0; keyId < numKeys && keys.hasNext(); keyId++) {
  112 + ByteArray key = keys.next();
  113 + writer.write(key.length());
  114 + writer.write(key.get());
  115 + }
  116 +
  117 + } finally {
  118 + if(writer != null)
  119 + writer.close();
  120 + }
  121 + break;
  122 + case 1:
  123 + if(!(storesKeyFile.exists() && storesKeyFile.canRead())) {
  124 + System.err.println("Could not find " + storeDef.getName()
  125 + + " file to check");
  126 + continue;
  127 + }
  128 + FileInputStream reader = null;
  129 + SocketStoreClientFactory socketFactory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(url));
  130 + StoreClient storeClient = socketFactory.getStoreClient(storeDef.getName());
  131 +
  132 + DefaultSerializerFactory factory = new DefaultSerializerFactory();
  133 + Serializer<?> keySerializer = factory.getSerializer(storeDef.getKeySerializer());
  134 + long foundKeys = 0L;
  135 + long totalKeys = 0L;
  136 + try {
  137 + reader = new FileInputStream(storesKeyFile);
  138 + while(reader.available() != 0) {
  139 + int size = reader.read();
  140 +
  141 + if(size <= 0) {
  142 + break;
  143 + }
  144 +
  145 + // Read the key
  146 + byte[] key = new byte[size];
  147 + reader.read(key);
  148 +
  149 + Versioned<Object> value = storeClient.get(keySerializer.toObject(key));
  150 + if(value != null) {
  151 + foundKeys++;
  152 + }
  153 + totalKeys++;
  154 +
  155 + }
  156 + System.out.println("Found = " + foundKeys + " Total = " + totalKeys);
  157 + if(foundKeys > 0 && totalKeys > 0) {
  158 + System.out.println("%age found - " + (double) 100
  159 + * (foundKeys / totalKeys));
  160 + }
  161 + } finally {
  162 + if(reader != null)
  163 + reader.close();
  164 + }
151 165 break;
152   - }
153 166 }
154   -
155   - if(totalKeyValues % 1000 == 0)
156   - System.out.println("Final => Percent correct = "
157   - + (double) (totalCorrect / totalKeyValues) * 100);
158 167 }
159   - if(totalKeyValues > 0)
160   - System.out.println("Final => Percent correct = "
161   - + (double) (totalCorrect / totalKeyValues) * 100);
162   - else
163   - System.out.println("Final => Percent correct = 0");
164 168 } finally {
165   - if(secondStore != null)
166   - secondStore.close();
  169 + if(adminClient != null)
  170 + adminClient.stop();
167 171 }
168 172 }
169 173 }
4 src/java/voldemort/utils/RebalanceUtils.java
@@ -38,6 +38,7 @@
38 38 import voldemort.server.VoldemortConfig;
39 39 import voldemort.store.StoreDefinition;
40 40 import voldemort.store.mysql.MysqlStorageConfiguration;
  41 +import voldemort.store.views.ViewStorageConfiguration;
41 42 import voldemort.versioning.Occured;
42 43 import voldemort.versioning.VectorClock;
43 44 import voldemort.versioning.Versioned;
@@ -55,7 +56,8 @@
55 56
56 57 private static Logger logger = Logger.getLogger(RebalanceUtils.class);
57 58
58   - public final static List<String> rebalancingStoreEngineBlackList = Arrays.asList(MysqlStorageConfiguration.TYPE_NAME);
  59 + public final static List<String> rebalancingStoreEngineBlackList = Arrays.asList(MysqlStorageConfiguration.TYPE_NAME,
  60 + ViewStorageConfiguration.TYPE_NAME);
59 61
60 62 public static boolean containsNode(Cluster cluster, int nodeId) {
61 63 try {

0 comments on commit 5adf557

Please sign in to comment.
Something went wrong with that request. Please try again.