Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
kr committed Apr 14, 2012
1 parent 3ff79f8 commit 557e4d8
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 149 deletions.
43 changes: 19 additions & 24 deletions conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ on_ignore(ms a, tube t, size_t i)
tube_dref(t);
}

conn
Conn *
make_conn(int fd, char start_state, tube use, tube watch)
{
job j;
conn c;
Conn *c;

c = new(struct conn);
if (!c) return twarn("OOM"), (conn) 0;
c = new(Conn);
if (!c) return twarn("OOM"), NULL;

ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
if (!ms_append(&c->watch, watch)) {
free(c);
return twarn("OOM"), (conn) 0;
return twarn("OOM"), NULL;
}

TUBE_ASSIGN(c->use, use);
Expand All @@ -60,15 +60,15 @@ make_conn(int fd, char start_state, tube use, tube watch)
}

void
conn_set_producer(conn c)
connsetproducer(Conn *c)
{
if (c->type & CONN_TYPE_PRODUCER) return;
c->type |= CONN_TYPE_PRODUCER;
cur_producer_ct++; /* stats */
}

void
conn_set_worker(conn c)
connsetworker(Conn *c)
{
if (c->type & CONN_TYPE_WORKER) return;
c->type |= CONN_TYPE_WORKER;
Expand Down Expand Up @@ -100,14 +100,14 @@ count_cur_workers()
}

static int
has_reserved_job(conn c)
has_reserved_job(Conn *c)
{
return job_list_any_p(&c->reserved_jobs);
}


static int64
conntickat(conn c)
conntickat(Conn *c)
{
int margin = 0, should_timeout = 0;
int64 t = INT64_MAX;
Expand All @@ -117,7 +117,7 @@ conntickat(conn c)
}

