-
Notifications
You must be signed in to change notification settings - Fork 0
/
processThreads.cpp
184 lines (154 loc) · 6.01 KB
/
processThreads.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/**
* @Author: Izhar Shaikh
* @Date: 2017-02-13T15:55:33-05:00
* @Email: izharits@gmail.com
* @Filename: transfProg.c
* @Last modified by: izhar
* @Last modified time: 2017-02-18T17:29:56-05:00
*/
#include <iostream>
#include <memory>
#include <pthread.h>
#include <stdbool.h>
#include <unistd.h>
#include <assert.h>
#include "debugMacros.hpp"
#include "transfProg.hpp"
using namespace std;
// Thread function (EFT requests processor)
static void *EFTWorker(void *data)
{
threadData_t *workerData = (threadData_t *) data;
EFTRequest_t *requestToProcess = NULL;
while((requestToProcess = workerData->EFTRequests.popRequest()) != NULL)
{
/*dbg_trace("[Thread-ID: " << workerData->threadID << "]: "\
<< "Queue-ID: " << workerData->EFTRequests->getWorkerID() << " , "\
<< "Queue-size: " << workerData->EFTRequests->size() << " , "\
<< "Account Pool: " << workerData->accountPool->size());*/
int64_t fromBalance = 0, toBalance = 0;
int64_t fromAccount = requestToProcess->fromAccount;
int64_t toAccount = requestToProcess->toAccount;
int64_t transferAmount = requestToProcess->transferAmount;
/*dbg_trace("[requestToProcess]: "\
<< "From: " << fromAccount << " , "\
<< "To: " << toAccount << " , "\
<< "Transfer: " << transferAmount);*/
// See if it is the last job
if(fromAccount == -1 || toAccount == -1){
delete requestToProcess;
requestToProcess = NULL;
break;
}
// -- Process the request with "restricted order" of accounts to avoid deadlocks
// ========== ENTER Critical Section ==========
if(fromAccount < toAccount)
{ // 1. From, 2. To
workerData->accountPool->at(fromAccount).lock();
workerData->accountPool->at(toAccount).lock();
}
else
{ // 1. To, 2. From
workerData->accountPool->at(toAccount).lock();
workerData->accountPool->at(fromAccount).lock();
}
// -- Get the balance
fromBalance = workerData->accountPool->at(fromAccount).getBalance();
toBalance = workerData->accountPool->at(toAccount).getBalance();
/*dbg_trace("[beforeProcess]: "\
<< "From: " << fromBalance << " , "\
<< "To: " << toBalance);*/
// -- Update the account with new balance
workerData->accountPool->at(fromAccount).setBalance(fromBalance - transferAmount);
workerData->accountPool->at(toAccount).setBalance(toBalance + transferAmount);
/*dbg_trace("[AfterProcess]: "\
<< "From: " << workerData->accountPool->at(fromAccount).getBalance() << " , "\
<< "To: " << workerData->accountPool->at(toAccount).getBalance());*/
if(fromAccount < toAccount)
{ // 1. To, 2. From
workerData->accountPool->at(toAccount).unlock();
workerData->accountPool->at(fromAccount).unlock();
}
else
{ // 1. From, 2. To
workerData->accountPool->at(fromAccount).unlock();
workerData->accountPool->at(toAccount).unlock();
}
// ========= EXIT Critical Section =========
// Cleanup
delete requestToProcess;
requestToProcess = NULL;
}
dbg_trace("THREAD: " << workerData->threadID << " EXIT!");
pthread_exit(NULL);
}
// Function to create thread data and spawn threads
int64_t spawnThreads(pthread_t *threads, threadData_t *threadDataPool, \
bankAccountPool_t *accountPool, int64_t NumberOfThreads)
{
threadData_t *threadPool = threadDataPool;
pthread_t *threadID = threads;
bool spawnThreadsStatus = FAIL;
int64_t thread = 0;
for(thread = 0; thread < NumberOfThreads; thread++)
{
threadPool[thread].threadID = thread;
threadPool[thread].EFTRequests.setWorkerID(thread);
threadPool[thread].accountPool = accountPool;
// Spwan it
int64_t status = pthread_create(&threadID[thread], NULL, &EFTWorker, (void*) &threadPool[thread]);
if(status != 0){
print_output("Failed to create thread: " << thread);
exit(1);
}
}
if(thread == NumberOfThreads){
spawnThreadsStatus = SUCCESS;
}
return spawnThreadsStatus;
}
// Ask threads to terminate
void askThreadsToExit(threadData_t *threadData, bankAccountPool_t &accountPool,\
int64_t NumberOfThreads, int64_t lastAssignedID)
{
int64_t fromAccount = -1, toAccount = -1, transferAmount = 0;
int64_t assignID = lastAssignedID;
int64_t requestCount = 0;
// the last job
fromAccount = -1;
toAccount = -1;
transferAmount = 0;
// Sanity checks
if(lastAssignedID == -1 || NumberOfThreads < 0){
return;
}
// This loop is added to give each worker a last job which will have
// both the from and to account numbers as -1 and the transferAmount 0
// The logic works irrespective of the number of threads and requests,
// as well as who was the last worker that got assigned the job
do {
// Calculate worker ID to be assigned
// Since we will be assigning the jobs in round robin fashion,
// we will mod the result with NumberOfThreads
assignID = (assignID + 1) % NumberOfThreads;
++requestCount;
assert(threadData[assignID].threadID == assignID); // Sanity checks
assert(threadData[assignID].threadID \
== threadData[assignID].EFTRequests.getWorkerID());
// Create new EFT request
EFTRequest_t* newRequest = new EFTRequest_t();
newRequest->workerID = assignID;
newRequest->fromAccount = fromAccount;
newRequest->toAccount = toAccount;
newRequest->transferAmount = transferAmount;
// Start writing;
// NOTE:: this is data-race safe since the workerQueue class implements
// race safe mechanism to write and read from worker queue using semaphores
threadData[assignID].EFTRequests.pushRequest(newRequest);
/*dbg_trace("[Thread ID: " << threadData[assignID].threadID << ","\
<< "Job Assigned ID: " << assignID << ","\
<< "Queue ID: " << threadData[assignID].EFTRequests->getWorkerID() << ","\
<< "Queue Size: " << threadData[assignID].EFTRequests->size() << "]");*/
} while(assignID != lastAssignedID);
// dbg_trace("Total Last Jobs: " << requestCount);
}