diff --git a/libspace/plugins/redis/PluginInterface.cpp b/libspace/plugins/redis/PluginInterface.cpp index 75cd090d9..7959f2c4d 100644 --- a/libspace/plugins/redis/PluginInterface.cpp +++ b/libspace/plugins/redis/PluginInterface.cpp @@ -44,6 +44,8 @@ static void InitPluginOptions() { new OptionValue("host","127.0.0.1",Sirikata::OptionValueType(),"Redis host to connect to."), new OptionValue("port","6379",Sirikata::OptionValueType(),"Redis port to connect to."), new OptionValue("prefix","",Sirikata::OptionValueType(),"Prefix for redis keys, allowing you to provide 'namespaces' so multiple spaces can share the same redis database."), + new OptionValue("ttl","60s",Sirikata::OptionValueType(),"Duration for keys to remain valid in Redis before they are automatically removed in case of dead nodes. This is a tradeoff between having to refresh entries and how long it takes before an object identifier can be reclaimed after a server crashes."), + new OptionValue("transactions","true",Sirikata::OptionValueType(),"If false, disables transactions. This isn't really safe as you can fail between commands and get keys stuck, but it allows running against older versions of Redis. Since this isn't safe, transactions are turned on by default."), NULL ); } @@ -55,8 +57,10 @@ static ObjectSegmentation* createRedisOSeg(SpaceContext* ctx, Network::IOStrand* String redis_host = optionsSet->referenceOption("host")->as(); uint32 redis_port = optionsSet->referenceOption("port")->as(); String redis_prefix = optionsSet->referenceOption("prefix")->as(); + Duration redis_ttl = optionsSet->referenceOption("ttl")->as(); + bool redis_has_transactions = optionsSet->referenceOption("transactions")->as(); - return new RedisObjectSegmentation(ctx, oseg_strand, cseg, cache, redis_host, redis_port, redis_prefix); + return new RedisObjectSegmentation(ctx, oseg_strand, cseg, cache, redis_host, redis_port, redis_prefix, redis_ttl, redis_has_transactions); } } // namespace Sirikata diff --git a/libspace/plugins/redis/RedisObjectSegmentation.cpp b/libspace/plugins/redis/RedisObjectSegmentation.cpp index c3d397f63..3bc19f5ff 100644 --- a/libspace/plugins/redis/RedisObjectSegmentation.cpp +++ b/libspace/plugins/redis/RedisObjectSegmentation.cpp @@ -32,6 +32,7 @@ #include "RedisObjectSegmentation.hpp" #include +#include #define REDISOSEG_LOG(lvl,msg) SILOG(redis_oseg, lvl, msg) @@ -77,15 +78,39 @@ void globalRedisCleanup(void *privdata) { // Basic state tracking for a request that uses Redis async api struct RedisObjectOperationInfo { + RedisObjectOperationInfo(RedisObjectSegmentation* _oseg, const UUID& _obj) + : oseg(_oseg), obj(_obj), completed(false), refcount(0) + {} + + void checkDestroy() { + if (refcount == 0) delete this; + } + RedisObjectSegmentation* oseg; UUID obj; + bool completed; // Callback invoked, successful or not + + // Only used in some cases, not required except for transactions + uint8 refcount; }; // State tracking for migrate changes. If we need to generate an ack, this // requires additional info struct RedisObjectMigratedOperationInfo { + RedisObjectMigratedOperationInfo(RedisObjectSegmentation* _oseg, const UUID& _obj, ServerID _ackTo) + : oseg(_oseg), obj(_obj), ackTo(_ackTo), completed(false), refcount(0) + {} + + void checkDestroy() { + if (refcount == 0) delete this; + } + RedisObjectSegmentation* oseg; UUID obj; ServerID ackTo; + bool completed; // Callback invoked, successful or not + + // Only used in some cases, not required except for transactions + uint8 refcount; }; void globalRedisLookupObjectReadFinished(redisAsyncContext* c, void* _reply, void* privdata) { @@ -118,64 +143,189 @@ void globalRedisAddNewObjectWriteFinished(redisAsyncContext* c, void* _reply, vo redisReply *reply = (redisReply*)_reply; RedisObjectOperationInfo* wi = (RedisObjectOperationInfo*)privdata; + // This code can handle both transaction and non-transaction writes (which we need + // for older versions of the redis server). They can be cleanly separated + // because transactions only have simple status replies + a bulk reply, + // whereas the individual commands (set + expire) both return an + // integer. Any other values like null or errors can be returned as errors + // in both cases. + + wi->refcount--; + // If we've already indicated a result, we can't go back + if (wi->completed) { + wi->checkDestroy(); + return; + } + if (reply == NULL) { REDISOSEG_LOG(error, "Unknown redis error when writing new object " << wi->obj.toString()); wi->oseg->finishWriteNewObject(wi->obj, OSegWriteListener::UNKNOWN_ERROR); + wi->completed = true; } else if (reply->type == REDIS_REPLY_ERROR) { REDISOSEG_LOG(error, "Redis error when writing new object " << wi->obj.toString() << ": " << String(reply->str, reply->len)); wi->oseg->finishWriteNewObject(wi->obj,OSegWriteListener::UNKNOWN_ERROR); + wi->completed = true; } else if (reply->type == REDIS_REPLY_INTEGER) { if (reply->integer == 1) { - wi->oseg->finishWriteNewObject(wi->obj, OSegWriteListener::SUCCESS); + // We need to get two of these, one for the SET(NX) one for the + // EXPIRE. Given the set of commands, we expect the refcount for the + // reply for EXPIRE to be 0 (we've already decremented and there's + // no EXEC following it in non-transaction mode). If we get into + // here (no previous error) and we're at that refcount, we can + // indicate success + if (wi->refcount == 0) { + wi->oseg->finishWriteNewObject(wi->obj, OSegWriteListener::SUCCESS); + wi->completed = true; + } } else if (reply->integer == 0) { REDISOSEG_LOG(error, "Redis error when writing new object " << wi->obj.toString() << ": " << reply->integer<< " likely already registered."); wi->oseg->finishWriteNewObject(wi->obj,OSegWriteListener::OBJ_ALREADY_REGISTERED); + wi->completed = true; } else { REDISOSEG_LOG(error, "Redis error when writing new object " << wi->obj.toString() << ": " << reply->integer<< " unknown error."); wi->oseg->finishWriteNewObject(wi->obj,OSegWriteListener::UNKNOWN_ERROR); + wi->completed = true; } } + else if (reply->type == REDIS_REPLY_STATUS && (String(reply->str, reply->len) == String("QUEUED") || String(reply->str, reply->len) == String("OK"))) { + // Just an OK response to the initial MULTI call or a queued item in the + // transaction, don't need to do anything until we get more feedback + } + else if (reply->type == REDIS_REPLY_ARRAY) { + // Bulk reply + if (reply->elements != 2) { // Set + expire + wi->oseg->finishWriteNewObject(wi->obj, OSegWriteListener::UNKNOWN_ERROR); + } + else if (reply->element[0]->type != REDIS_REPLY_INTEGER || reply->element[1]->type != REDIS_REPLY_INTEGER) { + wi->oseg->finishWriteNewObject(wi->obj, OSegWriteListener::UNKNOWN_ERROR); + } + else if (reply->element[0]->integer != 1) { // SET failed + REDISOSEG_LOG(error, "Redis error when writing new object " << wi->obj.toString() << ": " << reply->integer<< " likely already registered."); + wi->oseg->finishWriteNewObject(wi->obj,OSegWriteListener::OBJ_ALREADY_REGISTERED); + } + else if (reply->element[1]->integer != 1) { // EXPIRE failed + REDISOSEG_LOG(error, "Redis error when writing new object expiry " << wi->obj.toString() << ": " << reply->integer<< " likely already registered."); + wi->oseg->finishWriteNewObject(wi->obj,OSegWriteListener::UNKNOWN_ERROR); + } + else { + // If we've gotten past all these other error checks, we seem to + // have succeeded + wi->oseg->finishWriteNewObject(wi->obj, OSegWriteListener::SUCCESS); + } + // No matter what we've completed if we got here since it's the EXEC response + wi->completed = true; + } else { REDISOSEG_LOG(error, "Unexpected redis reply type when writing new object " << wi->obj.toString() << ": " << reply->type); wi->oseg->finishWriteNewObject(wi->obj,OSegWriteListener::UNKNOWN_ERROR); + wi->completed = true; } - delete wi; + wi->checkDestroy(); } void globalRedisAddMigratedObjectWriteFinished(redisAsyncContext* c, void* _reply, void* privdata) { redisReply *reply = (redisReply*)_reply; RedisObjectMigratedOperationInfo* wi = (RedisObjectMigratedOperationInfo*)privdata; + wi->refcount--; + // If we've already indicated a result, we can't go back + if (wi->completed) { + wi->checkDestroy(); + return; + } + if (reply == NULL) { REDISOSEG_LOG(error, "Unknown redis error when writing migrated object " << wi->obj.toString()); + wi->completed = true; } else if (reply->type == REDIS_REPLY_ERROR) { REDISOSEG_LOG(error, "Redis error when writing migrated object " << wi->obj.toString() << ": " << String(reply->str, reply->len)); + wi->completed = true; } else if (reply->type == REDIS_REPLY_STATUS) { - if (String(reply->str, reply->len) == String("OK")) - wi->oseg->finishWriteMigratedObject(wi->obj, wi->ackTo); - else + String reply_str(reply->str, reply->len); + if (reply_str == String("QUEUED")) { + // Just queuing up events, safe to ignore + } + else if (reply_str == String("OK")) { + // For transactions, we'll be at refcount == 3 for the initial OK to MULTI + if (wi->refcount == 3) { + // Ignore result from MULTI + } + else if (wi->refcount == 1) { // Non-transaction will be at 1 after the SET + // Still need the EXPIRE to succeed, do nothing + } + else { + // Unexpected error + REDISOSEG_LOG(error, "Unexpected status reply while writing migrated object " << wi->obj.toString() << ": " << reply_str); + wi->completed = true; + } + } + else { REDISOSEG_LOG(error, "Redis error when writing migrated object " << wi->obj.toString() << ": " << String(reply->str, reply->len)); + wi->completed = true; + } + } + else if (reply->type == REDIS_REPLY_INTEGER) { + // Response to non-transactional EXPIRE + if (reply->integer == 1) { + // We need to get two of these, one for the SET(NX) one for the + // EXPIRE. Given the set of commands, we expect the refcount for the + // reply for EXPIRE to be 0 (we've already decremented and there's + // no EXEC following it in non-transaction mode). If we get into + // here (no previous error) and we're at that refcount, we can + // indicate success + if (wi->refcount == 0) { + wi->oseg->finishWriteMigratedObject(wi->obj, wi->ackTo); + wi->completed = true; + } + } + else { + REDISOSEG_LOG(error, "Redis error when writing migrated object " << wi->obj.toString() << ": " << reply->integer << " likely already registered."); + wi->completed = true; + } + } + else if (reply->type == REDIS_REPLY_ARRAY) { + // Bulk reply + if (reply->elements != 2) { // Set + expire + REDISOSEG_LOG(error, "Invalid bulk reply to migrated object transaction " << wi->obj.toString() << ": " << reply->elements << " elements"); + } + else if (reply->element[0]->type != REDIS_REPLY_STATUS || reply->element[1]->type != REDIS_REPLY_INTEGER) { + REDISOSEG_LOG(error, "Invalid bulk reply types to migrated object transaction " << wi->obj.toString()); + } + else if ( String(reply->element[0]->str, reply->element[0]->len) != "OK") { // SET failed + REDISOSEG_LOG(error, "Unexpected status reply for SET for migrated object transaction " << wi->obj.toString()); + } + else if (reply->element[1]->integer != 1) { // EXPIRE failed + REDISOSEG_LOG(error, "Unexpected status reply for EXPIRE for migrated object transaction " << wi->obj.toString()); + } + else { + // If we've gotten past all these other error checks, we seem to + // have succeeded + wi->oseg->finishWriteMigratedObject(wi->obj, wi->ackTo); + } + // No matter what we've completed if we got here since it's the EXEC response + wi->completed = true; } else { REDISOSEG_LOG(error, "Unexpected redis reply type when writing migrated object " << wi->obj.toString() << ": " << reply->type); + wi->completed = true; } - delete wi; + wi->checkDestroy(); } void globalRedisDeleteFinished(redisAsyncContext* c, void* _reply, void* privdata) { @@ -200,19 +350,74 @@ void globalRedisDeleteFinished(redisAsyncContext* c, void* _reply, void* privdat delete wi; } +void globalRedisRefreshObjectTimeoutWriteFinished(redisAsyncContext* c, void* _reply, void* privdata) { + redisReply *reply = (redisReply*)_reply; + RedisObjectOperationInfo* wi = (RedisObjectOperationInfo*)privdata; + + wi->refcount--; + // If we've already indicated a result, we can't go back. We haven't issued + // a callback, but this just keeps track of whether we even can give a valid + // callback anymore/should report any more issues. + if (wi->completed) { + wi->checkDestroy(); + return; + } + + if (reply == NULL) { + REDISOSEG_LOG(error, "Unknown redis error when refreshing object timeout " << wi->obj.toString()); + wi->completed = true; + } + else if (reply->type == REDIS_REPLY_ERROR) { + REDISOSEG_LOG(error, "Redis error when refreshing object timeout " << wi->obj.toString() << ": " << String(reply->str, reply->len)); + wi->completed = true; + } + else if (reply->type == REDIS_REPLY_STATUS) { + String reply_str(reply->str, reply->len); + if (reply_str == String("OK")) { + // Should just be the OK from the SET for old versions of Redis, + // safe to ignore and wait for the EXPIRE reply + } + else { + REDISOSEG_LOG(error, "Redis got unexpected or bad status reply when refreshing object timeout " << wi->obj.toString() << ": " << reply_str); + wi->completed = true; + } + } + else if (reply->type == REDIS_REPLY_INTEGER) { + if (reply->integer != 1) { + REDISOSEG_LOG(error, "Redis error when refreshing object timeout " << wi->obj.toString() << ", got incorrect return value: " << reply->integer); + wi->completed = true; + } + // Otherwise succeeded + } + else { + REDISOSEG_LOG(error, "Unexpected redis reply type when refreshing object timeout " << wi->obj.toString() << ": " << reply->type); + wi->completed = true; + } + + wi->checkDestroy(); +} + } // namespace -RedisObjectSegmentation::RedisObjectSegmentation(SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, const String& redis_host, uint32 redis_port, const String& redis_prefix) +RedisObjectSegmentation::RedisObjectSegmentation(SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, const String& redis_host, uint32 redis_port, const String& redis_prefix, Duration key_ttl, bool redis_has_transactions) : ObjectSegmentation(con, o_strand), mCSeg(cseg), mCache(cache), mRedisHost(redis_host), mRedisPort(redis_port), mRedisPrefix(redis_prefix), + mRedisKeyTTL(key_ttl), + mRedisHasTransactions(redis_has_transactions), mRedisContext(NULL), mRedisFD(NULL), mReading(false), - mWriting(false) + mWriting(false), + mExpiryTimer( + Network::IOTimer::create( + con->mainStrand, + std::tr1::bind(&RedisObjectSegmentation::processExpiredObjects, this) + ) + ) { } @@ -225,6 +430,11 @@ void RedisObjectSegmentation::start() { connect(); } +void RedisObjectSegmentation::stop() { + mExpiryTimer->cancel(); + ObjectSegmentation::stop(); +} + void RedisObjectSegmentation::connect() { Lock lck(mMutex); @@ -375,9 +585,7 @@ OSegEntry RedisObjectSegmentation::lookup(const UUID& obj_id) { // Otherwise, kick off the lookup process and return null if (mStopping) return OSegEntry::null(); - RedisObjectOperationInfo* ri = new RedisObjectOperationInfo(); - ri->oseg = this; - ri->obj = obj_id; + RedisObjectOperationInfo* ri = new RedisObjectOperationInfo(this, obj_id); ensureConnected(); { Lock lck(mMutex); @@ -420,9 +628,7 @@ void RedisObjectSegmentation::addNewObject(const UUID& obj_id, float radius) { mOSeg[obj_id] = OSegEntry(mContext->id(), radius); - RedisObjectOperationInfo* wi = new RedisObjectOperationInfo(); - wi->oseg = this; - wi->obj = obj_id; + RedisObjectOperationInfo* wi = new RedisObjectOperationInfo(this, obj_id); // Note: currently we're keeping compatibility with Redis 1.2. This means // that there aren't hashes on the server. Instead, we create and parse them // ourselves. This isn't so bad since they are all fixed format anyway. @@ -432,8 +638,20 @@ void RedisObjectSegmentation::addNewObject(const UUID& obj_id, float radius) { REDISOSEG_LOG(insane, "SETNX " << obj_id.toString() << " " << valstr); ensureConnected(); { + String obj_id_str = obj_id.toString(); Lock lck(mMutex); - redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "SETNX %s%s %b", mRedisPrefix.c_str(), obj_id.toString().c_str(), valstr.c_str(), valstr.size()); + if (mRedisHasTransactions) { + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "MULTI"); + } + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "SETNX %s%s %b", mRedisPrefix.c_str(), obj_id_str.c_str(), valstr.c_str(), valstr.size()); + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "EXPIRE %s%s %d", mRedisPrefix.c_str(), obj_id_str.c_str(), (int32)mRedisKeyTTL.seconds()); + if (mRedisHasTransactions) { + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "EXEC"); + } } } @@ -449,6 +667,9 @@ void RedisObjectSegmentation::finishWriteNewObject(const UUID& obj_id, OSegWrite mCache->insert(obj_id, mOSeg[obj_id]); mWriteListener->osegAddNewFinished(obj_id, status); + + // Schedule for updates + scheduleObjectRefresh(obj_id); } void RedisObjectSegmentation::addMigratedObject(const UUID& obj_id, float radius, ServerID idServerAckTo, bool generateAck) { @@ -456,10 +677,7 @@ void RedisObjectSegmentation::addMigratedObject(const UUID& obj_id, float radius mOSeg[obj_id] = OSegEntry(mContext->id(), radius); - RedisObjectMigratedOperationInfo* wi = new RedisObjectMigratedOperationInfo(); - wi->oseg = this; - wi->obj = obj_id; - wi->ackTo = (generateAck ? idServerAckTo : NullServerID); + RedisObjectMigratedOperationInfo* wi = new RedisObjectMigratedOperationInfo(this, obj_id, (generateAck ? idServerAckTo : NullServerID)); // Note: currently we're keeping compatibility with Redis 1.2. This means // that there aren't hashes on the server. Instead, we create and parse them // ourselves. This isn't so bad since they are all fixed format anyway. @@ -469,8 +687,20 @@ void RedisObjectSegmentation::addMigratedObject(const UUID& obj_id, float radius REDISOSEG_LOG(insane, "SET " << obj_id.toString() << " " << valstr); ensureConnected(); { + String obj_id_str = obj_id.toString(); Lock lck(mMutex); - redisAsyncCommand(mRedisContext, globalRedisAddMigratedObjectWriteFinished, wi, "SET %s%s %b", mRedisPrefix.c_str(), obj_id.toString().c_str(), valstr.c_str(), valstr.size()); + if (mRedisHasTransactions) { + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "MULTI"); + } + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddMigratedObjectWriteFinished, wi, "SET %s%s %b", mRedisPrefix.c_str(), obj_id_str.c_str(), valstr.c_str(), valstr.size()); + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "EXPIRE %s%s %d", mRedisPrefix.c_str(), obj_id_str.c_str(), (int32)mRedisKeyTTL.seconds()); + if (mRedisHasTransactions) { + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisAddNewObjectWriteFinished, wi, "EXEC"); + } } } @@ -496,9 +726,8 @@ void RedisObjectSegmentation::removeObject(const UUID& obj_id) { if (mStopping) return; mOSeg.erase(obj_id); - RedisObjectOperationInfo* wi = new RedisObjectOperationInfo(); - wi->oseg = this; - wi->obj = obj_id; + mTimeouts.get().erase(obj_id); + RedisObjectOperationInfo* wi = new RedisObjectOperationInfo(this, obj_id); ensureConnected(); { Lock lck(mMutex); @@ -524,6 +753,7 @@ void RedisObjectSegmentation::migrateObject(const UUID& obj_id, const OSegEntry& // We "migrate" the object by removing it. The other server is responsible // for updating Redis mOSeg.erase(obj_id); + mTimeouts.get().erase(obj_id); } void RedisObjectSegmentation::handleMigrateMessageAck(const Sirikata::Protocol::OSeg::MigrateMessageAcknowledge& msg) { @@ -537,6 +767,9 @@ void RedisObjectSegmentation::handleMigrateMessageAck(const Sirikata::Protocol:: // Finally, this lets the server know the migration has been acked and the // object can disconnect mWriteListener->osegMigrationAcknowledged(obj_id); + + // Schedule for updates + scheduleObjectRefresh(obj_id); } void RedisObjectSegmentation::handleUpdateOSegMessage(const Sirikata::Protocol::OSeg::UpdateOSegMessage& update_oseg_msg) { @@ -544,4 +777,80 @@ void RedisObjectSegmentation::handleUpdateOSegMessage(const Sirikata::Protocol:: mCache->insert(update_oseg_msg.m_objid(), OSegEntry(update_oseg_msg.servid_obj_on(), update_oseg_msg.m_objradius())); } + + +void RedisObjectSegmentation::scheduleObjectRefresh(const UUID& obj_id) { + Time new_expiry = mContext->simTime() + (mRedisKeyTTL/2); + + ObjectTimeoutsByID& by_id = mTimeouts.get(); + ObjectTimeoutsByID::iterator it = by_id.find(obj_id); + if (it != by_id.end()) { + ObjectTimeoutsByExpiration& by_expiry = mTimeouts.get(); + ObjectTimeoutsByExpiration::iterator exp_it = mTimeouts.project(it); + by_expiry.modify_key(exp_it, boost::lambda::_1=new_expiry); + } + else { + // Should only need to trigger timer if we went empty + bool was_empty = mTimeouts.empty(); + mTimeouts.insert( ObjectTimeout(obj_id, new_expiry) ); + if (was_empty) startTimeoutHandler(); + } +} + +void RedisObjectSegmentation::startTimeoutHandler() { + // Because we always add a fixed TTL, we should never have to change a + // timer, only add one. + if (mTimeouts.empty()) return; + + Time first_expires = mTimeouts.get().begin()->expires; + Duration tout = mContext->simTime() - first_expires; + mExpiryTimer->wait(tout); +} + +void RedisObjectSegmentation::processExpiredObjects() { + Time tnow = mContext->simTime(); + ObjectTimeoutsByExpiration& by_expiry = mTimeouts.get(); + while(!by_expiry.empty() && + tnow > by_expiry.begin()->expires) { + refreshObjectTimeout(by_expiry.begin()->objid); + // Don't delete, just update the timeout for the next update + Time new_expiry = mContext->simTime() + (mRedisKeyTTL/2); + by_expiry.modify_key(by_expiry.begin(), boost::lambda::_1=new_expiry); + } + startTimeoutHandler(); +} + +void RedisObjectSegmentation::refreshObjectTimeout(const UUID& obj_id) { + if (mStopping) return; + + assert(mOSeg.find(obj_id) != mOSeg.end()); + + RedisObjectOperationInfo* wi = new RedisObjectOperationInfo(this, obj_id); + ensureConnected(); + REDISOSEG_LOG(insane, "Refreshing timeout for " << obj_id); + { + String obj_id_str = obj_id.toString(); + // Note: currently we're keeping compatibility with Redis 1.2. This means + // that there aren't hashes on the server. Instead, we create and parse them + // ourselves. This isn't so bad since they are all fixed format anyway. + std::ostringstream os; + os << mContext->id() << ":" << mOSeg[obj_id].radius(); + String valstr = os.str(); + + Lock lck(mMutex); + // If we're on older versions of redis, then just setting EXPIRE again + // isn't enough -- the TTL wasn't reset if they already had one. Not + // sure exactly which versions this is on, for now use whether it has + // transactions or not as an indicator. Since they won't have + // transactions anyway, this code and the completion handler can be a + // bit simpler + if (!mRedisHasTransactions) { + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisRefreshObjectTimeoutWriteFinished, wi, "SET %s%s %b", mRedisPrefix.c_str(), obj_id_str.c_str(), valstr.c_str(), valstr.size()); + } + wi->refcount++; + redisAsyncCommand(mRedisContext, globalRedisRefreshObjectTimeoutWriteFinished, wi, "EXPIRE %s%s %d", mRedisPrefix.c_str(), obj_id_str.c_str(), (int32)mRedisKeyTTL.seconds()); + } +} + } // namespace Sirikata diff --git a/libspace/plugins/redis/RedisObjectSegmentation.hpp b/libspace/plugins/redis/RedisObjectSegmentation.hpp index 7aafb3e9a..e082e3fa7 100644 --- a/libspace/plugins/redis/RedisObjectSegmentation.hpp +++ b/libspace/plugins/redis/RedisObjectSegmentation.hpp @@ -36,14 +36,20 @@ #include #include +#include +#include +#include +#include + namespace Sirikata { class RedisObjectSegmentation : public ObjectSegmentation { public: - RedisObjectSegmentation(SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, const String& redis_host, uint32 redis_port, const String& redis_prefix); + RedisObjectSegmentation(SpaceContext* con, Network::IOStrand* o_strand, CoordinateSegmentation* cseg, OSegCache* cache, const String& redis_host, uint32 redis_port, const String& redis_prefix, Duration redis_ttl, bool redis_has_transactions); ~RedisObjectSegmentation(); virtual void start(); + virtual void stop(); virtual OSegEntry cacheLookup(const UUID& obj_id); virtual OSegEntry lookup(const UUID& obj_id); @@ -85,6 +91,12 @@ class RedisObjectSegmentation : public ObjectSegmentation { void readHandler(const boost::system::error_code& ec); void writeHandler(const boost::system::error_code& ec); + // Schedule an object to be refreshed in .5 TTL to keep it's key alive + void scheduleObjectRefresh(const UUID& obj_id); + void startTimeoutHandler(); + void processExpiredObjects(); + void refreshObjectTimeout(const UUID& obj_id); + CoordinateSegmentation* mCSeg; OSegCache* mCache; @@ -94,6 +106,8 @@ class RedisObjectSegmentation : public ObjectSegmentation { String mRedisHost; uint16 mRedisPort; String mRedisPrefix; + Duration mRedisKeyTTL; + bool mRedisHasTransactions; redisAsyncContext* mRedisContext; boost::asio::posix::stream_descriptor* mRedisFD; // Wrapped hiredis file descriptor @@ -109,6 +123,30 @@ class RedisObjectSegmentation : public ObjectSegmentation { typedef boost::recursive_mutex Mutex; typedef boost::lock_guard Lock; Mutex mMutex; + + + // Track objects that need timeouts refreshed in redis + struct ObjectTimeout { + ObjectTimeout(const UUID& id, Time _expires) + : objid(id), + expires(_expires) + {} + UUID objid; + Time expires; + }; + struct objid_tag {}; + struct expires_tag {}; + typedef boost::multi_index_container< + ObjectTimeout, + boost::multi_index::indexed_by< + boost::multi_index::hashed_unique< boost::multi_index::tag, BOOST_MULTI_INDEX_MEMBER(ObjectTimeout,UUID,objid), UUID::Hasher >, + boost::multi_index::ordered_non_unique< boost::multi_index::tag, BOOST_MULTI_INDEX_MEMBER(ObjectTimeout,Time,expires) > + > + > ObjectTimeouts; + typedef ObjectTimeouts::index::type ObjectTimeoutsByID; + typedef ObjectTimeouts::index::type ObjectTimeoutsByExpiration; + ObjectTimeouts mTimeouts; + Network::IOTimerPtr mExpiryTimer; }; } // namespace Sirikata