Permalink
Browse files

Working on vsearch windows parallelization

  • Loading branch information...
1 parent 57f2df0 commit b358d11ba5799d709136dc407d754aa72e603aa4 @mothur-westcott mothur-westcott committed Jan 9, 2017
@@ -557,9 +557,6 @@ ChimeraUchimeCommand::ChimeraUchimeCommand(string option) {
//look for uchime exe
path = m->mothurProgramPath;
- //string tempPath = path;
- //for (int i = 0; i < path.length(); i++) { tempPath[i] = tolower(path[i]); }
- //path = path.substr(0, (tempPath.find_last_of('m')));
string uchimeCommand;
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
@@ -1661,7 +1658,8 @@ int ChimeraUchimeCommand::createProcesses(string outputFileName, string filename
//divide file
int count = 0;
int spot = 0;
-
+ files.resize(processors, "");
+
for (int i = 0; i < processors; i++) {
ofstream temp;
files[i] = filename+toString(i)+".temp";
@@ -1803,7 +1801,6 @@ int ChimeraUchimeCommand::createProcessesGroups(string outputFName, string filen
}
}
- m->mothurOut(toString( getpid() ) + " here\n");
//do my part
num = driverGroups(outputFName, filename, accnos, alns, accnos + ".byCount", lines[0].start, lines[0].end, groups);
@@ -510,11 +510,7 @@ ChimeraVsearchCommand::ChimeraVsearchCommand(string option) {
if (hasGroup && (templatefile != "self")) { m->mothurOut("You have provided a group file and the reference parameter is not set to self. I am not sure what reference you are trying to use, aborting."); m->mothurOutEndLine(); abort=true; }
//look for uchime exe
- path = m->mothurProgramPath;
- //string tempPath = path;
- //for (int i = 0; i < path.length(); i++) { tempPath[i] = tolower(path[i]); }
- //path = path.substr(0, (tempPath.find_last_of('m')));
-
+ path = m->mothurProgramPath;
string vsearchCommand;
#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
vsearchCommand = path + "vsearch"; // format the database, -o option gives us the ability
@@ -1322,8 +1318,8 @@ int ChimeraVsearchCommand::driver(string outputFName, string filename, string ac
char* tempmindiv = new char[9];
*tempmindiv = '\0'; strncat(tempmindiv, "--mindiffs", 10);
cPara.push_back(tempmindiv);
- char* tempMindiv = new char[mindiv.length()+1];
- *tempMindiv = '\0'; strncat(tempMindiv, mindiv.c_str(), mindiv.length());
+ char* tempMindiv = new char[mindiffs.length()+1];
+ *tempMindiv = '\0'; strncat(tempMindiv, mindiffs.c_str(), mindiffs.length());
cPara.push_back(tempMindiv);
}
@@ -1359,6 +1355,11 @@ int ChimeraVsearchCommand::driver(string outputFName, string filename, string ac
for (int i = 0; i < cPara.size(); i++) { vsearchParameters[i] = cPara[i]; commandString += toString(cPara[i]) + " "; }
//int numArgs = cPara.size();
+#if defined (__APPLE__) || (__MACH__) || (linux) || (__linux) || (__linux__) || (__unix__) || (__unix)
+#else
+ commandString = "\"" + commandString + "\"";
+#endif
+
if (m->debug) { m->mothurOut("[DEBUG]: vsearch command = " + commandString + ".\n"); }
//cout << "commandString = " << commandString << endl;
@@ -1503,6 +1504,72 @@ int ChimeraVsearchCommand::createProcesses(string outputFileName, string filenam
}
in.close(); m->mothurRemove(tempFile);
}
+#else
+ //divide file
+ int count = 0;
+ int spot = 0;
+ files.resize(processors, "");
+
+ for (int i = 0; i < processors; i++) {
+ ofstream temp;
+ files[i] = filename+toString(i)+".temp";
+ m->openOutputFile(filename+toString(i)+".temp", temp);
+ }
+
+ ifstream in;
+ m->openInputFile(filename, in);
+
+ while(!in.eof()) {
+ Sequence tempSeq(in); m->gobble(in);
+
+ if (tempSeq.getName() != "") {
+ ofstream temp;
+ m->openOutputFileAppend(files[spot], temp);
+ tempSeq.printSequence(temp); temp.close();
+ spot++; count++;
+ if (spot == processors) { spot = 0; }
+ }
+ }
+ in.close();
+
+ //sanity check for number of processors
+ if (count < processors) { processors = count; }
+
+ vector<vsearchData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+ vector<string> dummy; //used so that we can use the same struct for MyVsearchSeqsThreadFunction and MyVsearchThreadFunction
+
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+ // Allocate memory for thread data.
+ string extension = toString(i) + ".temp";
+
+ vsearchData* tempVsearch = new vsearchData(outputFileName+extension, vsearchLocation, templatefile, files[i], "", "", "", accnos+extension, alns+extension, "", dummy, m, 0, 0, i);
+ tempVsearch->setBooleans(dups, useAbskew, chimealns, useMinH, useMindiv, useXn, useDn, useMinDiffs, hasCount);
+ tempVsearch->setVariables(abskew, minh, mindiv, xn, dn, mindiffs);
+
+ pDataArray.push_back(tempVsearch);
+ processIDS.push_back(i);
+
+ hThreadArray[i-1] = CreateThread(NULL, 0, MyVsearchSeqsThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+ }
+
+
+ //using the main process as a worker saves time and memory
+ num = driver(outputFileName, files[0], accnos, alns, numChimeras);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ numChimeras += pDataArray[i]->numChimeras;
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
+
#endif
//append output files
for(int i=0;i<processIDS.size();i++){
@@ -1600,7 +1667,41 @@ int ChimeraVsearchCommand::createProcessesGroups(string outputFName, string file
if (!in.eof()) { int tempNum = 0; in >> tempNum; num += tempNum; }
in.close(); m->mothurRemove(tempFile);
}
+#else
+ vector<vsearchData*> pDataArray;
+ DWORD dwThreadIdArray[processors-1];
+ HANDLE hThreadArray[processors-1];
+ //Create processor worker threads.
+ for( int i=1; i<processors; i++ ){
+ // Allocate memory for thread data.
+ string extension = toString(i) + ".temp";
+
+ vsearchData* tempVsearch = new vsearchData(outputFName+extension, vsearchLocation, templatefile, filename+extension, fastaFile, nameFile, groupFile, accnos+extension, alns+extension, accnos+".byCount."+extension, groups, m, lines[i].start, lines[i].end, i);
+ tempVsearch->setBooleans(dups, useAbskew, chimealns, useMinH, useMindiv, useXn, useDn, useMinDiffs, hasCount);
+ tempVsearch->setVariables(abskew, minh, mindiv, xn, dn, mindiffs);
+
+ pDataArray.push_back(tempVsearch);
+ processIDS.push_back(i);
+
+ //MyUchimeThreadFunction is in header. It must be global or static to work with the threads.
+ //default security attributes, thread function name, argument to thread function, use default creation flags, returns the thread identifier
+ hThreadArray[i-1] = CreateThread(NULL, 0, MyVsearchThreadFunction, pDataArray[i-1], 0, &dwThreadIdArray[i-1]);
+ }
+
+
+ //using the main process as a worker saves time and memory
+ num = driverGroups(outputFName, filename, accnos, alns, accnos + ".byCount", lines[0].start, lines[0].end, groups);
+
+ //Wait until all threads have terminated.
+ WaitForMultipleObjects(processors-1, hThreadArray, TRUE, INFINITE);
+
+ //Close all thread handles and free memory allocations.
+ for(int i=0; i < pDataArray.size(); i++){
+ num += pDataArray[i]->count;
+ CloseHandle(hThreadArray[i]);
+ delete pDataArray[i];
+ }
#endif
//read my own
Oops, something went wrong.

0 comments on commit b358d11

Please sign in to comment.