Skip to content

Commit

Permalink
Clean up trigger system in vringbuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
kmatheussen committed May 27, 2012
1 parent 064519b commit ca2e331
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 30 deletions.
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -52,8 +52,8 @@ dist: clean
rm -fr jack_capture-$(VERSION) rm -fr jack_capture-$(VERSION)




jack_capture: setformat.c jack_capture.c vringbuffer.c osc.c Makefile das_config.h config_flags jack_capture: setformat.c jack_capture.c vringbuffer.c upwaker.c osc.c Makefile das_config.h config_flags
$(CC) $(COMPILEFLAGS) jack_capture.c vringbuffer.c osc.c -o jack_capture $(LINKFLAGS) `cat config_flags` $(CC) $(COMPILEFLAGS) jack_capture.c vringbuffer.c upwaker.c osc.c -o jack_capture $(LINKFLAGS) `cat config_flags`




jack_capture_gui2: jack_capture_gui2.cpp jack_capture_gui2: jack_capture_gui2.cpp
Expand Down
35 changes: 9 additions & 26 deletions vringbuffer.c
Expand Up @@ -94,7 +94,6 @@ int vringbuffer_writing_size (vringbuffer_t *vrb);


#include "vringbuffer.h" #include "vringbuffer.h"



////////////////// Implementation /////////////////////////////////// ////////////////// Implementation ///////////////////////////////////




Expand All @@ -118,17 +117,6 @@ static void* my_malloc(size_t size1,size_t size2){
return ret; return ret;
} }


static bool sem_post_if_waiting(sem_t *sem){
int semval;
sem_getvalue(sem,&semval);
if(semval<=0) {
sem_post(sem);
return true;
} else
return false;
}




