-
Notifications
You must be signed in to change notification settings - Fork 0
/
transfProg.cpp
228 lines (199 loc) · 6.89 KB
/
transfProg.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
* @Author: Izhar Shaikh
* @Date: 2017-02-13T15:55:33-05:00
* @Email: izharits@gmail.com
* @Filename: transfProg.cpp
* @Last modified by: izhar
* @Last modified time: 2017-02-18T17:29:22-05:00
*/
#include <iostream>
#include <fstream>
#include <sstream>
#include <cstring>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>
#include <unistd.h>
#include <assert.h>
#include "debugMacros.hpp"
#include "transfProg.hpp"
using namespace std;
// Global
// To save the order in which accounts are listed
std::vector<int64_t> accountList;
/* Parse the input file into bank account pool and EFT requests pool */
static int64_t assignWorkers(const char *fileName, threadData_t *threadData, \
bankAccountPool_t &accountPool, int64_t NumberOfThreads, int64_t &requestCount)
{
// Input file stream & buffer
std::ifstream fileStream;
std::stringstream stringParser;
char line[LINE_BUFFER] = { 0 };
int64_t accountNumber = -1, initBalance = 0;
int64_t fromAccount = -1, toAccount = -1, transferAmount = 0;
std::string transferString;
bool initDone = false;
int64_t assignID = -1;
// Open the fileStream
fileStream.open(fileName, std::ifstream::in);
if(!fileStream.is_open()){
dbg_trace("Failed to open the file: " << fileName);
return FAIL;
}
// Read from the file
while(fileStream.good())
{
fileStream.getline(line, LINE_BUFFER); // read a line
dbg_trace("String: " << line);
// Check if the transfer requests are coming
if (isalpha(line[0]) && line[0]=='T' ){
initDone = true;
}
stringParser.str(line); // convert c-like string to stringParser
// If we're not done reading accounts yet, keep reading and add to accountPool
if(!initDone)
{
stringParser >> accountNumber >> initBalance;
dbg_trace("Account Number: " \
<< accountNumber << " , " << "Init Balance: " << initBalance);
if(accountNumber == -1){
goto CLEAR;
}
// Keep the order of the accounts
accountList.push_back(accountNumber);
// Adding the object to the map here
accountPool.emplace(std::make_pair(accountNumber, \
bankAccount(accountNumber, initBalance)));
dbg_trace("POOL: \
Account Number: " << accountPool[accountNumber].getAccountNumber() \
<< " , " << \
"Init Balance: " << accountPool[accountNumber].getBalance());
}
else
{
// Once we are done reading accounts; read EFT requests
stringParser >> transferString >> fromAccount >> toAccount >> transferAmount;
dbg_trace("From: " << fromAccount << \
" To: " << toAccount << " Amount: " << transferAmount);
// Stop reading once we've reached invalid accounts i.e. EOF
if(fromAccount == -1 || toAccount == -1){
goto CLEAR;
}
// Assign the job to next worker
assignID = (assignID + 1) % NumberOfThreads;
++requestCount;
assert(threadData[assignID].threadID == assignID); // Sanity checks
assert(threadData[assignID].threadID \
== threadData[assignID].EFTRequests.getWorkerID());
// Create new EFT request
EFTRequest_t* newRequest = new EFTRequest_t();
newRequest->workerID = assignID;
newRequest->fromAccount = fromAccount;
newRequest->toAccount = toAccount;
newRequest->transferAmount = transferAmount;
// Start writing;
// NOTE:: this is data-race safe since the workerQueue class implements
// safe IPC using mutex and condition varibales
threadData[assignID].EFTRequests.pushRequest(newRequest);
/*dbg_trace("[Thread ID: " << threadData[assignID].threadID << ","\
<< "Job Assigned ID: " << assignID << ","\
<< "Queue ID: " << threadData[assignID].EFTRequests->getWorkerID() << ","\
<< "Queue Size: " << threadData[assignID].EFTRequests->size() << "]");*/
}
CLEAR:
// Clear the buffer here, before reading the next line
memset(line, '\0', LINE_BUFFER);
stringParser.str(""); // Clear the stringstream
stringParser.clear(); // needed to clear the stringstream
accountNumber = fromAccount = toAccount = -1;
initBalance = transferAmount = 0;
}
// Check why we got out
if(fileStream.eof())
{
dbg_trace("Reached End-of-File!");
dbg_trace("Total Transfer Requests: " << requestCount);
// Ask all threads to terminate
askThreadsToExit(threadData, accountPool, NumberOfThreads, assignID);
}
else {
dbg_trace("Error while reading!");
//fileStream.close();
//return FAIL;
}
// Close the fileStream
fileStream.close();
return SUCCESS;
}
#if 0
/* display account pool */
static void displayAccountPool(bankAccountPool_t &accountPool)
{
bankAccountIterator_t i;
for(i = accountPool.begin(); i != accountPool.end(); ++i)
{
dbg_trace("POOL:\
Account Number: " << i->second.getAccountNumber() \
<< " , " << \
"Balance: " << i->second.getBalance());
}
}
#endif
/* Print the account and their balances to stdout */
static void printAccounts(bankAccountPool_t &accountPool)
{
std::vector<int64_t>::iterator i;
for(i = accountList.begin(); i != accountList.end(); ++i)
{
print_output(*i << " " << accountPool[*i].getBalance());
}
}
// ------------------------ main() ------------------------------
int main(int argc, char const *argv[])
{
// Check and parse the command line argument
if(argc != 3){
print_output("USAGE:");
print_output("\t./transfProg <PathToInputFile> <NumberOfThreads>");
return 0;
}
// Check the validity of the input file,
int64_t fileStatus = access(argv[1], F_OK | R_OK);
if(fileStatus != 0){
print_output("Failed to access the input file or file doesn't exist!");
print_output("Please check the path to the input file is correct.");
return 0;
}
// Check the validity of the worker threads
int64_t workerThreads = atoi((const char *) argv[2]);
if(workerThreads < 1 || workerThreads > MAX_WORKERS){
print_output("Invalid number of workers: " << workerThreads \
<< "\nEnter buffer size between 1 to " << MAX_WORKERS);
return 0;
}
// If everything is fine, init threads
bankAccountPool_t accountPool; // Pool of bank accounts
threadData_t threadData[workerThreads]; // thread data array
pthread_t threads[workerThreads]; // pthreads ID array
int64_t EFTRequestsCount = 0;
bool status = spawnThreads(threads, threadData, &accountPool, workerThreads);
if(status == FAIL){
dbg_trace("Failed to create threads!");
return 0;
}
// And parse the file
int64_t parseStatus = assignWorkers(argv[1], threadData, accountPool, \
workerThreads, EFTRequestsCount);
if(parseStatus == FAIL)
{
print_output("ERROR: Failed during parsing!");
return 0;
}
// wait for threads to finish
for(int64_t i=0; i<workerThreads; i++){
pthread_join(threads[i], NULL);
}
// Display the Accounts and their Balances after transfer
printAccounts(accountPool);
return 0;
}