Skip to content

Commit

Permalink
First pass at liveness management system
Browse files Browse the repository at this point in the history
  • Loading branch information
jkominek committed Dec 19, 2021
1 parent 1b37439 commit bdb5700
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CXX=c++
endif
CXXFLAGS=-std=c++17 -fPIC -g -Wall `pkg-config fuse --cflags` `pkg-config liblz4 --cflags` `pkg-config libzstd --cflags` `pkg-config foundationdb-client --cflags` #-DDEBUG=1

fs: main.o inflight.o util.o values.pb.o getattr.o lookup.o readdir.o read.o open.o release.o mknod.o unlink.o link.o readlink.o symlink.o setattr.o rename.o write.o forget.o statfs.o garbage_collector.o getxattr.o setxattr.o removexattr.o listxattr.o flush.o
fs: main.o liveness.o inflight.o util.o values.pb.o getattr.o lookup.o readdir.o read.o open.o release.o mknod.o unlink.o link.o readlink.o symlink.o setattr.o rename.o write.o forget.o statfs.o garbage_collector.o getxattr.o setxattr.o removexattr.o listxattr.o flush.o
$(CXX) $(CXXFLAGS) -pg -g -Wall $^ `pkg-config fuse --libs` `pkg-config --libs protobuf` `pkg-config --libs liblz4` `pkg-config --libs libzstd` `pkg-config --libs foundationdb-client` -lm -o fs

# close enough
Expand Down
19 changes: 19 additions & 0 deletions dumppt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env python3

# This prints out the process table for the filesysem

import struct
import fdb
import values_pb2 as msgs

fdb.api_version(620)
db = fdb.open()

prefix = b'FSp'

for i in db[prefix:prefix+b'\xff']:
k = i.key[len(prefix):]

pte = msgs.ProcessTableEntry()
pte.ParseFromString(i.value)
print(k, pte)
32 changes: 31 additions & 1 deletion inflight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
#include "util.h"
#include "inflight.h"

bool shut_it_down_forever = false;

void shut_it_down() {
// all the future handlers will ENOSYS when they're
// invoked. we could do this a little bit faster if
// we kept a set (or something) of all of the inflights,
// so we could just go down the list and delete them all.
shut_it_down_forever = true;
}

