Permalink
Browse files

make libtask support epoll

  • Loading branch information...
kulv2012 committed Jul 1, 2014
1 parent 7fe9ca7 commit 6674d2a5e4393d3c4d5543b7fc96fd782962a956
Showing with 98 additions and 70 deletions.
  1. +74 −46 fd.c
  2. +5 −5 httpload.c
  3. +4 −4 net.c
  4. +7 −7 tags
  5. +5 −5 task.h
  6. +3 −3 tcpproxy.c
View
120 fd.c
@@ -1,20 +1,35 @@
#include "taskimpl.h"
#include <sys/poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
enum
{
MAXFD = 1024
};
static struct pollfd pollfd[MAXFD];//局部静态的poll数组
static Task *polltask[MAXFD];
static int npollfd;
static struct epoll_event epoll_recv_events[MAXFD] ;
static int g_epollfd ;
static int startedfdtask;
static Tasklist sleeping;
static int sleepingcounted;
static uvlong nsec(void);
void prepare_fdtask(){
g_epollfd = epoll_create(2);//Since Linux 2.6.8, the size argument is ignored, but must be greater 0
if(g_epollfd < 0){
printf("epoll_create failed. errno:%d, errmsg:%s.\n", errno, strerror(errno));
exit(errno);
}
taskcreate(fdtask, 0, 32768);//这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等
}
void
fdtask(void *v)
{
@@ -44,23 +59,17 @@ fdtask(void *v)
else
ms = 5000;
}
if(poll(pollfd, npollfd, ms) < 0){
if(errno == EINTR)
continue;
fprint(2, "poll: %s\n", strerror(errno));
int retval = epoll_wait( g_epollfd, epoll_recv_events, MAXFD, ms) ;
if( retval > 0){
for( i=0; i < retval; i++){
taskready( epoll_recv_events[i].data.ptr) ;//变为可执行状态
}
} else if( retval == EINTR){
continue ;
} else{
taskexitall(0);
}
/* wake up the guys who deserve it */
for(i=0; i<npollfd; i++){
while(i < npollfd && pollfd[i].revents){
taskready(polltask[i]);//将这些有情况的协程设置为可运行状态,这样这个for下一轮的时候就会调用taskyield主动让出CPU
--npollfd;
pollfd[i] = pollfd[npollfd];
polltask[i] = polltask[npollfd];
}
}
now = nsec();
while((t=sleeping.head) && now >= t->alarmtime){
deltask(&sleeping, t);//看看定时器有没有到时间的
@@ -80,7 +89,7 @@ taskdelay(uint ms)
if(!startedfdtask){
startedfdtask = 1;
taskcreate(fdtask, 0, 32768);
prepare_fdtask();
}
now = nsec();
@@ -115,39 +124,58 @@ taskdelay(uint ms)
}
void
fdwait(int fd, int rw)
fdwait(int *fd, int rw)
{//按需启动fdtask这个异步I/O控制协程,将当前FD加入到poll数组中。进行协程切换。
int bits;
int addedmask = 0;
int oldmask = 0;
struct epoll_event ee;
if(!startedfdtask){
startedfdtask = 1;
taskcreate(fdtask, 0, 32768);//这个是IO等待poll的线程,所有阻塞IO都走这里进行监听,唤醒等
prepare_fdtask();
}
if(npollfd >= MAXFD){
fprint(2, "too many poll file descriptors\n");
abort();
}
taskstate("fdwait for %s", rw=='r' ? "read" : rw=='w' ? "write" : "error");
bits = 0;
oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ;
oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ;
addedmask = 0;
switch(rw){
case 'r':
bits |= POLLIN;
addedmask = EPOLLIN ;
break;
case 'w':
bits |= POLLOUT;
addedmask = EPOLLOUT ;
break;
}
//将这个FD挂入到pollfd里面,这里面是由fdtask协程进行等待唤醒等管理的、
ee.data.u64 = 0; /* avoid valgrind warning */
//将这个FD挂入到epoll里面,这里面是由fdtask协程进行等待唤醒等管理的、
//等这个FD有事件的时候,会将本协程设置为可运行的状态,并且fdtask也会主动yeild让出CPU。
polltask[npollfd] = taskrunning;
pollfd[npollfd].fd = fd;
pollfd[npollfd].events = bits;
pollfd[npollfd].revents = 0;
npollfd++;
if( (addedmask | oldmask) != oldmask ){//add it if need
ee.events = oldmask|addedmask ;
ee.data.ptr = taskrunning;
int op = oldmask == 0 ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd , &ee) == -1){
printf("epoll_ctl pre failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate());
exit(errno);
}
}
taskswitch();//注意这里并没有修改这个协程的运行状态,这样他下次还可能跑起来
if( (addedmask | oldmask) != oldmask ){ //说明刚才我增加过,那么这里需要从当前状态中,去掉刚刚加入的。 这里如果另外的协程加入了新的事件,就会出现.
//最好是代码确认读取完成后,显示删除
oldmask |= (0x80000000&*fd) != 0 ? EPOLLIN : 0 ;
oldmask |= (0x40000000&*fd) != 0 ? EPOLLOUT : 0 ;
ee.events = oldmask & (~ addedmask ) ;
//int op = oldmask == addedmask ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ;
int op = ee.events == 0 ? EPOLL_CTL_DEL : EPOLL_CTL_MOD ;
if (epoll_ctl(g_epollfd, op, 0x3FFFFFFF&*fd, &ee) == -1){
printf("epoll_ctl post failed. errno:%d, errmsg:%s, state(%s)\n", errno, strerror(errno), taskgetstate());
exit(errno);
}
if( addedmask == EPOLLIN) *fd = 0x7FFFFFFF&*fd ;
if( addedmask == EPOLLOUT) *fd = 0xBFFFFFFF&*fd ;
}
/*
不过这里多唤醒一次,当前协程也就是再次尝试I/O,基本还是会EAGAIN, 然后又调用fdwait,又睡下去。这样不会有bug,但会浪费CPU?
@@ -159,34 +187,34 @@ PS: 后来想了想,这个会的,因为当前协程在taskscheduler里面调
/* Like fdread but always calls fdwait before reading. */
int
fdread1(int fd, void *buf, int n)
fdread1(int* pfd, void *buf, int n)
{
int m;
do
fdwait(fd, 'r');
while((m = read(fd, buf, n)) < 0 && errno == EAGAIN);
fdwait(pfd, 'r');
while((m = read( 0x3FFFFFFF&*pfd , buf, n)) < 0 && errno == EAGAIN);
return m;
}
int
fdread(int fd, void *buf, int n)
fdread(int* pfd, void *buf, int n)
{
int m;
while((m=read(fd, buf, n)) < 0 && errno == EAGAIN)
fdwait(fd, 'r');
while((m=read( 0x3FFFFFFF&*pfd , buf, n)) < 0 && errno == EAGAIN)
fdwait(pfd, 'r');
return m;
}
int
fdwrite(int fd, void *buf, int n)
fdwrite(int* pfd, void *buf, int n)
{
int m, tot;
for(tot=0; tot<n; tot+=m){
while((m=write(fd, (char*)buf+tot, n-tot)) < 0 && errno == EAGAIN)
fdwait(fd, 'w');//关键:如果写入时返回EAGAIN说明差不多了,得过会才能写入。那么这里需要放入epoll,把本协程挂起
while((m=write( 0x3FFFFFFF&*pfd , (char*)buf+tot, n-tot)) < 0 && errno == EAGAIN)
fdwait(pfd, 'w');//关键:如果写入时返回EAGAIN说明差不多了,得过会才能写入。那么这里需要放入epoll,把本协程挂起
if(m < 0)
return m;
if(m == 0)
View
@@ -36,15 +36,15 @@ void fetchtask(void *v) {
fprintf(stderr, "starting...\n");
for(;;){
if((fd = netdial(TCP, server, 80)) < 0){//异步连接服务器,会造成协程切换
if((fd = netdial(TCP, server, 8001)) < 0){//异步连接服务器,会造成协程切换
fprintf(stderr, "dial %s: %s (%s)\n", server, strerror(errno), taskgetstate());
continue;
}
snprintf(buf, sizeof buf, "GET %s HTTP/1.0\r\nHost: %s\r\n\r\n", url, server);
fdwrite(fd, buf, strlen(buf));//异步数据读写,这里可能会造成协程切换,因为一定有阻塞操作
while((n = fdread(fd, buf, sizeof buf)) > 0){///异步读取
//buf[n] = '\0';
//printf("buf:%s", buf);
fdwrite(&fd, buf, strlen(buf));//异步数据读写,这里可能会造成协程切换,因为一定有阻塞操作
while((n = fdread(&fd, buf, sizeof buf)) > 0){///异步读取
buf[n] = '\0';
printf("buf:%s", buf);
}
close(fd);
write(1, ".", 1);
View
8 net.c
@@ -52,18 +52,18 @@ netannounce(int istcp, char *server, int port)
}
int
netaccept(int fd, char *server, int *port)
netaccept(int* pfd, char *server, int *port)
{
int cfd, one;
struct sockaddr_in sa;
uchar *ip;
socklen_t len;
fdwait(fd, 'r');//等待可读事件触发,否则调度别人运行
fdwait(pfd, 'r');//等待可读事件触发,否则调度别人运行
taskstate("netaccept");
len = sizeof sa;
if((cfd = accept(fd, (void*)&sa, &len)) < 0){
if((cfd = accept( 0x3FFFFFFF&*pfd, (void*)&sa, &len)) < 0){
taskstate("accept failed");
return -1;
}
@@ -183,7 +183,7 @@ netdial(int istcp, char *server, int port)
}
/* wait for finish */
fdwait(fd, 'w');//等待处理完成,并且会释放CPU的。
fdwait(&fd, 'w');//等待处理完成,并且会释放CPU的。
sn = sizeof sa;
if(getpeername(fd, (struct sockaddr*)&sa, &sn) >= 0){
taskstate("connect succeeded");
View
14 tags
@@ -149,20 +149,22 @@ delaytask testdelay.c /^delaytask(void *v)$/;" f
deltask task.c /^deltask(Tasklist *l, Task *t)$/;" f
elemsize task.h /^ unsigned int elemsize;$/;" m struct:Channel
emalloc primes.c /^emalloc(unsigned long n)$/;" f
epoll_recv_events fd.c /^static struct epoll_event epoll_recv_events[MAXFD] ;$/;" v typeref:struct:epoll_event file:
exiting taskimpl.h /^ int exiting;$/;" m struct:Task
f1 test/context.c /^static void f1 (void) { $/;" f file:
f2 test/context.c /^static void f2 (void) { $/;" f file:
fdnoblock fd.c /^fdnoblock(int fd)$/;" f
fdread fd.c /^fdread(int fd, void *buf, int n)$/;" f
fdread1 fd.c /^fdread1(int fd, void *buf, int n)$/;" f
fdread fd.c /^fdread(int* pfd, void *buf, int n)$/;" f
fdread1 fd.c /^fdread1(int* pfd, void *buf, int n)$/;" f
fdtask fd.c /^fdtask(void *v)$/;" f
fdwait fd.c /^fdwait(int fd, int rw)$/;" f
fdwrite fd.c /^fdwrite(int fd, void *buf, int n)$/;" f
fdwait fd.c /^fdwait(int *fd, int rw)$/;" f
fdwrite fd.c /^fdwrite(int* pfd, void *buf, int n)$/;" f
fetchtask httpload.c /^void fetchtask(void *v) {$/;" f
fprint print.c /^fprint(int fd, char *fmt, ...)$/;" f
fprint taskimpl.h 63;" d
func1 test/swapcontext.c /^func1(void)$/;" f file:
func2 test/swapcontext.c /^func2(void)$/;" f file:
g_epollfd fd.c /^static int g_epollfd ;$/;" v file:
getcontext 386-ucontext.h 2;" d
getcontext amd64-ucontext.h 2;" d
getcontext power-ucontext.h 2;" d
@@ -270,17 +272,15 @@ netdial net.c /^netdial(int istcp, char *server, int port)$/;" f
netlookup net.c /^netlookup(char *name, uint32_t *ip)$/;" f
next taskimpl.h /^ Task *next;\/\/协程的双向链表$/;" m struct:Task
nil taskimpl.h 45;" d
npollfd fd.c /^static int npollfd;$/;" v file:
nsec fd.c /^nsec(void)$/;" f file:
off task.h /^ unsigned int off;$/;" m struct:Channel
op task.h /^ unsigned int op;$/;" m struct:Alt
otherop channel.c 56;" d file:
owner task.h /^ Task *owner;$/;" m struct:QLock
parseip net.c /^parseip(char *name, uint32_t *ip)$/;" f file:
pc power-ucontext.h /^ ulong pc; \/* lr *\/$/;" m struct:mcontext
pollfd fd.c /^static struct pollfd pollfd[MAXFD];\/\/局部静态的poll数组$/;" v typeref:struct:pollfd file:
polltask fd.c /^static Task *polltask[MAXFD];$/;" v file:
port tcpproxy.c /^int port;$/;" v
prepare_fdtask fd.c /^void prepare_fdtask(){$/;" f
prev taskimpl.h /^ Task *prev;$/;" m struct:Task
primetask primes.c /^primetask(void *arg)$/;" f
print print.c /^print(char *fmt, ...)$/;" f
View
10 task.h
@@ -152,10 +152,10 @@ int chansendul(Channel *c, unsigned long v);
/*
* Threaded I/O.
*/
int fdread(int, void*, int);
int fdread1(int, void*, int); /* always uses fdwait */
int fdwrite(int, void*, int);
void fdwait(int, int);
int fdread(int* , void*, int);
int fdread1(int* , void*, int); /* always uses fdwait */
int fdwrite(int* , void*, int);
void fdwait(int*, int);
int fdnoblock(int);
void fdtask(void*);
@@ -170,7 +170,7 @@ enum
};
int netannounce(int, char*, int);
int netaccept(int, char*, int*);
int netaccept(int*, char*, int*);
int netdial(int, char*, int);
int netlookup(char*, uint32_t*); /* blocks entire program! */
int netdial(int, char*, int);
View
@@ -50,7 +50,7 @@ taskmain(int argc, char **argv)
taskexitall(1);
}
fdnoblock(fd);
while((cfd = netaccept(fd, remote, &rport)) >= 0){
while((cfd = netaccept(&fd, remote, &rport)) >= 0){
fprintf(stderr, "connection from %s:%d\n", remote, rport);
taskcreate(proxytask, (void*)cfd, STACK);
}
@@ -84,8 +84,8 @@ rwtask(void *v)
wfd = a[1];
free(a);
while((n = fdread(rfd, buf, sizeof buf)) > 0)
fdwrite(wfd, buf, n);
while((n = fdread(&rfd, buf, sizeof buf)) > 0)
fdwrite(&wfd, buf, n);
shutdown(wfd, SHUT_WR);
close(rfd);
}

0 comments on commit 6674d2a

Please sign in to comment.