Permalink
Browse files

Add background fetch and multiple client streaming support

Add parameter stream_maxchunksize that limits the storage chunks
allocated when streaming. This creates interruption points where the
receiving clients are notified of new data

Add parameter stream_tokens controlling the default number of tokens
available when new stream data arrives from backend.

Add counter values for use with streaming

Token strategy: Any thread hitting end of data will be marked as a
fast writer. It will then have to get a token before calling
RES_StreamWrite, thus n_tokens limits the number of thundering threads
run whenever new data arrives. The fast writer flag is cleared on each
pass through the loop, and reset if it hits the end of data and waits.

Expose the number of tokens as VRT functions accessible through VCL in
vcl_fetch. This allows the rate throttling to be tuned per connection.

Make conditional delivery work while streaming

Rename parameter default_tokens to stream_tokens

Update streaming documentation

Update transition graph

Make r00979.vtc compatible with threaded streaming.

Add author in files with more than trivial changes

Add WRK_QueueFirst() that will schedule a work request first, and not
take queue lengths into account. For already commited to work loads.

Add WRK_QueueSessionFirst() that will queue a session using WRK_QueueFirst().

Add SES_NewNonVCA() for getting recycled sessions from the list not
owned by the VCA

Conflicts:

	bin/varnishd/cache_center.c
  • Loading branch information...
mbgrydeland committed Sep 7, 2011
1 parent c663246 commit 7a79f3b4820a693bfa09ea901876907cffce7289
@@ -36,5 +36,6 @@ ACCT(req)
ACCT(pipe)
ACCT(pass)
ACCT(fetch)
+ACCT(stream)
ACCT(hdrbytes)
ACCT(bodybytes)
View
@@ -283,8 +283,12 @@ struct stream_ctx {
/* Next byte we will take from storage */
ssize_t stream_next;
- /* First byte of storage if we free it as we go (pass) */
+ /* First byte of storage and address we look at */
ssize_t stream_front;
+ struct storage *stream_frontchunk;
+
+ /* Max byte we can stream */
+ ssize_t stream_max;
};
/*--------------------------------------------------------------------*/
@@ -331,13 +335,14 @@ struct worker {
struct vfp *vfp;
struct vgz *vgz_rx;
struct vef_priv *vef_priv;
- unsigned do_stream;
unsigned do_esi;
unsigned do_gzip;
unsigned is_gzip;
unsigned do_gunzip;
unsigned is_gunzip;
unsigned do_close;
+ unsigned do_stream;
+ unsigned stream_tokens;
char *h_content_length;
/* Stream state */
@@ -449,7 +454,6 @@ oc_getobj(struct worker *wrk, struct objcore *oc)
{
CHECK_OBJ_NOTNULL(oc, OBJCORE_MAGIC);
- AZ(oc->flags & OC_F_BUSY);
AN(oc->methods);
AN(oc->methods->getobj);
return (oc->methods->getobj(wrk, oc));
@@ -491,6 +495,17 @@ struct busyobj {
unsigned magic;
#define BUSYOBJ_MAGIC 0x23b95567
uint8_t *vary;
+ int can_stream;
+ int stream_stop;
+ int stream_error;
+ int stream_refcnt;
+ ssize_t stream_max;
+ struct storage *stream_frontchunk;
+ unsigned stream_tokens;
+ char *stream_h_content_length;
+ struct lock mtx;
+ pthread_cond_t cond_tokens;
+ pthread_cond_t cond_data;
};
/* Object structure --------------------------------------------------*/
@@ -605,6 +620,8 @@ struct sess {
struct object *obj;
struct objcore *objcore;
struct VCL_conf *vcl;
+ struct busyobj *stream_busyobj; /* The busyobj if we
+ * are streaming */
/* The busy objhead we sleep on */
struct objhead *hash_objhead;
@@ -841,6 +858,7 @@ void PipeSession(struct sess *sp);
/* cache_pool.c */
void WRK_Init(void);
int WRK_QueueSession(struct sess *sp);
+int WRK_QueueSessionFirst(struct sess *sp);
void WRK_SumStat(struct worker *w);
#define WRW_IsReleased(w) ((w)->wrw.wfd == NULL)
@@ -863,6 +881,7 @@ void WRK_BgThread(pthread_t *thr, const char *name, bgthread_t *func,
/* cache_session.c [SES] */
void SES_Init(void);
struct sess *SES_New(void);
+struct sess *SES_NewNonVCA(struct worker *w);
struct sess *SES_Alloc(void);
void SES_Delete(struct sess *sp);
void SES_Charge(struct sess *sp);
@@ -904,7 +923,9 @@ void RES_BuildHttp(const struct sess *sp);
void RES_WriteObj(struct sess *sp);
void RES_StreamStart(struct sess *sp);
void RES_StreamEnd(struct sess *sp);
-void RES_StreamPoll(const struct sess *sp);
+int RES_StreamPoll(const struct sess *sp);
+void RES_StreamWrite(const struct sess *sp);
+void RES_StreamBody(struct sess *sp);
/* cache_vary.c */
struct vsb *VRY_Create(const struct sess *sp, const struct http *hp);
@@ -1014,6 +1035,7 @@ AssertObjBusy(const struct object *o)
{
AN(o->objcore);
AN (o->objcore->flags & OC_F_BUSY);
+ AN(o->objcore->busyobj);
}
static inline void
Oops, something went wrong.

0 comments on commit 7a79f3b

Please sign in to comment.