Skip to content
Browse files

update

  • Loading branch information...
1 parent 8f21ec9 commit 614038eb9f69f935a476f877cc1f2f24e17adce5 yayanyang committed Sep 26, 2012
Showing with 280 additions and 38 deletions.
  1. +27 −0 io/io_service_epoll.hpp
  2. +134 −5 io/io_service_reactor.cpp
  3. +81 −0 io/io_service_reactor.hpp
  4. +11 −10 io/socket_reactor.cpp
  5. +2 −0 sys/configure.h.in
  6. +14 −5 sys/thread.cpp
  7. +2 −10 sys/thread.h
  8. +5 −0 sys/unix.cmake
  9. +1 −1 trace/abi.h
  10. +2 −6 trace/message.cpp
  11. +1 −1 trace/message.hpp
View
27 io/io_service_epoll.hpp
@@ -50,6 +50,33 @@ namespace lemon{namespace io{namespace core{
errorCode.check_throw();
}
+ io_data * alloc_io_data(size_t type,int fd,void * userdata, LemonIOCallback callback, void * buffer, size_t bufferSize)
+ {
+ return _reactor.alloc_io_data(type,fd,userdata,callback,buffer,bufferSize);
+ }
+
+ void free_io_data(io_data * iodata)
+ {
+ _reactor.free_io_data(iodata);
+ }
+
+ accept_io_data * alloc_io_data
+ (
+ int fd,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ )
+ {
+ return _reactor.alloc_io_data(fd,callback,userdata,address,addressSize);
+ }
+
+ void free_accept_io_data(io_data * iodata)
+ {
+ _reactor.free_accept_io_data(iodata);
+ }
+
private:
void epoll_loop();
View
139 io/io_service_reactor.cpp
@@ -51,6 +51,36 @@ namespace lemon{namespace io{namespace core{
break;
}
+ case LEMON_REACTOR_SENDTO:
+ {
+ ssize_t length = read(_fd, _buffer, _bufferSize);
+
+ if(length == -1)
+ {
+ LEMON_POSIX_ERROR(_errorCode,errno);
+ }
+ else
+ {
+ _numberOfBytesTransferred = length;
+ }
+
+ break;
+ }
+ case LEMON_REACTOR_RECVFROM:
+ {
+ ssize_t length = write(_fd, _buffer, _bufferSize);
+
+ if(length == -1)
+ {
+ LEMON_POSIX_ERROR(_errorCode,errno);
+ }
+ else
+ {
+ _numberOfBytesTransferred = length;
+ }
+
+ break;
+ }
}
_callback(_userdata,_numberOfBytesTransferred,&_errorCode);
@@ -67,6 +97,8 @@ namespace lemon{namespace io{namespace core{
socklen_t *addressSize
)
:io_data(LEMON_REACTOR_ACCEPT,fd)
+ ,_address(address)
+ ,_addresslen(addressSize)
{
reset(this,&accept_io_data::callback,NULL,0);
}
@@ -78,6 +110,44 @@ namespace lemon{namespace io{namespace core{
//TODO: call accept function
}
+ //////////////////////////////////////////////////////////////////////////
+
+ sendto_io_data::sendto_io_data
+ (
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ const sockaddr *address,
+ socklen_t addressSize
+ )
+ :io_data(LEMON_REACTOR_SENDTO,fd,userdata,callback,buffer,bufferSize)
+ ,_address(address)
+ ,_addresslen(addressSize)
+ {
+
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ recvfrom_io_data::recvfrom_io_data
+ (
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ sockaddr *address,
+ socklen_t *addressSize
+ )
+ :io_data(LEMON_REACTOR_RECVFROM,fd,userdata,callback,buffer,bufferSize)
+ ,_address(address)
+ ,_addresslen(addressSize)
+ {
+
+ }
+
//////////////////////////////////////////////////////////////////////////
io_service_reactor::io_service_reactor()
@@ -105,13 +175,30 @@ namespace lemon{namespace io{namespace core{
iodata->call();
- if(iodata->type() == LEMON_REACTOR_ACCEPT)
- {
- free_accept_io_data(iodata);
- }
- else
+ switch(iodata->type())
{
+ case LEMON_REACTOR_ACCEPT:
+ {
+ free_accept_io_data(iodata);
+
+ break;
+ }
+ case LEMON_REACTOR_SENDTO:
+ {
+ free_sendto_io_data(iodata);
+
+ break;
+ }
+ case LEMON_REACTOR_RECVFROM:
+ {
+ free_recvfrom_io_data(iodata);
+
+ break;
+ }
+ default:
free_io_data(iodata);
+
+ break;
}
}
}
@@ -192,6 +279,48 @@ namespace lemon{namespace io{namespace core{
_acceptIODataAllocator.free(iodata);
}
+
+ sendto_io_data * io_service_reactor::alloc_sendto_io_data(
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ const sockaddr *address,
+ socklen_t addressSize)
+ {
+ mutex_t::scope_lock lock(_sendtoIODataAllocatorMutex);
+
+ return new(_sendtoIODataAllocator.alloc()) sendto_io_data(fd,userdata,callback,buffer, bufferSize,address,addressSize);
+ }
+
+ void io_service_reactor::free_sendto_io_data(io_data * iodata)
+ {
+ mutex_t::scope_lock lock(_sendtoIODataAllocatorMutex);
+
+ _sendtoIODataAllocator.free(iodata);
+ }
+
+ recvfrom_io_data * io_service_reactor::alloc_recvfrom_io_data(
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ sockaddr *address,
+ socklen_t *addressSize)
+ {
+ mutex_t::scope_lock lock(_recvfromIODataAllocatorMutex);
+
+ return new(_recvfromIODataAllocator.alloc()) sendto_io_data(fd,userdata,callback,buffer, bufferSize,address,addressSize);
+ }
+
+ void io_service_reactor::free_recvfrom_io_data(io_data * iodata)
+ {
+ mutex_t::scope_lock lock(_recvfromIODataAllocatorMutex);
+
+ _recvfromIODataAllocator.free(iodata);
+ }
}}}
#endif //
View
81 io/io_service_reactor.hpp
@@ -22,6 +22,10 @@
#define LEMON_REACTOR_POST_ONE 0x04
+#define LEMON_REACTOR_SENDTO 0x05
+
+#define LEMON_REACTOR_RECVFROM 0x06
+
namespace lemon{namespace io{namespace core{
class socket;
@@ -97,6 +101,48 @@ namespace lemon{namespace io{namespace core{
socklen_t *_addresslen;
};
+ class sendto_io_data : public io_data
+ {
+ public:
+
+ sendto_io_data
+ (
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ const sockaddr *address,
+ socklen_t addressSize
+ );
+
+ private:
+ const sockaddr *_address;
+
+ socklen_t _addresslen;
+ };
+
+ class recvfrom_io_data : public io_data
+ {
+ public:
+
+ recvfrom_io_data
+ (
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ sockaddr *address,
+ socklen_t *addressSize
+ );
+
+ private:
+ sockaddr *_address;
+
+ socklen_t *_addresslen;
+ };
+
class io_service_reactor : private lemon::nocopyable
@@ -107,6 +153,10 @@ namespace lemon{namespace io{namespace core{
typedef memory::fixed::allocator<sizeof(accept_io_data)> accept_io_data_allocator;
+ typedef memory::fixed::allocator<sizeof(sendto_io_data)> sendto_io_data_allocator;
+
+ typedef memory::fixed::allocator<sizeof(recvfrom_io_data)> recvfrom_io_data_allocator;
+
typedef memory::ringbuffer::allocator<sizeof(io_data*)> complete_queue;
io_service_reactor();
@@ -140,6 +190,29 @@ namespace lemon{namespace io{namespace core{
void free_accept_io_data(io_data * iodata);
+
+ sendto_io_data * alloc_sendto_io_data(
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ const sockaddr *address,
+ socklen_t addressSize);
+
+ void free_sendto_io_data(io_data * iodata);
+
+ recvfrom_io_data * alloc_recvfrom_io_data(
+ int fd,
+ void * userdata,
+ LemonIOCallback callback,
+ void * buffer,
+ size_t bufferSize,
+ sockaddr *address,
+ socklen_t *addressSize);
+
+ void free_recvfrom_io_data(io_data * iodata);
+
private:
io_data_allocator _iodataAllocator;
@@ -150,6 +223,14 @@ namespace lemon{namespace io{namespace core{
mutex_t _acceptIODataAllocatorMutex;
+ sendto_io_data_allocator _sendtoIODataAllocator;
+
+ mutex_t _sendtoIODataAllocatorMutex;
+
+ recvfrom_io_data_allocator _recvfromIODataAllocator;
+
+ mutex_t _recvfromIODataAllocatorMutex;
+
mutex_t _mutex;
condition_variable _condition;
View
21 io/socket_reactor.cpp
@@ -3,6 +3,12 @@
#ifndef LEMON_IO_IOCP
+#ifdef LEMON_IO_EPOLL
+#include <lemon/io/io_service_epoll.hpp>
+#elif defined(LEMON_IO_KQUEUE)
+#include <lemon/io/io_service_kqueue.hpp>
+#endif //LEMON_IO_KQEUE
+
#ifdef LEMON_HAS_FCNTL_H
# include <fcntl.h>
#else
@@ -49,20 +55,15 @@ namespace lemon{namespace io{namespace core{
socket::socket(int af, int type, int protocol,LemonNativeSock sock, io_service * service)
:socket_base(af,type,protocol,sock,service)
- {
- // if(__setnonblocking(handle()) < 0)
- // {
- // error_info errorCode;
-
- // LEMON_POSIX_ERROR(errorCode,errno);
-
- // errorCode.check_throw();
- // }
- }
+ {}
void socket::async_send(const byte_t * buffer, size_t length, int flag,LemonIOCallback callback, void * userdata,LemonErrorInfo *errorCode)
{
+ io_data * iodata = service()->alloc_io_data(LEMON_REACTOR_WRITE, handle(), userdata, callback, const_cast<byte_t*>(buffer), length);
+
+ if(LEMON_FAILED(*errorCode)) return;
+ service()->post_one(iodata,errorCode);
}
void socket::async_sendto(const byte_t * buffer, size_t length, int flag,const sockaddr * addr, socklen_t addrlen,LemonIOCallback callback, void * userdata,LemonErrorInfo *errorCode)
View
2 sys/configure.h.in
@@ -64,6 +64,8 @@
#cmakedefine LEMON_USE_BUILDIN_UCONTEXT
+#cmakedefine LEMON_HAS_GETTID
+
//define the macro to indicate the wchar_t length
#define LEMON_WCHAR_SIZE ${LEMON_WCHAR_SIZE}
View
19 sys/thread.cpp
@@ -325,12 +325,12 @@ LEMON_SYS_API void LemonThreadJoin(LemonThread t,LemonErrorInfo * errorCode){
}
}
-LEMON_SYS_API lemon_tid_t LemonGetThreadId(LemonThread t)
+LEMON_SYS_API lemon_pid_t LemonGetThreadId(LemonThread t)
{
return t->Id;
}
-LEMON_SYS_API lemon_tid_t LemonGetCurrentThreadId(LemonErrorInfo * errorCode)
+LEMON_SYS_API lemon_pid_t LemonGetCurrentThreadId(LemonErrorInfo * errorCode)
{
LEMON_RESET_ERRORINFO(*errorCode);
@@ -358,6 +358,11 @@ LEMON_SYS_API lemon_int32_t LemonAtomicDecrement(lemon_atomic_t* source){
#include <string.h>
#include <pthread.h>
#include <unistd.h>
+#ifdef LEMON_HAS_GETTID
+#include <sys/types.h>
+#else
+#include <sys/syscall.h>
+#endif //LEMON_HAS_GETTID
LEMON_SYS_API LemonTls LemonCreateTls(__lemon_option void (*destructor)(void*),__lemon_inout LemonErrorInfo* errorCode){
@@ -667,14 +672,18 @@ LEMON_SYS_API void LemonThreadJoin(LemonThread t,LemonErrorInfo * errorCode){
}
-LEMON_SYS_API lemon_tid_t LemonGetThreadId(LemonThread t)
+LEMON_SYS_API lemon_pid_t LemonGetThreadId(LemonThread t)
{
return t->Handle;
}
-LEMON_SYS_API lemon_tid_t LemonGetCurrentThreadId(LemonErrorInfo * errorCode)
+LEMON_SYS_API lemon_pid_t LemonGetCurrentThreadId(LemonErrorInfo * errorCode)
{
- return pthread_self();
+#ifdef LEMON_HAS_GETTID
+ return gettid();
+#else
+ return (pid_t) syscall (SYS_gettid);
+#endif
}
#else
View
12 sys/thread.h
@@ -17,14 +17,6 @@ typedef pthread_key_t LemonTls;
typedef unsigned int LemonTls;
#endif
-#ifdef WIN32
-typedef DWORD lemon_tid_t;
-#elif defined(LEMON_HAS_PTHREAD)
-typedef pthread_t lemon_tid_t;
-#else
-#error "not support platform"
-#endif
-
LEMON_DECLARE_HANDLE(LemonMutex);
LEMON_DECLARE_HANDLE(LemonThread);
@@ -89,9 +81,9 @@ LEMON_SYS_API void LemonReleaseThread(__lemon_in LemonThread t);
LEMON_SYS_API void LemonThreadJoin(LemonThread t,LemonErrorInfo * errorCode);
-LEMON_SYS_API lemon_tid_t LemonGetThreadId(LemonThread t);
+LEMON_SYS_API lemon_pid_t LemonGetThreadId(LemonThread t);
-LEMON_SYS_API lemon_tid_t LemonGetCurrentThreadId(LemonErrorInfo * errorCode);
+LEMON_SYS_API lemon_pid_t LemonGetCurrentThreadId(LemonErrorInfo * errorCode);
LEMON_SYS_API void LemonSleep(size_t milliseconds);
View
5 sys/unix.cmake
@@ -1,6 +1,11 @@
include(CheckCXXSourceRuns)
include(CheckFunctionExists)
include(CheckIncludeFiles)
+
+#######################################################################################################################################
+
+check_cxx_source_runs("#include <sys/types.h>\n int main(){gettid();return 0;}" LEMON_HAS_GETTID)
+
#######################################################################################################################################
# check the random function implement
CHECK_CXX_SOURCE_RUNS("#include <fstream>\n int main(){std::ifstream f(\"/dev/urandom\");return f.is_open()?0:1;}" LEMON_RANDOM_URANDOM)
View
2 trace/abi.h
@@ -167,7 +167,7 @@ typedef struct LemonTraceDescription{
lemon_pid_t ProcessId;
- lemon_tid_t ThreadId;
+ lemon_pid_t ThreadId;
lemon_trace_flag Flag;
View
8 trace/message.cpp
@@ -148,9 +148,7 @@ namespace lemon{namespace trace{
result += writer.write((const byte_t*)&pid,sizeof(pid));
- uint32_t tid = *(uint32_t*)&_threadId;
-
- tid = htonl(tid);
+ lemon_pid_t tid = htonl(_threadId);
result += writer.write((const byte_t*)&tid,sizeof(tid));
@@ -189,9 +187,7 @@ namespace lemon{namespace trace{
result += reader.read((byte_t*)&tid,sizeof(tid));
- tid = ntohl(tid);
-
- _threadId = *(lemon_tid_t*)&tid;
+ _threadId = ntohl(tid);
result += reader.read((byte_t*)&flag,sizeof(flag));
View
2 trace/message.hpp
@@ -47,7 +47,7 @@ namespace lemon{namespace trace{
lemon_pid_t _processid;
- lemon_tid_t _threadId;
+ lemon_pid_t _threadId;
lemon_trace_flag _flag;

0 comments on commit 614038e

Please sign in to comment.
Something went wrong with that request. Please try again.