Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: ac9242c56e
Fetching contributors…

Cannot retrieve contributors at this time

file 138 lines (98 sloc) 3.302 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
#include "Pipe.h"

#include <iostream>
#include <stdlib.h>

Pipe :: Pipe (int bufferSize) {

// set up the mutex assoicated with the pipe
pthread_mutex_init (&pipeMutex, NULL);

// set up the condition variables associated with the pipe
pthread_cond_init (&producerVar, NULL);
pthread_cond_init (&consumerVar, NULL);

// set up the pipe's buffer
buffered = new (std::nothrow) Record[bufferSize];
if (buffered == NULL)
{
cout << "ERROR : Not enough memory. EXIT !!!\n";
exit(1);
}

totSpace = bufferSize;
firstSlot = lastSlot = 0;

// note that the pipe has not yet been turned off
done = 0;
}

Pipe :: ~Pipe () {

// free everything up!
delete [] buffered;

pthread_mutex_destroy (&pipeMutex);
pthread_cond_destroy (&producerVar);
pthread_cond_destroy (&consumerVar);

}


void Pipe :: Insert (Record *insertMe) {

// first, get a mutex on the pipeline
pthread_mutex_lock (&pipeMutex);

// next, see if there is space in the pipe for more data; if
// there is, then do the insertion
if (lastSlot - firstSlot < totSpace) {
// cout<<__LINE__<<" " <<__FUNCTION__<<" "<<__FILE__<<endl;
buffered [lastSlot % totSpace].Consume (insertMe);

// if there is not, then we need to wait until the consumer
// frees up some space in the pipeline
} else {
pthread_cond_wait (&producerVar, &pipeMutex);
// cout<<__LINE__<<" "<<__FUNCTION__<<" "<<__FILE__<<endl;
buffered [lastSlot % totSpace].Consume (insertMe);
}

// note that we have added a new record
lastSlot++;

// signal the consumer who might now want to suck up the new
// record that has been added to the pipeline
pthread_cond_signal (&consumerVar);

// done!
pthread_mutex_unlock (&pipeMutex);
}


int Pipe :: Remove (Record *removeMe) {

// first, get a mutex on the pipeline
pthread_mutex_lock (&pipeMutex);

// next, see if there is anything in the pipeline; if
// there is, then do the removal
if (lastSlot != firstSlot) {
// cout<<"something is there in pipeline: calling consume"<<endl;

removeMe->Consume (&buffered [firstSlot % totSpace]);

// if there is not, then we need to wait until the producer
// puts some data into the pipeline
} else {

// the pipeline is empty so we first see if this
// is because it was turned off
if (done) {

pthread_mutex_unlock (&pipeMutex);
return 0;
}

// wait until there is something there
pthread_cond_wait (&consumerVar, &pipeMutex);

// since the producer may have decided to turn off
// the pipe, we need to check if it is still open
if (done && lastSlot == firstSlot) {
pthread_mutex_unlock (&pipeMutex);
return 0;
}

// cout<<"calling consume"<<endl;
removeMe->Consume (&buffered [firstSlot % totSpace]);
}

// note that we have deleted a record
firstSlot++;

// signal the producer who might now want to take the slot
// that has been freed up by the deletion
pthread_cond_signal (&producerVar);

// done!
pthread_mutex_unlock (&pipeMutex);
return 1;
}


void Pipe :: ShutDown () {

// first, get a mutex on the pipeline
        pthread_mutex_lock (&pipeMutex);

// note that we are now done with the pipeline
done = 1;

// signal the consumer who may be waiting
pthread_cond_signal (&consumerVar);

// unlock the mutex
pthread_mutex_unlock (&pipeMutex);

}
Something went wrong with that request. Please try again.