Skip to content

Commit

Permalink
Adds threading to parsimony
Browse files Browse the repository at this point in the history
  • Loading branch information
mothur-westcott committed Jan 30, 2018
1 parent 1e8aa08 commit b111b6e
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 523 deletions.
8 changes: 4 additions & 4 deletions Mothur.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -3671,7 +3671,7 @@
GCC_C_LANGUAGE_STANDARD = "compiler-default";
GCC_DYNAMIC_NO_PIC = NO;
GCC_MODEL_TUNING = G5;
GCC_OPTIMIZATION_LEVEL = 3;
GCC_OPTIMIZATION_LEVEL = 0;
GCC_PREPROCESSOR_DEFINITIONS = (
"VERSION=\"\\\"1.39.5\\\"\"",
"RELEASE_DATE=\"\\\"9/20/2017\\\"\"",
Expand Down Expand Up @@ -3705,7 +3705,7 @@
DSTROOT = TARGET_BUILD_DIR;
GCC_C_LANGUAGE_STANDARD = "compiler-default";
GCC_MODEL_TUNING = G5;
GCC_OPTIMIZATION_LEVEL = 3;
GCC_OPTIMIZATION_LEVEL = 0;
GCC_PREPROCESSOR_DEFINITIONS = (
"VERSION=\"\\\"1.39.5\\\"\"",
"RELEASE_DATE=\"\\\"9/20/2017\\\"\"",
Expand Down Expand Up @@ -3749,7 +3749,7 @@
GCC_ENABLE_SSE41_EXTENSIONS = NO;
GCC_ENABLE_SSE42_EXTENSIONS = NO;
GCC_NO_COMMON_BLOCKS = YES;
GCC_OPTIMIZATION_LEVEL = 3;
GCC_OPTIMIZATION_LEVEL = 0;
GCC_PREPROCESSOR_DEFINITIONS = (
"MOTHUR_FILES=\"\\\"/Users/sarahwestcott/desktop/release\\\"\"",
"VERSION=\"\\\"1.39.5\\\"\"",
Expand Down Expand Up @@ -3817,7 +3817,7 @@
GCC_GENERATE_DEBUGGING_SYMBOLS = NO;
GCC_MODEL_TUNING = "";
GCC_NO_COMMON_BLOCKS = YES;
GCC_OPTIMIZATION_LEVEL = 3;
GCC_OPTIMIZATION_LEVEL = 0;
GCC_PREPROCESSOR_DEFINITIONS = (
"VERSION=\"\\\"1.39.5\\\"\"",
"RELEASE_DATE=\"\\\"9/20/2017\\\"\"",
Expand Down
316 changes: 85 additions & 231 deletions source/calculators/parsimony.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,7 @@ EstOutput Parsimony::getValues(Tree* t, int p, string o) {
}
}

lines.clear();
int remainingPairs = namesOfGroupCombos.size();
int startIndex = 0;
for (int remainingProcessors = processors; remainingProcessors > 0; remainingProcessors--) {
int numPairs = remainingPairs; //case for last processor
if (remainingProcessors != 1) { numPairs = ceil(remainingPairs / remainingProcessors); }
lines.push_back(linePair(startIndex, numPairs)); //startIndex, numPairs
startIndex = startIndex + numPairs;
remainingPairs = remainingPairs - numPairs;
}

data = createProcesses(t, namesOfGroupCombos, ct);

return data;
return (createProcesses(t, namesOfGroupCombos, ct));

}
catch(exception& e) {
Expand All @@ -77,180 +64,107 @@ EstOutput Parsimony::getValues(Tree* t, int p, string o) {
}
}
/**************************************************************************************************/

EstOutput Parsimony::createProcesses(Tree* t, vector< vector<string> > namesOfGroupCombos, CountTable* ct) {
try {
int process = 1;
vector<int> processIDS;
bool recalc = false;

EstOutput results;

#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)

//loop through and create all the processes you want
while (process != processors) {
pid_t pid = fork();

if (pid > 0) {
processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
process++;
}else if (pid == 0){
EstOutput myresults;
//myresults = driver(t, namesOfGroupCombos, lines[process].start, lines[process].num, ct);

if (m->getControl_pressed()) { exit(0); }

//pass numSeqs to parent
ofstream out;
string tempFile = outputDir + toString(process) + ".pars.results.temp";
util.openOutputFile(tempFile, out);
out << myresults.size() << endl;
for (int i = 0; i < myresults.size(); i++) { out << myresults[i] << '\t'; } out << endl;
out.close();

exit(0);
}else {
m->mothurOut("[ERROR]: unable to spawn the number of processes you requested, reducing number to " + toString(process) + "\n"); processors = process;
for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
//wait to die
for (int i=0;i<processIDS.size();i++) {
int temp = processIDS[i];
wait(&temp);
}
m->setControl_pressed(false);

for (int i=0;i<processIDS.size();i++) {
util.mothurRemove(outputDir + (toString(processIDS[i]) + ".pars.results.temp"));
}
recalc = true;
break;
}
}
void driverPars(parsData* params) {
try {

Tree copyTree(params->ct, params->Treenames);
int count = 0;

if (recalc) {
//test line, also set recalc to true.
//for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); } for (int i=0;i<processIDS.size();i++) { int temp = processIDS[i]; wait(&temp); } m->setControl_pressed(false);
for (int i=0;i<processIDS.size();i++) {util.mothurRemove(outputDir + (toString(processIDS[i]) + ".pars.results.temp"));}processors=3; m->mothurOut("[ERROR]: unable to spawn the number of processes you requested, reducing number to " + toString(processors) + "\n");
for (int h = params->start; h < (params->start+params->num); h++) {

lines.clear();
int remainingPairs = namesOfGroupCombos.size();
int startIndex = 0;
for (int remainingProcessors = processors; remainingProcessors > 0; remainingProcessors--) {
int numPairs = remainingPairs; //case for last processor
if (remainingProcessors != 1) { numPairs = ceil(remainingPairs / remainingProcessors); }
lines.push_back(linePair(startIndex, numPairs)); //startIndex, numPairs
startIndex = startIndex + numPairs;
remainingPairs = remainingPairs - numPairs;
}
if (params->m->getControl_pressed()) { break; }

int score = 0;

//groups in this combo
vector<string> groups = params->namesOfGroupCombos[h];

processIDS.resize(0);
process = 1;
//copy users tree so that you can redo pgroups
copyTree.getCopy(params->t);

//loop through and create all the processes you want
while (process != processors) {
pid_t pid = fork();
//create pgroups that reflect the groups the user want to use
for(int i=copyTree.getNumLeaves();i<copyTree.getNumNodes();i++){
copyTree.tree[i].pGroups = (copyTree.mergeUserGroups(i, groups));
}

for(int i=copyTree.getNumLeaves();i<copyTree.getNumNodes();i++){

if (params->m->getControl_pressed()) { break; }

int lc = copyTree.tree[i].getLChild();
int rc = copyTree.tree[i].getRChild();

int iSize = copyTree.tree[i].pGroups.size();
int rcSize = copyTree.tree[rc].pGroups.size();
int lcSize = copyTree.tree[lc].pGroups.size();

if (pid > 0) {
processIDS.push_back(pid); //create map from line number to pid so you can append files in correct order later
process++;
}else if (pid == 0){
EstOutput myresults;
//myresults = driver(t, namesOfGroupCombos, lines[process].start, lines[process].num, ct);

if (m->getControl_pressed()) { exit(0); }

//pass numSeqs to parent
ofstream out;
string tempFile = outputDir + toString(process) + ".pars.results.temp";
util.openOutputFile(tempFile, out);
out << myresults.size() << endl;
for (int i = 0; i < myresults.size(); i++) { out << myresults[i] << '\t'; } out << endl;
out.close();

exit(0);
}else {
m->mothurOut("[ERROR]: unable to spawn the necessary processes."); m->mothurOutEndLine();
for (int i = 0; i < processIDS.size(); i++) { kill (processIDS[i], SIGINT); }
exit(0);
//if isize are 0 then that branch is to be ignored
if (iSize == 0) { }
else if ((rcSize == 0) || (lcSize == 0)) { }
//if you have more groups than either of your kids then theres been a change.
else if(iSize > rcSize || iSize > lcSize){
score++;
}
}
}

params->results[count] = score;
count++;
}
}
catch(exception& e) {
params->m->errorOut(e, "Parsimony", "driver");
exit(1);
}
}
/**************************************************************************************************/

EstOutput Parsimony::createProcesses(Tree* t, vector< vector<string> > namesOfGroupCombos, CountTable* ct) {
try {
vector<linePair> lines;
int remainingPairs = namesOfGroupCombos.size();
if (remainingPairs < processors) { processors = remainingPairs; }
int startIndex = 0;
for (int remainingProcessors = processors; remainingProcessors > 0; remainingProcessors--) {
int numPairs = remainingPairs; //case for last processor
if (remainingProcessors != 1) { numPairs = ceil(remainingPairs / remainingProcessors); }
lines.push_back(linePair(startIndex, numPairs)); //startIndex, numPairs
startIndex = startIndex + numPairs;
remainingPairs = remainingPairs - numPairs;
}

//create array of worker threads
vector<thread*> workerThreads;
vector<parsData*> data;

//results = driver(t, namesOfGroupCombos, lines[0].start, lines[0].num, ct);

//force parent to wait until all the processes are done
for (int i=0;i<processIDS.size();i++) {
int temp = processIDS[i];
wait(&temp);
}

if (m->getControl_pressed()) { return results; }

//get data created by processes
for (int i=0;i<processIDS.size();i++) {
ifstream in;
string s = outputDir + toString(processIDS[i]) + ".pars.results.temp";
util.openInputFile(s, in);

//get scores
if (!in.eof()) {
int num;
in >> num; util.gobble(in);

if (m->getControl_pressed()) { break; }

double w;
for (int j = 0; j < num; j++) {
in >> w;
results.push_back(w);
}
util.gobble(in);
}
in.close();
util.mothurRemove(s);
}
#else
//fill in functions
vector<parsData*> pDataArray;
DWORD dwThreadIdArray[processors-1];
HANDLE hThreadArray[processors-1];
vector<CountTable*> cts;
vector<Tree*> trees;

//Create processor worker threads.
for( int i=1; i<processors; i++ ){
//Lauch worker threads
for (int i = 0; i < processors-1; i++) {
CountTable* copyCount = new CountTable();
copyCount->copy(ct);
Tree* copyTree = new Tree(copyCount, Treenames);
copyTree->getCopy(t);

cts.push_back(copyCount);
trees.push_back(copyTree);
parsData* dataBundle = new parsData(lines[i+1].start, lines[i+1].end, namesOfGroupCombos, copyTree, copyCount);
data.push_back(dataBundle);

parsData* temppars = new parsData(m, lines[i].start, lines[i].end, namesOfGroupCombos, copyTree, copyCount);
pDataArray.push_back(temppars);
processIDS.push_back(i);
workerThreads.push_back(new thread(driverPars, dataBundle));
}

parsData* dataBundle = new parsData(lines[0].start, lines[0].end, namesOfGroupCombos, t, ct);
driverPars(dataBundle);
EstOutput results = dataBundle->results;
delete dataBundle;

for (int i = 0; i < processors-1; i++) {
workerThreads[i]->join();

hThreadArray[i-1] = CreateThread(NULL, 0, MyParsimonyThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
}

results = driver(t, namesOfGroupCombos, lines[0].start, lines[0].end, ct);

//Wait until all threads have terminated.
WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);

//Close all thread handles and free memory allocations.
for(int i=0; i < pDataArray.size(); i++){
for (int j = 0; j < pDataArray[i]->results.size(); j++) { results.push_back(pDataArray[i]->results[j]); }
delete cts[i];
delete trees[i];
CloseHandle(hThreadArray[i]);
delete pDataArray[i];
}
for (int j = 0; j < data[i]->results.size(); j++) { results.push_back(data[i]->results[j]); }

delete data[i]->t;
delete data[i]->ct;
delete data[i];
delete workerThreads[i];
}

#endif
return results;
}
catch(exception& e) {
Expand All @@ -259,64 +173,4 @@ EstOutput Parsimony::createProcesses(Tree* t, vector< vector<string> > namesOfGr
}
}
/**************************************************************************************************/
EstOutput Parsimony::driver(Tree* t, vector< vector<string> > namesOfGroupCombos, int start, int num, CountTable* ct) {
try {

EstOutput results; results.resize(num);

Tree* copyTree = new Tree(ct, Treenames);
int count = 0;

for (int h = start; h < (start+num); h++) {

if (m->getControl_pressed()) { delete copyTree; return results; }

int score = 0;

//groups in this combo
vector<string> groups = namesOfGroupCombos[h];

//copy users tree so that you can redo pgroups
copyTree->getCopy(t);

//create pgroups that reflect the groups the user want to use
for(int i=copyTree->getNumLeaves();i<copyTree->getNumNodes();i++){
copyTree->tree[i].pGroups = (copyTree->mergeUserGroups(i, groups));
}

for(int i=copyTree->getNumLeaves();i<copyTree->getNumNodes();i++){

if (m->getControl_pressed()) { return data; }

int lc = copyTree->tree[i].getLChild();
int rc = copyTree->tree[i].getRChild();

int iSize = copyTree->tree[i].pGroups.size();
int rcSize = copyTree->tree[rc].pGroups.size();
int lcSize = copyTree->tree[lc].pGroups.size();

//if isize are 0 then that branch is to be ignored
if (iSize == 0) { }
else if ((rcSize == 0) || (lcSize == 0)) { }
//if you have more groups than either of your kids then theres been a change.
else if(iSize > rcSize || iSize > lcSize){
score++;
}
}

results[count] = score;
count++;
}

delete copyTree;

return results;
}
catch(exception& e) {
m->errorOut(e, "Parsimony", "driver");
exit(1);
}
}

/**************************************************************************************************/

Loading

0 comments on commit b111b6e

Please sign in to comment.