Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: limaoscarjuliet/HyperDex
base: e4ddfcc194
...
head fork: limaoscarjuliet/HyperDex
compare: 4a3eefdded
  • 2 commits
  • 7 files changed
  • 0 commit comments
  • 1 contributor
Commits on Apr 22, 2012
@limaoscarjuliet qiuesce in daemon (work in progress)
Signed-off-by: Pawel Loj <pawel.loj@gmail.com>
d88906f
@limaoscarjuliet dump state is functional now, restore in progress
Signed-off-by: Pawel Loj <pawel.loj@gmail.com>
4a3eefd
View
26 hyperdaemon/datalayer.cc 100644 → 100755
@@ -83,6 +83,8 @@ hyperdaemon :: datalayer :: datalayer(coordinatorlink* cl, const po6::pathname&
, m_optimistic_rr()
, m_last_dose_of_optimism(0)
, m_flushed_recently(false)
+ , m_quiesce(false)
+ , m_quiesce_state_id("")
{
m_optimistic_io_thread.start();
@@ -135,9 +137,29 @@ hyperdaemon :: datalayer :: prepare(const configuration& newconfig, const instan
}
void
-hyperdaemon :: datalayer :: reconfigure(const configuration&, const instance&)
+hyperdaemon :: datalayer :: reconfigure(const configuration& newconfig, const instance&)
{
- // Do nothing.
+ // Quiesce (will quiesce multiple times if requested so).
+ if (newconfig.quiesce())
+ {
+ m_quiesce = newconfig.quiesce();
+ m_quiesce_state_id = newconfig.quiesce_state_id();
+
+ // Quiesce the disks.
+ for (disk_map_t::iterator d = m_disks.begin(); d != m_disks.end(); d.next())
+ {
+ try
+ {
+ // XXX fail this region.
+ d.value()->quiesce(m_quiesce_state_id);
+ }
+ catch (po6::error& e)
+ {
+ PLOG(ERROR) << "Could not quiesce disk " << d.key();
+ return;
+ }
+ }
+ }
}
void
View
2  hyperdaemon/datalayer.h 100644 → 100755
@@ -127,6 +127,8 @@ class datalayer
std::list<hyperdex::regionid> m_optimistic_rr;
uint64_t m_last_dose_of_optimism;
volatile bool m_flushed_recently;
+ bool m_quiesce;
+ std::string m_quiesce_state_id;
};
} // namespace hyperdaemon
View
6 hyperdaemon/replication_manager.cc
@@ -130,7 +130,7 @@ hyperdaemon :: replication_manager :: prepare(const configuration&, const instan
void
hyperdaemon :: replication_manager :: reconfigure(const configuration& newconfig, const instance& us)
{
- // Quiesce request?
+ // Quiesce (will quiesce multiple times if requested so).
if (newconfig.quiesce())
{
m_quiesce = newconfig.quiesce();
@@ -153,7 +153,8 @@ hyperdaemon :: replication_manager :: reconfigure(const configuration& newconfig
}
}
- // If quiescing, wait for replication-related state to be cleaned up.
+ // If quiescing, wait for replication-related state to be cleaned up before
+ // ACK-ing the config (ack happens in the daemon, after reconfigure completes).
if (m_quiesce)
{
while (true)
@@ -1551,6 +1552,7 @@ int
hyperdaemon :: replication_manager :: retransmit()
{
int processed = 0;
+
for (keyholder_map_t::iterator khiter = m_keyholders.begin();
khiter != m_keyholders.end(); khiter.next())
{
View
3  hyperdex/configuration_parser.cc
@@ -41,6 +41,9 @@ hyperdex :: configuration_parser :: configuration_parser()
, m_regions()
, m_entities()
, m_transfers()
+ , m_quiesce(false)
+ , m_quiesce_state_id("")
+ , m_shutdown(false)
{
}
View
59 hyperdisk/disk.cc
@@ -93,16 +93,18 @@ hyperdisk :: disk :: create(const po6::pathname& directory,
}
bool
-hyperdisk :: disk :: dump_state()
+hyperdisk :: disk :: quiesce(const std::string& quiesce_state_id)
{
// Flush all data to O/S buffers.
- while (true)
+ bool flushed = false;
+ while (!flushed)
{
returncode rc = flush();
switch (rc)
{
case DIDNOTHING:
// All data is flushed, move on.
+ flushed = true;
break;
case SUCCESS:
// Some data flushed, try again.
@@ -124,6 +126,13 @@ hyperdisk :: disk :: dump_state()
return false;
}
+ // Persist the state into a file.
+ return dump_state();
+}
+
+bool
+hyperdisk :: disk :: dump_state()
+{
// Dump state information.
e::intrusive_ptr<shard_vector> shards;
{
@@ -135,7 +144,7 @@ hyperdisk :: disk :: dump_state()
s << "version " << STATE_FILE_VER << std::endl;
for (size_t i = 0; i < shards->size(); ++i)
{
- hyperspacehashing::mask::coordinate c = shards->get_coordinate(i);
+ coordinate c = shards->get_coordinate(i);
uint32_t o = shards->get_offset(i);
s << "shard";
s << " " << c.primary_mask;
@@ -232,19 +241,19 @@ hyperdisk :: disk :: load_state()
}
// Restore the shards.
- po6::threads::mutex::hold a(&m_shards_mutate);
- po6::threads::mutex::hold b(&m_shards_lock);
-
+ std::vector<std::pair<coordinate, e::intrusive_ptr<shard> > > shards;
while (!f.eof())
{
- std::string s;
- f >> s;
- if (f.fail() || "shard" != s)
+ // Line header.
+ std::string h;
+ f >> h;
+ if (f.fail() || "shard" != h)
{
return false;
}
- uint64_t ct[6];
+ // Coordinate.
+ uint64_t ct[6] = {-1, -1, -1, -1, -1 ,-1};
for (int i=0; i<6; i++)
{
f >> ct[i];
@@ -254,30 +263,28 @@ hyperdisk :: disk :: load_state()
return false;
}
+ // Offset.
uint32_t o = -1;
f >> o;
if (f.fail())
{
return false;
}
-
-/*
-Add coordinate and shard to vector one by one.
-
-e::intrusive_ptr<shard> s = open(start);
-
-coordinate c - restore from the file data
-
-e::intrusive_ptr<hyperdisk::shard>
-hyperdisk :: shard :: open(const po6::io::fd& base,
- const po6::pathname& filename)
-m_shards = ...
-
-*/
-
+ // Reopen the shard.
+ coordinate c(ct[0], ct[1], ct[2], ct[3], ct[4], ct[5]);
+ po6::pathname path = shard_filename(c);
+ // XXX need to set the offset - inside open?
+ e::intrusive_ptr<shard> s = hyperdisk::shard::open(m_base, path);
+
+ shards.push_back(std::make_pair(c, s));
}
-
+
+ // Use the reopened shards.
+ po6::threads::mutex::hold a(&m_shards_mutate);
+ po6::threads::mutex::hold b(&m_shards_lock);
+ m_shards = new shard_vector(1, &shards);
+
return true;
}
View
4 hyperdisk/hyperdisk/disk.h
@@ -122,6 +122,10 @@ class disk
returncode async();
returncode sync();
+ public:
+ // Quiesce.
+ bool quiesce(const std::string& quiesce_state_id);
+
private:
friend class e::intrusive_ptr<disk>;
class stored;
View
4 hyperdisk/shard_vector.h 100644 → 100755
@@ -48,6 +48,8 @@ class shard_vector
{
public:
shard_vector(const hyperspacehashing::mask::coordinate& coord, e::intrusive_ptr<shard> s);
+ shard_vector(uint64_t generation,
+ std::vector<std::pair<hyperspacehashing::mask::coordinate, e::intrusive_ptr<shard> > >* newvec);
public:
size_t size() const;
@@ -69,8 +71,6 @@ class shard_vector
friend class e::intrusive_ptr<shard_vector>;
private:
- shard_vector(uint64_t generation,
- std::vector<std::pair<hyperspacehashing::mask::coordinate, e::intrusive_ptr<shard> > >* newvec);
~shard_vector() throw ();
private:

No commit comments for this range

Something went wrong with that request. Please try again.