Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

update

Signed-off-by: yayanyang <yayanyang@gmail.com>
  • Loading branch information...
commit 11d1e1596a138694ed367aaf5734d428f073f0ff 1 parent 7690df4
yayanyang authored
74 io/abi.cpp
View
@@ -1,16 +1,16 @@
#include <lemon/io/abi.h>
-
#ifdef LEMON_IO_IOCP
-# include <lemon/io/iocp/iocp.hpp>
-#elif defined(LEMON_IO_EPOLL)
-# include <lemon/io/epoll/epoll.hpp>
-# include <lemon/io/posix/socket.hpp>
+#include <lemon/io/io_service_iocp.hpp>
#else
+#include <lemon/io/io_service_reactor.hpp>
#endif //
+
using namespace lemon;
using namespace lemon::io;
+using namespace lemon::io::core;
+
LEMON_IO_API
LemonIOService
LemonCreateIOService(
@@ -18,7 +18,7 @@ LEMON_IO_API
{
try
{
- return reinterpret_cast<LemonIOService>(new IOService(0));
+ return reinterpret_cast<LemonIOService>(new io_service);
}
catch(const error_info & e)
{
@@ -37,7 +37,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<IOService*>(service)->Start(newThreads);
+ reinterpret_cast<io_service*>(service)->start(newThreads);
}
catch(const error_info & e)
{
@@ -53,7 +53,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<IOService*>(service)->Stop();
+ reinterpret_cast<io_service*>(service)->stop();
}
catch(const error_info & e)
{
@@ -65,7 +65,7 @@ LEMON_IO_API
void LemonCloseIOService(
__lemon_in LemonIOService service)
{
- delete reinterpret_cast<IOService*>(service);
+ delete reinterpret_cast<io_service*>(service);
}
@@ -78,7 +78,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<IOService*>(service)->PostJob(callback,userdata);
+ reinterpret_cast<io_service*>(service)->post_one(callback,userdata,errorCode);
}
catch(const error_info & e)
{
@@ -93,7 +93,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<IOService*>(service)->Join();
+ reinterpret_cast<io_service*>(service)->join();
}
catch(const error_info & e)
{
@@ -110,7 +110,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<IOService*>(service)->Reset();
+ reinterpret_cast<io_service*>(service)->reset();
}
catch(const error_info & e)
{
@@ -125,19 +125,13 @@ LEMON_IO_API
__lemon_in int af,
__lemon_in int type,
__lemon_in int protocol,
- __lemon_in LemonIOService ioservice,
+ __lemon_in LemonIOService service,
__lemon_inout LemonErrorInfo *errorCode)
{
-
+
try
{
- return reinterpret_cast<LemonIO>
- (
- new(reinterpret_cast<IOService*>(ioservice)) Socket
- (
- af,type,protocol,reinterpret_cast<IOService*>(ioservice)
- )
- );
+ return reinterpret_cast<LemonIO>(reinterpret_cast<io_service*>(service)->create_socket(af,type,protocol));
}
catch(const error_info & e)
{
@@ -145,7 +139,7 @@ LEMON_IO_API
return LEMON_HANDLE_NULL_VALUE;
}
-
+
}
LEMON_IO_API
@@ -153,7 +147,7 @@ LEMON_IO_API
LemonCloseIO(
__lemon_free LemonIO io)
{
- delete reinterpret_cast<Object*>(io);
+ reinterpret_cast<io_object*>(io)->release();
}
LEMON_IO_API
@@ -165,7 +159,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(sock)->Bind(name,nameLength);
+ reinterpret_cast<core::socket*>(sock)->bind(name,nameLength);
}
catch(const error_info & e)
{
@@ -182,7 +176,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(sock)->Shutdown(how);
+ reinterpret_cast<core::socket*>(sock)->shutdown(how);
}
catch(const error_info & e)
{
@@ -199,7 +193,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(sock)->SockName(name,bufferSize);
+ reinterpret_cast<core::socket*>(sock)->sockname(name,bufferSize);
}
catch(const error_info & e)
{
@@ -219,7 +213,7 @@ LEMON_IO_API
{
try
{
- return reinterpret_cast<Socket*>(socket)->Send(buffer,bufferSize,flags);
+ return reinterpret_cast<core::socket*>(socket)->send(buffer,bufferSize,flags);
}
catch(const error_info & e)
{
@@ -243,7 +237,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->Send(buffer,bufferSize,flags,callback ,userData);
+ reinterpret_cast<core::socket*>(socket)->async_send(buffer,bufferSize,flags,callback ,userData,errorCode);
}
catch(const error_info & e)
{
@@ -263,7 +257,7 @@ LEMON_IO_API
{
try
{
- return reinterpret_cast<Socket*>(socket)->Recieve(buffer,bufferSize,flags);
+ return reinterpret_cast<core::socket*>(socket)->receive(buffer,bufferSize,flags);
}
catch(const error_info & e)
{
@@ -287,7 +281,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->Recieve(buffer,bufferSize,flags,callback ,userData);
+ reinterpret_cast<core::socket*>(socket)->async_receive(buffer,bufferSize,flags,callback ,userData,errorCode);
}
catch(const error_info & e)
{
@@ -306,7 +300,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->Connect(addr,addrlen);
+ reinterpret_cast<core::socket*>(socket)->connect(addr,addrlen);
}
catch(const error_info & e)
{
@@ -326,7 +320,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->Connect(addr,addrlen,callback ,userData);
+ reinterpret_cast<core::socket*>(socket)->async_connect(addr,addrlen,callback ,userData,errorCode);
}
catch(const error_info & e)
{
@@ -342,7 +336,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->Listen(backlog);
+ reinterpret_cast<core::socket*>(socket)->listen(backlog);
}
catch(const error_info & e)
{
@@ -360,7 +354,7 @@ LEMON_IO_API
{
try
{
- return reinterpret_cast<LemonIO>(reinterpret_cast<Socket*>(socket)->Accept(addr,addrlen));
+ return reinterpret_cast<LemonIO>(reinterpret_cast<core::socket*>(socket)->accept(addr,addrlen));
}
catch(const error_info & e)
{
@@ -382,7 +376,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->Accept(addr,addrlen,callback,userData);
+ reinterpret_cast<core::socket*>(socket)->async_accept(addr,addrlen,callback,userData,errorCode);
}
catch(const error_info & e)
{
@@ -404,7 +398,7 @@ LEMON_IO_API
{
try
{
- return reinterpret_cast<Socket*>(socket)->SendTo(buffer,bufferSize,flags,address,addressSize);
+ return reinterpret_cast<core::socket*>(socket)->sendto(buffer,bufferSize,flags,address,addressSize);
}
catch(const error_info & e)
{
@@ -430,7 +424,7 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->SendTo(buffer,bufferSize,flags,address,addressSize,callback ,userData);
+ reinterpret_cast<core::socket*>(socket)->async_sendto(buffer,bufferSize,flags,address,addressSize,callback ,userData,errorCode);
}
catch(const error_info & e)
{
@@ -451,7 +445,7 @@ LEMON_IO_API
{
try
{
- return reinterpret_cast<Socket*>(socket)->RecieveFrom(buffer,bufferSize,flags,address,addressSize);
+ return reinterpret_cast<core::socket*>(socket)->recvfrom(buffer,bufferSize,flags,address,addressSize);
}
catch(const error_info & e)
{
@@ -477,10 +471,10 @@ LEMON_IO_API
{
try
{
- reinterpret_cast<Socket*>(socket)->RecieveFrom(buffer,bufferSize,flags,address,addressSize,callback ,userData);
+ reinterpret_cast<core::socket*>(socket)->async_recvfrom(buffer,bufferSize,flags,address,addressSize,callback ,userData,errorCode);
}
catch(const error_info & e)
{
*errorCode = e;
}
-}
+}
11 io/abi.h
View
@@ -18,6 +18,9 @@
#define LEMON_IO_CURRENT 0x03
+
+#define LEMON_IO_REACTOR_SESSIONS 1024
+
//////////////////////////////////////////////////////////////////////////
typedef struct LemonIoWriter{
@@ -99,6 +102,12 @@ LEMON_IO_API
//////////////////////////////////////////////////////////////////////////
+#ifdef WIN32
+typedef SOCKET LemonNativeSock;
+#else
+typedef int LemonNativeSock;
+#endif //WIN32
+
#ifndef SD_BOTH
#define SD_BOTH SHUT_RDWR
#endif //SD_BOTH
@@ -119,7 +128,7 @@ LEMON_IO_API
__lemon_in int af,
__lemon_in int type,
__lemon_in int protocol,
- __lemon_in LemonIOService ioservice,
+ __lemon_in LemonIOService io_service,
__lemon_inout LemonErrorInfo *errorCode);
LEMON_IO_API
1  io/epoll/epoll.hpp
View
@@ -1 +0,0 @@
-#include <lemon/io/epoll/io_service.hpp>
2  io/epoll/io_service.cpp
View
@@ -1,2 +0,0 @@
-#include <libaio.h>
-#include <sys/epoll.h>
42 io/epoll/io_service.hpp
View
@@ -1,42 +0,0 @@
-/**
-*
-* @file io_service
-* @brief Copyright (C) 2012 yayanyang All Rights Reserved
-* @author yayanyang
-* @version 1.0.0.0
-* @date 2012/08/30
-*/
-#ifndef LEMON_IO_EPOLL_IO_SERVICE_HPP
-#define LEMON_IO_EPOLL_IO_SERVICE_HPP
-
-#include <lemon/io/io_service.hpp>
-
-namespace lemon{namespace io{
- class IOService : public IOServiceT<IOService>
- {
- public:
-
- IOService(size_t workthreads);
-
- ~IOService();
-
- void Run();
-
- void Cancel();
-
- public:
-
- void Bind(int file);
-
- void PostJob(LemonIOCallback callback, void * userdata);
-
- private:
-
- int _epollfd;
-
- int _eventfd;
- };
-}}
-
-#endif //LEMON_IO_EPOLL_IO_SERVICE_HPP
-
0  io/object.cpp → io/io_service.cpp
View
File renamed without changes
75 io/io_service.hpp
View
@@ -4,82 +4,83 @@
* @brief Copyright (C) 2012 yayanyang All Rights Reserved
* @author yayanyang
* @version 1.0.0.0
-* @date 2012/08/30
+* @date 2012/09/12
*/
#ifndef LEMON_IO_IO_SERVICE_HPP
#define LEMON_IO_IO_SERVICE_HPP
-#include <lemon/io/abi.h>
-#include <lemonxx/sys/sys.hpp>
-#include <lemonxx/memory/fixobj.hpp>
+#include <lemon/io/object.hpp>
#include <lemonxx/function/bind.hpp>
-#include <lemonxx/utility/utility.hpp>
-#include <lemonxx/memory/smallobj.hpp>
-
+#include <lemonxx/memory/fixobj.hpp>
-namespace lemon{namespace io{
+namespace lemon{namespace io{namespace core{
- template<typename T>
- class IOServiceT : private nocopyable
+ template<typename Impl,typename Socket>
+ class basic_io_service : private lemon::nocopyable
{
public:
- IOServiceT(size_t workThreads)
- {
- Start(workThreads);
- }
+ typedef Impl io_service_type;
- ~IOServiceT()
- {
- Stop();
+ typedef Socket socket_type;
- Join();
- }
+ typedef memory::fixed::allocator<sizeof(socket_type)> socket_allocator;
- void Start(size_t workThreads)
+ public:
+
+ void start(size_t numbers)
{
- _threadGroup.start(lemon::bind(&T::Run,static_cast<T*>(this)),workThreads);
+ _threads.start(lemon::function<void()>(&io_service_type::attach,reinterpret_cast<io_service_type*>(this)),numbers);
}
- void Stop()
+ void stop()
{
- static_cast<T*>(this)->Cancel();
+ reinterpret_cast<io_service_type*>(this)->detach();
}
- void Join()
+ void join()
{
- _threadGroup.join();
+ _threads.join();
}
- void Reset()
+ void reset()
{
- _threadGroup.reset();
+ _threads.reset();
+
+ reinterpret_cast<io_service_type*>(this)->reset();
}
public:
- void * AllocObj(size_t size)
+ socket_type* create_socket(int af, int type, int protocol)
+ {
+ mutex_t::scope_lock lock(_socketAllocatorMutex);
+
+ return new(_socketAllocator.alloc()) socket_type(af,type,protocol,reinterpret_cast<io_service_type*>(this));
+ }
+
+ socket_type* create_socket(int af, int type, int protocol, LemonNativeSock socket)
{
- mutex_t::scope_lock lock(_allocatorMutex);
+ mutex_t::scope_lock lock(_socketAllocatorMutex);
- return _allocator.alloc(size);
+ return new(_socketAllocator.alloc()) socket_type(af,type,protocol,socket,reinterpret_cast<io_service_type*>(this));
}
- void FreeObj(void * data, size_t size)
+ void close_socket(void * object)
{
- mutex_t::scope_lock lock(_allocatorMutex);
+ mutex_t::scope_lock lock(_socketAllocatorMutex);
- _allocator.free(data,size);
+ _socketAllocator.free(object);
}
private:
- mutex_t _allocatorMutex;
+ socket_allocator _socketAllocator;
- memory::smallobject::allocator<LEMON_IO_SMALLOBJ_SIZE> _allocator;
+ mutex_t _socketAllocatorMutex;
- thread_group _threadGroup;
+ thread_group _threads;
};
-}}
+}}}
#endif //LEMON_IO_IO_SERVICE_HPP
231 io/io_service_iocp.cpp
View
@@ -0,0 +1,231 @@
+#include <lemon/io/io_service_iocp.hpp>
+
+#ifdef LEMON_IO_IOCP
+
+namespace lemon{namespace io{namespace core{
+
+ io_service::io_service()
+ : _completionPort(CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0))
+ {
+ if(NULL == _completionPort)
+ {
+ scope_error_info errorCode;
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+ }
+
+ io_service::~io_service()
+ {
+ ::CloseHandle(_completionPort);
+ }
+
+ void io_service::attach()
+ {
+ error_info errorCode;
+
+ DWORD numberOfBytes;
+
+ ULONG_PTR completionKey;
+
+ io_data *iodata = NULL;
+
+ scope_counter counter(_workThreads);
+
+ for(;;)
+ {
+ LEMON_RESET_ERRORINFO(errorCode);
+
+ if(!GetQueuedCompletionStatus(_completionPort,&numberOfBytes,&completionKey,(LPOVERLAPPED*)&iodata,INFINITE))
+ {
+ if(NULL == iodata)
+ {
+ scope_error_info errorCode;
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+
+ if(LEMON_IOCP_EXIT == completionKey) break;
+
+ iodata->call(numberOfBytes,errorCode);
+
+ if(LEMON_IOCP_IO == iodata->completekey())
+ {
+ free_io_data(iodata);
+ }
+ else
+ {
+ free_accept_io_data(iodata);
+ }
+ }
+ }
+
+ void io_service::detach()
+ {
+ size_t workingThreads = _workThreads;
+
+ for(size_t i = 0; i < workingThreads; ++ i)
+ {
+ if(!PostQueuedCompletionStatus(_completionPort,0,LEMON_IOCP_EXIT,NULL))
+ {
+ scope_error_info errorCode;
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+ }
+ }
+
+ void io_service::post_one(LemonIOCallback callback,void * userdata,LemonErrorInfo *errorCode)
+ {
+ io_data * iodata = alloc_io_data(userdata,callback,WSABUF());
+
+ if(!PostQueuedCompletionStatus(_completionPort,(DWORD)0,LEMON_IOCP_CUSTOMER_EVENT, *iodata))
+ {
+ LEMON_WIN32_ERROR(*errorCode,GetLastError());
+ }
+ }
+
+ io_data * io_service::alloc_io_data(void * userdata, LemonIOCallback callback, WSABUF buffer)
+ {
+ mutex_t::scope_lock lock(_iodataAllocatorMutex);
+
+ return new(_iodataAllocator.alloc()) io_data(LEMON_IOCP_IO,userdata,callback,buffer);
+ }
+
+ void io_service::free_io_data(io_data * iodata)
+ {
+ mutex_t::scope_lock lock(_iodataAllocatorMutex);
+
+ _iodataAllocator.free(iodata);
+ }
+
+ accept_io_data * io_service::alloc_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ )
+ {
+ mutex_t::scope_lock lock(_acceptIODataAllocatorMutex);
+
+ return new(_acceptIODataAllocator.alloc()) accept_io_data(listen,peer,callback,userdata,address,addressSize);
+ }
+
+ void io_service::free_accept_io_data(io_data * iodata)
+ {
+ mutex_t::scope_lock lock(_acceptIODataAllocatorMutex);
+
+ _acceptIODataAllocator.free(iodata);
+ }
+
+ void io_service::bind(HANDLE handle,ULONG_PTR completekey)
+ {
+ if(NULL == CreateIoCompletionPort(handle,_completionPort,completekey,0))
+ {
+ scope_error_info errorCode;
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+
+ io_data::io_data(ULONG_PTR completeKey,void * userdata, LemonIOCallback callback, WSABUF buffer)
+ :_completeKey(completeKey)
+ {
+ reset(userdata,callback,buffer);
+ }
+
+ void io_data::reset(void * userdata, LemonIOCallback callback, WSABUF buffer)
+ {
+ _userdata = userdata; _callback = callback; _buffer = buffer;
+ }
+
+ void io_data::call(size_t numberOfBytesTransferred, LemonErrorInfo * errorCode)
+ {
+ _callback(_userdata,numberOfBytesTransferred,errorCode);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+
+
+ accept_io_data::accept_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ )
+ :io_data(LEMON_IOCP_ACCEPT)
+ ,_listen(listen)
+ ,_peer(peer)
+ ,_callback(callback)
+ ,_userdata(userdata)
+ ,_address(address)
+ ,_addresslen(addressSize)
+ {
+ WSABUF buffer = {0,0};
+
+ io_data::reset(this,&accept_io_data::callback,buffer);
+ }
+
+ void accept_io_data::callback(void *userData,size_t numberOfBytesTransferred,const LemonErrorInfo * ec)
+ {
+ scope_error_info errorCode(ec);
+
+ accept_io_data * self = (accept_io_data*)userData;
+
+ if(LEMON_SUCCESS(errorCode))
+ {
+ if(SOCKET_ERROR == setsockopt(
+ self->_peer->handle(),
+ SOL_SOCKET,
+ SO_UPDATE_ACCEPT_CONTEXT,
+ (char *)&(self->_listen->handle()),sizeof(SOCKET)))
+ {
+ LEMON_WIN32_ERROR(errorCode,WSAGetLastError());
+ }
+ else
+ {
+ sockaddr * localAddress,*remoteAddress;
+
+ int localAddressSize,remoteAddressSize;
+
+ self->_listen->get_acceptex_sockaddrs()
+ (
+ self->_acceptAddressBuffer,
+ 0,
+ LEMON_ACCEPTEX_ADDRESS_LENGTH,
+ LEMON_ACCEPTEX_ADDRESS_LENGTH,
+ &localAddress,
+ &localAddressSize,
+ &remoteAddress,
+ &remoteAddressSize
+ );
+
+ memcpy(self->_address,remoteAddress,remoteAddressSize);
+
+ *self->_addresslen = remoteAddressSize;
+ }
+ }
+ else
+ {
+ self->_peer->release();
+
+ self->_peer = NULL;
+ }
+
+ self->_callback(self->_userdata,reinterpret_cast<LemonIO>(self->_peer),numberOfBytesTransferred,errorCode);
+
+ }
+}}}
+
+#endif //
178 io/io_service_iocp.hpp
View
@@ -0,0 +1,178 @@
+/**
+*
+* @file io_service_epoll
+* @brief Copyright (C) 2012 yayanyang All Rights Reserved
+* @author yayanyang
+* @version 1.0.0.0
+* @date 2012/09/12
+*/
+#ifndef LEMON_IO_IO_SERVICE_IOCP_HPP
+#define LEMON_IO_IO_SERVICE_IOCP_HPP
+#include <lemon/io/io_service.hpp>
+
+#ifdef LEMON_IO_IOCP
+
+#include <lemon/io/socket_iocp.hpp>
+
+
+#define LEMON_IOCP_EXIT (ULONG_PTR)0
+
+#define LEMON_IOCP_CUSTOMER_EVENT (ULONG_PTR)1
+
+#define LEMON_IOCP_IO (ULONG_PTR)2
+
+#define LEMON_IOCP_ACCEPT (ULONG_PTR)3
+
+#ifdef LEMON_SUPPORT_IPV6
+
+#define LEMON_ACCEPTEX_ADDRESS_LENGTH (sizeof(sockaddr_in6) + 16)
+
+#else
+
+#define LEMON_ACCEPTEX_ADDRESS_LENGTH (sizeof(sockaddr_in) + 16)
+
+#endif //LEMON_SUPPORT_IPV6
+
+namespace lemon{namespace io{namespace core{
+
+ class socket;
+
+ //////////////////////////////////////////////////////////////////////////
+
+ class io_data
+ {
+ public:
+
+ io_data(ULONG_PTR completeKey):_completeKey(completeKey){}
+
+ io_data(ULONG_PTR completeKey,void * userdata, LemonIOCallback callback, WSABUF buffer);
+
+ void call(size_t numberOfBytesTransferred, LemonErrorInfo * errorCode);
+
+ void reset(void * userdata, LemonIOCallback callback, WSABUF buffer);
+
+ WSABUF & buffer() {return _buffer;}
+
+ operator OVERLAPPED * () {return &_overlapped;}
+
+ ULONG_PTR completekey() { return _completeKey;}
+
+ private:
+
+ OVERLAPPED _overlapped;
+
+ ULONG_PTR _completeKey;
+
+ void *_userdata;
+
+ LemonIOCallback _callback;
+
+ WSABUF _buffer;
+ };
+
+ //////////////////////////////////////////////////////////////////////////
+
+ class accept_io_data : public io_data
+ {
+ public:
+
+ accept_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ );
+
+ byte_t * accept_address_buffer() {return _acceptAddressBuffer;}
+
+ private:
+
+ static void callback(void *userData,size_t numberOfBytesTransferred,const LemonErrorInfo * errorCode);
+
+ private:
+
+ socket *_listen;
+
+ socket *_peer;
+
+ LemonAcceptCallback _callback;
+
+ void *_userdata;
+
+ sockaddr *_address;
+
+ socklen_t *_addresslen;
+
+ lemon_byte_t _acceptAddressBuffer[LEMON_ACCEPTEX_ADDRESS_LENGTH * 2];
+ };
+
+ //////////////////////////////////////////////////////////////////////////
+
+ class io_service : public basic_io_service<io_service,socket>
+ {
+ public:
+
+ typedef lemon::memory::fixed::allocator<sizeof(socket)> socket_allocator;
+
+ typedef lemon::memory::fixed::allocator<sizeof(io_data)> io_data_allocator;
+
+ typedef lemon::memory::fixed::allocator<sizeof(accept_io_data)> accept_io_data_allocator;
+
+
+ io_service();
+
+ ~io_service();
+
+ void reset(){}
+
+ void attach();
+
+ void detach();
+
+ void post_one(LemonIOCallback callback,void * userdata,LemonErrorInfo *errorCode);
+
+ public:
+
+ void bind(HANDLE handle,ULONG_PTR completekey);
+
+ public:
+
+ io_data * alloc_io_data(void * userdata, LemonIOCallback callback, WSABUF buffer);
+
+ void free_io_data(io_data * iodata);
+
+ accept_io_data * alloc_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ );
+
+ void free_accept_io_data(io_data * iodata);
+
+ private:
+
+ atomic_t _workThreads;
+
+ HANDLE _completionPort;
+
+ io_data_allocator _iodataAllocator;
+
+ mutex_t _iodataAllocatorMutex;
+
+ accept_io_data_allocator _acceptIODataAllocator;
+
+ mutex_t _acceptIODataAllocatorMutex;
+
+ };
+}}}
+
+#endif //LEMON_IO_IOCP
+
+#endif //LEMON_IO_IO_SERVICE_IOCP_HPP
164 io/io_service_reactor.cpp
View
@@ -0,0 +1,164 @@
+#include <lemon/io/io_service_reactor.hpp>
+
+#ifndef LEMON_IO_IOCP
+
+namespace lemon{namespace io{namespace core{
+
+ io_data::io_data(size_t type,void * userdata, LemonIOCallback callback, void * buffer, size_t bufferSize)
+ :_type(type),_userdata(userdata),_callback(callback),_buffer(buffer),_bufferSize(bufferSize)
+ {
+
+ }
+
+ void io_data::reset(void * userdata, LemonIOCallback callback, void * buffer, size_t bufferSize)
+ {
+ _userdata = userdata; _callback = callback; _buffer = buffer; _bufferSize = bufferSize;
+ }
+
+ void io_data::call()
+ {
+ _callback(_userdata,_numberOfBytesTransferred,&_errorCode);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+
+ accept_io_data::accept_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ )
+ :io_data(LEMON_REACTOR_ACCEPT)
+ {
+ reset(this,&accept_io_data::callback,NULL,0);
+ }
+
+ void accept_io_data::callback(void *userData,size_t numberOfBytesTransferred,const LemonErrorInfo * errorCode)
+ {
+ accept_io_data * self = (accept_io_data*)userData;
+
+ if(LEMON_SUCCESS(*errorCode))
+ {
+ self->_callback(userData,reinterpret_cast<LemonIO>(self->_peer),numberOfBytesTransferred,errorCode);
+ }
+ else
+ {
+ //TODO: release the socket
+ self->_callback(userData,NULL,numberOfBytesTransferred,errorCode);
+ }
+ }
+
+
+ //////////////////////////////////////////////////////////////////////////
+ io_service_reactor::io_service_reactor()
+ :_completeQueue(LEMON_IO_REACTOR_SESSIONS),_exit(false)
+ {
+
+ }
+
+ io_service_reactor::~io_service_reactor()
+ {
+
+ }
+
+ void io_service_reactor::attach()
+ {
+ mutex_t::scope_lock lock(_mutex);
+
+ while(!_exit)
+ {
+ while(!_exit && _completeQueue.empty()) _condition.wait(lock);
+
+ if(!_exit)
+ {
+ io_data* iodata = (io_data*)_completeQueue.pop_front();
+
+ iodata->call();
+
+ if(iodata->type() == LEMON_REACTOR_ACCEPT)
+ {
+ free_accept_io_data(iodata);
+ }
+ else
+ {
+ free_io_data(iodata);
+ }
+ }
+ }
+ }
+
+ void io_service_reactor::detach()
+ {
+ mutex_t::scope_lock lock(_mutex);
+
+ _exit = true;
+
+ _condition.notifyall();
+ }
+
+ void io_service_reactor::post_one(LemonIOCallback callback,void * userdata, void * buffer, size_t bufferSize,LemonErrorInfo *errorCode)
+ {
+ mutex_t::scope_lock lock(_mutex);
+
+ io_data * iodata = alloc_io_data(LEMON_REACTOR_POST_ONE,userdata,callback,buffer,bufferSize);
+
+ bool used = false;
+
+ io_data* replace = (io_data*)_completeQueue.push_back(used);
+
+ if(used)
+ {
+ iodata->call();
+
+ if(iodata->type() == LEMON_REACTOR_ACCEPT)
+ {
+ free_accept_io_data(iodata);
+ }
+ else
+ {
+ free_io_data(iodata);
+ }
+ }
+ }
+
+ io_data * io_service_reactor::alloc_io_data(size_t type,void * userdata, LemonIOCallback callback, void * buffer, size_t bufferSize)
+ {
+ mutex_t::scope_lock lock(_iodataAllocatorMutex);
+
+ return new(_iodataAllocator.alloc()) io_data(type,userdata,callback,buffer,bufferSize);
+ }
+
+ void io_service_reactor::free_io_data(io_data * iodata)
+ {
+ mutex_t::scope_lock lock(_iodataAllocatorMutex);
+
+ _iodataAllocator.free(iodata);
+ }
+
+ accept_io_data * io_service_reactor::alloc_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ )
+ {
+ mutex_t::scope_lock lock(_acceptIODataAllocatorMutex);
+
+ return new(_acceptIODataAllocator.alloc()) accept_io_data(listen,peer,callback,userdata,address,addressSize);
+ }
+
+ void io_service_reactor::free_accept_io_data(io_data * iodata)
+ {
+ mutex_t::scope_lock lock(_acceptIODataAllocatorMutex);
+
+ _acceptIODataAllocator.free(iodata);
+ }
+}}}
+
+#endif //
170 io/io_service_reactor.hpp
View
@@ -0,0 +1,170 @@
+/**
+*
+* @file io_service_reactor
+* @brief Copyright (C) 2012 yayanyang All Rights Reserved
+* @author yayanyang
+* @version 1.0.0.0
+* @date 2012/09/12
+*/
+#ifndef LEMON_IO_SERVICE_REACTOR_HPP
+#define LEMON_IO_SERVICE_REACTOR_HPP
+#include <lemon/io/io_service.hpp>
+#include <lemonxx/memory/fixobj.hpp>
+#include <lemonxx/memory/ringbuffer.hpp>
+
+#ifndef LEMON_IO_IOCP
+
+#define LEMON_REACTOR_READ 0x01
+
+#define LEMON_REACTOR_WRITE 0x02
+
+#define LEMON_REACTOR_ACCEPT 0x03
+
+#define LEMON_REACTOR_POST_ONE 0x04
+
+namespace lemon{namespace io{namespace core{
+
+ class socket;
+
+ class io_data
+ {
+ public:
+
+ io_data(size_t type):_type(type) {}
+
+ io_data(size_t type,void * userdata, LemonIOCallback callback, void * buffer, size_t bufferSize);
+
+ void reset(void * userdata, LemonIOCallback callback, void * buffer, size_t bufferSize);
+
+ void call();
+
+ void * buffer() { return _buffer; }
+
+ size_t buffersize() const { return _bufferSize; }
+
+ void numberOfBytesTransferred(size_t val) { _numberOfBytesTransferred = val; }
+
+ LemonErrorInfo errorcode() { return _errorCode; }
+
+ size_t type() { return _type; }
+
+ private:
+
+ size_t _type;
+
+ void *_userdata;
+
+ LemonIOCallback _callback;
+
+ void *_buffer;
+
+ size_t _bufferSize;
+
+ size_t _numberOfBytesTransferred;
+
+ LemonErrorInfo _errorCode;
+
+ };
+
+
+ class accept_io_data : public io_data
+ {
+ public:
+
+ accept_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ );
+
+ socket * peer() { return _peer; }
+
+ private:
+
+ static void callback(void *userData,size_t numberOfBytesTransferred,const LemonErrorInfo * errorCode);
+
+ private:
+
+ socket *_listen;
+
+ socket *_peer;
+
+ LemonAcceptCallback _callback;
+
+ void *_userdata;
+
+ sockaddr *_address;
+
+ socklen_t *_addresslen;
+ };
+
+
+
+ class io_service_reactor : private lemon::nocopyable
+ {
+ public:
+
+ typedef memory::fixed::allocator<sizeof(io_data)> io_data_allocator;
+
+ typedef memory::fixed::allocator<sizeof(accept_io_data)> accept_io_data_allocator;
+
+ typedef memory::ringbuffer::allocator<sizeof(io_data*)> complete_queue;
+
+ io_service_reactor();
+
+ ~io_service_reactor();
+
+ void reset() {_exit = false; }
+
+ void attach();
+
+ void detach();
+
+ void post_one(LemonIOCallback callback,void * userdata,void * buffer, size_t bufferSize,LemonErrorInfo *errorCode);
+
+ public:
+
+ io_data * alloc_io_data(size_t type,void * userdata, LemonIOCallback callback, void * buffer, size_t bufferSize);
+
+ void free_io_data(io_data * iodata);
+
+ accept_io_data * alloc_io_data
+ (
+ socket * listen,
+ socket *peer,
+ LemonAcceptCallback callback,
+ void * userdata,
+ sockaddr *address,
+ socklen_t *addressSize
+ );
+
+ void free_accept_io_data(io_data * iodata);
+
+ private:
+
+ io_data_allocator _iodataAllocator;
+
+ mutex_t _iodataAllocatorMutex;
+
+ accept_io_data_allocator _acceptIODataAllocator;
+
+ mutex_t _acceptIODataAllocatorMutex;
+
+ mutex_t _mutex;
+
+ condition_variable _condition;
+
+ complete_queue _completeQueue;
+
+ bool _exit;
+ };
+
+}}}
+
+#endif //LEMON_IO_IOCP
+
+#endif //LEMON_IO_SERVICE_REACTOR_HPP
128 io/iocp/io_service.cpp
View
@@ -1,128 +0,0 @@
-#include <lemon/io/iocp/io_service.hpp>
-
-#ifdef LEMON_IO_IOCP
-
-namespace lemon{namespace io{
-
- IOService::IOService(size_t workthreads)
- : IOServiceT<IOService>(workthreads)
- , _completionPort(CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0))
- {
- if(NULL == _completionPort)
- {
- error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
-
- errorCode.check_throw();
- }
- }
-
- IOService::~IOService()
- {
- CloseHandle(_completionPort);
- }
-
- void IOService::Cancel()
- {
- size_t workingThreads = _workThreads;
-
- for(size_t i = 0; i < workingThreads; ++ i)
- {
- if(!PostQueuedCompletionStatus(_completionPort,0,LEMON_IOCP_EXIT,NULL))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
- }
- }
-
- void IOService::Run()
- {
- error_info errorCode;
-
- DWORD numberOfBytes;
-
- ULONG_PTR completionKey;
-
- IOData *iodata = NULL;
-
- scope_counter counter(_workThreads);
-
- for(;;)
- {
- LEMON_RESET_ERRORINFO(errorCode);
-
- if(!GetQueuedCompletionStatus(_completionPort,&numberOfBytes,&completionKey,(LPOVERLAPPED*)&iodata,INFINITE))
- {
- if(NULL == iodata)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
-
- if(LEMON_IOCP_EXIT == completionKey) break;
-
- iodata->Callback(iodata->UserData,numberOfBytes,errorCode);
-
- Release(this,iodata);
- }
- }
-
- IOService::IOData * IOService::NewIOData(void * userData, LemonIOCallback callback, WSABUF buffer)
- {
- mutex_t::scope_lock lock(_ioDataAllocatorMutex);
-
- return new(_ioDataAllocator.alloc()) IOData(userData,callback,buffer,&IOService::ReleaseIOData);
- }
-
- void IOService::ReleaseIOData(IOService::IOData * iodata)
- {
- mutex_t::scope_lock lock(_ioDataAllocatorMutex);
-
- _ioDataAllocator.free(iodata);
- }
-
- void IOService::PostJob(LemonIOCallback callback, void * userdata)
- {
- IOData * iodata = NewIOData(userdata,callback,WSABUF());
-
- if(!PostQueuedCompletionStatus(_completionPort,(DWORD)0,LEMON_IOCP_CUSTOMER_EVENT,(LPOVERLAPPED)iodata))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
- }
-
- void IOService::Bind(HANDLE file)
- {
- if(NULL == CreateIoCompletionPort(file,_completionPort,LEMON_IOCP_IO,0))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
- }
-
- IOService::AcceptIOData * IOService::NewAcceptIOData(Socket * listen, Socket * peer, LemonAcceptCallback callback,void * userData,sockaddr *address,socklen_t *addressSize)
- {
- mutex_t::scope_lock lock(_acceptIoDataAllocatorMutex);
-
- return new(_acceptIoDataAllocator.alloc()) AcceptIOData(listen,peer,callback,userData,address,addressSize,&IOService::ReleaseAcceptIOData);
- }
-
- void IOService::ReleaseAcceptIOData(IOData * iodata)
- {
- mutex_t::scope_lock lock(_acceptIoDataAllocatorMutex);
-
- _acceptIoDataAllocator.free(iodata);
- }
-}}
-
-#endif //LEMON_IO_IOCP
134 io/iocp/io_service.hpp
View
@@ -1,134 +0,0 @@
-/**
-*
-* @file io_service
-* @brief Copyright (C) 2012 yayanyang All Rights Reserved
-* @author yayanyang
-* @version 1.0.0.0
-* @date 2012/08/30
-*/
-#ifndef LEMON_IO_IOCP_IO_SERVICE_HPP
-#define LEMON_IO_IOCP_IO_SERVICE_HPP
-#include <lemon/io/io_service.hpp>
-#include <lemon/io/iocp/iodata.hpp>
-
-
-#ifdef LEMON_IO_IOCP
-
-#define LEMON_IOCP_EXIT (ULONG_PTR)0
-
-#define LEMON_IOCP_CUSTOMER_EVENT (ULONG_PTR)1
-
-#define LEMON_IOCP_IO (ULONG_PTR)2
-
-#ifdef LEMON_SUPPORT_IPV6
-
-#define LEMON_ACCEPTEX_ADDRESS_LENGTH (sizeof(sockaddr_in6) + 16)
-
-#else
-
-#define LEMON_ACCEPTEX_ADDRESS_LENGTH (sizeof(sockaddr_in) + 16)
-
-#endif //LEMON_SUPPORT_IPV6
-
-namespace lemon{namespace io{
-
- class Socket;
-
- void __AcceptCallback(void *userData,size_t numberOfBytesTransferred,const LemonErrorInfo * errorCode);
-
- template<typename IOService>
- struct AcceptIODataT : public IODataT<IOService>
- {
- Socket *Listen;
-
- Socket *Peer;
-
- void *AcceptUserData;
-
- LemonAcceptCallback AcceptCallback;
-
- lemon_byte_t AcceptBuffer[LEMON_ACCEPTEX_ADDRESS_LENGTH * 2];
-
- struct sockaddr *Address;
-
- socklen_t *AddressSize;
-
- AcceptIODataT(Socket * listen, Socket * peer, LemonAcceptCallback callback,void * userData,sockaddr *address,socklen_t *addressSize,IODataT<IOService>::IODataRelease release)
- {
- Listen = listen;Peer = peer;
-
- AcceptCallback = callback;
-
- AcceptUserData = userData;
-
- Address = address;
-
- AddressSize = addressSize;
-
- UserData = this;
-
- IODataT<IOService>::Callback = &__AcceptCallback;
-
- Release = release;
- }
- };
-
-
- class IOService : public IOServiceT<IOService>
- {
- public:
-
- typedef IODataT<IOService> IOData;
-
- typedef AcceptIODataT<IOService> AcceptIOData;
-
- typedef memory::fixed::allocator<sizeof(IOData)> IODataAllocator;
-
- typedef memory::fixed::allocator<sizeof(AcceptIOData)> AcceptIODataAllocator;
-
- IOService(size_t workthreads);
-
- ~IOService();
-
- void Run();
-
- void Cancel();
-
- public:
-
- IOData * NewIOData(void * userData, LemonIOCallback callback, WSABUF buffer);
-
- void ReleaseIOData(IOData * iodata);
-
- AcceptIOData * NewAcceptIOData(Socket * listen, Socket * peer, LemonAcceptCallback callback,void * userData,sockaddr *address,socklen_t *addressSize);
-
- void ReleaseAcceptIOData(IOData * iodata);
-
- void PostJob(LemonIOCallback callback, void * userdata);
-
- public:
-
- //win32 special API
-
- void Bind(HANDLE file);
-
- private:
-
- HANDLE _completionPort;
-
- IODataAllocator _ioDataAllocator;
-
- mutex_t _ioDataAllocatorMutex;
-
- AcceptIODataAllocator _acceptIoDataAllocator;
-
- mutex_t _acceptIoDataAllocatorMutex;
-
- atomic_t _workThreads;
- };
-
-}}
-
-#endif //LEMON_IO_IOCP
-
-#endif //LEMON_IO_IOCP_IO_SERVICE_HPP
17 io/iocp/iocp.hpp
View
@@ -1,17 +0,0 @@
-/**
-*
-* @file iocp
-* @brief Copyright (C) 2012 yayanyang All Rights Reserved
-* @author yayanyang
-* @version 1.0.0.0
-* @date 2012/08/30
-*/
-#ifndef LEMON_IO_IOCP_IOCP_HPP
-#define LEMON_IO_IOCP_IOCP_HPP
-
-#include <lemon/io/iocp/socket.hpp>
-#include <lemon/io/iocp/io_service.hpp>
-
-#endif //LEMON_IO_IOCP_IOCP_HPP
-
-
57 io/iocp/iodata.hpp
View
@@ -1,57 +0,0 @@
-/**
-*
-* @file iodata
-* @brief Copyright (C) 2012 yayanyang All Rights Reserved
-* @author yayanyang
-* @version 1.0.0.0
-* @date 2012/08/31
-*/
-#ifndef LEMON_IO_IOCP_IODATA_HPP
-#define LEMON_IO_IOCP_IODATA_HPP
-#include <lemon/io/io_service.hpp>
-
-#ifdef LEMON_IO_IOCP
-
-namespace lemon{namespace io{
-
- template<class IOService>
- struct IODataT
- {
- typedef IODataT<IOService> Self;
-
- typedef void (IOService::*IODataRelease)( Self * self );
-
- OVERLAPPED Overlapped;
-
- void *UserData;
-
- LemonIOCallback Callback;
-
- WSABUF Buffer;
-
- IODataRelease Release;
-
- IODataT(){}
-
- IODataT(void * userData, LemonIOCallback callback, WSABUF buffer, IODataRelease release)
- {
- UserData = userData;
-
- Callback = callback;
-
- Buffer = buffer;
-
- Release = release;
- }
- };
-
- template<class IOService>
- inline void Release(IOService * service,IODataT<IOService> * iodata)
- {
- (service->*(iodata->Release))(iodata);
- }
-}}
-
-#endif // LEMON_IO_IOCP
-
-#endif //LEMON_IO_IOCP_IODATA_HPP
403 io/iocp/socket.cpp
View
@@ -1,403 +0,0 @@
-#include <lemon/io/iocp/socket.hpp>
-#include <lemon/io/iocp/io_service.hpp>
-#ifdef LEMON_IO_IOCP
-
-namespace lemon{namespace io{
-
- const static GUID GuidAcceptEx = WSAID_ACCEPTEX;
-
- const static GUID GuidConnectionEx = WSAID_CONNECTEX;
-
- const static GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
-
- template<class LPFN>
- LPFN LoadWinSocketExtension(SOCKET socket,const GUID *guid)
- {
- DWORD dwBytes;
-
- LPFN fn;
-
- if(SOCKET_ERROR == WSAIoctl(socket,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- (LPVOID)guid,
- sizeof(GUID),
- &fn,
- sizeof(fn),
- &dwBytes,
- NULL,
- NULL))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
-
- return fn;
- }
-
- void __AcceptCallback(void *userData,size_t numberOfBytesTransferred,const LemonErrorInfo * errorCode)
- {
- IOService::AcceptIOData* data = (IOService::AcceptIOData*)userData;
-
- if(LEMON_SUCCESS(*errorCode)){
-
- if(SOCKET_ERROR == setsockopt(
- *data->Peer,
- SOL_SOCKET,
- SO_UPDATE_ACCEPT_CONTEXT,
- (char *)&(data->Listen->_handle),
- sizeof(SOCKET)
- )){
-
- LEMON_DECLARE_ERRORINFO(error);
-
- LEMON_WIN32_ERROR(error,WSAGetLastError());
- } else {
-
- sockaddr * localAddress,*remoteAddress;
-
- int localAddressSize,remoteAddressSize;
-
- data->Listen->_getAcceptExSockaddrs(
- data->AcceptBuffer,
- 0,
- LEMON_ACCEPTEX_ADDRESS_LENGTH,
- LEMON_ACCEPTEX_ADDRESS_LENGTH,
- &localAddress,
- &localAddressSize,
- &remoteAddress,
- &remoteAddressSize
- );
-
- memcpy(data->Address,remoteAddress,remoteAddressSize);
-
- *data->AddressSize = remoteAddressSize;
- }
- }
-
- data->AcceptCallback(data->AcceptUserData,reinterpret_cast<LemonIO>(data->Peer),numberOfBytesTransferred,errorCode);
- }
-
- Socket::Socket(int af, int type, int protocol,IOService * service)
- :BaseType(service)
- ,_handle(WSASocket(af,type,protocol,NULL,0,WSA_FLAG_OVERLAPPED))
- ,_af(af)
- ,_type(type)
- ,_protocol(protocol)
- {
- if(INVALID_SOCKET == _handle)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
-
- try
- {
- // close SIO_UDP_CONNRESET
- if(SOCK_DGRAM == type)
- {
- BOOL bNewBehavior = FALSE;
-
- DWORD dwBytesReturned = 0;
-
- DWORD status = WSAIoctl(_handle,SIO_UDP_CONNRESET,&bNewBehavior,sizeof(bNewBehavior),NULL, 0, &dwBytesReturned,NULL,NULL);
-
- if(SOCKET_ERROR == status)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,GetLastError());
- }
- }
-
- _acceptEx = LoadWinSocketExtension<LPFN_ACCEPTEX>(_handle,&GuidAcceptEx);
-
- _connectEx = LoadWinSocketExtension<LPFN_CONNECTEX>(_handle,&GuidConnectionEx);
-
- _getAcceptExSockaddrs = LoadWinSocketExtension<LPFN_GETACCEPTEXSOCKADDRS>(_handle,&GuidGetAcceptExSockaddrs);
-
- service->Bind((HANDLE)_handle);
- }
- catch(...)
- {
- ::closesocket(_handle);
-
- throw;//re throw the exception
- }
- }
-
- Socket::~Socket()
- {
- ::closesocket(_handle);
- }
-
- void Socket::Bind(const sockaddr * name ,socklen_t length)
- {
- if(SOCKET_ERROR == ::bind(_handle,name,length))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
- }
-
- void Socket::Shutdown(int how)
- {
- if(SOCKET_ERROR == ::shutdown(_handle,how))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
- }
-
- void Socket::SockName(sockaddr * name, socklen_t* bufferSize)
- {
- if(SOCKET_ERROR == ::getsockname(_handle,name,bufferSize))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
- }
-
- size_t Socket::Send(const byte_t * buffer, size_t length, int flag)
- {
-
- size_t sendSize = ::send(_handle,(const char*)buffer,(int)length,flag);
-
- if(SOCKET_ERROR == sendSize)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
-
- return sendSize;
- }
-
- void Socket::Send(const byte_t * buffer, size_t length, int flag , LemonIOCallback callback, void * userdata)
- {
- WSABUF wsaBuf = {(ULONG)length,(CHAR*)buffer};
-
- IOService::IOData * iodata = Service()->NewIOData(userdata,callback,wsaBuf);
-
- if(0 != WSASend(_handle,&iodata->Buffer,1,NULL,flag,&iodata->Overlapped,NULL))
- {
- DWORD lastError = WSAGetLastError();
-
- if(WSA_IO_PENDING != lastError)
- {
-
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,lastError);
-
- Release(Service(),iodata);
- }
- }
- }
-
- size_t Socket::Recieve(byte_t * buffer, size_t length, int flag)
- {
- size_t sendSize = ::recv(_handle,(char*)buffer,(int)length,flag);
-
- if(SOCKET_ERROR == sendSize)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
-
- return sendSize;
- }
-
- void Socket::Recieve(byte_t * buffer, size_t length, int flag , LemonIOCallback callback, void * userdata)
- {
- DWORD placeholder = flag;
-
- WSABUF wsaBuf = {(ULONG)length,(CHAR*)buffer};
-
- IOService::IOData * iodata = Service()->NewIOData(userdata,callback,wsaBuf);
-
- if(0 != WSARecv(_handle,&iodata->Buffer,1,NULL,&placeholder,&iodata->Overlapped,NULL))
- {
- DWORD lastError = WSAGetLastError();
-
- if(WSA_IO_PENDING != lastError)
- {
-
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,lastError);
-
- Release(Service(),iodata);
- }
- }
- }
-
- void Socket::Connect(const sockaddr * addr, socklen_t addrlen)
- {
- if(SOCKET_ERROR == ::connect(_handle,addr,addrlen))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
- }
-
- void Socket::Connect(const sockaddr * addr, socklen_t addrlen, LemonIOCallback callback, void * userdata)
- {
-
- IOService::IOData * iodata = Service()->NewIOData(userdata,callback,WSABUF());
-
- DWORD bytesSent;
-
- DWORD sendBytes = 0;
-
- if(!_connectEx(_handle,addr,addrlen,NULL,sendBytes,&bytesSent,&iodata->Overlapped))
- {
- if(ERROR_IO_PENDING != GetLastError())
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,GetLastError());
-
- Release(Service(),iodata);
- }
- }
- }
-
- void Socket::Listen(int backlog)
- {
- if(SOCKET_ERROR == ::listen(_handle,backlog))
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
- }
-
- Socket * Socket::Accept(sockaddr * addr,socklen_t * addrlen)
- {
- SOCKET handle = ::accept(_handle,addr,addrlen);
-
- if(INVALID_SOCKET == handle)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
-
- return new(Service()) Socket(_af,_type,_protocol,handle,Service());
- }
-
- void Socket::Accept(sockaddr * addr,socklen_t * addrlen,LemonAcceptCallback callback, void * userdata)
- {
- Socket * peer = new (Service()) Socket(_af,_type,_protocol,Service());
-
- IOService::AcceptIOData * iodata = Service()->NewAcceptIOData(this,peer,callback,userdata,addr,addrlen);
-
- DWORD bytesReceived;
-
- if(!_acceptEx(
- _handle,
- peer->_handle,
- iodata->AcceptBuffer,
- 0,
- LEMON_ACCEPTEX_ADDRESS_LENGTH,
- LEMON_ACCEPTEX_ADDRESS_LENGTH,
- &bytesReceived,
- &iodata->Overlapped
- ))
- {
- DWORD error = WSAGetLastError();
-
- if(ERROR_IO_PENDING != error){
-
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,error);
-
- Release(Service(),iodata);
- }
- }
- }
-
- size_t Socket::SendTo(const byte_t * buffer, size_t length, int flag,const sockaddr * addr, socklen_t addrlen)
- {
- size_t sendSize = ::sendto(_handle,(const char*)buffer,(int)length,flag,addr,addrlen);
-
- if(SOCKET_ERROR == sendSize)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
-
- return sendSize;
- }
-
- void Socket::SendTo(const byte_t * buffer, size_t length, int flag ,const sockaddr * addr, socklen_t addrlen, LemonIOCallback callback, void * userdata)
- {
- WSABUF wsaBuf = {(ULONG)length,(CHAR*)buffer};
-
- IOService::IOData * iodata = Service()->NewIOData(userdata,callback,wsaBuf);
-
- if(0 != WSASendTo(_handle,&iodata->Buffer,1,NULL,flag,addr,addrlen,&iodata->Overlapped,NULL))
- {
- DWORD lastError = WSAGetLastError();
-
- if(WSA_IO_PENDING != lastError)
- {
-
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,lastError);
-
- Release(Service(),iodata);
- }
- }
- }
-
- size_t Socket::RecieveFrom(byte_t * buffer, size_t length, int flag,sockaddr * addr,socklen_t * addrlen)
- {
- size_t sendSize = ::recvfrom(_handle,(char*)buffer,(int)length,flag,addr,addrlen);
-
- if(SOCKET_ERROR == sendSize)
- {
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(*errorCode,WSAGetLastError());
- }
-
- return sendSize;
- }
-
- void Socket::RecieveFrom(byte_t * buffer, size_t length, int flag , sockaddr * addr,socklen_t * addrlen, LemonIOCallback callback, void * userdata)
- {
- WSABUF wsaBuf = {(ULONG)length,(CHAR*)buffer};
-
- IOService::IOData * iodata = Service()->NewIOData(userdata,callback,wsaBuf);
-
- DWORD placeholder = flag;
-
- if(0 != WSARecvFrom(_handle,&iodata->Buffer,1,NULL,&placeholder,addr,addrlen,&iodata->Overlapped,NULL))
- {
- DWORD lastError = WSAGetLastError();
-
- if(WSA_IO_PENDING != lastError)
- {
-
- scope_error_info errorCode;
-
- LEMON_WIN32_ERROR(errorCode,lastError);
-
- Release(Service(),iodata);
- }
- }
- }
-}}
-
-#endif //#ifdef LEMON_IO_IOCP
103 io/iocp/socket.hpp
View
@@ -1,103 +0,0 @@
-/**
-*
-* @file socket
-* @brief Copyright (C) 2012 yayanyang All Rights Reserved
-* @author yayanyang
-* @version 1.0.0.0
-* @date 2012/08/31
-*/
-#ifndef LEMON_IO_IOCP_SOCKET_HPP
-#define LEMON_IO_IOCP_SOCKET_HPP
-#include <lemon/io/object.hpp>
-#include <lemon/io/iocp/io_service.hpp>
-
-#ifdef LEMON_IO_IOCP
-
-#include <Mswsock.h>
-
-#pragma comment(lib,"Mswsock.lib")
-
-namespace lemon{namespace io{
-
- class Socket : public ObjectT<Socket,IOService>
- {
- public:
-
- friend void __AcceptCallback(void *userData,size_t numberOfBytesTransferred,const LemonErrorInfo * errorCode);
-
- typedef ObjectT<Socket,IOService> BaseType;
-
- Socket(int af, int type, int protocol,SOCKET handle,IOService * service)
- :BaseType(service)
- ,_handle(handle)
- ,_af(af)
- ,_type(type)
- ,_protocol(protocol)
- {}
-
- Socket(int af, int type, int protocol,IOService * service);
-
- ~Socket();
-
- operator const SOCKET () const { return _handle; }
-
- operator SOCKET () { return _handle; }
-
- SOCKET handle() { return _handle; }
-
- public:
-
- void Bind(const sockaddr * name ,socklen_t length);
-
- void Shutdown(int how);
-
- void SockName(sockaddr * name, socklen_t* bufferSize);
-
- size_t Send(const byte_t * buffer, size_t length, int flag);
-
- void Send(const byte_t * buffer, size_t length, int flag , LemonIOCallback callback, void * userdata);
-
- size_t SendTo(const byte_t * buffer, size_t length, int flag,const sockaddr * addr, socklen_t addrlen);
-
- void SendTo(const byte_t * buffer, size_t length, int flag ,const sockaddr * addr, socklen_t addrlen, LemonIOCallback callback, void * userdata);
-
- size_t Recieve(byte_t * buffer, size_t length, int flag);
-
- void Recieve(byte_t * buffer, size_t length, int flag , LemonIOCallback callback, void * userdata);
-
- size_t RecieveFrom(byte_t * buffer, size_t length, int flag,sockaddr * addr,socklen_t * addrlen);
-
- void RecieveFrom(byte_t * buffer, size_t length, int flag , sockaddr * addr,socklen_t * addrlen, LemonIOCallback callback, void * userdata);
-
- void Connect(const sockaddr * addr, socklen_t addrlen);
-
- void Connect(const sockaddr * addr, socklen_t addrlen, LemonIOCallback callback, void * userdata);
-
- void Listen(int backlog);
-
- Socket * Accept(sockaddr * addr,socklen_t * addrlen);
-
- void Accept(sockaddr * addr,socklen_t * addrlen,LemonAcceptCallback callback, void * userdata);
-
- private:
-
- SOCKET _handle;
-
- LPFN_ACCEPTEX _acceptEx;
-
- LPFN_CONNECTEX _connectEx;
-
- LPFN_GETACCEPTEXSOCKADDRS _getAcceptExSockaddrs;
-
- int _af;
-
- int _type;
-
- int _protocol;
-
- };
-}}
-
-#endif //#ifdef LEMON_IO_IOCP
-
-#endif //LEMON_IO_IOCP_SOCKET_HPP
55 io/object.hpp
View
@@ -4,65 +4,38 @@
* @brief Copyright (C) 2012 yayanyang All Rights Reserved
* @author yayanyang
* @version 1.0.0.0
-* @date 2012/08/30
+* @date 2012/09/12
*/
#ifndef LEMON_IO_OBJECT_HPP
#define LEMON_IO_OBJECT_HPP
-#include <cassert>
#include <lemon/io/abi.h>
#include <lemonxx/sys/sys.hpp>
#include <lemonxx/utility/utility.hpp>
-namespace lemon{namespace io{
+namespace lemon{namespace io{namespace core{
- struct Object : private lemon::nocopyable
- {
- virtual ~Object() {}
- };
+ class io_service;
- template<class Impl, class IOService>
- class ObjectT : public Object
+ class io_object : private lemon::nocopyable
{
- public:
+ protected:
- ObjectT(IOService * service):_service(service) {}
-
- virtual ~ObjectT() {}
-
- IOService * Service() { return _service; }
-
- static void * operator new (std::size_t size, IOService * service)
- {
- assert(size == sizeof(Impl));
+ io_object(io_service * service):_ioService(service) {}
+
+ io_object(){}
- byte_t * data = (byte_t*)service->AllocObj(size + sizeof(service));
-
- memcpy(data,&service,sizeof(service));
-
- return data + sizeof(service);
- }
-
- static void operator delete (void * p , IOService * service) throw()
- {
- service->FreeObj((byte_t*)p - sizeof(service),sizeof(Impl));
- }
-
- static void operator delete (void * p , std::size_t size) throw()
- {
- IOService * service;
+ public:
- byte_t * block = (byte_t*)p - sizeof(service);
+ virtual void release() = 0;
- memcpy(&service, block ,sizeof(service));
+ io_service* service() { return _ioService; }
- service->FreeObj(block,size + sizeof(service));
- }
+ const io_service* service() const { return _ioService; }
private:
- IOService *_service;
+ io_service *_ioService;
};
-
-}}
+}}}
#endif //LEMON_IO_OBJECT_HPP
130 io/posix/socket.cpp
View
@@ -1,130 +0,0 @@
-#include <lemon/io/posix/socket.hpp>
-
-#ifndef LEMON_IO_IOCP
-
-#include <errno.h>
-#include <unistd.h>
-
-#define SOCKET_ERROR -1
-
-namespace lemon{namespace io{
-
- Socket::Socket(int af, int type, int protocol,int handle,IOService * service)
- :BaseType(service),_handle(handle),_af(af),_type(type),_protocol(protocol)
- {}
-
- Socket::Socket(int af, int type, int protocol,IOService * service)
- :BaseType(service),_af(af),_type(type),_protocol(protocol)
- {
- _handle = socket( af, type , protocol );
-
- if(-1 == _handle)
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(errorCode,errno);
- }
-
- service->Bind(_handle);
- }
-
- Socket::~Socket()
- {
- ::close(_handle);
- }
-
- void Socket::Bind(const sockaddr * name ,socklen_t length)
- {
- if(SOCKET_ERROR == ::bind(_handle,name,length))
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
- }
-
- void Socket::Shutdown(int how)
- {
- if(SOCKET_ERROR == ::shutdown(_handle,how))
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
- }
-
- void Socket::SockName(sockaddr * name, socklen_t* bufferSize)
- {
- if(SOCKET_ERROR == ::getsockname(_handle,name,bufferSize))
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
- }
-
- size_t Socket::Send(const byte_t * buffer, size_t length, int flag)
- {
-
- size_t sendSize = ::send(_handle,(const char*)buffer,(int)length,flag);
-
- if(SOCKET_ERROR == sendSize)
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
-
- return sendSize;
- }
-
- size_t Socket::Recieve(byte_t * buffer, size_t length, int flag)
- {
- size_t sendSize = ::recv(_handle,(char*)buffer,(int)length,flag);
-
- if(SOCKET_ERROR == sendSize)
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
-
- return sendSize;
- }
-
- void Socket::Connect(const sockaddr * addr, socklen_t addrlen)
- {
- if(SOCKET_ERROR == ::connect(_handle,addr,addrlen))
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
- }
-
- void Socket::Listen(int backlog)
- {
- if(SOCKET_ERROR == ::listen(_handle,backlog))
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
- }
-
- Socket * Socket::Accept(sockaddr * addr,socklen_t * addrlen)
- {
- int handle = ::accept(_handle,addr,addrlen);
-
- if(-1 == handle)
- {
- scope_error_info errorCode;
-
- LEMON_POSIX_ERROR(*errorCode,errno);
- }
-
- return new(Service()) Socket(_af,_type,_protocol,handle,Service());
- }
-}}
-
-#endif //#ifndef LEMON_IO_IOCP
90 io/posix/socket.hpp
View
@@ -1,90 +0,0 @@
-/**
-*
-* @file socket
-* @brief Copyright (C) 2012 yayanyang All Rights Reserved
-* @author yayanyang
-* @version 1.0.0.0
-* @date 2012/08/31
-*/
-#ifndef LEMON_IO_POSIX_SOCKET_HPP
-#define LEMON_IO_POSIX_SOCKET_HPP
-#include <lemon/io/object.hpp>
-
-#ifndef LEMON_IO_IOCP
-
-#ifdef LEMON_IO_EPOLL
-#include <lemon/io/epoll/epoll.hpp>
-#else
-#endif //LEMON_IO_EPOLL
-
-namespace lemon{namespace io{
-
- class Socket : public ObjectT<Socket,IOService>
- {
- public:
- typedef ObjectT<Socket,IOService> BaseType;
-
- Socket(int af, int type, int protocol,int handle,IOService * service);
-
- Socket(int af, int type, int protocol,IOService * service);
-
- ~Socket();
-
- operator const int () const { return _handle; }
-
- operator int () { return _handle; }
-
- int handle() { return _handle; }
-
- public:
-
- void Bind(const sockaddr * name ,socklen_t length);
-
- void Shutdown(int how);
-
- void SockName(sockaddr * name, socklen_t* bufferSize);
-
- size_t Send(const byte_t * buffer, size_t length, int flag);
-
- void Send(const byte_t * buffer, size_t length, int flag , LemonIOCallback callback, void * userdata);
-
- size_t SendTo(const byte_t * buffer, size_t length, int flag,const sockaddr * addr, socklen_t addrlen);
-
- void SendTo(const byte_t * buffer, size_t length, int flag ,const sockaddr * addr, socklen_t addrlen, LemonIOCallback callback, void * userdata);
-
- size_t Recieve(byte_t * buffer, size_t length, int flag);
-
- void Recieve(byte_t * buffer, size_t length, int flag , LemonIOCallback callback, void * userdata);
-
- size_t RecieveFrom(byte_t * buffer, size_t length, int flag,sockaddr * addr,socklen_t * addrlen);
-
- void RecieveFrom(byte_t * buffer, size_t length, int flag , sockaddr * addr,socklen_t * addrlen, LemonIOCallback callback, void * userdata);
-
- void Connect(const sockaddr * addr, socklen_t addrlen);
-
- void Connect(const sockaddr * addr, socklen_t addrlen, LemonIOCallback callback, void * userdata);
-
- void Listen(int backlog);
-
- Socket * Accept(sockaddr * addr,socklen_t * addrlen);
-
- void Accept(sockaddr * addr,socklen_t * addrlen,LemonAcceptCallback callback, void * userdata);
-
- private:
-
- int _handle;
-
- int _af;
-
- int _type;
-
- int _protocol;
-
- };
-
-}}
-
-#endif //#ifndef LEMON_IO_IOCP
-
-#endif //LEMON_IO_POSIX_SOCKET_HPP
-
153 io/socket.cpp
View
@@ -0,0 +1,153 @@
+#include <lemon/io/socket.hpp>
+#ifdef LEMON_IO_IOCP
+#include <lemon/io/io_service_iocp.hpp>
+#else
+#endif //LEMON_IO_IOCP
+
+
+namespace lemon{namespace io{namespace core{
+
+ socket_base::socket_base(int af, int type, int protocol,LemonNativeSock sock, io_service * service)
+ :io_object(service),_handle(sock),_af(af),_type(type),_protocol(protocol)
+ {
+
+ }
+
+ socket_base::~socket_base()
+ {
+#ifdef WIN32
+ ::closesocket(_handle);
+#else
+ ::close(_handle);
+#endif //WIN32
+ }
+
+ void socket_base::release()
+ {
+ this->~socket_base();
+
+ service()->close_socket(this);
+ }
+
+ void socket_base::bind(const sockaddr * name ,socklen_t length)
+ {
+ if(SOCKET_ERROR == ::bind(_handle,name,length))
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+ }
+
+ void socket_base::shutdown(int how)
+ {
+ if(SOCKET_ERROR == ::shutdown(_handle,how))
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+ }
+
+ void socket_base::sockname(sockaddr * name, socklen_t* bufferSize)
+ {
+ if(SOCKET_ERROR == ::getsockname(_handle,name,bufferSize))
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+ }
+
+ size_t socket_base::send(const byte_t * buffer, size_t length, int flag)
+ {
+ size_t sendSize = ::send(_handle,(const char*)buffer,(int)length,flag);
+
+ if(SOCKET_ERROR == sendSize)
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+
+ return sendSize;
+ }
+
+ size_t socket_base::sendto(const byte_t * buffer, size_t length, int flag,const sockaddr * addr, socklen_t addrlen)
+ {
+ size_t sendSize = ::sendto(_handle,(const char*)buffer,(int)length,flag,addr,addrlen);
+
+ if(SOCKET_ERROR == sendSize)
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+
+ return sendSize;
+ }
+
+ size_t socket_base::receive(byte_t * buffer, size_t length, int flag)
+ {
+ size_t sendSize = ::recv(_handle,(char*)buffer,(int)length,flag);
+
+ if(SOCKET_ERROR == sendSize)
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+
+ return sendSize;
+ }
+
+ size_t socket_base::recvfrom(byte_t * buffer, size_t length, int flag,sockaddr * addr,socklen_t * addrlen)
+ {
+ size_t sendSize = ::recvfrom(_handle,(char*)buffer,(int)length,flag,addr,addrlen);
+
+ if(SOCKET_ERROR == sendSize)
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+
+ return sendSize;
+ }
+
+ void socket_base::connect(const sockaddr * addr, socklen_t addrlen)
+ {
+ if(SOCKET_ERROR == ::connect(_handle,addr,addrlen))
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+ }
+
+ void socket_base::listen(int backlog)
+ {
+ if(SOCKET_ERROR == ::listen(_handle,backlog))
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+ }
+
+ socket* socket_base::accept(sockaddr * addr,socklen_t * addrlen)
+ {
+ LemonNativeSock handle = ::accept(_handle,addr,addrlen);
+
+ if(INVALID_SOCKET == handle)
+ {
+ scope_error_info errorCode;
+
+ LEMON_SOCKET_ERROR(errorCode);
+ }
+
+ return service()->create_socket(_af,_type,_protocol,handle);
+ }
+
+
+}}}
82 io/socket.hpp
View
@@ -0,0 +1,82 @@
+/**
+*
+* @file socket
+* @brief Copyright (C) 2012 yayanyang All Rights Reserved
+* @author yayanyang
+* @version 1.0.0.0
+* @date 2012/09/12
+*/
+#ifndef LEMON_IO_SOCKET_HPP
+#define LEMON_IO_SOCKET_HPP
+#include <lemon/io/object.hpp>
+
+#ifndef WIN32
+#define SOCKET_ERROR -1
+#define LEMON_SOCKET_ERROR(errorCode) LEMON_POSIX_ERROR(errorCode,errno)
+#else
+#define LEMON_SOCKET_ERROR(errorCode) LEMON_WIN32_ERROR(errorCode,WSAGetLastError())
+#endif //WIN32
+
+namespace lemon{namespace io{namespace core{
+
+ class socket;
+
+ class socket_base : public io_object
+ {
+ public:
+
+ socket_base(int af, int type, int protocol,LemonNativeSock sock, io_service * service);
+
+ virtual ~socket_base();
+
+ LemonNativeSock& handle() { return _handle; }
+
+ void handle(LemonNativeSock val) { _handle = val; }
+
+ int af() {return _af; }
+
+ int type() {return _type; }
+
+ int protocol() {return _protocol; }
+
+ public:
+
+ void release();
+
+ public:
+
+ void bind(const sockaddr * name ,socklen_t length);
+
+ void shutdown(int how);
+
+ void sockname(sockaddr * name, socklen_t* bufferSize);
+
+ size_t send(const byte_t * buffer, size_t length, int flag);
+
+ size_t sendto(const byte_t * buffer, size_t length, int flag,const sockaddr * addr, socklen_t addrlen);
+
+ size_t receive(byte_t * buffer, size_t length, int flag);
+
+ size_t recvfrom(byte_t * buffer, size_t length, int flag,sockaddr * addr,socklen_t * addrlen);
+
+ void connect(const sockaddr * addr, socklen_t addrlen);
+
+ socket* accept(sockaddr * addr,socklen_t * addrlen);
+
+ void listen(int backlog);
+
+ private:
+
+ LemonNativeSock _handle;
+
+ int _af;
+
+ int _type;
+
+ int _protocol;
+
+ };
+
+}}}
+
+#endif //LEMON_IO_SOCKET_HPP
224 io/socket_iocp.cpp
View
@@ -0,0 +1,224 @@
+#include <lemon/io/socket_iocp.hpp>
+
+#ifdef LEMON_IO_IOCP
+#include <lemon/io/io_service_iocp.hpp>
+
+const static GUID GuidAcceptEx = WSAID_ACCEPTEX;
+
+const static GUID GuidConnectionEx = WSAID_CONNECTEX;
+
+const static GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
+
+namespace lemon{namespace io{namespace core{
+
+ template<class LPFN>
+ LPFN LoadWinSocketExtension(SOCKET socket,const GUID *guid)
+ {
+ DWORD dwBytes;
+
+ LPFN fn;
+
+ if(SOCKET_ERROR == WSAIoctl(socket,
+ SIO_GET_EXTENSION_FUNCTION_POINTER,
+ (LPVOID)guid,
+ sizeof(GUID),
+ &fn,
+ sizeof(fn),
+ &dwBytes,
+ NULL,
+ NULL))
+ {
+ scope_error_info errorCode;
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+
+ return fn;
+ }
+
+ socket::socket(int af, int type, int protocol,io_service * service)
+ :socket_base(af,type,protocol,WSASocket(af,type,protocol,NULL,0,WSA_FLAG_OVERLAPPED),service)
+ {
+ if(INVALID_SOCKET == handle())
+ {
+ scope_error_info errorCode;
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+
+ try
+ {
+ // close SIO_UDP_CONNRESET
+ if(SOCK_DGRAM == type)
+ {
+ BOOL bNewBehavior = FALSE;
+
+ DWORD dwBytesReturned = 0;
+
+ DWORD status = WSAIoctl(handle(),SIO_UDP_CONNRESET,&bNewBehavior,sizeof(bNewBehavior),NULL, 0, &dwBytesReturned,NULL,NULL);
+
+ if(SOCKET_ERROR == status)
+ {
+ scope_error_info errorCode;
+
+ LEMON_WIN32_ERROR(errorCode,GetLastError());
+ }
+ }
+
+ _acceptEx = LoadWinSocketExtension<LPFN_ACCEPTEX>(handle(),&GuidAcceptEx);
+
+ _connectEx = LoadWinSocketExtension<LPFN_CONNECTEX>(handle(),&GuidConnectionEx);
+
+ _getAcceptExSockaddrs = LoadWinSocketExtension<LPFN_GETACCEPTEXSOCKADDRS>(handle(),&GuidGetAcceptExSockaddrs);
+
+ service->bind((HANDLE)handle(),LEMON_IOCP_IO);
+ }
+ catch(...)
+ {
+ ::closesocket(handle());
+
+ throw;//re throw the exception
+ }
+ }
+
+ socket::socket(int af, int type, int protocol,LemonNativeSock sock,io_service * service)
+ :socket_base(af,type,protocol,sock,service)
+ {
+
+ }
+
+