Permalink
Browse files

createsubdb result compression state is now always the same as the input

  • Loading branch information...
milot-mirdita committed Dec 6, 2018
1 parent 25f3925 commit 10eb43471df3f54f4d12a91b7b8f04963d345918
Showing with 77 additions and 75 deletions.
  1. +16 −16 src/commons/DBReader.cpp
  2. +33 −38 src/commons/DBWriter.cpp
  3. +9 −2 src/commons/DBWriter.h
  4. +1 −1 src/mmseqs.cpp
  5. +18 −18 src/util/createsubdb.cpp
@@ -132,7 +132,7 @@ template <typename T> bool DBReader<T>::open(int accessType){
dstream = new ZSTD_DStream*[threads];
for(int i = 0; i < threads; i++){
// allocated buffer
compressedBufferSizes[i] = maxSeqLen+1;
compressedBufferSizes[i] = std::max(maxSeqLen+1, 1024u);
compressedBuffers[i] = (char*) malloc(compressedBufferSizes[i]);
if(compressedBuffers[i]==NULL){
Debug(Debug::ERROR) << "Could not allocate compressedBuffer!\n";
@@ -317,28 +317,27 @@ void DBReader<unsigned int>::sortIndex(bool isSortedById) {
}
}
template <typename T> char* DBReader<T>::mmapData(FILE * file, size_t *dataSize){
template <typename T> char* DBReader<T>::mmapData(FILE * file, size_t *dataSize) {
struct stat sb;
if (fstat(fileno(file), &sb) < 0)
{
if (fstat(fileno(file), &sb) < 0) {
int errsv = errno;
Debug(Debug::ERROR) << "Failed to fstat File=" << dataFileName << ". Error " << errsv << ".\n";
EXIT(EXIT_FAILURE);
}
*dataSize = sb.st_size;
int fd = fileno(file);
int mode;
char *ret;
if ((dataMode & USE_FREAD) == 0) {
if(dataMode & USE_WRITABLE) {
int mode;
if (dataMode & USE_WRITABLE) {
mode = PROT_READ | PROT_WRITE;
} else {
mode = PROT_READ;
}
ret = static_cast<char*>(mmap(NULL, *dataSize, mode, MAP_PRIVATE, fd, 0));
if(ret == MAP_FAILED){
if (ret == MAP_FAILED){
int errsv = errno;
Debug(Debug::ERROR) << "Failed to mmap memory dataSize=" << *dataSize <<" File=" << dataFileName << ". Error " << errsv << ".\n";
EXIT(EXIT_FAILURE);
@@ -398,22 +397,23 @@ template <typename T> size_t DBReader<T>::bsearch(const Index * index, size_t N,
return std::upper_bound(index, index + N, val, Index::compareById) - index;
}
template <typename T> char* DBReader<T>::getDataCompressed(size_t id, int thrIdx){
char * data = getDataUncompressed(id);
template <typename T> char* DBReader<T>::getDataCompressed(size_t id, int thrIdx) {
char *data = getDataUncompressed(id);
unsigned int cSize = *(reinterpret_cast<unsigned int*>(data));
unsigned int cSize = *(reinterpret_cast<unsigned int *>(data));
size_t totalSize = 0;
void* const cBuff = static_cast<void*>(data + sizeof(unsigned int));
ZSTD_inBuffer input = { cBuff, cSize, 0 };
const void *cBuff = static_cast<void *>(data + sizeof(unsigned int));
ZSTD_inBuffer input = {cBuff, cSize, 0};
while (input.pos < input.size) {
ZSTD_outBuffer output = { compressedBuffers[thrIdx], compressedBufferSizes[thrIdx], 0 };
size_t toRead = ZSTD_decompressStream(dstream[thrIdx], &output , &input); /* toRead : size of next compressed block */
ZSTD_outBuffer output = {compressedBuffers[thrIdx], compressedBufferSizes[thrIdx], 0};
// size of next compressed block
size_t toRead = ZSTD_decompressStream(dstream[thrIdx], &output, &input);
if (ZSTD_isError(toRead)) {
Debug(Debug::ERROR) << "ERROR: " << id << " ZSTD_decompressStream() error " << ZSTD_getErrorName(toRead) << "\n";
Debug(Debug::ERROR) << "ERROR: " << id << " ZSTD_decompressStream " << ZSTD_getErrorName(toRead) << "\n";
EXIT(EXIT_FAILURE);
}
totalSize+=output.pos;
totalSize += output.pos;
}
compressedBuffers[thrIdx][totalSize] = '\0';
return compressedBuffers[thrIdx];
@@ -199,6 +199,23 @@ void DBWriter::open(size_t bufferSize) {
closed = false;
}
void DBWriter::writeDbtypeFile(const char* path, int dbtype, bool isCompressed) {
std::string name = std::string(path) + ".dbtype";
FILE* file = fopen(name.c_str(), "wb");
if (file == NULL) {
Debug(Debug::ERROR) << "Could not open data file " << name << "!\n";
EXIT(EXIT_FAILURE);
}
dbtype = isCompressed ? dbtype | (1 << 31) : dbtype & ~(1 << 31);
size_t written = fwrite(&dbtype, sizeof(int), 1, file);
if (written != 1) {
Debug(Debug::ERROR) << "Could not write to data file " << name << "\n";
EXIT(EXIT_FAILURE);
}
fclose(file);
}
void DBWriter::close() {
// close all datafiles
for (unsigned int i = 0; i < threads; i++) {
@@ -212,24 +229,8 @@ void DBWriter::close() {
ZSTD_freeCStream(cstream[i]);
}
}
if((mode & Parameters::WRITER_COMPRESSED_MODE) != 0) {
dbtype |= (1 << 31);
}
// if (dbtype > -1){
std::string dataFile = dataFileName;
std::string dbTypeFile = (dataFile+".dbtype").c_str();
FILE * dbtypeDataFile = fopen(dbTypeFile.c_str(), "wb");
if (dbtypeDataFile == NULL) {
Debug(Debug::ERROR) << "Could not open data file " << dbTypeFile << "!\n";
EXIT(EXIT_FAILURE);
}
size_t written = fwrite(&dbtype, sizeof(int), 1, dbtypeDataFile);
if (written != 1) {
Debug(Debug::ERROR) << "Could not write to data file " << dbTypeFile << "\n";
EXIT(EXIT_FAILURE);
}
fclose(dbtypeDataFile);
// }
writeDbtypeFile(dataFileName, dbtype, (mode & Parameters::WRITER_COMPRESSED_MODE) != 0);
mergeResults(dataFileName, indexFileName,
(const char **) dataFileNames, (const char **) indexFileNames, threads, ((mode & Parameters::WRITER_LEXICOGRAPHIC_MODE) != 0));
@@ -307,7 +308,7 @@ size_t DBWriter::writeAdd(const char* data, size_t dataSize, unsigned int thrIdx
return totalWriten;
}
void DBWriter::writeEnd(unsigned int key, unsigned int thrIdx, bool addNullByte) {
void DBWriter::writeEnd(unsigned int key, unsigned int thrIdx, bool addNullByte, bool addIndexEntry) {
// close stream
if((mode & Parameters::WRITER_COMPRESSED_MODE) != 0) {
ZSTD_outBuffer output = {compressedBuffers[thrIdx], compressedBufferSizes[thrIdx], 0};
@@ -345,12 +346,12 @@ void DBWriter::writeEnd(unsigned int key, unsigned int thrIdx, bool addNullByte)
}
}
}
size_t totalWritten=0;
size_t totalWritten = 0;
// entries are always separated by a null byte
if(addNullByte == true){
if (addNullByte == true) {
char nullByte = '\0';
size_t written = fwrite(&nullByte, sizeof(char), 1, dataFiles[thrIdx]);
const size_t written = fwrite(&nullByte, sizeof(char), 1, dataFiles[thrIdx]);
if (written != 1) {
Debug(Debug::ERROR) << "Could not write to data file " << dataFileNames[thrIdx] << "\n";
EXIT(EXIT_FAILURE);
@@ -359,21 +360,15 @@ void DBWriter::writeEnd(unsigned int key, unsigned int thrIdx, bool addNullByte)
offsets[thrIdx] += 1;
}
size_t length = offsets[thrIdx] - starts[thrIdx];
// keep original size in index
if((mode & Parameters::WRITER_COMPRESSED_MODE) != 0) {
ZSTD_frameProgression progression = ZSTD_getFrameProgression(cstream[thrIdx]);
length = progression.consumed + totalWritten;
}
char buffer[1024];
size_t len = indexToBuffer(buffer, key, starts[thrIdx], length );
size_t written = fwrite(buffer, sizeof(char), len, indexFiles[thrIdx]);
if (written != len) {
Debug(Debug::ERROR) << "Could not write to data file " << indexFiles[thrIdx] << "\n";
EXIT(EXIT_FAILURE);
if (addIndexEntry == true) {
size_t length = offsets[thrIdx] - starts[thrIdx];
// keep original size in index
if ((mode & Parameters::WRITER_COMPRESSED_MODE) != 0) {
ZSTD_frameProgression progression = ZSTD_getFrameProgression(cstream[thrIdx]);
length = progression.consumed + totalWritten;
}
writeIndexEntry(key, starts[thrIdx], length, thrIdx);
}
}
void DBWriter::writeIndexEntry(unsigned int key, size_t offset, size_t length, unsigned int thrIdx){
@@ -387,10 +382,10 @@ void DBWriter::writeIndexEntry(unsigned int key, size_t offset, size_t length, u
}
void DBWriter::writeData(const char *data, size_t dataSize, unsigned int key, unsigned int thrIdx, bool addNullByte) {
void DBWriter::writeData(const char *data, size_t dataSize, unsigned int key, unsigned int thrIdx, bool addNullByte, bool addIndexEntry) {
writeStart(thrIdx);
writeAdd(data, dataSize, thrIdx);
writeEnd(key, thrIdx, addNullByte);
writeEnd(key, thrIdx, addNullByte, addIndexEntry);
}
size_t DBWriter::indexToBuffer(char *buff1, unsigned int key, size_t offsetStart, size_t len){
@@ -33,9 +33,9 @@ class DBWriter {
void writeStart(unsigned int thrIdx = 0);
size_t writeAdd(const char* data, size_t dataSize, unsigned int thrIdx = 0);
void writeEnd(unsigned int key, unsigned int thrIdx = 0, bool addNullByte = true);
void writeEnd(unsigned int key, unsigned int thrIdx = 0, bool addNullByte = true, bool addIndexEntry = true);
void writeData(const char *data, size_t dataSize, unsigned int key, unsigned int threadIdx = 0, bool addNullByte = true);
void writeData(const char *data, size_t dataSize, unsigned int key, unsigned int threadIdx = 0, bool addNullByte = true, bool addIndexEntry = true);
static size_t indexToBuffer(char *buff1, unsigned int key, size_t offsetStart, size_t len);
@@ -59,9 +59,16 @@ class DBWriter {
void writeIndexEntry(unsigned int key, size_t offset, size_t length, unsigned int thrIdx);
static void writeDbtypeFile(const char* path, int dbtype, bool isCompressed);
size_t getStart(unsigned int threadIdx){
return starts[threadIdx];
}
size_t getOffset(unsigned int threadIdx){
return offsets[threadIdx];
}
private:
template <typename T>
static void writeIndex(FILE *outFile, size_t indexSize, T *index, unsigned int *seqLen);
@@ -357,7 +357,7 @@ std::vector<struct Command> commands = {
"Clovis Galiez & Martin Steinegger <martin.steinegger@mpibpc.mpg.de>",
"<i:resultDB> <o:resultDB>",
CITATION_MMSEQS2},
{"createsubdb", createsubdb, &par.verbandcompression, COMMAND_DB,
{"createsubdb", createsubdb, &par.onlyverbosity, COMMAND_DB,
"Create a subset of a DB from a file of IDs of entries",
NULL,
"Milot Mirdita <milot@mirdita.de>",
@@ -20,35 +20,35 @@ int createsubdb(int argc, const char **argv, const Command& command) {
DBReader<unsigned int> reader(par.db2.c_str(), par.db2Index.c_str(), 1, DBReader<unsigned int>::USE_INDEX|DBReader<unsigned int>::USE_DATA);
reader.open(DBReader<unsigned int>::NOSORT);
int compression = par.PARAM_COMPRESSED.wasSet ? par.compressed : reader.isCompressed();
DBWriter writer(par.db3.c_str(), par.db3Index.c_str(), 1, compression, reader.getDbtype());
const bool isCompressed = reader.isCompressed();
DBWriter writer(par.db3.c_str(), par.db3Index.c_str(), 1, 0, reader.getDbtype() & ~(1 << 31));
writer.open();
Debug(Debug::INFO) << "Start writing to file " << par.db3 << "\n";
char * line = new char[65536];
char dbKey[255 + 1];
Debug(Debug::INFO) << "Start writing to database " << par.db3 << "\n";
char *line = (char*)malloc(1024);
size_t len = 0;
char dbKey[256];
while (getline(&line, &len, orderFile) != -1) {
Util::parseKey(line, dbKey);
const unsigned int key = Util::fast_atoi<unsigned int>(dbKey);
size_t id = reader.getId(key);
if(id >= UINT_MAX) {
const size_t id = reader.getId(key);
if (id >= UINT_MAX) {
Debug(Debug::WARNING) << "Key " << line << " not found in database\n";
continue;
}
const char* data = reader.getData(id, 0);
// discard null byte
size_t length = reader.getSeqLens(id) - 1;
writer.writeData(data, length, key);
}
if(FileUtil::fileExists((par.db2 + ".dbtype").c_str())){
FileUtil::copyFile((par.db2 + ".dbtype").c_str(), (par.db3 + ".dbtype").c_str());
char* data = reader.getDataUncompressed(id);
size_t originalLength = reader.getSeqLens(id);
size_t entryLength = std::max(originalLength, static_cast<size_t>(1)) - 1;
if (isCompressed) {
entryLength = *(reinterpret_cast<unsigned int*>(data)) + sizeof(unsigned int);
}
writer.writeData(data, entryLength, key, 0, true, false);
writer.writeIndexEntry(key, writer.getStart(0), originalLength, 0);
}
writer.close();
delete[] line;
DBWriter::writeDbtypeFile(par.db3.c_str(), reader.getDbtype(), isCompressed);
free(line);
reader.close();
fclose(orderFile);

0 comments on commit 10eb434

Please sign in to comment.