diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index acd8ffd3..b457b64b 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -7,7 +7,8 @@ include_directories(${CMAKE_SOURCE_DIR}/include) #------------------------------------------------------------------------------ # FLAGS for building #------------------------------------------------------------------------------ -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++11") + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++17") find_package(MPI REQUIRED) if(MPI_FOUND) @@ -59,6 +60,10 @@ add_executable(recorder-summary recorder-summary.c) target_link_libraries(recorder-summary reader) add_dependencies(recorder-summary reader) +add_executable(recorder-filter recorder-filter.cpp) +target_link_libraries(recorder-filter reader recorder) +add_dependencies(recorder-filter reader recorder) + if(RECORDER_ENABLE_PARQUET) message("-- " "Configuring Parquet tool: TRUE") @@ -102,7 +107,7 @@ endif() # Add Target(s) to CMake Install #----------------------------------------------------------------------------- #set(targets reader recorder2text metaops_checker conflict_detector) -set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary) +set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary recorder-filter) foreach(target ${targets}) install( TARGETS diff --git a/tools/reader-cst-cfg.c b/tools/reader-cst-cfg.c index 3e992aa3..7fb53559 100644 --- a/tools/reader-cst-cfg.c +++ b/tools/reader-cst-cfg.c @@ -35,14 +35,14 @@ void reader_decode_cst_2_3(RecorderReader *reader, int rank, CST *cst) { cst->cs_list = malloc(cst->entries * sizeof(CallSignature)); for(int i = 0; i < cst->entries; i++) { - fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f); - fread(&cst->cs_list[i].key_len, sizeof(int), 1, f); - - cst->cs_list[i].key = malloc(cst->cs_list[i].key_len); - fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f); - - assert(cst->cs_list[i].terminal_id < cst->entries); - } + fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f); + fread(&cst->cs_list[i].key_len, sizeof(int), 1, f); + + cst->cs_list[i].key = malloc(cst->cs_list[i].key_len); + fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f); + + assert(cst->cs_list[i].terminal_id < cst->entries); + } fclose(f); } @@ -58,7 +58,7 @@ void reader_decode_cfg_2_3(RecorderReader *reader, int rank, CFG* cfg) { cfg->cfg_head = NULL; for(int i = 0; i < cfg->rules; i++) { RuleHash *rule = malloc(sizeof(RuleHash)); - + fread(&(rule->rule_id), sizeof(int), 1, f); fread(&(rule->symbols), sizeof(int), 1, f); @@ -75,18 +75,18 @@ void reader_decode_cst(int rank, void* buf, CST* cst) { memcpy(&cst->entries, buf, sizeof(int)); buf += sizeof(int); - // cst->cs_list will be stored in the terminal_id order. + // cst->cs_list will be stored in the terminal_id order. cst->cs_list = malloc(cst->entries * sizeof(CallSignature)); for(int i = 0; i < cst->entries; i++) { - int terminal_id; + int terminal_id; memcpy(&terminal_id, buf, sizeof(int)); buf += sizeof(int); - assert(terminal_id < cst->entries); + assert(terminal_id < cst->entries); - CallSignature* cs = &(cst->cs_list[terminal_id]); - cs->terminal_id = terminal_id; + CallSignature* cs = &(cst->cs_list[terminal_id]); + cs->terminal_id = terminal_id; memcpy(&cs->rank, buf, sizeof(int)); buf += sizeof(int); @@ -125,14 +125,14 @@ void reader_decode_cfg(int rank, void* buf, CFG* cfg) { } CST* reader_get_cst(RecorderReader* reader, int rank) { - CST* cst = reader->csts[rank]; + CST* cst = reader->csts[rank]; return cst; } CFG* reader_get_cfg(RecorderReader* reader, int rank) { CFG* cfg; if (reader->metadata.interprocess_compression) - cfg = reader->cfgs[reader->ug_ids[rank]]; + cfg = reader->cfgs[reader->ug_ids[rank]]; else cfg = reader->cfgs[rank]; return cfg; diff --git a/tools/reader.c b/tools/reader.c index de76b43a..7c65ad30 100644 --- a/tools/reader.c +++ b/tools/reader.c @@ -30,11 +30,12 @@ void* read_zlib(FILE* source) { size_t compressed_size, decompressed_size; fread(&compressed_size, sizeof(size_t), 1, source); fread(&decompressed_size, sizeof(size_t), 1, source); + //printf("read zlib compressed size: %ld, decompressed size: %ld\n", + // compressed_size, decompressed_size); + //fflush(stdout); void* compressed = malloc(compressed_size); void* decompressed = malloc(decompressed_size); void* p_decompressed = decompressed; - //printf("compressed size: %ld, decompressed size: %ld\n", - // compressed_size, decompressed_size); strm.avail_in = fread(compressed, 1, compressed_size, source); strm.next_in = compressed; @@ -127,7 +128,7 @@ void read_metadata(RecorderReader* reader) { // first 1024 bytes are reserved for metadata block // the rest of the file stores all supported functions fseek(fp, 0, SEEK_END); - long fsize = ftell(fp) - 1024; + long fsize = ftell(fp) - 1024; char buf[fsize]; fseek(fp, 1024, SEEK_SET); @@ -155,19 +156,19 @@ void read_metadata(RecorderReader* reader) { memcpy(reader->func_list[func_id], buf+start_pos, end_pos-start_pos); start_pos = end_pos+1; if((reader->mpi_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "MPI"))) + (NULL!=strstr(reader->func_list[func_id], "MPI"))) reader->mpi_start_idx = func_id; if((reader->hdf5_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "H5"))) + (NULL!=strstr(reader->func_list[func_id], "H5"))) reader->hdf5_start_idx = func_id; if((reader->pnetcdf_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "ncmpi"))) + (NULL!=strstr(reader->func_list[func_id], "ncmpi"))) reader->pnetcdf_start_idx = func_id; if((reader->netcdf_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "nc_"))) + (NULL!=strstr(reader->func_list[func_id], "nc_"))) reader->netcdf_start_idx = func_id; func_id++; @@ -193,39 +194,39 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { read_metadata(reader); - int nprocs= reader->metadata.total_ranks; + int nprocs= reader->metadata.total_ranks; - reader->ug_ids = malloc(sizeof(int) * nprocs); + reader->ug_ids = malloc(sizeof(int) * nprocs); reader->ugs = malloc(sizeof(CFG*) * nprocs); - reader->csts = malloc(sizeof(CST*) * nprocs); - reader->cfgs = malloc(sizeof(CFG*) * nprocs); + reader->csts = malloc(sizeof(CST*) * nprocs); + reader->cfgs = malloc(sizeof(CFG*) * nprocs); - if(reader->metadata.interprocess_compression) { + if(reader->metadata.interprocess_compression) { // a single file for merged csts // and a single for unique cfgs void* buf_cst; void* buf_cfg; // Read and parse the cst file - char cst_fname[1096] = {0}; - sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir); - FILE* cst_file = fopen(cst_fname, "rb"); + char cst_fname[1096] = {0}; + sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir); + FILE* cst_file = fopen(cst_fname, "rb"); buf_cst = read_zlib(cst_file); reader->csts[0] = (CST*) malloc(sizeof(CST)); - reader_decode_cst(0, buf_cst, reader->csts[0]); + reader_decode_cst(0, buf_cst, reader->csts[0]); fclose(cst_file); free(buf_cst); - char ug_metadata_fname[1096] = {0}; - sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir); - FILE* f = fopen(ug_metadata_fname, "rb"); - fread(reader->ug_ids, sizeof(int), nprocs, f); - fread(&reader->num_ugs, sizeof(int), 1, f); - fclose(f); + char ug_metadata_fname[1096] = {0}; + sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir); + FILE* f = fopen(ug_metadata_fname, "rb"); + fread(reader->ug_ids, sizeof(int), nprocs, f); + fread(&reader->num_ugs, sizeof(int), 1, f); + fclose(f); - char cfg_fname[1096] = {0}; - sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir); - FILE* cfg_file = fopen(cfg_fname, "rb"); + char cfg_fname[1096] = {0}; + sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir); + FILE* cfg_file = fopen(cfg_fname, "rb"); for(int i = 0; i < reader->num_ugs; i++) { buf_cfg = read_zlib(cfg_file); reader->ugs[i] = (CFG*) malloc(sizeof(CFG)); @@ -238,8 +239,8 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { reader->csts[rank] = reader->csts[0]; reader->cfgs[rank] = reader->ugs[reader->ug_ids[rank]]; } + } else { // interprocess_compression == false - } else { for(int rank = 0; rank < nprocs; rank++) { if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) { reader->csts[rank] = (CST*) malloc(sizeof(CST)); @@ -254,7 +255,7 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { free(buf_cst); fclose(cst_file); } - + if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) { reader->cfgs[rank] = (CFG*) malloc(sizeof(CFG)); reader_decode_cfg_2_3(reader, rank, reader->cfgs[rank]); @@ -275,24 +276,24 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { void recorder_free_reader(RecorderReader *reader) { assert(reader); - if(reader->metadata.interprocess_compression) { - reader_free_cst(reader->csts[0]); - free(reader->csts[0]); - for(int i = 0; i < reader->num_ugs; i++) { - reader_free_cfg(reader->ugs[i]); - free(reader->ugs[i]); - } - } else { - for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { + if(reader->metadata.interprocess_compression) { + reader_free_cst(reader->csts[0]); + free(reader->csts[0]); + for(int i = 0; i < reader->num_ugs; i++) { + reader_free_cfg(reader->ugs[i]); + free(reader->ugs[i]); + } + } else { + for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { reader_free_cst(reader->csts[rank]); reader_free_cfg(reader->cfgs[rank]); } } - free(reader->csts); - free(reader->cfgs); - free(reader->ugs); - free(reader->ug_ids); + free(reader->csts); + free(reader->cfgs); + free(reader->ugs); + free(reader->ug_ids); for(int i = 0; i < reader->supported_funcs; i++) free(reader->func_list[i]); free(reader->func_list); @@ -338,8 +339,7 @@ void recorder_free_record(Record* r) { #define TERMINAL_START_ID 0 void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, uint32_t* ts_buf, - void (*user_op)(Record*, void*), void* user_arg, int free_record) { - + void (*user_op)(Record*, void*), void* user_arg, int free_record) { RuleHash *rule = NULL; HASH_FIND_INT(cfg->cfg_head, &rule_id, rule); assert(rule != NULL); @@ -350,10 +350,8 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u if (sym_val >= TERMINAL_START_ID) { // terminal for(int j = 0; j < sym_exp; j++) { - Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); - - // Fill in timestamps + // update timestamps uint32_t ts[2] = {ts_buf[0], ts_buf[1]}; ts_buf += 2; record->tstart = ts[0] * reader->metadata.time_resolution + reader->prev_tstart; @@ -361,7 +359,6 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u reader->prev_tstart = record->tstart; user_op(record, user_arg); - if(free_record) recorder_free_record(record); } @@ -425,10 +422,10 @@ uint32_t* read_timestamp_file(RecorderReader* reader, int rank) { void decode_records_core(RecorderReader *reader, int rank, - void (*user_op)(Record*, void*), void* user_arg, bool free_record) { + void (*user_op)(Record*, void*), void* user_arg, bool free_record) { - CST* cst = reader_get_cst(reader, rank); - CFG* cfg = reader_get_cfg(reader, rank); + CST* cst = reader_get_cst(reader, rank); + CFG* cfg = reader_get_cfg(reader, rank); reader->prev_tstart = 0.0; @@ -442,12 +439,12 @@ void decode_records_core(RecorderReader *reader, int rank, // Decode all records for one rank // one record at a time void recorder_decode_records(RecorderReader *reader, int rank, - void (*user_op)(Record*, void*), void* user_arg) { + void (*user_op)(Record*, void*), void* user_arg) { decode_records_core(reader, rank, user_op, user_arg, true); } void recorder_decode_records2(RecorderReader *reader, int rank, - void (*user_op)(Record*, void*), void* user_arg) { + void (*user_op)(Record*, void*), void* user_arg) { decode_records_core(reader, rank, user_op, user_arg, false); } @@ -569,7 +566,7 @@ int update_mpi_src_tag(VerifyIORecord* vir, Record* r, int src_idx, int tag_idx, int src = atoi(r->args[src_idx]); int tag = atoi(r->args[tag_idx]); char* status = r->args[status_idx]; - + if(src == RECORDER_MPI_ANY_SOURCE) { char* p = strstr(status, "_"); if (p == NULL) { @@ -633,11 +630,11 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi } } else if (func_type == RECORDER_MPI) { if (strcmp(func_name, "MPI_Send") == 0 || - strcmp(func_name, "MPI_Ssend") == 0) { + strcmp(func_name, "MPI_Ssend") == 0) { // dst, tag, comm verifyio_record_copy_args(vir, r, 3, 3, 4, 5); } else if (strcmp(func_name, "MPI_Issend") == 0 || - strcmp(func_name, "MPI_Isend") == 0) { + strcmp(func_name, "MPI_Isend") == 0) { // dst, tag, comm, req verifyio_record_copy_args(vir, r, 4, 3, 4, 5, 6); } else if (strcmp(func_name, "MPI_Recv") == 0) { @@ -681,7 +678,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi // comm verifyio_record_copy_args(vir, r, 1, 6); } else if (strcmp(func_name, "MPI_Allreduce") == 0 || - strcmp(func_name, "MPI_Reduce_scatter") == 0) { + strcmp(func_name, "MPI_Reduce_scatter") == 0) { // comm verifyio_record_copy_args(vir, r, 1, 5); } else if (strcmp(func_name, "MPI_Allgatherv") == 0) { @@ -700,7 +697,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi // comm, local_rank verifyio_record_copy_args(vir, r, 2, 5, 6); } else if ((strcmp(func_name, "MPI_Cart_sub") == 0) || - (strcmp(func_name, "MPI_Comm_create") == 0)) { + (strcmp(func_name, "MPI_Comm_create") == 0)) { // comm, local_rank verifyio_record_copy_args(vir, r, 2, 2, 3); } else if ((strcmp(func_name, "MPI_Waitall")) == 0) { @@ -720,14 +717,14 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi // only keep needed *write* *read* POSIX calls // additionally, MPI-IO often uses fcntl to lock files. if (strstr(func_name, "write") || - strstr(func_name, "read") || - strstr(func_name, "fcntl")) { + strstr(func_name, "read") || + strstr(func_name, "fcntl")) { verifyio_record_copy_args(vir, r, 1, 0); } else if (strstr(func_name, "fsync") || - strstr(func_name, "open") || - strstr(func_name, "close") || - strstr(func_name, "fopen") || - strstr(func_name, "fclose")) { + strstr(func_name, "open") || + strstr(func_name, "close") || + strstr(func_name, "fopen") || + strstr(func_name, "fclose")) { verifyio_record_copy_args(vir, r, 1, 0); //included = 0; } diff --git a/tools/recorder-filter.cpp b/tools/recorder-filter.cpp new file mode 100644 index 00000000..a5273d42 --- /dev/null +++ b/tools/recorder-filter.cpp @@ -0,0 +1,587 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +extern "C" { +#include "reader.h" +#include "recorder-sequitur.h" +} + +static char formatting_record[32]; +static char filtered_trace_dir[1024]; +static CallSignature* global_cst = NULL; + + +template +class Interval { +public: + KeyType lower; + KeyType upper; + + Interval(KeyType l, KeyType u) : lower(l), upper(u) {} +}; + +template +class IntervalTable { +public: + ValueType& operator[](const KeyType& key) { + auto it = std::lower_bound(data.begin(), data.end(), key, + [](const auto& lhs, const auto& rhs) { return lhs.first.upper <= rhs; }); + return it->second; + } + + void insert(const Interval& interval, const ValueType& value) { + data.push_back({interval, value}); + std::sort(data.begin(), data.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first.lower < rhs.first.lower; }); + } + + + std::vector, ValueType>> data; +}; + +template +class MultiIndexIntervalTable { +public: + void insert(const std::string& index, const Interval& interval, const ValueType& value) { + indices[index].insert(interval, value); + } + + void insert(const std::string& index) { + // Ensure the index exists without adding intervals + if (indices.find(index) == indices.end()) { + indices[index] = IntervalTable(); + } + } + + ValueType& operator[](const std::pair& key) { + const std::string& index = key.first; + const KeyType& key_value = key.second; + return indices[index][key_value]; + } + + void printIntervals(const std::string& index) const { + const auto& table = indices.at(index); + for (const auto& pair : table.data) { + std::cout << "[" << pair.first.lower << ", " << pair.first.upper << ") : " << pair.second << "\n"; + } + } + + auto begin() { + return indices.begin(); + } + + auto end() { + return indices.end(); + } + + +private: + std::map> indices; // Map from index name to IntervalTable +}; + +template +class Filter { +public: + std::string func_name; + MultiIndexIntervalTable indices; + + Filter(const std::string& name) : func_name(name) {} + Filter(const std::string& name, const MultiIndexIntervalTable& miit) + : func_name(name), indices(miit) {} +}; + +template +class Filters { +private: + std::vector> filters; + +public: + void addFilter(const Filter& filter) { + filters.push_back(filter); + } +/* + void addFilter(const std::string& name, const MultiIndexIntervalTable& miit) { + filters.emplace_back(name, miit); + } +*/ + const Filter& getFilter(size_t index) const { + if (index < filters.size()) { + return filters[index]; + } else { + throw std::out_of_range("Index out of range"); + } + } + + size_t size() const { + return filters.size(); + } + + // in case for accessing the underlying vector + const std::vector>& getFilters() const { + return filters; + } + + auto begin() { + return filters.begin(); + } + + auto end() { + return filters.end(); + } + +}; + + +std::vector splitStringBySpace(const std::string& input) { + std::vector result; + std::istringstream stream(input); + std::string token; + while (std::getline(stream, token, ' ')) { + result.push_back(token); + } + return result; +} + +/** + * TODO: no need to read a text file and process this mannually. + * It would be easier just writing filters in a json file + * and use an existing library to read; it will automatically + * handles the strings, floats and arrays. + */ +std::pair splitIntoNumberAndRanges(const std::string& input) { + std::regex pattern(R"((\d+)\[(.*)\])"); // Matches format "[]" + std::smatch match; + if (std::regex_match(input, match, pattern)) { + return {match[1], match[2]}; // Return the number and range array + } + return {"", ""}; +} + + +template +IntervalTable parseRanges(const std::string& ranges) { + IntervalTable table; + std::regex range_pattern(R"((\d+):(\d+)-(\d+))"); // Matches format ":-" + auto it = std::sregex_iterator(ranges.begin(), ranges.end(), range_pattern); + auto end = std::sregex_iterator(); + + for (; it != end; ++it) { + KeyType lower = std::stoi((*it)[1]); + KeyType upper = std::stoi((*it)[2]); + ValueType value = std::stoi((*it)[3]); + table.insert(Interval(lower, upper), value); + } + + return table; +} + + +void read_filters(char* filter_path, Filters *filters){ + std::string fpath(filter_path); + std::ifstream ffile(fpath); + if (!ffile.is_open()) { + std::cerr << "Error: Unable to open file at " << fpath<< "\n"; + } + + std::string fline; + while (std::getline(ffile, fline)) { + if (fline.empty()) continue; // Skip empty lines + + std::vector substrings = splitStringBySpace(fline); + if (substrings.empty()) continue; // Skip lines with no content + + std::string func_name = substrings.at(0); + substrings.erase(substrings.begin()); + MultiIndexIntervalTable indices; + + for (const auto& substring : substrings) { + if (substring.find('[') != std::string::npos) { + auto [number, ranges] = splitIntoNumberAndRanges(substring); + if (!number.empty() && !ranges.empty()) { + IntervalTable table = parseRanges(ranges); + for (const auto& [interval, value] : table.data) { + indices.insert(number, interval, value); + } + } else { + std::cerr << "Warning: Invalid range format in substring '" << substring << "'\n"; + } + } else { + indices.insert(substring); + } + } + + filters->addFilter(Filter(func_name, indices)); + } + std::cout << "Successfully read filters.\n"; +} + +void apply_filter_to_record(Record* record, Record* new_record, RecorderReader *reader, Filters *filters){ + + // duplicate the original record and then + // make modifications to the new record + memcpy(new_record, record, sizeof(Record)); + + std::string func_name = recorder_get_func_name(reader, record); + for(auto &filter:*filters) { + if(filter.func_name == func_name) { + std::vector new_args; + + // TODO: should the filters include the same number of incides as the actual call? + for(auto it = filter.indices.begin(); it != filter.indices.end(); ++it) { + // for each index in the filter + int index = stoi(it->first); + auto &intervalTable = it->second; + + // Clustering + int arg_modified = 0; + for(auto &interval : intervalTable.data) { + if(atoi(record->args[index]) >= interval.first.lower && atoi(record->args[index]) < interval.first.upper) { + new_args.push_back(std::to_string(interval.second)); + arg_modified = 1; + break; + } + } + + if (!arg_modified) + new_args.push_back(record->args[index]); + } + + // Overwrite the orginal record with modified args + new_record->arg_count = new_args.size(); + new_record->args = (char**) malloc(sizeof(char*) * new_record->arg_count); + for(int i = 0; i < new_record->arg_count; i++) { + new_record->args[i] = strdup(new_args[i].c_str()); + } + } + } +} + +/** + * helper structure for passing arguments + * to the iterate_record() function + */ +typedef struct IterArg_t { + int rank; + RecorderReader* reader; + Grammar local_cfg; + Filters* filters; +} IterArg; + + +/** + * this function is directly copied from recorder/lib/recorder-cst-cfg.c + * to avoid adding dependency to the entire recorder library. + * TODO: think a better way to reuse this code + */ +char* serialize_cst(CallSignature *cst, size_t *len) { + *len = sizeof(int); + + CallSignature *entry, *tmp; + HASH_ITER(hh, cst, entry, tmp) { + *len = *len + entry->key_len + sizeof(int)*3 + sizeof(unsigned); + } + + int entries = HASH_COUNT(cst); + char *res = (char*) malloc(*len); + char *ptr = res; + + memcpy(ptr, &entries, sizeof(int)); + ptr += sizeof(int); + + HASH_ITER(hh, cst, entry, tmp) { + memcpy(ptr, &entry->terminal_id, sizeof(int)); + ptr = ptr + sizeof(int); + memcpy(ptr, &entry->rank, sizeof(int)); + ptr = ptr + sizeof(int); + memcpy(ptr, &entry->key_len, sizeof(int)); + ptr = ptr + sizeof(int); + memcpy(ptr, &entry->count, sizeof(unsigned)); + ptr = ptr + sizeof(unsigned); + memcpy(ptr, entry->key, entry->key_len); + ptr = ptr + entry->key_len; + } + + return res; +} + +/** + * this function is directly copied from recorder/lib/recorder-utils.c + * to avoid adding dependency to the entire recorder library. + * TODO: think a better way to reuse this code + */ +void recorder_write_zlib(unsigned char* buf, size_t buf_size, FILE* out_file) { + // Always write two size_t (compressed_size and decopmressed_size) + // before writting the the compressed data. + // This allows easier post-processing. + long off = ftell(out_file); + size_t compressed_size = 0; + size_t decompressed_size = buf_size; + fwrite(&compressed_size, sizeof(size_t), 1, out_file); + fwrite(&decompressed_size, sizeof(size_t), 1, out_file); + + int ret; + unsigned have; + z_stream strm; + + unsigned char out[buf_size]; + + /* allocate deflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION); + // ret = deflateInit(&strm, Z_BEST_COMPRESSION); + if (ret != Z_OK) { + printf("[recorder-filter] fatal error: can't initialize zlib.\n"); + return; + } + + strm.avail_in = buf_size; + strm.next_in = buf; + /* run deflate() on input until output buffer not full, finish + compression if all of source has been read in */ + do { + strm.avail_out = buf_size; + strm.next_out = out; + ret = deflate(&strm, Z_FINISH); /* no bad return value */ + assert(ret != Z_STREAM_ERROR); /* state not clobbered */ + have = buf_size - strm.avail_out; + compressed_size += have; + if (fwrite(out, 1, have, out_file) != have) { + printf("[recorder-filter] fatal error: zlib write out error."); + (void)deflateEnd(&strm); + return; + } + } while (strm.avail_out == 0); + assert(strm.avail_in == 0); /* all input will be used */ + + /* clean up and return */ + (void)deflateEnd(&strm); + + fseek(out_file, off, SEEK_SET); + fwrite(&compressed_size, sizeof(size_t), 1, out_file); + fwrite(&decompressed_size, sizeof(size_t), 1, out_file); + fseek(out_file, compressed_size, SEEK_CUR); +} + +void save_updated_metadata(RecorderReader* reader) { + char old_metadata_filename[2048] = {0}; + char new_metadata_filename[2048] = {0}; + FILE* srcfh; + FILE* dstfh; + void* fhdata; + + sprintf(old_metadata_filename, "%s/recorder.mt", reader->logs_dir); + sprintf(new_metadata_filename, "%s/recorder.mt", filtered_trace_dir); + + srcfh = fopen(old_metadata_filename, "rb"); + dstfh = fopen(new_metadata_filename, "wb"); + + // first copy the entire old meatdata file to the new metadata file + size_t res = 0; + fseek(srcfh, 0, SEEK_END); + long metafh_size = ftell(srcfh); + fhdata = malloc(metafh_size); + fseek(srcfh, 0, SEEK_SET); + res = fread(fhdata , 1, metafh_size, srcfh); + if (ferror(srcfh)) { + perror("Error reading metadata file\n"); + exit(1); + } + res = fwrite(fhdata, 1, metafh_size, dstfh); + + // then update the inter-process compression flag. + int oldval = reader->metadata.interprocess_compression; + reader->metadata.interprocess_compression = 0; + fseek(dstfh, 0, SEEK_SET); + fwrite(&reader->metadata, sizeof(RecorderMetadata), 1, dstfh); + reader->metadata.interprocess_compression = oldval; + + fclose(srcfh); + fclose(dstfh); + free(fhdata); +} + +void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { + + size_t cst_data_len; + char* cst_data = serialize_cst(global_cst, &cst_data_len); + + for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { + char filename[1024] = {0}; + sprintf(filename, "%s/%d.cfg", filtered_trace_dir, rank); + FILE* f = fopen(filename, "wb"); + int integers; + int* cfg_data = serialize_grammar(&(iter_args[rank].local_cfg), &integers); + recorder_write_zlib((unsigned char*)cfg_data, sizeof(int)*integers, f); + fclose(f); + free(cfg_data); + + // write out global cst, all ranks have the same copy + sprintf(filename, "%s/%d.cst", filtered_trace_dir, rank); + f = fopen(filename, "wb"); + recorder_write_zlib((unsigned char*)cst_data, cst_data_len, f); + fclose(f); + } + + free(cst_data); + + // Update metadata and write out + save_updated_metadata(reader); + + // Save timestamps + // for now, we simply copy the timestamp files from + // the original trace folder, if we want to cut some + // records, we also need to cut timestamps + char cmd[1024]; + sprintf(cmd, "cp %s/recorder.ts %s/recorder.ts", reader->logs_dir, filtered_trace_dir); + system(cmd); + + // Similarly, simply copy the version file from the + // original trace folder. + sprintf(cmd, "cp %s/VERSION %s/VERSION", reader->logs_dir, filtered_trace_dir); + system(cmd); +} + +/** + * This function addes one record to the CFG and CST + * the implementation is identical to that of the + * recorder-logger.c + */ +static int current_cfg_terminal = 0; +void grow_cst_cfg(Grammar* cfg, Record* record) { + int key_len; + char* key = compose_cs_key(record, &key_len); + + CallSignature *entry = NULL; + HASH_FIND(hh, global_cst, key, key_len, entry); + if(entry) { // Found + entry->count++; + free(key); + } else { // Not exist, add to hash table + entry = (CallSignature*) malloc(sizeof(CallSignature)); + entry->key = key; + entry->key_len = key_len; + entry->rank = 0; + entry->terminal_id = current_cfg_terminal++; + entry->count = 1; + HASH_ADD_KEYPTR(hh, global_cst, entry->key, entry->key_len, entry); + } + + append_terminal(cfg, entry->terminal_id, 1); +} + +/** + * This is a helper (debug) function that prints out + * the recorded function call + */ +static void print_record(Record* record, RecorderReader *reader) { + int decimal = log10(1 / reader->metadata.time_resolution); + sprintf(formatting_record, "%%.%df %%.%df %%s %%d %%d (", decimal, decimal); + + bool user_func = (record->func_id == RECORDER_USER_FUNCTION); + const char* func_name = recorder_get_func_name(reader, record); + + fprintf(stdout, formatting_record, record->tstart, record->tend, // record->tid + func_name, record->call_depth); + + for(int arg_id = 0; !user_func && arg_id < record->arg_count; arg_id++) { + char *arg = record->args[arg_id]; + fprintf(stdout, " %s", arg); + } + fprintf(stdout, " )\n"); +} + +/** + * Function that processes one record at a time + * The pointer of this function is passed + * to the recorder_decode_records() call. + * + * In this function: + * 1. we apply the filters + * 2. then build the cst and cfg + */ +void iterate_record(Record* record, void* arg) { + + IterArg *ia = (IterArg*) arg; + + // debug purpose; print out the original record + printf("old:"); + print_record(record, ia->reader); + + // apply fiter to the record + // then add it to the cst and cfg. + Record new_record; + apply_filter_to_record(record, &new_record, ia->reader, ia->filters); + + // debug purpose; print out the modified record + printf("new:"); + print_record(&new_record, ia->reader); + + grow_cst_cfg(&ia->local_cfg, &new_record); +} + + +int main(int argc, char** argv) { + + if (argc != 3) { + printf("usage: recorder-filter /path/to/trace-folder /path/to/filter-file\n"); + exit(1); + } + + char* trace_dir = argv[1]; + char* filter_path = argv[2]; + + Filters filters; + read_filters(filter_path, &filters); + + RecorderReader reader; + recorder_init_reader(trace_dir, &reader); + + // create a new folder to store the filtered trace files + sprintf(filtered_trace_dir, "%s/_filtered", reader.logs_dir); + if(access(filtered_trace_dir, F_OK) != -1) + rmdir(filtered_trace_dir); + mkdir(filtered_trace_dir, S_IRWXU|S_IRWXG|S_IROTH|S_IXOTH); + + // Prepare the arguments to pass to each rank + // when iterating local records + IterArg *iter_args = (IterArg*) malloc(sizeof(IterArg)); + for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { + iter_args[rank].rank = rank; + iter_args[rank].reader = &reader; + iter_args[rank].filters = &filters; + // initialize local CFG + sequitur_init(&(iter_args[rank].local_cfg)); + } + + // Go through each rank's records + for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { + // this call iterates through all records of one rank + // each record is processed by the iterate_record() function + recorder_decode_records(&reader, rank, iterate_record, &(iter_args[rank])); + } + + // At this point we should have built the global cst and each + // rank's local cfg. Now let's write them out. + save_filtered_trace(&reader, iter_args); + + // clean up everything + cleanup_cst(global_cst); + for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { + sequitur_cleanup(&iter_args[rank].local_cfg); + } + recorder_free_reader(&reader); + +} +