Permalink
Browse files

Fix the replication stress test and make it pass

  • Loading branch information...
1 parent 19c26f2 commit 03169f7e6da0c54257378303b24a3e1d084cf417 @rescrv committed Jan 6, 2013
View
@@ -40,7 +40,7 @@ EXTRA_DIST = \
hyperclient/nodejs/async_test.js \
hyperclient/nodejs/hyperclient.cc \
wscript
-#doc/HyperDex-$(VERSION).pdf
+# XXX doc/HyperDex-$(VERSION).pdf
lib_LTLIBRARIES = \
libhyperclient.la \
@@ -54,8 +54,8 @@ include_HEADERS = \
bin_PROGRAMS = hyperdex \
hyperdex-daemon \
hyperdex-search-stress-test \
+ hyperdex-replication-stress-test \
hyperdex-simple-consistency-stress-test
-# XXX hyperdex-replication-stress-test
hyperdexexecdir = $(exec_prefix)/libexec/$(PACKAGE)-$(VERSION)
hyperdexexec_PROGRAMS = \
@@ -208,6 +208,10 @@ hyperdex_daemon_LDADD = \
-lreplicant -lcityhash -lpopt -lglog -lrt -lpthread
hyperdex_daemon_CPPFLAGS = $(CPPFLAGS)
+daemon_test_index_encode_SOURCES = runner.cc daemon/test/index_encode.cc daemon/index_encode.cc common/float_encode.cc
+daemon_test_index_encode_CPPFLAGS = $(GTEST_CPPFLAGS) $(CPPFLAGS)
+daemon_test_index_encode_LDADD = $(GTEST_LDFLAGS) -lgtest -lpthread
+
################################################################################
################################## Coordinator #################################
################################################################################
@@ -420,21 +424,11 @@ endif
################################### Binaries ###################################
################################################################################
-#hyperdex_replication_stress_test_SOURCES = \
-# replication-stress-test.cc
-#hyperdex_replication_stress_test_CPPFLAGS = \
-# $(E_CFLAGS) \
-# $(CPPFLAGS)
-#hyperdex_replication_stress_test_LDADD = \
-# libhyperclient.la \
-# -lpopt
+hyperdex_replication_stress_test_SOURCES = replication-stress-test.cc
+hyperdex_replication_stress_test_LDADD = libhyperclient.la -lpopt
hyperdex_search_stress_test_SOURCES = search-stress-test.cc test/common.cc
hyperdex_search_stress_test_LDADD = libhyperclient.la -lpopt
hyperdex_simple_consistency_stress_test_SOURCES = simple-consistency-stress-test.cc
hyperdex_simple_consistency_stress_test_LDADD = libhyperclient.la -lpopt -lpthread
-
-daemon_test_index_encode_SOURCES = runner.cc daemon/test/index_encode.cc daemon/index_encode.cc common/float_encode.cc
-daemon_test_index_encode_CPPFLAGS = $(GTEST_CPPFLAGS) $(CPPFLAGS)
-daemon_test_index_encode_LDADD = $(GTEST_LDFLAGS) -lgtest -lpthread
View
@@ -547,7 +547,7 @@ configuration :: point_leader(const char* sname, const e::slice& key)
for (size_t pl = 0; pl < m_spaces[s].subspaces[0].regions.size(); ++pl)
{
if (m_spaces[s].subspaces[0].regions[pl].lower_coord[0] <= h &&
- h < m_spaces[s].subspaces[0].regions[pl].upper_coord[0])
+ h <= m_spaces[s].subspaces[0].regions[pl].upper_coord[0])
{
assert(!m_spaces[s].subspaces[0].regions[pl].replicas.empty());
return m_spaces[s].subspaces[0].regions[pl].replicas[0].vsi;
@@ -580,7 +580,7 @@ configuration :: point_leader(const region_id& rid, const e::slice& key)
for (size_t pl = 0; pl < m_spaces[s].subspaces[0].regions.size(); ++pl)
{
if (m_spaces[s].subspaces[0].regions[pl].lower_coord[0] <= h &&
- h < m_spaces[s].subspaces[0].regions[pl].upper_coord[0])
+ h <= m_spaces[s].subspaces[0].regions[pl].upper_coord[0])
{
assert(!m_spaces[s].subspaces[0].regions[pl].replicas.empty());
return m_spaces[s].subspaces[0].regions[pl].replicas[0].vsi;
@@ -631,7 +631,7 @@ configuration :: lookup_region(const subspace_id& ssid,
{
assert(a < m_spaces[s].schema.attrs_sz);
matches &= m_spaces[s].subspaces[ss].regions[r].lower_coord[a] <= hashes[m_spaces[s].subspaces[ss].attrs[a]] &&
- hashes[m_spaces[s].subspaces[ss].attrs[a]] < m_spaces[s].subspaces[ss].regions[r].upper_coord[a];
+ hashes[m_spaces[s].subspaces[ss].attrs[a]] <= m_spaces[s].subspaces[ss].regions[r].upper_coord[a];
}
if (matches)
View
@@ -415,6 +415,7 @@ datalayer :: get(const region_id& ri,
datalayer::returncode
datalayer :: put(const region_id& ri,
+ const region_id& reg_id,
uint64_t seq_id,
const e::slice& key,
const std::vector<e::slice>& value,
@@ -445,7 +446,7 @@ datalayer :: put(const region_id& ri,
{
char abacking[sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t)];
abacking[0] = 'a';
- e::pack64be(ri.get(), abacking + sizeof(uint8_t));
+ e::pack64be(reg_id.get(), abacking + sizeof(uint8_t));
e::pack64be(seq_id, abacking + sizeof(uint8_t) + sizeof(uint64_t));
leveldb::Slice akey(abacking, sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t));
leveldb::Slice aval("", 0);
@@ -502,6 +503,7 @@ datalayer :: put(const region_id& ri,
datalayer::returncode
datalayer :: del(const region_id& ri,
+ const region_id& reg_id,
uint64_t seq_id,
const e::slice& key)
{
@@ -527,7 +529,7 @@ datalayer :: del(const region_id& ri,
{
char abacking[sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t)];
abacking[0] = 'a';
- e::pack64be(ri.get(), abacking + sizeof(uint8_t));
+ e::pack64be(reg_id.get(), abacking + sizeof(uint8_t));
e::pack64be(seq_id, abacking + sizeof(uint8_t) + sizeof(uint64_t));
leveldb::Slice akey(abacking, sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t));
leveldb::Slice aval("", 0);
View
@@ -102,11 +102,13 @@ class datalayer
uint64_t* version,
reference* ref);
returncode put(const region_id& ri,
+ const region_id& reg_id,
uint64_t seq_id,
const e::slice& key,
const std::vector<e::slice>& value,
uint64_t version);
returncode del(const region_id& ri,
+ const region_id& reg_id,
uint64_t seq_id,
const e::slice& key);
returncode make_snapshot(const region_id& ri,
@@ -188,7 +188,7 @@ replication_manager :: client_atomic(const server_id& from,
e::intrusive_ptr<keyholder> kh = get_or_create_keyholder(ri, key);
bool has_old_value = false;
uint64_t old_version = 0;
- std::vector<e::slice>* old_value = NULL; // don't use pointer?
+ std::vector<e::slice>* old_value = NULL;
kh->get_latest_version(&has_old_value, &old_version, &old_value);
if (erase && !funcs->empty())
@@ -292,13 +292,6 @@ replication_manager :: chain_op(const virtual_server_id& from,
HOLD_LOCK_FOR_KEY(ri, key);
e::intrusive_ptr<keyholder> kh = get_or_create_keyholder(ri, key);
- if (reg_id != ri)
- {
- LOG(ERROR) << "dropping CHAIN_OP send to the wrong region";
- CLEANUP_KEYHOLDER(ri, key, kh);
- return;
- }
-
// Check that a chain's put matches the dimensions of the space.
if (has_value && sc->attrs_sz != value.size() + 1)
{
@@ -361,13 +354,6 @@ replication_manager :: chain_subspace(const virtual_server_id& from,
HOLD_LOCK_FOR_KEY(ri, key);
e::intrusive_ptr<keyholder> kh = get_or_create_keyholder(ri, key);
- if (reg_id != ri)
- {
- LOG(ERROR) << "dropping CHAIN_OP send to the wrong region";
- CLEANUP_KEYHOLDER(ri, key, kh);
- return;
- }
-
// Check that a chain's put matches the dimensions of the space.
if (sc->attrs_sz != value.size() + 1 || sc->attrs_sz != hashes.size())
{
@@ -404,9 +390,9 @@ replication_manager :: chain_subspace(const virtual_server_id& from,
}
if (!(new_pend->this_old_region == m_daemon->m_config.get_region_id(from) &&
- m_daemon->m_config.tail_of_region(ri) == from) &&
+ m_daemon->m_config.tail_of_region(new_pend->this_old_region) == from) &&
!(new_pend->this_new_region == m_daemon->m_config.get_region_id(from) &&
- m_daemon->m_config.next_in_region(from) != to))
+ m_daemon->m_config.next_in_region(from) == to))
{
LOG(INFO) << "dropping CHAIN_SUBSPACE which didn't obey chaining rules";
CLEANUP_KEYHOLDER(ri, key, kh);
@@ -444,13 +430,6 @@ replication_manager :: chain_ack(const virtual_server_id& from,
return;
}
- if (reg_id != ri)
- {
- LOG(ERROR) << "dropping CHAIN_ACK send to the wrong region";
- CLEANUP_KEYHOLDER(ri, key, kh);
- return;
- }
-
e::intrusive_ptr<pending> pend = kh->get_by_version(version);
if (!pend)
@@ -492,19 +471,18 @@ replication_manager :: chain_ack(const virtual_server_id& from,
if (kh->version_on_disk() < version)
{
- assert(reg_id == ri);
e::intrusive_ptr<pending> op = kh->get_by_version(version);
assert(op);
datalayer::returncode rc;
if (!op->has_value || (op->this_old_region != op->this_new_region && ri == op->this_old_region))
{
- rc = m_daemon->m_data.del(ri, seq_id, key);
+ rc = m_daemon->m_data.del(ri, reg_id, seq_id, key);
}
else
{
- rc = m_daemon->m_data.put(ri, seq_id, key, op->value, version);
+ rc = m_daemon->m_data.put(ri, reg_id, seq_id, key, op->value, version);
}
switch (rc)
@@ -726,31 +704,41 @@ replication_manager :: move_operations_between_queues(const virtual_server_id& u
uint64_t old_version = 0;
std::vector<e::slice>* old_value = NULL;
kh->get_latest_version(&has_old_value, &old_version, &old_value);
+ if (old_version >= kh->oldest_deferred_version()) LOG(INFO) << "VERSIONS " << old_version << " " << kh->oldest_deferred_version();
assert(old_version < kh->oldest_deferred_version());
e::intrusive_ptr<pending> new_pend = kh->oldest_deferred_op();
+ // If the version numbers don't line up, and this is not fresh, and it
+ // is not a subspace transfer
if (old_version + 1 != kh->oldest_deferred_version() &&
- !new_pend->fresh)
+ !new_pend->fresh &&
+ (new_pend->this_old_region == new_pend->this_new_region ||
+ new_pend->this_old_region == ri))
{
break;
}
- hash_objects(ri, sc, key, new_pend->has_value, new_pend->value, has_old_value, old_value ? *old_value : new_pend->value, new_pend);
-
- if (new_pend->this_old_region != ri && new_pend->this_new_region != ri)
+ // If this is not a subspace transfer
+ if (new_pend->this_old_region == new_pend->this_new_region ||
+ new_pend->this_old_region == ri)
{
- LOG(INFO) << "dropping deferred CHAIN_* which didn't get sent to the right host";
- kh->pop_oldest_deferred();
- continue;
- }
+ hash_objects(ri, sc, key, new_pend->has_value, new_pend->value, has_old_value, old_value ? *old_value : new_pend->value, new_pend);
- if (new_pend->recv != virtual_server_id() &&
- m_daemon->m_config.next_in_region(new_pend->recv) != us &&
- !m_daemon->m_config.subspace_adjacent(new_pend->recv, us))
- {
- LOG(INFO) << "dropping deferred CHAIN_* which didn't come from the right host";
- kh->pop_oldest_deferred();
- continue;
+ if (new_pend->this_old_region != ri && new_pend->this_new_region != ri)
+ {
+ LOG(INFO) << "dropping deferred CHAIN_* which didn't get sent to the right host";
+ kh->pop_oldest_deferred();
+ continue;
+ }
+
+ if (new_pend->recv != virtual_server_id() &&
+ m_daemon->m_config.next_in_region(new_pend->recv) != us &&
+ !m_daemon->m_config.subspace_adjacent(new_pend->recv, us))
+ {
+ LOG(INFO) << "dropping deferred CHAIN_* which didn't come from the right host";
+ kh->pop_oldest_deferred();
+ continue;
+ }
}
kh->shift_one_deferred_to_blocked();
@@ -63,13 +63,13 @@ replication_manager :: keyholder :: get_latest_version(bool* has_old_value,
if (has_blocked_ops())
{
- *has_old_value = true;
+ *has_old_value = most_recent_blocked_op()->has_value;
*old_version = most_recent_blocked_version();
*old_value = &most_recent_blocked_op()->value;
}
else if (has_committable_ops())
{
- *has_old_value = true;
+ *has_old_value = most_recent_committable_op()->has_value;
*old_version = most_recent_committable_version();
*old_value = &most_recent_committable_op()->value;
}
@@ -246,7 +246,7 @@ state_transfer_manager :: xfer_op(const virtual_server_id& from,
{
while (tis->del_iter.valid() && tis->del_iter.key() < op->key)
{
- m_daemon->m_data.del(tis->xfer.rid, 0, tis->del_iter.key());
+ m_daemon->m_data.del(tis->xfer.rid, region_id(), 0, tis->del_iter.key());
tis->del_iter.next();
}
}
@@ -259,7 +259,7 @@ state_transfer_manager :: xfer_op(const virtual_server_id& from,
while (tis->del_iter.valid())
{
- datalayer::returncode rc = m_daemon->m_data.del(tis->xfer.rid, 0, tis->del_iter.key());
+ datalayer::returncode rc = m_daemon->m_data.del(tis->xfer.rid, region_id(), 0, tis->del_iter.key());
tis->del_iter.next();
switch (rc)
@@ -282,7 +282,7 @@ state_transfer_manager :: xfer_op(const virtual_server_id& from,
if (op->has_value)
{
- datalayer::returncode rc = m_daemon->m_data.put(tis->xfer.rid, 0, op->key, op->value, op->version);
+ datalayer::returncode rc = m_daemon->m_data.put(tis->xfer.rid, region_id(), 0, op->key, op->value, op->version);
switch (rc)
{
@@ -302,7 +302,7 @@ state_transfer_manager :: xfer_op(const virtual_server_id& from,
}
else
{
- datalayer::returncode rc = m_daemon->m_data.del(tis->xfer.rid, 0, op->key);
+ datalayer::returncode rc = m_daemon->m_data.del(tis->xfer.rid, region_id(), 0, op->key);
switch (rc)
{
Oops, something went wrong.

0 comments on commit 03169f7

Please sign in to comment.