Skip to content
Permalink
Browse files

reduce slicedSearch runtime: sort hits during target splits merge

  • Loading branch information...
elileka committed Aug 13, 2019
1 parent edb3541 commit 174ff6de9c8552d6906979457e36433d77958154
@@ -83,9 +83,9 @@ while [ "${FIRST_INDEX_LINE}" -le "${TOTAL_NUM_PROFILES}" ]; do
CURRENT_AVAIL_DISK_SPACE=$(($("$MMSEQS" diskspaceavail "${TMP_PATH}")/2))
# Compute the max number of profiles that can be processed
# based on the number of hits that saturate
NUM_PROFS_IN_STEP="$((CURRENT_AVAIL_DISK_SPACE/NUM_SEQS_THAT_SATURATE/90))"
NUM_PROFS_IN_STEP="$((CURRENT_AVAIL_DISK_SPACE/NUM_SEQS_THAT_SATURATE/25))"
else
NUM_PROFS_IN_STEP="$((AVAIL_DISK/NUM_SEQS_THAT_SATURATE/90))"
NUM_PROFS_IN_STEP="$((AVAIL_DISK/NUM_SEQS_THAT_SATURATE/25))"
fi

# no matter what, process at least one profile...
@@ -751,7 +751,7 @@ bool DBReader<T>::readIndex(char *data, size_t indexDataSize, Index *index, unsi
maxSeqLen=0;
while (currPos < indexDataSize){
if (i >= this->size) {
Debug(Debug::ERROR) << "Corrupt memory, too many entries!\n";
Debug(Debug::ERROR) << "Corrupt memory, too many entries: " << i << " >= " << this->size << "\n";
EXIT(EXIT_FAILURE);
}
Util::getWordsOfLine(indexDataChar, cols, 3);
@@ -493,7 +493,7 @@ void DBWriter::mergeResults(const std::string &outFileName, const std::string &o
}

template <>
void DBWriter::writeIndexEntryToFile(FILE *outFile, char *buff1, DBReader<unsigned int>::Index &index, unsigned int seqLen){
void DBWriter::writeIndexEntryToFile(FILE *outFile, char *buff1, DBReader<unsigned int>::Index &index, unsigned int seqLen){
char * tmpBuff = Itoa::u32toa_sse2((uint32_t)index.id,buff1);
*(tmpBuff-1) = '\t';
size_t currOffset = index.offset;
@@ -525,7 +525,7 @@ void DBWriter::writeIndexEntryToFile(FILE *outFile, char *buff1, DBReader<std::s
}

template <>
void DBWriter::writeIndex(FILE *outFile, size_t indexSize, DBReader<unsigned int>::Index *index, unsigned int *seqLen) {
void DBWriter::writeIndex(FILE *outFile, size_t indexSize, DBReader<unsigned int>::Index *index, unsigned int *seqLen) {
char buff1[1024];
for (size_t id = 0; id < indexSize; id++) {
writeIndexEntryToFile(outFile, buff1, index[id], seqLen[id]);
@@ -660,96 +660,6 @@ void DBWriter::sortIndex(const char *inFileNameIndex, const char *outFileNameInd
}
}


void DBWriter::mergeFilePair(const std::vector<std::pair<std::string, std::string>> &fileNames) {
FILE ** files = new FILE*[fileNames.size()];
for (size_t i = 0; i < fileNames.size();i++) {
files[i] = FileUtil::openFileOrDie(fileNames[i].first.c_str(), "r", true);
#if HAVE_POSIX_FADVISE
int status;
if ((status = posix_fadvise (fileno(files[i]), 0, 0, POSIX_FADV_SEQUENTIAL)) != 0){
Debug(Debug::ERROR) << "posix_fadvise returned an error: " << strerror(status) << "\n";
}
#endif
}

int c1 = EOF;
char * buffer = dataFilesBuffer[0];
size_t writePos = 0;
int dataFilefd = fileno(dataFiles[0]);
do {
for (size_t i = 0; i < fileNames.size(); ++i) {
while ((c1 = getc_unlocked(files[i])) != EOF) {
if (c1 == '\0') {
break;
}
buffer[writePos] = (char) c1;
writePos++;
if (writePos == bufferSize) {
size_t written = write(dataFilefd, buffer, bufferSize);
if (written != bufferSize) {
Debug(Debug::ERROR) << "Can not write to data file " << dataFileNames[0] << "\n";
EXIT(EXIT_FAILURE);
}
writePos = 0;
}
}
}
buffer[writePos] = '\0';
writePos++;
if (writePos == bufferSize) {
size_t written = write(dataFilefd, buffer, bufferSize);
if (written != bufferSize) {
Debug(Debug::ERROR) << "Can not write to data file " << dataFileNames[0] << "\n";
EXIT(EXIT_FAILURE);
}
writePos = 0;
}
} while (c1!=EOF);

if (writePos != 0) {
// if there is data in the buffer that is not yet written
size_t written = write(dataFilefd, (const void *) dataFilesBuffer[0], writePos);
if (written != writePos) {
Debug(Debug::ERROR) << "Can not write to data file " << dataFileNames[0] << "\n";
EXIT(EXIT_FAILURE);
}
}

for (size_t i = 0; i < fileNames.size(); ++i) {
fclose(files[i]);
}
delete[] files;

Debug(Debug::INFO) << "Will merge " << fileNames.size() << " files into " << fileNames[0].first << " and into " << fileNames[0].second << "\n";
DBReader<unsigned int> reader1(fileNames[0].first.c_str(), fileNames[0].second.c_str(), 1,
DBReader<unsigned int>::USE_INDEX);
reader1.open(DBReader<unsigned int>::NOSORT);
unsigned int *seqLen1 = reader1.getSeqLens();
DBReader<unsigned int>::Index *index1 = reader1.getIndex();

for (size_t i = 1; i < fileNames.size(); i++) {
Debug(Debug::INFO) << "Adding files " << fileNames[i].first << " and " << fileNames[i].second << " to the merge \n";
DBReader<unsigned int> reader2(fileNames[i].first.c_str(), fileNames[i].second.c_str(), 1,
DBReader<unsigned int>::USE_INDEX);
reader2.open(DBReader<unsigned int>::NOSORT);
unsigned int *seqLen2 = reader2.getSeqLens();
size_t currOffset = 0;

for (size_t id = 0; id < reader1.getSize(); id++) {
// add length for file1 and file2 and subtract -1 for one null byte
size_t seqLen = seqLen1[id] + seqLen2[id] - 1;
seqLen1[id] = seqLen;
index1[id].offset = currOffset;
currOffset += seqLen;
}
reader2.close();
}

writeIndex(indexFiles[0], reader1.getSize(), index1, seqLen1);
reader1.close();
}

void DBWriter::writeThreadBuffer(unsigned int idx, size_t dataSize) {
size_t written = fwrite(threadBuffer[idx], 1, dataSize, dataFiles[idx]);
if (written != dataSize) {
@@ -49,8 +49,6 @@ class DBWriter {
const std::vector<std::pair<std::string, std::string>> &files,
bool lexicographicOrder = false);

void mergeFilePair(const std::vector<std::pair<std::string, std::string>> &fileNames);

void writeIndexEntry(unsigned int key, size_t offset, size_t length, unsigned int thrIdx);

static void writeDbtypeFile(const char* path, int dbtype, bool isCompressed);
@@ -9,6 +9,8 @@
#include "Timer.h"
#include "ByteParser.h"
#include "Parameters.h"
#include <fcntl.h>


namespace prefilter {
#include "ExpOpt3_8_polished.cs32.lib.h"
@@ -387,64 +389,120 @@ void Prefiltering::setupSplit(DBReader<unsigned int>& tdbr, const int alphabetSi
}
}

void Prefiltering::mergeTargetSplits(const std::string &outDB, const std::string &outDBIndex,
const std::vector<std::pair<std::string, std::string>> &filenames) {
Timer timer;
if (filenames.size() < 2) {
DBReader<unsigned int>::moveDb(filenames[0].first, outDB);
void Prefiltering::mergeTargetSplits(const std::string &outDB, const std::string &outDBIndex, const std::vector<std::pair<std::string, std::string>> &fileNames) {
if (fileNames.size() < 2) {
DBReader<unsigned int>::moveDb(fileNames[0].first, outDB);
Debug(Debug::INFO) << "No merging needed.\n";
return;
}

const std::pair<std::string, std::string> tmpDb = Util::databaseNames((outDB + "_merged"));
DBWriter writer(tmpDb.first.c_str(), tmpDb.second.c_str(), 1, compressed, Parameters::DBTYPE_PREFILTER_RES);
writer.open(1024 * 1024 * 1024); // 1 GB buffer
writer.mergeFilePair(filenames);
writer.close();
for (size_t i = 0; i < filenames.size(); i++) {
// remove split
DBReader<unsigned int>::removeDb(filenames[i].first);
}
// sort merged entries by evalue
DBReader<unsigned int> dbr(tmpDb.first.c_str(), tmpDb.second.c_str(), threads, DBReader<unsigned int>::USE_INDEX|DBReader<unsigned int>::USE_DATA);
dbr.open(DBReader<unsigned int>::LINEAR_ACCCESS);
DBWriter dbw(outDB.c_str(), outDBIndex.c_str(), threads, compressed, Parameters::DBTYPE_PREFILTER_RES);
dbw.open(1024 * 1024 * 1024);
#pragma omp parallel
{
int thread_idx = 0;
#ifdef OPENMP
thread_idx = omp_get_thread_num();
#endif

std::string result;
result.reserve(BUFFER_SIZE);
char buffer[100];
// dynamic schedule causes mis-order of the input files during merge. Needs to be fixed there
#pragma omp for schedule(static)
for (size_t id = 0; id < dbr.getSize(); id++) {
unsigned int dbKey = dbr.getDbKey(id);
char *data = dbr.getData(id, thread_idx);
std::vector<hit_t> hits = QueryMatcher::parsePrefilterHits(data);
if (hits.size() > 1) {
std::sort(hits.begin(), hits.end(), hit_t::compareHitsByScoreAndId);
}
for(size_t hit_id = 0; hit_id < hits.size(); hit_id++){
int len = QueryMatcher::prefilterHitToBuffer(buffer, hits[hit_id]);
result.append(buffer, len);
// we assume that the hits are in the same order
Timer timer;
// merge the index files and find the largest entry
size_t largestEntrySize = 0;
Debug(Debug::INFO) << "Merging " << fileNames.size() << " target splits into " << outDB << "\n";
DBReader<unsigned int> reader1(fileNames[0].first.c_str(), fileNames[0].second.c_str(), 1, DBReader<unsigned int>::USE_INDEX);
reader1.open(DBReader<unsigned int>::NOSORT);
unsigned int *seqLen1 = reader1.getSeqLens();
DBReader<unsigned int>::Index *index1 = reader1.getIndex();

for (size_t i = 1; i < fileNames.size(); i++) {
DBReader<unsigned int> reader2(fileNames[i].first.c_str(), fileNames[i].second.c_str(), 1, DBReader<unsigned int>::USE_INDEX);
reader2.open(DBReader<unsigned int>::NOSORT);
unsigned int *seqLen2 = reader2.getSeqLens();
size_t currOffset = 0;

for (size_t id = 0; id < reader1.getSize(); id++) {
// add length for file1 and file2 and subtract -1 for one null byte
size_t seqLen = seqLen1[id] + seqLen2[id] - 1;
seqLen1[id] = seqLen;
index1[id].offset = currOffset;
currOffset += seqLen;

// keep track of longest
if (largestEntrySize < seqLen) {
largestEntrySize = seqLen;
}
dbw.writeData(result.c_str(), result.size(), dbKey, thread_idx);
result.clear();
}
reader2.close();
}

// TODO: we close with "true" because multiple calls to mergeOutput call mergeFilePair that expects two file (merged input)
dbw.close(true);
dbr.close();
FILE *outDBIndexFILE = FileUtil::openAndDelete(outDBIndex.c_str(), "w");
DBWriter::writeIndex(outDBIndexFILE, reader1.getSize(), index1, seqLen1);
fclose(outDBIndexFILE);
reader1.close();

// merge target splits data files and sort the hits at the same time
FILE ** files = new FILE*[fileNames.size()];
for (size_t i = 0; i < fileNames.size();i++) {
files[i] = FileUtil::openFileOrDie(fileNames[i].first.c_str(), "r", true);
#if HAVE_POSIX_FADVISE
int status;
if ((status = posix_fadvise (fileno(files[i]), 0, 0, POSIX_FADV_SEQUENTIAL)) != 0){
Debug(Debug::ERROR) << "posix_fadvise returned an error: " << strerror(status) << "\n";
}
#endif
}

int c1 = EOF;
char * hitsBuffer = new char[largestEntrySize + 1];
size_t writePos = 0;

std::string result;
result.reserve(largestEntrySize + 1);

FILE * outDBFILE = FileUtil::openAndDelete(outDB.c_str(), "w");
int dataFilefd = fileno(outDBFILE);
do {
// go over all files
for (size_t i = 0; i < fileNames.size(); ++i) {
// collect everything until the next entry
while ((c1 = getc_unlocked(files[i])) != EOF) {
if (c1 == '\0') {
break;
}
hitsBuffer[writePos] = (char) c1;
writePos++;
}
}
// collected all hits, separated by \n. add null byte to assure correct parsing of hitsBuffer
hitsBuffer[writePos] = '\0';
writePos++;
// parse hitsBuffer as hits and sort them by score

std::vector<hit_t> hits = QueryMatcher::parsePrefilterHits(hitsBuffer);
if (hits.size() > 1) {
std::sort(hits.begin(), hits.end(), hit_t::compareHitsByScoreAndId);
}
for (size_t hit_id = 0; hit_id < hits.size(); hit_id++) {
int len = QueryMatcher::prefilterHitToBuffer(hitsBuffer, hits[hit_id]);
result.append(hitsBuffer, len);
}
result.append(1, '\0');

DBReader<unsigned int>::removeDb(tmpDb.first);
// write sorted hits
size_t written = write(dataFilefd, result.c_str(), result.size());
result.clear();
if (written != writePos) {
Debug(Debug::ERROR) << "Cannot write to data file " << outDB << "\n";
EXIT(EXIT_FAILURE);
}
// reset the buffer position
writePos = 0;
} while (c1 != EOF);
fclose(outDBFILE);
delete hitsBuffer;

// write output dbtype
DBWriter::writeDbtypeFile(outDB.c_str(), Parameters::DBTYPE_PREFILTER_RES, compressed);

for (size_t i = 0; i < fileNames.size(); ++i) {
fclose(files[i]);
DBReader<unsigned int>::removeDb(fileNames[i].first);
}
delete[] files;

Debug(Debug::INFO) << "\nTime for merging into " << outDB << " by mergeTargetSplits: " << timer.lap() << "\n";
Debug(Debug::INFO) << "\nTime for merging target splits: " << timer.lap() << "\n";
}


@@ -125,8 +125,7 @@ class Prefiltering {
void printStatistics(const statistics_t &stats, std::list<int> **reslens,
unsigned int resLensSize, size_t empty, size_t maxResults);

void mergeTargetSplits(const std::string &outDb, const std::string &outDBIndex,
const std::vector<std::pair<std::string, std::string>> &filenames);
void mergeTargetSplits(const std::string &outDB, const std::string &outDBIndex, const std::vector<std::pair<std::string, std::string>> &fileNames);

bool isSameQTDB();

0 comments on commit 174ff6d

Please sign in to comment.
You can’t perform that action at this time.