Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don't worry, you can still create the pull request.
  • 3 commits
  • 3 files changed
  • 0 commit comments
  • 1 contributor
Commits on Jun 08, 2012
@karlheyes remove redundant refcount for burst
The listeners do not refcount the queue as they only get a read lock and the
queue does not currently shrink so the counters for the burst data are not
required anymore.
7948824
@karlheyes remove redundant trap for catching cases with rwlock conversion 3040756
@karlheyes shrink queues to min queue size for special case
it is hard to know what the largest listener lag is now (no refcounts), so we
have to allow up to the queue size but if there are no listeners then we can
definitely shrink it.
c794e15
Showing with 11 additions and 27 deletions.
  1. +1 −4 src/client.c
  2. +9 −21 src/source.c
  3. +1 −2  src/source.h
View
5 src/client.c
@@ -317,11 +317,8 @@ void client_set_queue (client_t *client, refbuf_t *refbuf)
}
client->refbuf = refbuf;
if (refbuf)
- {
- if (refbuf->flags & 04) // trap SOURCE_QUEUE_BLOCK for now
- abort();
refbuf_addref (client->refbuf);
- }
+
client->pos = 0;
if (to_release)
refbuf_release (to_release);
View
30 src/source.c
@@ -249,9 +249,6 @@ int source_compare_sources(void *arg, void *a, void *b)
void source_clear_source (source_t *source)
{
- int do_twice = 0;
- refbuf_t *p;
-
DEBUG1 ("clearing source \"%s\"", source->mount);
if (source->dumpfile)
@@ -268,22 +265,14 @@ void source_clear_source (source_t *source)
refbuf_release (source->stream_data_tail);
/* remove the reference for buffers on the queue */
- p = source->stream_data;
- while (p)
+ while (source->stream_data)
{
- refbuf_t *to_go = p;
- p = to_go->next;
+ refbuf_t *to_go = source->stream_data;
+ source->stream_data = to_go->next;
to_go->next = NULL;
- // DEBUG1 ("queue refbuf count is %d", to_go->_count);
- if (do_twice || to_go == source->min_queue_point)
- { /* burst data is also counted */
- refbuf_release (to_go);
- do_twice = 1;
- }
refbuf_release (to_go);
}
source->min_queue_point = NULL;
- source->stream_data = NULL;
source->stream_data_tail = NULL;
source->min_queue_size = 0;
@@ -403,6 +392,7 @@ int source_read (source_t *source)
refbuf_t *refbuf = NULL;
int skip = 1, loop = 1;
time_t current = client->worker->current_time.tv_sec;
+ long queue_size_target;
int fds = 0;
if (global.running != ICE_RUNNING)
@@ -515,9 +505,6 @@ int source_read (source_t *source)
source->stream_data_tail = refbuf;
source->queue_size += refbuf->len;
- /* increase refcount for keeping burst data */
- refbuf_addref (refbuf);
-
/* move the starting point for new listeners */
source->min_queue_offset += refbuf->len;
while (source->min_queue_offset > source->min_queue_size)
@@ -527,7 +514,6 @@ int source_read (source_t *source)
{
source->min_queue_offset -= to_release->len;
source->min_queue_point = to_release->next;
- refbuf_release (to_release);
continue;
}
if (source->min_queue_point != refbuf)
@@ -557,14 +543,16 @@ int source_read (source_t *source)
} while (loop);
/* lets see if we have too much data in the queue */
- while (source->queue_size > source->queue_size_limit)
+ if (source->listeners)
+ queue_size_target = source->queue_size_limit;
+ else
+ queue_size_target = source->min_queue_size;
+ while (source->queue_size > queue_size_target)
{
refbuf_t *to_go = source->stream_data;
source->stream_data = to_go->next;
source->queue_size -= to_go->len;
to_go->next = NULL;
- /* mark for delete to tell others holding it and release it ourselves */
- to_go->flags |= SOURCE_BLOCK_RELEASE;
refbuf_release (to_go);
}
} while (0);
View
3  src/source.h
@@ -114,8 +114,7 @@ void source_set_fallback (source_t *source, const char *dest_mount);
void source_listeners_wakeup (source_t *source);
#define SOURCE_BLOCK_SYNC 01
-#define SOURCE_BLOCK_RELEASE 02
-#define SOURCE_QUEUE_BLOCK 04
+#define SOURCE_QUEUE_BLOCK 02
#endif

No commit comments for this range

Something went wrong with that request. Please try again.