diff --git a/Makefile b/Makefile index b37e3cc..5752542 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ CXX=c++ endif CXXFLAGS=-std=c++17 -fPIC -g -Wall `pkg-config fuse --cflags` `pkg-config liblz4 --cflags` `pkg-config libzstd --cflags` `pkg-config foundationdb-client --cflags` #-DDEBUG=1 -fs: main.o inflight.o util.o values.pb.o getattr.o lookup.o readdir.o read.o open.o release.o mknod.o unlink.o link.o readlink.o symlink.o setattr.o rename.o write.o forget.o statfs.o garbage_collector.o getxattr.o setxattr.o removexattr.o listxattr.o flush.o +fs: main.o liveness.o inflight.o util.o values.pb.o getattr.o lookup.o readdir.o read.o open.o release.o mknod.o unlink.o link.o readlink.o symlink.o setattr.o rename.o write.o forget.o statfs.o garbage_collector.o getxattr.o setxattr.o removexattr.o listxattr.o flush.o $(CXX) $(CXXFLAGS) -pg -g -Wall $^ `pkg-config fuse --libs` `pkg-config --libs protobuf` `pkg-config --libs liblz4` `pkg-config --libs libzstd` `pkg-config --libs foundationdb-client` -lm -o fs # close enough diff --git a/dumppt.py b/dumppt.py new file mode 100755 index 0000000..38887bc --- /dev/null +++ b/dumppt.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +# This prints out the process table for the filesysem + +import struct +import fdb +import values_pb2 as msgs + +fdb.api_version(620) +db = fdb.open() + +prefix = b'FSp' + +for i in db[prefix:prefix+b'\xff']: + k = i.key[len(prefix):] + + pte = msgs.ProcessTableEntry() + pte.ParseFromString(i.value) + print(k, pte) diff --git a/inflight.cc b/inflight.cc index fe5d177..f5717a0 100644 --- a/inflight.cc +++ b/inflight.cc @@ -21,6 +21,16 @@ #include "util.h" #include "inflight.h" +bool shut_it_down_forever = false; + +void shut_it_down() { + // all the future handlers will ENOSYS when they're + // invoked. we could do this a little bit faster if + // we kept a set (or something) of all of the inflights, + // so we could just go down the list and delete them all. + shut_it_down_forever = true; +} + /****************************************************** * This encapsulates all of our retry logic * @@ -74,6 +84,19 @@ Inflight::~Inflight() #endif } +void Inflight::start() +{ + if(shut_it_down_forever) { + // abort everything immediately + fuse_reply_err(req, ENOSYS); + delete this; + return; + } + + cb.emplace(issue()); + begin_wait(); +} + void Inflight::future_ready(FDBFuture *f) { if(!future_queue.empty()) { @@ -141,7 +164,14 @@ void Inflight::wait_on_future(FDBFuture *f, unique_future *dest) extern "C" void fdbfs_error_processor(FDBFuture *f, void *p) { Inflight *inflight = static_cast(p); - + + if(shut_it_down_forever) { + // everything is to be terminated immediately + fuse_reply_err(inflight->req, ENOSYS); + delete inflight; + return; + } + fdb_error_t err = fdb_future_get_error(f); // done with this either way. fdb_future_destroy(f); diff --git a/inflight.h b/inflight.h index f090bd2..7ec2020 100644 --- a/inflight.h +++ b/inflight.h @@ -14,6 +14,9 @@ #include #endif +// Halt all inflights and prevent new ones from starting. +extern void shut_it_down(); + struct FDBTransactionDeleter { void operator()(FDBTransaction *t) { fdb_transaction_destroy(t); @@ -57,10 +60,7 @@ class Inflight { fuse_req_t req; bool suppress_errors = false; - void start() { - cb.emplace(issue()); - begin_wait(); - } + void start(); protected: // constructor diff --git a/kvlayout.txt b/kvlayout.txt index 579c53e..c7783e4 100644 --- a/kvlayout.txt +++ b/kvlayout.txt @@ -2,13 +2,15 @@ Overall key scheme: === inodes === prefix 'i' inode -> contains basic attrs and symlink info -prefix 'i' inode \x00 lock -> info on whoever has the lock -prefix 'i' inode \x00 pending -> info on parties waiting for lock? + +prefix 'i' inode \x00 pid1, anything -> info on a lock +prefix 'i' inode \x00 pid2, anything -> info on a lock prefix 'i' inode \x01 pid1 -> per fuse-process use counter (64 bit LE) prefix 'i' inode \x01 pid2 -> per fuse-process use counter (64 bit LE) prefix 'i' inode \x02 xattr0 -> arbitrary number of extended attributes prefix 'i' inode \x02 xattr1 prefix 'i' inode \x02 xattr2 +prefix 'i' inode \xfe inodeattr -> pseudo keys to indicate inode updates prefix 'd' inode filenameA-> dirents. prefix 'd' inode filenameB prefix 'd' inode filenameC @@ -18,6 +20,21 @@ prefix 'f' inode offset2 'z'\x01\x00 -> block compressed with whatever algorithm prefix 'f' inode offset3 'p'\x02\x00... -> key contains 512 bytes of parity for the block there should be only one kv pair within a given offset(n) to offset(n+1) range. +prefix 'p' pid -> ProcessTableEntry + + +The \x00 space for locks is adjacent to the inodes, because in a mandatory +locking system, you'd need to read them along with the inode. So we'll +store them for a single range read. + +The \xfe space is to register common partial updates to an inode. For +instance, when reading blocks with noatime, the only thing you need to +know about an inode is existance and the size of it. So read the inode +without a read conflict range, but lay down a read conflict on the +inode size key. Anything changing the size will read/write conflict the +inode, and write conflict the inode size key. So another process only +changing the owner (which will read/write the inode, and write conflict +the inode owner attr) won't conflict all of the ongoing reads. === garbage === prefix 'g' inode -> indicates that nlinks in the inode has gone to zero, @@ -37,6 +54,9 @@ if processes notice that another pid's entry has stopped updating, they should delete it. the GC process can confirm the liveness of all pids it encounters when cleaning up inodes. +similarly processes should watch for updates on their pid entry. +if it is updated or erased, they should react immediately. + === meta === prefix 'M' -> metadata block. filesystem version and blocksize? diff --git a/liveness.cc b/liveness.cc new file mode 100644 index 0000000..8fb9276 --- /dev/null +++ b/liveness.cc @@ -0,0 +1,183 @@ + +#define FUSE_USE_VERSION 26 +#include +#define FDB_API_VERSION 630 +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "util.h" +#include "inflight.h" +#include "values.pb.h" + +/************************************************************* + * liveness management + ************************************************************* + * initially for garbage collection purposes, filesystems need to be + * able to agree which processes are still running, and clean up + * records of dead ones. + */ + +std::vector pid; +ProcessTableEntry pt_entry; +struct fuse_session *fuse_session; +pthread_t liveness_thread; +bool terminate = true; + +void send_pt_entry(bool startup) +{ + std::function f = [startup](FDBTransaction *t){ + auto key = pack_pid_key(pid); + fdb_error_t err; + if(!startup) { + FDBFuture *g = fdb_transaction_get(t, key.data(), key.size(), 0); + err = fdb_future_block_until_ready(g); + if(err!=0) { throw err; } + fdb_bool_t present; + uint8_t const *value; + int value_length; + err = fdb_future_get_value(g, &present, &value, &value_length); + if(err!=0) { throw err; } + + if(!present) { + // our entry in the table was removed. + terminate = true; + + // kill everything in-flight + shut_it_down(); + + // stop processing filesystem calls immediately. + // the fuse loop won't notice this until the next + // filesystem operation comes in. sigh. + fuse_session_exit(fuse_session); + + return 0; + } + } + int entry_size = pt_entry.ByteSizeLong(); + uint8_t entry_buffer[entry_size]; + pt_entry.SerializeToArray(entry_buffer, entry_size); + + fdb_transaction_set(t, key.data(), key.size(), + entry_buffer, entry_size); + return 0; + }; + run_sync_transaction(f); +} + +const int liveness_refresh_sec = 0; +const int liveness_refresh_nsec = 500*1000000; + +void update_pt_entry_time() +{ + struct timespec tv; + clock_gettime(CLOCK_REALTIME, &tv); + + pt_entry.mutable_last_updated()->set_sec(tv.tv_sec); + pt_entry.mutable_last_updated()->set_nsec(tv.tv_nsec); +} + +std::mutex manager_running; +void *liveness_manager(void *ignore) +{ + std::unique_lock lock(manager_running); + + pt_entry.set_pid(pid.data(), pid.size()); + pt_entry.set_liveness_counter(0); + update_pt_entry_time(); + struct utsname buf; + uname(&buf); + pt_entry.set_hostname(buf.nodename); + + send_pt_entry(true); + + while(!terminate) { + struct timespec sleep; + sleep.tv_sec = liveness_refresh_sec; + sleep.tv_nsec = liveness_refresh_nsec; + nanosleep(&sleep, NULL); + + update_pt_entry_time(); + pt_entry.set_liveness_counter(pt_entry.liveness_counter() + 1); + + send_pt_entry(false); + } + + return NULL; +} + +void start_liveness(struct fuse_session *se) +{ + // this probably isn't the best way to produce 128 bits in + // a std::vector, but, whatever. + for(int i=0; i<16; i++) { + pid.push_back(random() & 0xFF); + } + + // we make main pass this in so it isn't floating around + // in every namespace. + fuse_session = se; + + terminate = false; + + pthread_create(&liveness_thread, NULL, liveness_manager, NULL); +} + +// we're being called after unmount +void terminate_liveness() +{ + terminate = true; + + // wait until the liveness_manager is done + std::unique_lock lock(manager_running); + + // clear our PID record + std::function f = [](FDBTransaction *t){ + auto start = pack_pid_key(pid); + auto stop = start; + stop.push_back('\xff'); + fdb_transaction_clear_range(t, + start.data(), start.size(), + stop.data(), stop.size()); + return 0; + }; + run_sync_transaction(f); + + // TODO are all of our outstanding transactions dead? + + int clears_per_batch = 64; + for(auto it = lookup_counts.cbegin(); + it != lookup_counts.cend();) { + // it is at this moment that i begin to question my use of c++ + std::function f = + [it, clears_per_batch](FDBTransaction *t) { + auto jt = it; + for(int count = 0; (jt != lookup_counts.cend()) && (count < clears_per_batch); jt++, count++) { + auto key = pack_inode_use_key(jt->first); + fdb_transaction_clear(t, key.data(), key.size()); + } + return jt; + }; + auto maybe_jt = run_sync_transaction(f); + if(maybe_jt) + it = *maybe_jt; + else + /* if there was failure, don't advance it; we'll try again. */; + } + +} diff --git a/liveness.h b/liveness.h new file mode 100644 index 0000000..4986814 --- /dev/null +++ b/liveness.h @@ -0,0 +1,9 @@ +#ifndef _LIVENESS_H_ +#define _LIVENESS_H_ + +extern std::vector pid; // our unique id for use records + +extern void start_liveness(struct fuse_session *); +extern void terminate_liveness(); + +#endif diff --git a/main.cc b/main.cc index bcedd8d..f9d1e94 100644 --- a/main.cc +++ b/main.cc @@ -17,6 +17,7 @@ #include #include "util.h" +#include "liveness.h" #include "values.pb.h" #include "garbage_collector.h" #include "fdbfs_ops.h" @@ -129,6 +130,8 @@ int main(int argc, char *argv[]) se = fuse_lowlevel_new(&args, &fdbfs_oper, sizeof(fdbfs_oper), NULL); + start_liveness(se); + if (se != NULL) { if (fuse_set_signal_handlers(se) != -1) @@ -141,6 +144,7 @@ int main(int argc, char *argv[]) fuse_session_destroy(se); } fuse_unmount(mountpoint, ch); + terminate_liveness(); } fuse_opt_free_args(&args); diff --git a/util.cc b/util.cc index e4d396c..c321ec5 100644 --- a/util.cc +++ b/util.cc @@ -112,6 +112,14 @@ std::vector pack_garbage_key(fuse_ino_t ino) return key; } +std::vector pack_pid_key(std::vector p) +{ + auto key = key_prefix; + key.push_back('p'); + key.insert(key.end(), p.begin(), p.end()); + return key; +} + std::vector pack_inode_use_key(fuse_ino_t ino) { auto key = pack_inode_key(ino); diff --git a/util.h b/util.h index bdc4cf3..f224b01 100644 --- a/util.h +++ b/util.h @@ -68,6 +68,7 @@ extern struct fdbfs_filehandle **extract_fdbfs_filehandle(struct fuse_file_info extern fuse_ino_t generate_inode(); extern std::vector pack_inode_key(fuse_ino_t, char=INODE_PREFIX); extern std::vector pack_garbage_key(fuse_ino_t); +extern std::vector pack_pid_key(std::vector); extern std::vector pack_inode_use_key(fuse_ino_t); extern std::vector pack_fileblock_key(fuse_ino_t, uint64_t); extern std::vector pack_dentry_key(fuse_ino_t, const std::string&); @@ -109,4 +110,50 @@ extern int decode_block(FDBKeyValue *, int, uint8_t *, int, int); #define debug_print(fmt, ...) \ do { if (DEBUG) fprintf(stderr, fmt, __VA_ARGS__); } while (0) +/** + * This is for simple retryable synchronous transactions, like the + * ones we'll run in our background threads. It shouldn't be used + * anywhere performance critical. + */ +template +std::optional run_sync_transaction(std::function f) +{ + FDBTransaction *t; + if(fdb_database_create_transaction(database, &t)) { + throw new std::runtime_error("failed to create transaction"); + } + + while(true) { + fdb_error_t err = 0; + T retval; + try { + retval = f(t); + FDBFuture *commitfuture = fdb_transaction_commit(t); + if(fdb_future_block_until_ready(commitfuture)) { + throw new std::runtime_error("failed to block for future"); + } + if(fdb_future_get_error(commitfuture)) { + err = fdb_future_get_error(commitfuture); + } + fdb_future_destroy(commitfuture); + } catch (fdb_error_t _err) { + err = _err; + } + + if(!err) { + return std::make_optional(retval); + } else { + FDBFuture *retryfuture = fdb_transaction_on_error(t, err); + if(fdb_future_block_until_ready(retryfuture)) { + throw new std::runtime_error("failed to block for future"); + } + if(fdb_future_get_error(retryfuture)) { + // failed utterly + return std::nullopt; + } + // fall out and loop + } + } +} + #endif // __UTIL_H_ diff --git a/values.proto b/values.proto index c46f314..bc0b7b8 100644 --- a/values.proto +++ b/values.proto @@ -20,15 +20,29 @@ message Timespec { required uint32 nsec = 2; } +message ProcessTableEntry { + // randomly generated 128 bits + required bytes pid = 1; + // based on the processes' clock. not reliable. + required Timespec last_updated = 2; + // incremented every time this is updated + required uint64 liveness_counter = 3; + + optional string hostname = 4; +} + message DirectoryEntry { + // We can plunk anything in here from the + // INodeRecord that's immutable required fixed64 inode = 1; required filetype type = 2; + // symlink value? } message INodeRecord { // 0.. fundamentals - required fixed64 inode = 1; - required filetype type = 2; + required fixed64 inode = 1; // IMMUTABLE + required filetype type = 2; // IMMUTABLE required uint32 nlinks = 3; optional uint32 mode = 4; optional uint64 size = 5;