static bool vringbuffer_increase_writer1(vringbuffer_t *vrb,int num_elements,bool first_call){ static bool vringbuffer_increase_writer1(vringbuffer_t *vrb,int num_elements,bool first_call){


Expand Down Expand Up @@ -192,20 +180,23 @@ vringbuffer_t* vringbuffer_create (int num_elements_during_startup, int max_num_
if(vringbuffer_increase_writer1(vrb,num_elements_during_startup,true)==false) if(vringbuffer_increase_writer1(vrb,num_elements_during_startup,true)==false)
return NULL; return NULL;


vrb->receiver_trigger = create_upwaker();
vrb->autoincrease_trigger = create_upwaker();

return vrb; return vrb;
} }


void vringbuffer_stop_callbacks(vringbuffer_t* vrb){ void vringbuffer_stop_callbacks(vringbuffer_t* vrb){
vrb->please_stop=true; vrb->please_stop=true;


if(vrb->autoincrease_callback!=NULL){ if(vrb->autoincrease_callback!=NULL){
sem_post(&vrb->autoincrease_trigger); upwaker_wake_up(vrb->autoincrease_trigger);
pthread_join(vrb->autoincrease_thread, NULL); pthread_join(vrb->autoincrease_thread, NULL);
vrb->autoincrease_callback=NULL; vrb->autoincrease_callback=NULL;
} }


if(vrb->receiver_callback!=NULL){ if(vrb->receiver_callback!=NULL){
sem_post(&vrb->receiver_trigger); upwaker_wake_up(vrb->receiver_trigger);
pthread_join(vrb->receiver_thread, NULL); pthread_join(vrb->receiver_thread, NULL);
vrb->receiver_callback=NULL; vrb->receiver_callback=NULL;
} }
Expand Down Expand Up @@ -241,7 +232,7 @@ static void *autoincrease_func(void* arg){
vringbuffer_increase_writer1(vrb,num_new_elements,false); vringbuffer_increase_writer1(vrb,num_new_elements,false);


if(vrb->autoincrease_interval==0) if(vrb->autoincrease_interval==0)
sem_wait(&vrb->autoincrease_trigger); upwaker_sleep(vrb->autoincrease_trigger);
else else
usleep(vrb->autoincrease_interval); usleep(vrb->autoincrease_interval);
} }
Expand All @@ -251,15 +242,13 @@ static void *autoincrease_func(void* arg){




void vringbuffer_trigger_autoincrease_callback(vringbuffer_t *vrb){ void vringbuffer_trigger_autoincrease_callback(vringbuffer_t *vrb){
sem_post_if_waiting(&vrb->autoincrease_trigger); upwaker_wake_up(vrb->autoincrease_trigger);
} }


void vringbuffer_set_autoincrease_callback (vringbuffer_t *vrb, Vringbuffer_autoincrease_callback callback, useconds_t interval){ void vringbuffer_set_autoincrease_callback (vringbuffer_t *vrb, Vringbuffer_autoincrease_callback callback, useconds_t interval){
vrb->autoincrease_callback = callback; vrb->autoincrease_callback = callback;
vrb->autoincrease_interval = interval; vrb->autoincrease_interval = interval;


sem_init(&vrb->autoincrease_trigger,0,0);

sem_init(&vrb->autoincrease_started,0,0); sem_init(&vrb->autoincrease_started,0,0);
pthread_create(&vrb->autoincrease_thread, NULL, autoincrease_func, vrb); pthread_create(&vrb->autoincrease_thread, NULL, autoincrease_func, vrb);
sem_wait(&vrb->autoincrease_started); sem_wait(&vrb->autoincrease_started);
Expand Down Expand Up @@ -303,13 +292,12 @@ static void *receiver_func(void* arg){
sem_post(&vrb->receiver_started); sem_post(&vrb->receiver_started);


while(vrb->please_stop==false){ while(vrb->please_stop==false){
sem_wait(&vrb->receiver_trigger); upwaker_sleep(vrb->receiver_trigger);
while(vringbuffer_reading_size(vrb) > 0){ while(vringbuffer_reading_size(vrb) > 0){
void *buffer=vringbuffer_get_reading(vrb); void *buffer=vringbuffer_get_reading(vrb);
vrb->receiver_callback(vrb,false,buffer); vrb->receiver_callback(vrb,false,buffer);
vringbuffer_return_reading(vrb,buffer); vringbuffer_return_reading(vrb,buffer);
} }
// This is the <stuck> position. (see below)
} }


return NULL; return NULL;
Expand All @@ -318,8 +306,6 @@ static void *receiver_func(void* arg){
void vringbuffer_set_receiver_callback(vringbuffer_t *vrb,Vringbuffer_receiver_callback receiver_callback){ void vringbuffer_set_receiver_callback(vringbuffer_t *vrb,Vringbuffer_receiver_callback receiver_callback){
vrb->receiver_callback=receiver_callback; vrb->receiver_callback=receiver_callback;


sem_init(&vrb->receiver_trigger,0,0);

sem_init(&vrb->receiver_started,0,0); sem_init(&vrb->receiver_started,0,0);
pthread_create(&vrb->receiver_thread, NULL, receiver_func, vrb); pthread_create(&vrb->receiver_thread, NULL, receiver_func, vrb);
sem_wait(&vrb->receiver_started); sem_wait(&vrb->receiver_started);
Expand All @@ -337,10 +323,7 @@ void* vringbuffer_get_writing (vringbuffer_t *vrb){


void vringbuffer_return_writing (vringbuffer_t *vrb, void *data){ void vringbuffer_return_writing (vringbuffer_t *vrb, void *data){
jack_ringbuffer_write(vrb->for_reader,(char*)&data,sizeof(void*)); jack_ringbuffer_write(vrb->for_reader,(char*)&data,sizeof(void*));
if(vrb->receiver_callback!=NULL) upwaker_wake_up(vrb->receiver_trigger);
if (sem_post_if_waiting(&vrb->receiver_trigger) == false)
if (vringbuffer_reading_size(vrb)==1) // Fix for race condition. Callback might not be triggered if receiver_thread are stuck at the <stuck> position.
sem_post(&vrb->receiver_trigger);
} }


int vringbuffer_writing_size (vringbuffer_t *vrb){ int vringbuffer_writing_size (vringbuffer_t *vrb){
Expand Down
7 changes: 5 additions & 2 deletions vringbuffer.h
Expand Up @@ -4,8 +4,11 @@
#include <unistd.h> #include <unistd.h>


#include <pthread.h> #include <pthread.h>

#include <semaphore.h> #include <semaphore.h>


#include "upwaker.h"



#include <jack/ringbuffer.h> #include <jack/ringbuffer.h>


Expand Down Expand Up @@ -37,13 +40,13 @@ typedef struct vringbuffer_t{


// Receiver callback // Receiver callback
pthread_t receiver_thread; pthread_t receiver_thread;
sem_t receiver_trigger; upwaker_t *receiver_trigger;
sem_t receiver_started; sem_t receiver_started;
Vringbuffer_receiver_callback receiver_callback; Vringbuffer_receiver_callback receiver_callback;


// Autoincrease callback // Autoincrease callback
pthread_t autoincrease_thread; pthread_t autoincrease_thread;
sem_t autoincrease_trigger; upwaker_t *autoincrease_trigger;
sem_t autoincrease_started; sem_t autoincrease_started;
Vringbuffer_autoincrease_callback autoincrease_callback; Vringbuffer_autoincrease_callback autoincrease_callback;
useconds_t autoincrease_interval; useconds_t autoincrease_interval;
Expand Down

0 comments on commit ca2e331

Please sign in to comment.