/******************************************************
* This encapsulates all of our retry logic
*
Expand Down Expand Up @@ -74,6 +84,19 @@ Inflight::~Inflight()
#endif
}

void Inflight::start()
{
if(shut_it_down_forever) {
// abort everything immediately
fuse_reply_err(req, ENOSYS);
delete this;
return;
}

cb.emplace(issue());
begin_wait();
}

void Inflight::future_ready(FDBFuture *f)
{
if(!future_queue.empty()) {
Expand Down Expand Up @@ -141,7 +164,14 @@ void Inflight::wait_on_future(FDBFuture *f, unique_future *dest)
extern "C" void fdbfs_error_processor(FDBFuture *f, void *p)
{
Inflight *inflight = static_cast<Inflight*>(p);


if(shut_it_down_forever) {
// everything is to be terminated immediately
fuse_reply_err(inflight->req, ENOSYS);
delete inflight;
return;
}

fdb_error_t err = fdb_future_get_error(f);
// done with this either way.
fdb_future_destroy(f);
Expand Down
8 changes: 4 additions & 4 deletions inflight.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include <time.h>
#endif

// Halt all inflights and prevent new ones from starting.
extern void shut_it_down();

struct FDBTransactionDeleter {
void operator()(FDBTransaction *t) {
fdb_transaction_destroy(t);
Expand Down Expand Up @@ -57,10 +60,7 @@ class Inflight {
fuse_req_t req;
bool suppress_errors = false;

void start() {
cb.emplace(issue());
begin_wait();
}
void start();

protected:
// constructor
Expand Down
24 changes: 22 additions & 2 deletions kvlayout.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ Overall key scheme:

=== inodes ===
prefix 'i' inode -> contains basic attrs and symlink info
prefix 'i' inode \x00 lock -> info on whoever has the lock
prefix 'i' inode \x00 pending -> info on parties waiting for lock?

prefix 'i' inode \x00 pid1, anything -> info on a lock
prefix 'i' inode \x00 pid2, anything -> info on a lock
prefix 'i' inode \x01 pid1 -> per fuse-process use counter (64 bit LE)
prefix 'i' inode \x01 pid2 -> per fuse-process use counter (64 bit LE)
prefix 'i' inode \x02 xattr0 -> arbitrary number of extended attributes
prefix 'i' inode \x02 xattr1
prefix 'i' inode \x02 xattr2
prefix 'i' inode \xfe inodeattr -> pseudo keys to indicate inode updates
prefix 'd' inode filenameA-> dirents.
prefix 'd' inode filenameB
prefix 'd' inode filenameC
Expand All @@ -18,6 +20,21 @@ prefix 'f' inode offset2 'z'\x01\x00 -> block compressed with whatever algorithm
prefix 'f' inode offset3 'p'\x02\x00... -> key contains 512 bytes of parity for the block
there should be only one kv pair within a given offset(n) to offset(n+1)
range.
prefix 'p' pid -> ProcessTableEntry


The \x00 space for locks is adjacent to the inodes, because in a mandatory
locking system, you'd need to read them along with the inode. So we'll
store them for a single range read.

The \xfe space is to register common partial updates to an inode. For
instance, when reading blocks with noatime, the only thing you need to
know about an inode is existance and the size of it. So read the inode
without a read conflict range, but lay down a read conflict on the
inode size key. Anything changing the size will read/write conflict the
inode, and write conflict the inode size key. So another process only
changing the owner (which will read/write the inode, and write conflict
the inode owner attr) won't conflict all of the ongoing reads.

=== garbage ===
prefix 'g' inode -> indicates that nlinks in the inode has gone to zero,
Expand All @@ -37,6 +54,9 @@ if processes notice that another pid's entry has stopped updating,
they should delete it. the GC process can confirm the liveness of
all pids it encounters when cleaning up inodes.

similarly processes should watch for updates on their pid entry.
if it is updated or erased, they should react immediately.

=== meta ===
prefix 'M' -> metadata block. filesystem version and blocksize?

Expand Down
183 changes: 183 additions & 0 deletions liveness.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@

#define FUSE_USE_VERSION 26
#include <fuse_lowlevel.h>
#define FDB_API_VERSION 630
#include <foundationdb/fdb_c.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include <stdbool.h>
#include <time.h>
#include <sys/utsname.h>
#include <sys/time.h>

#include <string>
#include <memory>
#include <mutex>

#include "util.h"
#include "inflight.h"
#include "values.pb.h"

/*************************************************************
* liveness management
*************************************************************
* initially for garbage collection purposes, filesystems need to be
* able to agree which processes are still running, and clean up
* records of dead ones.
*/

std::vector<uint8_t> pid;
ProcessTableEntry pt_entry;
struct fuse_session *fuse_session;
pthread_t liveness_thread;
bool terminate = true;

void send_pt_entry(bool startup)
{
std::function<int(FDBTransaction *)> f = [startup](FDBTransaction *t){
auto key = pack_pid_key(pid);
fdb_error_t err;
if(!startup) {
FDBFuture *g = fdb_transaction_get(t, key.data(), key.size(), 0);
err = fdb_future_block_until_ready(g);
if(err!=0) { throw err; }
fdb_bool_t present;
uint8_t const *value;
int value_length;
err = fdb_future_get_value(g, &present, &value, &value_length);
if(err!=0) { throw err; }

if(!present) {
// our entry in the table was removed.
terminate = true;

// kill everything in-flight
shut_it_down();

// stop processing filesystem calls immediately.
// the fuse loop won't notice this until the next
// filesystem operation comes in. sigh.
fuse_session_exit(fuse_session);

return 0;
}
}
int entry_size = pt_entry.ByteSizeLong();
uint8_t entry_buffer[entry_size];
pt_entry.SerializeToArray(entry_buffer, entry_size);

fdb_transaction_set(t, key.data(), key.size(),
entry_buffer, entry_size);
return 0;
};
run_sync_transaction(f);
}

const int liveness_refresh_sec = 0;
const int liveness_refresh_nsec = 500*1000000;

void update_pt_entry_time()
{
struct timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);

