-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathio_validation.c
147 lines (122 loc) · 3.67 KB
/
io_validation.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#define C_FEK_BLOCKING_QUEUE_IMPLEMENTATION
#define C_FEK_FAIR_LOCK_IMPLEMENTATION
#include "../blocking_queue.h"
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
static Blocking_Queue bq;
static int data_size;
static int num_producer_threads;
static int num_consumer_threads;
static int* produced;
static int* consumed;
static int* producer_threads_ids;
static int* consumer_threads_ids;
static pthread_t* producer_threads;
static pthread_t* consumer_threads;
#define BLOCKING_QUEUE_CAPACITY 2
static void heapsort(int a[], int n) {
int i = n / 2, parent, child, t;
while (1) {
if (i > 0) {
i--;
t = a[i];
} else {
n--;
if (n <= 0) return;
t = a[n];
a[n] = a[0];
}
parent = i;
child = i * 2 + 1;
while (child < n) {
if ((child + 1 < n) && (a[child + 1] > a[child]))
child++;
if (a[child] > t) {
a[parent] = a[child];
parent = child;
child = parent * 2 + 1;
} else {
break;
}
}
a[parent] = t;
}
}
void* producer(void* args) {
int producer_id = *(int*)args;
unsigned int num_data_to_consume = data_size / num_producer_threads;
unsigned int start_at = producer_id * num_data_to_consume;
for (unsigned int i = start_at; i < start_at + num_data_to_consume; ++i) {
assert(!blocking_queue_put(&bq, &produced[i]));
}
return 0;
}
void* consumer(void* args) {
int consumer_id = *(int*)args;
unsigned int num_data_to_consume = data_size / num_consumer_threads;
unsigned int start_at = consumer_id * num_data_to_consume;
for (unsigned int i = start_at; i < start_at + num_data_to_consume; ++i) {
void* got;
blocking_queue_take(&bq, &got);
assert(got != NULL);
consumed[i] = *(int*)got;
}
return 0;
}
int main(int argc, char** argv) {
if (argc != 4) {
printf("usage: %s <num_producer_threads> <num_consumer_threads> <data_size>\n", argv[0]);
return -1;
}
num_producer_threads = atoi(argv[1]);
num_consumer_threads = atoi(argv[2]);
data_size = atoi(argv[3]);
assert(data_size % num_producer_threads == 0);
assert(data_size % num_consumer_threads == 0);
produced = malloc(data_size * sizeof(int));
consumed = malloc(data_size * sizeof(int));
producer_threads_ids = malloc(num_producer_threads * sizeof(int));
consumer_threads_ids = malloc(num_consumer_threads * sizeof(int));
producer_threads = malloc(num_producer_threads * sizeof(pthread_t));
consumer_threads = malloc(num_consumer_threads * sizeof(pthread_t));
blocking_queue_init(&bq, BLOCKING_QUEUE_CAPACITY);
for (unsigned int i = 0; i < data_size; ++i) {
produced[i] = i;
}
for (unsigned int i = 0; i < num_producer_threads; ++i) {
producer_threads_ids[i] = i;
if (pthread_create(&producer_threads[i], NULL, producer, &producer_threads_ids[i])) {
fprintf(stderr, "error creating thread: %s\n", strerror(errno));
return -1;
}
}
for (unsigned int i = 0; i < num_consumer_threads; ++i) {
consumer_threads_ids[i] = i;
if (pthread_create(&consumer_threads[i], NULL, consumer, &consumer_threads_ids[i])) {
fprintf(stderr, "error creating thread: %s\n", strerror(errno));
return -1;
}
}
for (unsigned int i = 0; i < num_producer_threads; ++i) {
pthread_join(producer_threads[i], NULL);
}
for (unsigned int i = 0; i < num_consumer_threads; ++i) {
pthread_join(consumer_threads[i], NULL);
}
heapsort(consumed, data_size);
for (unsigned int i = 0; i < data_size; ++i) {
assert(produced[i] == consumed[i]);
}
blocking_queue_destroy(&bq);
free(produced);
free(consumed);
free(producer_threads_ids);
free(consumer_threads_ids);
free(producer_threads);
free(consumer_threads);
printf("Test completed succesfully. [%u, %u, %u]\n", num_producer_threads, num_consumer_threads, data_size);
return 0;
}