From bbbe798629aba9bc8e84e076764dfaad877d14b6 Mon Sep 17 00:00:00 2001 From: fortrue Date: Sun, 29 Apr 2018 19:34:32 +0800 Subject: [PATCH 1/4] fix crash because server without DC, but config defined DC, see #32 --- src/ServerGroup.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ServerGroup.cpp b/src/ServerGroup.cpp index 30b6e9e..e5d088c 100644 --- a/src/ServerGroup.cpp +++ b/src/ServerGroup.cpp @@ -162,6 +162,9 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const continue; } DC* dc = s->dc(); + if (!dc) { + continue; + } int dcrp = localDC->getReadPriority(dc); if (dcrp <= 0) { continue; @@ -221,7 +224,7 @@ Server* ServerGroup::getReadServer(Handler* h, DC* localDC) const dc = sdc[0]; found = true; } - if (!found) {//dc maybe nullptr even we found + if (!found) { return nullptr; } Server* deadServs[Const::MaxServInGroup]; From 7fda4b77c1e6db8276a3a516cdd76ded8d0be3f7 Mon Sep 17 00:00:00 2001 From: fortrue Date: Wed, 16 May 2018 09:56:43 +0800 Subject: [PATCH 2/4] 1.fix multi-keys request leader self reference 2.adjust alloc implement --- src/AcceptConnection.h | 3 ++- src/Alloc.h | 4 ++-- src/Buffer.h | 4 +++- src/Common.h | 4 ++-- src/ConnectConnection.h | 3 ++- src/Connection.cpp | 2 +- src/Request.cpp | 36 +++++++++++++++++++----------------- src/Request.h | 7 ++++--- src/Response.h | 3 ++- 9 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/AcceptConnection.h b/src/AcceptConnection.h index bcc800f..f2754ad 100644 --- a/src/AcceptConnection.h +++ b/src/AcceptConnection.h @@ -29,6 +29,7 @@ class AcceptConnection : typedef AcceptConnection Value; typedef ListNode> ListNodeType; typedef DequeNode> DequeNodeType; + typedef Alloc Allocator; public: AcceptConnection(int fd, sockaddr* addr, socklen_t len); ~AcceptConnection(); @@ -97,6 +98,6 @@ class AcceptConnection : typedef List AcceptConnectionList; typedef Deque AcceptConnectionDeque; -typedef Alloc AcceptConnectionAlloc; +typedef AcceptConnection::Allocator AcceptConnectionAlloc; #endif diff --git a/src/Alloc.h b/src/Alloc.h index 41cce3d..3797ac7 100644 --- a/src/Alloc.h +++ b/src/Alloc.h @@ -76,7 +76,7 @@ class Alloc : public AllocBase } UsedMemory += allocSize(); if (MaxMemory == 0 || UsedMemory <= MaxMemory) { - void* p = ::operator new(allocSize()); + void* p = ::operator new(allocSize(), std::nothrow); if (p) { try { obj = new (p) T(args...); @@ -145,7 +145,7 @@ class RefCntObj { int n = --mCnt; if (n == 0) { - Alloc::destroy(static_cast(this)); + T::Allocator::destroy(static_cast(this)); } else if (n < 0) { logError("unref object %p with cnt %d", this, n); abort(); diff --git a/src/Buffer.h b/src/Buffer.h index b7b9a09..acb3d14 100644 --- a/src/Buffer.h +++ b/src/Buffer.h @@ -23,6 +23,7 @@ class Buffer : public RefCntObj { public: + typedef Alloc Allocator; static const int MaxBufFmtAppendLen = 8192; public: Buffer& operator=(const Buffer&); @@ -92,12 +93,13 @@ class Buffer : }; typedef List BufferList; +typedef Buffer::Allocator BufferAlloc; + template<> inline int allocSize() { return Buffer::getSize() + sizeof(Buffer); } -typedef Alloc BufferAlloc; struct BufferPos { diff --git a/src/Common.h b/src/Common.h index 07e3e52..b7f15ab 100644 --- a/src/Common.h +++ b/src/Common.h @@ -32,8 +32,8 @@ namespace Const static const int MaxCmdLen = 32; static const int MaxKeyLen = 512; static const int BufferAllocCacheSize = 64; - static const int RequestAllocCacheSize = 32; - static const int ResponseAllocCacheSize = 32; + static const int RequestAllocCacheSize = 128; + static const int ResponseAllocCacheSize = 128; static const int AcceptConnectionAllocCacheSize = 32; static const int ConnectConnectionAllocCacheSize = 4; }; diff --git a/src/ConnectConnection.h b/src/ConnectConnection.h index 44b6482..315acd1 100644 --- a/src/ConnectConnection.h +++ b/src/ConnectConnection.h @@ -23,6 +23,7 @@ class ConnectConnection : typedef ConnectConnection Value; typedef ListNode ListNodeType; typedef DequeNode DequeNodeType; + typedef Alloc Allocator; public: ConnectConnection(Server* s, bool shared); ~ConnectConnection(); @@ -97,6 +98,6 @@ class ConnectConnection : typedef List ConnectConnectionList; typedef Deque ConnectConnectionDeque; -typedef Alloc ConnectConnectionAlloc; +typedef ConnectConnection::Allocator ConnectConnectionAlloc; #endif diff --git a/src/Connection.cpp b/src/Connection.cpp index 40c6de2..38409dc 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -27,7 +27,7 @@ BufferPtr Connection::getBuffer(Handler* h, bool allowNew) } } if (!mBuf || mBuf->full()) { - BufferPtr buf = Alloc::create(); + BufferPtr buf = BufferAlloc::create(); if (mBuf) { mBuf->concat(buf); } diff --git a/src/Request.cpp b/src/Request.cpp index ec03b40..842242f 100644 --- a/src/Request.cpp +++ b/src/Request.cpp @@ -98,6 +98,7 @@ Request::Request(GenericCode code): Request::~Request() { + clear(); } void Request::clear() @@ -155,13 +156,14 @@ void Request::set(const RequestParser& p, Request* leader) } mHead = r->mReq; mReq = p.request(); - mLeader = leader; if (leader == this) { if (mType == Command::Mset || mType == Command::Msetnx) { mFollowers = (p.argNum() - 1) >> 1; } else { mFollowers = p.argNum() - 1; } + } else { + mLeader = leader; } } else { mReq = p.request(); @@ -338,49 +340,49 @@ int Request::fill(IOVec* vecs, int len) void Request::setResponse(Response* res) { mDone = true; - if (mLeader) { - mLeader->mFollowersDone += 1; + if (Request* ld = leader()) { + ld->mFollowersDone += 1; switch (mType) { case Command::Mget: mRes = res; break; case Command::Mset: - if (Response* leaderRes = mLeader->getResponse()) { + if (Response* leaderRes = ld->getResponse()) { if (res->isError() && !leaderRes->isError()) { - mLeader->mRes = res; + ld->mRes = res; } } else { - mLeader->mRes = res; + ld->mRes = res; } break; case Command::Msetnx: - if (Response* leaderRes = mLeader->getResponse()) { + if (Response* leaderRes = ld->getResponse()) { if (!leaderRes->isError() && (res->isError() || res->integer() == 0)) { - mLeader->mRes = res; + ld->mRes = res; } } else { - mLeader->mRes = res; + ld->mRes = res; } break; case Command::Touch: case Command::Exists: case Command::Del: case Command::Unlink: - if (!mLeader->mRes) { - mLeader->mRes = res; + if (!ld->mRes) { + ld->mRes = res; } - if (mLeader->isDone()) { - mLeader->mRes->set(mLeader->mRes->integer()); + if (ld->isDone()) { + ld->mRes->set(ld->mRes->integer()); } break; case Command::ScriptLoad: - if (Response* leaderRes = mLeader->getResponse()) { + if (Response* leaderRes = ld->getResponse()) { if (leaderRes->isString() && !res->isString()) { - mLeader->mRes = res; + ld->mRes = res; } } else { - mLeader->mRes = res; + ld->mRes = res; } break; default: @@ -395,7 +397,7 @@ void Request::setResponse(Response* res) bool Request::isDone() const { - if (mLeader == this) { + if (isLeader()) { switch (mType) { case Command::Mget: case Command::Psubscribe: diff --git a/src/Request.h b/src/Request.h index 4d8e431..1f04102 100644 --- a/src/Request.h +++ b/src/Request.h @@ -25,6 +25,7 @@ class Request : public: typedef Request Value; typedef ListNode, RequestListIndex::Size> ListNodeType; + typedef Alloc Allocator; static const int MaxRedirectLimit = 3; enum GenericCode { @@ -119,11 +120,11 @@ class Request : } Request* leader() const { - return mLeader; + return isLeader() ? const_cast(this) : (Request*)mLeader; } bool isLeader() const { - return mLeader == this; + return mFollowers > 0; } bool isDelivered() const { @@ -181,6 +182,6 @@ class Request : typedef List RecvRequestList; typedef List SendRequestList; -typedef Alloc RequestAlloc; +typedef Request::Allocator RequestAlloc; #endif diff --git a/src/Response.h b/src/Response.h index 7634492..db75dd6 100644 --- a/src/Response.h +++ b/src/Response.h @@ -18,6 +18,7 @@ class Response : public: typedef Response Value; typedef ListNode> ListNodeType; + typedef Alloc Allocator; enum GenericCode { Pong, @@ -137,6 +138,6 @@ class Response : }; typedef List ResponseList; -typedef Alloc ResponseAlloc; +typedef Response::Allocator ResponseAlloc; #endif From 34cc6d151e1caa85a8405bfc82144a9103c73bfd Mon Sep 17 00:00:00 2001 From: fortrue Date: Tue, 12 Jun 2018 16:29:12 +0800 Subject: [PATCH 3/4] fix for script load command follow policy --- src/Request.cpp | 2 +- src/Request.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Request.cpp b/src/Request.cpp index 842242f..927d8b9 100644 --- a/src/Request.cpp +++ b/src/Request.cpp @@ -289,7 +289,7 @@ void Request::adjustScanCursor(long cursor) void Request::follow(Request* leader) { - ++mFollowers; + leader->mFollowers += 1; if (leader == this) { return; } diff --git a/src/Request.h b/src/Request.h index 1f04102..260e5be 100644 --- a/src/Request.h +++ b/src/Request.h @@ -71,7 +71,7 @@ class Request : bool isDone() const; AcceptConnection* connection() const { - return mConn; + return mLeader ? mLeader->mConn : mConn; } void detach() { From 20bfcb657ec9cd9190eef6f15b7bd84840d2eb6d Mon Sep 17 00:00:00 2001 From: fortrue Date: Thu, 5 Jul 2018 11:54:12 +0800 Subject: [PATCH 4/4] fix async assign client race condition, #32 #33 #35 #36 --- src/EpollMultiplexor.cpp | 6 +----- src/Handler.cpp | 3 ++- src/KqueueMultiplexor.h | 1 - 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/EpollMultiplexor.cpp b/src/EpollMultiplexor.cpp index ea34a10..8fe7ffb 100644 --- a/src/EpollMultiplexor.cpp +++ b/src/EpollMultiplexor.cpp @@ -31,12 +31,9 @@ bool EpollMultiplexor::addSocket(Socket* s, int evts) event.events |= (evts & ReadEvent) ? EPOLLIN : 0; event.events |= (evts & WriteEvent) ? EPOLLOUT : 0; event.events |= EPOLLET; - //event.events |= EPOLLONESHOT; event.data.ptr = s; + s->setEvent(evts); int ret = epoll_ctl(mFd, EPOLL_CTL_ADD, s->fd(), &event); - if (ret == 0) { - s->setEvent(evts); - } return ret == 0; } @@ -61,7 +58,6 @@ bool EpollMultiplexor::addEvent(Socket* s, int evts) } if ((s->getEvent() | evts) != s->getEvent()) { event.events |= EPOLLET; - //event.events |= EPOLLONESHOT; int ret = epoll_ctl(mFd, EPOLL_CTL_MOD, s->fd(), &event); if (ret == 0) { s->setEvent(s->getEvent() | evts); diff --git a/src/Handler.cpp b/src/Handler.cpp index 8393e80..b29bdf4 100644 --- a/src/Handler.cpp +++ b/src/Handler.cpp @@ -315,7 +315,6 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len) AcceptConnection* c = nullptr; try { c = AcceptConnectionAlloc::create(fd, addr, len); - logNotice("h %d accept c %s %d", id(), c->peer(), fd); } catch (ExceptionBase& e) { logWarn("h %d create connection for client %d fail %s", id(), fd, e.what()); @@ -368,6 +367,8 @@ void Handler::addAcceptSocket(int fd, sockaddr* addr, socklen_t len) logWarn("h %d destroy c %s %d with add to event loop fail:%s", id(), c->peer(), c->fd(), StrError()); AcceptConnectionAlloc::destroy(c); + } else { + logNotice("h %d accept c %s %d assign to h %d", id(), c->peer(), fd, dst->id()); } } diff --git a/src/KqueueMultiplexor.h b/src/KqueueMultiplexor.h index a9c804e..c465c6f 100644 --- a/src/KqueueMultiplexor.h +++ b/src/KqueueMultiplexor.h @@ -67,6 +67,5 @@ int KqueueMultiplexor::wait(long usec, T* handler) typedef KqueueMultiplexor Multiplexor; -#define _MULTIPLEXOR_ASYNC_ASSIGN_ #endif