pt_entry.mutable_last_updated()->set_sec(tv.tv_sec);
pt_entry.mutable_last_updated()->set_nsec(tv.tv_nsec);
}

std::mutex manager_running;
void *liveness_manager(void *ignore)
{
std::unique_lock<std::mutex> lock(manager_running);

pt_entry.set_pid(pid.data(), pid.size());
pt_entry.set_liveness_counter(0);
update_pt_entry_time();
struct utsname buf;
uname(&buf);
pt_entry.set_hostname(buf.nodename);

send_pt_entry(true);

while(!terminate) {
struct timespec sleep;
sleep.tv_sec = liveness_refresh_sec;
sleep.tv_nsec = liveness_refresh_nsec;
nanosleep(&sleep, NULL);

update_pt_entry_time();
pt_entry.set_liveness_counter(pt_entry.liveness_counter() + 1);

send_pt_entry(false);
}

return NULL;
}

void start_liveness(struct fuse_session *se)
{
// this probably isn't the best way to produce 128 bits in
// a std::vector, but, whatever.
for(int i=0; i<16; i++) {
pid.push_back(random() & 0xFF);
}

// we make main pass this in so it isn't floating around
// in every namespace.
fuse_session = se;

terminate = false;

pthread_create(&liveness_thread, NULL, liveness_manager, NULL);
}

// we're being called after unmount
void terminate_liveness()
{
terminate = true;

// wait until the liveness_manager is done
std::unique_lock<std::mutex> lock(manager_running);

// clear our PID record
std::function<int(FDBTransaction *)> f = [](FDBTransaction *t){
auto start = pack_pid_key(pid);
auto stop = start;
stop.push_back('\xff');
fdb_transaction_clear_range(t,
start.data(), start.size(),
stop.data(), stop.size());
return 0;
};
run_sync_transaction(f);

// TODO are all of our outstanding transactions dead?

int clears_per_batch = 64;
for(auto it = lookup_counts.cbegin();
it != lookup_counts.cend();) {
// it is at this moment that i begin to question my use of c++
std::function<decltype(lookup_counts.cbegin())(FDBTransaction *)> f =
[it, clears_per_batch](FDBTransaction *t) {
auto jt = it;
for(int count = 0; (jt != lookup_counts.cend()) && (count < clears_per_batch); jt++, count++) {
auto key = pack_inode_use_key(jt->first);
fdb_transaction_clear(t, key.data(), key.size());
}
return jt;
};
auto maybe_jt = run_sync_transaction(f);
if(maybe_jt)
it = *maybe_jt;
else
/* if there was failure, don't advance it; we'll try again. */;
}

}
9 changes: 9 additions & 0 deletions liveness.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#ifndef _LIVENESS_H_
#define _LIVENESS_H_

extern std::vector<uint8_t> pid; // our unique id for use records

extern void start_liveness(struct fuse_session *);
extern void terminate_liveness();

#endif
4 changes: 4 additions & 0 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>

#include "util.h"
#include "liveness.h"
#include "values.pb.h"
#include "garbage_collector.h"
#include "fdbfs_ops.h"
Expand Down Expand Up @@ -129,6 +130,8 @@ int main(int argc, char *argv[])

se = fuse_lowlevel_new(&args, &fdbfs_oper,
sizeof(fdbfs_oper), NULL);
start_liveness(se);

if (se != NULL)
{
if (fuse_set_signal_handlers(se) != -1)
Expand All @@ -141,6 +144,7 @@ int main(int argc, char *argv[])
fuse_session_destroy(se);
}
fuse_unmount(mountpoint, ch);
terminate_liveness();
}
fuse_opt_free_args(&args);

Expand Down
8 changes: 8 additions & 0 deletions util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ std::vector<uint8_t> pack_garbage_key(fuse_ino_t ino)
return key;
}

std::vector<uint8_t> pack_pid_key(std::vector<uint8_t> p)
{
auto key = key_prefix;
key.push_back('p');
key.insert(key.end(), p.begin(), p.end());
return key;
}

std::vector<uint8_t> pack_inode_use_key(fuse_ino_t ino)
{
auto key = pack_inode_key(ino);
Expand Down
Loading

0 comments on commit bdb5700

Please sign in to comment.