if (has_reserved_job(c)) {
t = soonest_job(c)->r.deadline_at - nanoseconds() - margin;
t = connsoonestjob(c)->r.deadline_at - nanoseconds() - margin;
should_timeout = 1;
}
if (c->pending_timeout >= 0) {
Expand All @@ -133,15 +133,15 @@ conntickat(conn c)


void
connwant(conn c, int rw)
connwant(Conn *c, int rw)
{
c->rw = rw;
connsched(c);
}


void
connsched(conn c)
connsched(Conn *c)
{
c->tickat = conntickat(c);
srvschedconn(c->srv, c);
Expand All @@ -151,7 +151,7 @@ connsched(conn c)
/* return the reserved job with the earliest deadline,
* or NULL if there's no reserved job */
job
soonest_job(conn c)
connsoonestjob(Conn *c)
{
job j = NULL;
job soonest = c->soonest_job;
Expand All @@ -165,25 +165,20 @@ soonest_job(conn c)
return soonest;
}

int
has_reserved_this_job(conn c, job j)
{
return j && j->r.state == Reserved && j->reserver == c;
}

/* return true if c has a reserved job with less than one second until its
* deadline */
int
conn_has_close_deadline(conn c)
conndeadlinesoon(Conn *c)
{
int64 t = nanoseconds();
job j = soonest_job(c);
job j = connsoonestjob(c);

return j && t >= j->r.deadline_at - SAFETY_MARGIN;
}

int
conn_ready(conn c)
conn_ready(Conn *c)
{
size_t i;

Expand All @@ -195,21 +190,21 @@ conn_ready(conn c)


int
connless(conn a, conn b)
connless(Conn *a, Conn *b)
{
return a->tickat < b->tickat;
}


void
connrec(conn c, int i)
connrec(Conn *c, int i)
{
c->tickpos = i;
}


void
conn_close(conn c)
connclose(Conn *c)
{
sockwant(&c->sock, 0);
close(c->sock.fd);
Expand Down
120 changes: 56 additions & 64 deletions dat.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ typedef uint64_t uint64;
typedef struct ms *ms;
typedef struct job *job;
typedef struct tube *tube;
typedef struct conn *conn;
typedef struct Conn Conn;
typedef struct Heap Heap;
typedef struct Jobrec Jobrec;
typedef struct File File;
Expand Down Expand Up @@ -170,47 +170,6 @@ struct tube {
struct job buried;
};

struct conn {
conn next;
Server *srv;
Socket sock;
char state;
char type;
int rw; // currently want: 'r' or 'w'
int pending_timeout;
int64 tickat; // time at which to do more work
int tickpos; // position in srv->conns

/* we cannot share this buffer with the reply line because we might read in
* command line data for a subsequent command, and we need to store it
* here. */
char cmd[LINE_BUF_SIZE]; /* this string is NOT NUL-terminated */
int cmd_len;
int cmd_read;
const char *reply;
int reply_len;
int reply_sent;
char reply_buf[LINE_BUF_SIZE]; /* this string IS NUL-terminated */

/* A job to be read from the client. */
job in_job;

/* Memoization of the soonest job */
job soonest_job;

/* How many bytes of in_job->body have been read so far. If in_job is NULL
* while in_job_read is nonzero, we are in bit bucket mode and
* in_job_read's meaning is inverted -- then it counts the bytes that
* remain to be thrown away. */
int in_job_read;

job out_job;
int out_job_sent;
tube use;
struct ms watch;
struct job reserved_jobs; /* doubly-linked list header */
};


void v(void);

Expand Down Expand Up @@ -274,30 +233,13 @@ tube tube_find_or_make(const char *name);
#define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))


conn make_conn(int fd, char start_state, tube use, tube watch);

int connless(conn a, conn b);
void connrec(conn c, int i);
void connwant(conn c, int rw);
void connsched(conn c);

void conn_close(conn c);
Conn *make_conn(int fd, char start_state, tube use, tube watch);

int count_cur_conns(void);
uint count_tot_conns(void);
int count_cur_producers(void);
int count_cur_workers(void);

void conn_set_producer(conn c);
void conn_set_worker(conn c);

job soonest_job(conn c);
int has_reserved_this_job(conn c, job j);
int conn_has_close_deadline(conn c);
int conn_ready(conn c);

#define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)


extern size_t primes[];

Expand All @@ -307,9 +249,9 @@ extern size_t job_data_size_limit;
void prot_init(void);
void prottick(Server *s);

conn remove_waiting_conn(conn c);
Conn *remove_waiting_conn(Conn *c);

void enqueue_reserved_jobs(conn c);
void enqueue_reserved_jobs(Conn *c);

void enter_drain_mode(int sig);
void h_accept(const int fd, const short which, Server* srv);
Expand All @@ -320,13 +262,63 @@ void prot_replay(Server *s, job list);
int make_server_socket(char *host_addr, char *port);


struct Conn {
Server *srv;
Socket sock;
char state;
char type;
Conn *next;
tube use;
int64 tickat; // time at which to do more work
int tickpos; // position in srv->conns
job soonest_job; // memoization of the soonest job
int rw; // currently want: 'r', 'w', or 'h'
int pending_timeout;

struct ms watch;
struct job reserved_jobs; // linked list header

char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated
int cmd_len;
int cmd_read;

char *reply;
int reply_len;
int reply_sent;
char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated

// How many bytes of in_job->body have been read so far. If in_job is NULL
// while in_job_read is nonzero, we are in bit bucket mode and
// in_job_read's meaning is inverted -- then it counts the bytes that
// remain to be thrown away.
int in_job_read;
job in_job; // a job to be read from the client

job out_job;
int out_job_sent;
};
int connless(Conn *a, Conn *b);
void connrec(Conn *c, int i);
void connwant(Conn *c, int rw);
void connsched(Conn *c);
void connclose(Conn *c);
void connsetproducer(Conn *c);
void connsetworker(Conn *c);
job connsoonestjob(Conn *c);
int conndeadlinesoon(Conn *c);
int conn_ready(Conn *c);
#define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)




enum
{
Filesizedef = (10 << 20)
};

struct Wal {
int filesz;
int filesize;
int use;
char *dir;
File *head;
Expand Down Expand Up @@ -391,5 +383,5 @@ struct Server {
};
void srvserve(Server *srv);
void srvaccept(Server *s, int ev);
void srvschedconn(Server *srv, conn c);
void srvschedconn(Server *srv, Conn *c);
void srvtick(Server *s, int ev);
6 changes: 3 additions & 3 deletions file.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ filewopen(File *f)
return;
}

r = falloc(fd, f->w->filesz);
r = falloc(fd, f->w->filesize);
if (r) {
close(fd);
errno = r;
Expand All @@ -449,7 +449,7 @@ filewopen(File *f)
f->fd = fd;
f->iswopen = 1;
fileincref(f);
f->free = f->w->filesz - n;
f->free = f->w->filesize - n;
f->resv = 0;
}

Expand Down Expand Up @@ -513,7 +513,7 @@ filewclose(File *f)
if (!f) return;
if (!f->iswopen) return;
if (f->free) {
(void)ftruncate(f->fd, f->w->filesz - f->free);
(void)ftruncate(f->fd, f->w->filesize - f->free);
}
close(f->fd);
f->iswopen = 0;
Expand Down
8 changes: 4 additions & 4 deletions integ-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ cttestbinlogsizelimit()
mkdtemp(dir);
srv.wal.dir = dir;
srv.wal.use = 1;
srv.wal.filesz = size;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;

Expand Down Expand Up @@ -755,7 +755,7 @@ cttestbinlogallocation()
mkdtemp(dir);
srv.wal.dir = dir;
srv.wal.use = 1;
srv.wal.filesz = size;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;

Expand Down Expand Up @@ -839,7 +839,7 @@ cttestbinlogdiskfull()
mkdtemp(dir);
srv.wal.dir = dir;
srv.wal.use = 1;
srv.wal.filesz = size;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;

Expand Down Expand Up @@ -908,7 +908,7 @@ cttestbinlogdiskfulldelete()
mkdtemp(dir);
srv.wal.dir = dir;
srv.wal.use = 1;
srv.wal.filesz = size;
srv.wal.filesize = size;
srv.wal.syncrate = 0;
srv.wal.wantsync = 1;

Expand Down
Loading

0 comments on commit 557e4d8

Please sign in to comment.