From a7ca340414d2eea2ff7752547755aad727fc4d68 Mon Sep 17 00:00:00 2001 From: Zhaobin Zhu Date: Mon, 25 Nov 2024 00:31:14 -0800 Subject: [PATCH 01/17] add cfg-exporter to cmake --- tools/CMakeLists.txt | 6 +++++- tools/cfg-exporter.cpp | 9 +++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 tools/cfg-exporter.cpp diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index acd8ffd..16ef7d2 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -59,6 +59,10 @@ add_executable(recorder-summary recorder-summary.c) target_link_libraries(recorder-summary reader) add_dependencies(recorder-summary reader) +add_executable(cfg-exporter cfg-exporter.cpp) +target_link_libraries(cfg-exporter reader) +add_dependencies(cfg-exporter reader) + if(RECORDER_ENABLE_PARQUET) message("-- " "Configuring Parquet tool: TRUE") @@ -102,7 +106,7 @@ endif() # Add Target(s) to CMake Install #----------------------------------------------------------------------------- #set(targets reader recorder2text metaops_checker conflict_detector) -set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary) +set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary cfg-exporter) foreach(target ${targets}) install( TARGETS diff --git a/tools/cfg-exporter.cpp b/tools/cfg-exporter.cpp new file mode 100644 index 0000000..37b806f --- /dev/null +++ b/tools/cfg-exporter.cpp @@ -0,0 +1,9 @@ +// +// Created by zhu22 on 11/25/24. +// + + + +int main(int argc, char* argv[]) { + return 0; +} \ No newline at end of file From e3b4a861c255635f05d10bfe7a79d15fe21f5a57 Mon Sep 17 00:00:00 2001 From: Zhaobin Zhu Date: Mon, 25 Nov 2024 10:03:49 -0800 Subject: [PATCH 02/17] add read filters --- tools/CMakeLists.txt | 3 +- tools/cfg-exporter.cpp | 183 ++++++++++++++++++++++++++++++++++++++++- tools/filters.txt | 4 + 3 files changed, 185 insertions(+), 5 deletions(-) create mode 100644 tools/filters.txt diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 16ef7d2..f3199e1 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -7,7 +7,8 @@ include_directories(${CMAKE_SOURCE_DIR}/include) #------------------------------------------------------------------------------ # FLAGS for building #------------------------------------------------------------------------------ -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++11") + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++17") find_package(MPI REQUIRED) if(MPI_FOUND) diff --git a/tools/cfg-exporter.cpp b/tools/cfg-exporter.cpp index 37b806f..6a43910 100644 --- a/tools/cfg-exporter.cpp +++ b/tools/cfg-exporter.cpp @@ -1,9 +1,184 @@ -// -// Created by zhu22 on 11/25/24. -// +#include +#include +#include +#include +#include +#include +#include +#include +template +class Interval { +public: + KeyType lower; + KeyType upper; -int main(int argc, char* argv[]) { + Interval(KeyType l, KeyType u) : lower(l), upper(u) {} +}; + +template +class IntervalTable { +public: + ValueType& operator[](const KeyType& key) { + auto it = std::lower_bound(data.begin(), data.end(), key, + [](const auto& lhs, const auto& rhs) { return lhs.first.upper <= rhs; }); + return it->second; + } + + void insert(const Interval& interval, const ValueType& value) { + data.push_back({interval, value}); + std::sort(data.begin(), data.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first.lower < rhs.first.lower; }); + } + + + std::vector, ValueType>> data; +}; + +template +class MultiIndexIntervalTable { +public: + void insert(const std::string& index, const Interval& interval, const ValueType& value) { + indices[index].insert(interval, value); + } + + void insert(const std::string& index) { + // Ensure the index exists without adding intervals + if (indices.find(index) == indices.end()) { + indices[index] = IntervalTable(); + } + } + + ValueType& operator[](const std::pair& key) { + const std::string& index = key.first; + const KeyType& key_value = key.second; + return indices[index][key_value]; + } + + void printIntervals(const std::string& index) const { + const auto& table = indices.at(index); + for (const auto& pair : table.data) { + std::cout << "[" << pair.first.lower << ", " << pair.first.upper << ") : " << pair.second << "\n"; + } + } + +private: + std::map> indices; // Map from index name to IntervalTable +}; + +template +class Filter{ +public: + std::string func_name; + MultiIndexIntervalTable indices; + + Filter(const std::string& name) : func_name(name) {} + Filter(const std::string& name, const MultiIndexIntervalTable& miit) + : func_name(name), indices(miit) {} +}; + + +std::vector splitStringBySpace(const std::string& input) { + std::vector result; + std::istringstream stream(input); + std::string token; + + while (std::getline(stream, token, ' ')) { + result.push_back(token); + } + + return result; +} + +std::pair splitIntoNumberAndRanges(const std::string& input) { + std::regex pattern(R"((\d+)\[(.*)\])"); // Matches format "[]" + std::smatch match; + if (std::regex_match(input, match, pattern)) { + return {match[1], match[2]}; // Return the number and range array + } + return {"", ""}; +} + + +template +IntervalTable parseRanges(const std::string& ranges) { + IntervalTable table; + std::regex range_pattern(R"((\d+):(\d+)-(\d+))"); // Matches format ":-" + auto it = std::sregex_iterator(ranges.begin(), ranges.end(), range_pattern); + auto end = std::sregex_iterator(); + + for (; it != end; ++it) { + KeyType lower = std::stoi((*it)[1]); + KeyType upper = std::stoi((*it)[2]); + ValueType value = std::stoi((*it)[3]); + table.insert(Interval(lower, upper), value); + } + + return table; +} + +int read_filter(std::string &fpath) { + std::ifstream ffile(fpath); + if (!ffile.is_open()) { + std::cerr << "Error: Unable to open file at " << fpath << "\n"; + return -1; + } + + std::string fline; + std::vector> filters; + + while (std::getline(ffile, fline)) { + if (fline.empty()) continue; // Skip empty lines + + std::vector substrings = splitStringBySpace(fline); + if (substrings.empty()) continue; // Skip lines with no content + + std::string func_name = substrings.at(0); + substrings.erase(substrings.begin()); + MultiIndexIntervalTable indices; + + for (const auto& substring : substrings) { + if (substring.find('[') != std::string::npos) { + auto [number, ranges] = splitIntoNumberAndRanges(substring); + if (!number.empty() && !ranges.empty()) { + IntervalTable table = parseRanges(ranges); + for (const auto& [interval, value] : table.data) { + indices.insert(number, interval, value); + } + } else { + std::cerr << "Warning: Invalid range format in substring '" << substring << "'\n"; + } + } else { + indices.insert(substring); + } + } + + filters.emplace_back(func_name, indices); + } + + std::cout << "Successfully processed filters.\n"; return 0; +} + +int main(int argc, char* argv[]) { + // std::vector> *filters = nullptr; + MultiIndexIntervalTable multiIndexTable; + std::string fpath = "/g/g90/zhu22/repos/Recorder-CFG/tools/filters.txt"; + int i = read_filter(fpath); + +/* + multiIndexTable.insert("0", {1, 5}, "2"); + multiIndexTable.insert("0", {5, 10}, "7"); + multiIndexTable.insert("0", {10, 15}, "12"); + + // Insert data into index "B" + multiIndexTable.insert("1", {1, 3}, "2"); + multiIndexTable.insert("1", {3, 7}, "4"); + multiIndexTable.insert("1", {7, 10}, "9"); + + std::cout << "Index A contents:\n"; + multiIndexTable.printIntervals("0"); + std::cout << "Index B contents:\n"; + multiIndexTable.printIntervals("1");*/ } \ No newline at end of file diff --git a/tools/filters.txt b/tools/filters.txt new file mode 100644 index 0000000..4848404 --- /dev/null +++ b/tools/filters.txt @@ -0,0 +1,4 @@ +functionname1 0 1 2[100:500-200,501:1000-400,501:1000-400,501:1000-400] 3[100:500-300,501:1000-700] 4[100:500-200,501:1000-400] +functionname2 0 1 2 3 4 +functionname3 0 1[100:500-100,501:1000-1000] 2 3[100:500-100,501:1000-1000] 4 +functionname4 0 1[100:500-140,501:1000-200] 2 3 \ No newline at end of file From 595ec579f572a7cab766997b51574a9819552211 Mon Sep 17 00:00:00 2001 From: Zhaobin Zhu Date: Tue, 26 Nov 2024 05:26:36 -0800 Subject: [PATCH 03/17] tested with funcname --- tools/cfg-exporter.cpp | 28 +++++++++++++++++++--------- tools/filters.txt | 21 +++++++++++++++++---- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/tools/cfg-exporter.cpp b/tools/cfg-exporter.cpp index 6a43910..ce460af 100644 --- a/tools/cfg-exporter.cpp +++ b/tools/cfg-exporter.cpp @@ -6,6 +6,9 @@ #include #include #include +#include "reader.h" + +RecorderReader reader; template @@ -118,16 +121,14 @@ IntervalTable parseRanges(const std::string& ranges) { return table; } -int read_filter(std::string &fpath) { +std::vector>* read_filter(std::string &fpath, std::vector> *filters){ std::ifstream ffile(fpath); if (!ffile.is_open()) { std::cerr << "Error: Unable to open file at " << fpath << "\n"; - return -1; + return nullptr; } std::string fline; - std::vector> filters; - while (std::getline(ffile, fline)) { if (fline.empty()) continue; // Skip empty lines @@ -154,20 +155,29 @@ int read_filter(std::string &fpath) { } } - filters.emplace_back(func_name, indices); + filters->emplace_back(func_name, indices); } std::cout << "Successfully processed filters.\n"; - return 0; + return filters; } int main(int argc, char* argv[]) { - // std::vector> *filters = nullptr; - MultiIndexIntervalTable multiIndexTable; + std::vector> filters; + std::string fpath = "/g/g90/zhu22/repos/Recorder-CFG/tools/filters.txt"; - int i = read_filter(fpath); + read_filter(fpath, &filters); + + std::string rpath = "/g/g90/zhu22/iopattern/recorder-20241017/121628.414-ruby20-zhu22-ior-2599384"; + recorder_init_reader(rpath.c_str(), &reader); + for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { + std::cout << "Rank: " << rank << std::endl; + printf("\r[Recorder] rank %d finished\n", rank); + } + /* + MultiIndexIntervalTable multiIndexTable; multiIndexTable.insert("0", {1, 5}, "2"); multiIndexTable.insert("0", {5, 10}, "7"); multiIndexTable.insert("0", {10, 15}, "12"); diff --git a/tools/filters.txt b/tools/filters.txt index 4848404..d623efb 100644 --- a/tools/filters.txt +++ b/tools/filters.txt @@ -1,4 +1,17 @@ -functionname1 0 1 2[100:500-200,501:1000-400,501:1000-400,501:1000-400] 3[100:500-300,501:1000-700] 4[100:500-200,501:1000-400] -functionname2 0 1 2 3 4 -functionname3 0 1[100:500-100,501:1000-1000] 2 3[100:500-100,501:1000-1000] 4 -functionname4 0 1[100:500-140,501:1000-200] 2 3 \ No newline at end of file +close 0 +fclose 0 +open64 0 1 +open 0 1 +fopen64 0 1 +fopen 0 1 +pread64 0 1 2[100:500-100,501:1000-1000] 3 +pread 0 1 2[100:500-100,501:1000-1000] 3 +pwrite64 0 1 2[100:500-100,501:1000-1000] 3 +pwrite 0 1 2[100:500-100,501:1000-1000] 3 +fread 0 1 2 3 +fwrite 0 1 2 3 +read 0 1 2[100:500-200,501:1000-400,501:1000-400,501:1000-400] +write 0 1 2[100:500-200,501:1000-400,501:1000-400,501:1000-400] + + + From fb7ae2b6a33ef012c888bea0c4e0e9a7d07fcf2b Mon Sep 17 00:00:00 2001 From: Zhaobin Zhu Date: Fri, 29 Nov 2024 06:11:32 -0800 Subject: [PATCH 04/17] add apply filter --- tools/cfg-exporter.cpp | 173 ++++++++++++++++++++++++++++++++++++++++- tools/filters.txt | 2 +- 2 files changed, 172 insertions(+), 3 deletions(-) diff --git a/tools/cfg-exporter.cpp b/tools/cfg-exporter.cpp index ce460af..b91a925 100644 --- a/tools/cfg-exporter.cpp +++ b/tools/cfg-exporter.cpp @@ -66,6 +66,15 @@ class MultiIndexIntervalTable { } } + auto begin() { + return indices.begin(); + } + + auto end() { + return indices.end(); + } + + private: std::map> indices; // Map from index name to IntervalTable }; @@ -162,17 +171,177 @@ std::vector>* read_filter(std::string &fpath, std::vectorcsts[rank]; + return cst; +} + +CFG* reader_get_cfg(RecorderReader* reader, int rank) { + CFG* cfg; + if (reader->metadata.interprocess_compression) + cfg = reader->cfgs[reader->ug_ids[rank]]; + else + cfg = reader->cfgs[rank]; + return cfg; +} + + +Record* reader_cs_to_record(CallSignature *cs) { + + Record *record = static_cast(malloc(sizeof(Record))); + + char* key = static_cast(cs->key); + + int pos = 0; + memcpy(&record->tid, key+pos, sizeof(pthread_t)); + pos += sizeof(pthread_t); + memcpy(&record->func_id, key+pos, sizeof(record->func_id)); + pos += sizeof(record->func_id); + memcpy(&record->call_depth, key+pos, sizeof(record->call_depth)); + pos += sizeof(record->call_depth); + memcpy(&record->arg_count, key+pos, sizeof(record->arg_count)); + pos += sizeof(record->arg_count); + + record->args = static_cast(malloc(sizeof(char *) * record->arg_count)); + + int arg_strlen; + memcpy(&arg_strlen, key+pos, sizeof(int)); + pos += sizeof(int); + + char* arg_str = key+pos; + int ai = 0; + int start = 0; + for(int i = 0; i < arg_strlen; i++) { + if(arg_str[i] == ' ') { + record->args[ai++] = strndup(arg_str+start, (i-start)); + start = i + 1; + } + } + + assert(ai == record->arg_count); + return record; +} + +std::string charPointerArrayToString(char** charArray, int size) { + std::string result; + for (int i = 0; i < size; ++i) { + if (charArray[i] != nullptr) { + result += std::string(charArray[i]); + if (i < size - 1) { + result += " "; + } + } + } + return result; +} + +std::vector charPointerPointerArrayToList(char** charArray, int size) { + std::vector args_list; + for(int i= 0; i< size; i++){ + if (charArray[i] != nullptr){ + args_list.emplace_back(charArray[i]); + } + } + return args_list; +} + + + + + +#define TERMINAL_START_ID 0 + +void printRules(RuleHash *rule) { + for (int i = 0; i < rule->symbols; ++i) { + if (rule->rule_body[2 * i + 0] >= TERMINAL_START_ID) { + std::cout << "Rule" << rule->rule_id << " : " << rule->rule_body[2 * i + 0] << "^" + << rule->rule_body[2 * i + 1] << std::endl; + } + } + +} + +void applyFilter(Record* record, RecorderReader *reader, std::vector> *filters){ + std::string func_name = recorder_get_func_name(reader, record); + std::vector args = charPointerPointerArrayToList(record->args, record->arg_count); + for(auto &filter:*filters){ + if(filter.func_name == func_name){ + std::vector args_list; + int arg_cnt = 0; + for(auto it = filter.indices.begin(); it != filter.indices.end(); ++it){ + // for each index in the filter + int index = stoi(it->first); + auto &intervalTable = it->second; + if (intervalTable.data.empty()){ + // no intervals defined for this arg + args_list.push_back(args[index]); + }else{ + for( auto &interval : intervalTable.data){ + //go through the intervals and check if the record->args[i] is in any of the defined intervals + if(std::stoi(args[index]) >= interval.first.lower && std::stoi(args[index]) < interval.first.upper){ + args_list.push_back(std::to_string(interval.second)); + } + } + } + arg_cnt++; + } + if(arg_cnt == args_list.size()){ + record->args = static_cast(malloc(sizeof (char*) * args_list.size())); + for(int i=0; iargs[i] = strdup(args_list[i].c_str()); + } + record->arg_count = args_list.size(); + } + } + } + +} + +void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, int free_record, std::vector> *filters) { + RuleHash *rule = NULL; + HASH_FIND_INT(cfg->cfg_head, &rule_id, rule); + assert(rule != NULL); + //printRules(rule); + for(int i = 0; i < rule->symbols; i++) { + int sym_val = rule->rule_body[2*i+0]; + int sym_exp = rule->rule_body[2*i+1]; + std::cout << sym_val << "^"<= TERMINAL_START_ID) { // terminal + for(int j = 0; j < sym_exp; j++) { + Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); + applyFilter(record, reader, filters); +/* std::string func_name = recorder_get_func_name(reader, record); + std::string args = charPointerArrayToString(record->args, record->arg_count); + + std::cout << func_name << " "<tid: " << record->func_id << std::endl; + */ + if(free_record) + recorder_free_record(record); + } + } else { // non-terminal (i.e., rule) + for(int j = 0; j < sym_exp; j++) + rule_application(reader, cfg, cst, sym_val, free_record, filters); + } + } +} + + int main(int argc, char* argv[]) { std::vector> filters; std::string fpath = "/g/g90/zhu22/repos/Recorder-CFG/tools/filters.txt"; read_filter(fpath, &filters); - std::string rpath = "/g/g90/zhu22/iopattern/recorder-20241017/121628.414-ruby20-zhu22-ior-2599384"; + std::string rpath = "/g/g90/zhu22/iopattern/recorder-20241007/170016.899-ruby22-zhu22-ior-1614057/"; recorder_init_reader(rpath.c_str(), &reader); for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { std::cout << "Rank: " << rank << std::endl; - printf("\r[Recorder] rank %d finished\n", rank); + CST* cst = reader_get_cst(&reader, rank); + CFG* cfg = reader_get_cfg(&reader, rank); + rule_application(&reader, cfg, cst, -1, 1, &filters); + } diff --git a/tools/filters.txt b/tools/filters.txt index d623efb..9b8dad4 100644 --- a/tools/filters.txt +++ b/tools/filters.txt @@ -1,6 +1,6 @@ close 0 fclose 0 -open64 0 1 +open64 1 open 0 1 fopen64 0 1 fopen 0 1 From 44d612f2bb454eb834838b9bcbdefdcbff91cc67 Mon Sep 17 00:00:00 2001 From: Zhaobin Zhu Date: Fri, 29 Nov 2024 11:44:25 -0800 Subject: [PATCH 05/17] bugfix --- tools/cfg-exporter.cpp | 37 +++++++++++++++++++++++++++++++++---- tools/filters.txt | 2 +- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/tools/cfg-exporter.cpp b/tools/cfg-exporter.cpp index b91a925..c3a5929 100644 --- a/tools/cfg-exporter.cpp +++ b/tools/cfg-exporter.cpp @@ -262,7 +262,7 @@ void printRules(RuleHash *rule) { } -void applyFilter(Record* record, RecorderReader *reader, std::vector> *filters){ +void applyFilterToRecord(Record* record, RecorderReader *reader, std::vector> *filters){ std::string func_name = recorder_get_func_name(reader, record); std::vector args = charPointerPointerArrayToList(record->args, record->arg_count); for(auto &filter:*filters){ @@ -287,17 +287,45 @@ void applyFilter(Record* record, RecorderReader *reader, std::vectorargs = static_cast(malloc(sizeof (char*) * args_list.size())); + record->args = static_cast(malloc(sizeof(char*) * args_list.size())); for(int i=0; iargs[i] = strdup(args_list[i].c_str()); } record->arg_count = args_list.size(); + for(int i=0; i< record->arg_count; i++){ + std::cout << record->args[i] << std::endl; + } } } } } +/* +void rewriteRecord(Record *record) { + int key_len; + char* key = compose_cs_key(record, &key_len); + + CallSignature *entry = NULL; + HASH_FIND(hh, logger.cst, key, key_len, entry); + if(entry) { // Found + entry->count++; + recorder_free(key, key_len); + } else { // Not exist, add to hash table + entry = (CallSignature*) recorder_malloc(sizeof(CallSignature)); + entry->key = key; + entry->key_len = key_len; + entry->rank = logger.rank; + entry->terminal_id = logger.current_cfg_terminal++; + entry->count = 1; + HASH_ADD_KEYPTR(hh, logger.cst, entry->key, entry->key_len, entry); + } + + append_terminal(&logger.cfg, entry->terminal_id, 1); + logger.num_records++; +} +*/ + void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, int free_record, std::vector> *filters) { RuleHash *rule = NULL; HASH_FIND_INT(cfg->cfg_head, &rule_id, rule); @@ -310,7 +338,8 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, i if (sym_val >= TERMINAL_START_ID) { // terminal for(int j = 0; j < sym_exp; j++) { Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); - applyFilter(record, reader, filters); + applyFilterToRecord(record, reader, filters); + /* std::string func_name = recorder_get_func_name(reader, record); std::string args = charPointerArrayToString(record->args, record->arg_count); @@ -360,4 +389,4 @@ int main(int argc, char* argv[]) { multiIndexTable.printIntervals("0"); std::cout << "Index B contents:\n"; multiIndexTable.printIntervals("1");*/ -} \ No newline at end of file +} diff --git a/tools/filters.txt b/tools/filters.txt index 9b8dad4..3688c62 100644 --- a/tools/filters.txt +++ b/tools/filters.txt @@ -10,7 +10,7 @@ pwrite64 0 1 2[100:500-100,501:1000-1000] 3 pwrite 0 1 2[100:500-100,501:1000-1000] 3 fread 0 1 2 3 fwrite 0 1 2 3 -read 0 1 2[100:500-200,501:1000-400,501:1000-400,501:1000-400] +read 0 1 2[501:1000-400] write 0 1 2[100:500-200,501:1000-400,501:1000-400,501:1000-400] From 97dd9280f6fbbf884ee803cf84442380e4827713 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Wed, 11 Dec 2024 10:33:23 -0800 Subject: [PATCH 06/17] Do not include the filter files to the git repo Signed-off-by: Chen Wang --- tools/filters.txt | 17 ----------------- 1 file changed, 17 deletions(-) delete mode 100644 tools/filters.txt diff --git a/tools/filters.txt b/tools/filters.txt deleted file mode 100644 index 3688c62..0000000 --- a/tools/filters.txt +++ /dev/null @@ -1,17 +0,0 @@ -close 0 -fclose 0 -open64 1 -open 0 1 -fopen64 0 1 -fopen 0 1 -pread64 0 1 2[100:500-100,501:1000-1000] 3 -pread 0 1 2[100:500-100,501:1000-1000] 3 -pwrite64 0 1 2[100:500-100,501:1000-1000] 3 -pwrite 0 1 2[100:500-100,501:1000-1000] 3 -fread 0 1 2 3 -fwrite 0 1 2 3 -read 0 1 2[501:1000-400] -write 0 1 2[100:500-200,501:1000-400,501:1000-400,501:1000-400] - - - From 83f691b666ea1659aef574443d5cfba25a6d1344 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Wed, 11 Dec 2024 10:33:53 -0800 Subject: [PATCH 07/17] Rename cfg-export to recorder-filter and add recorder.so as dependency Signed-off-by: Chen Wang --- tools/CMakeLists.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index f3199e1..281bc1b 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -60,9 +60,9 @@ add_executable(recorder-summary recorder-summary.c) target_link_libraries(recorder-summary reader) add_dependencies(recorder-summary reader) -add_executable(cfg-exporter cfg-exporter.cpp) -target_link_libraries(cfg-exporter reader) -add_dependencies(cfg-exporter reader) +add_executable(recorder-filter recorder-filter.cpp) +target_link_libraries(recorder-filter reader recorder) +add_dependencies(recorder-filter reader recorder) if(RECORDER_ENABLE_PARQUET) From 7016a2e0ebaa970381f449d9416ca4faddb24fa9 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Wed, 11 Dec 2024 11:04:01 -0800 Subject: [PATCH 08/17] Finished the skeleton Signed-off-by: Chen Wang --- tools/CMakeLists.txt | 2 +- .../{cfg-exporter.cpp => recorder-filter.cpp} | 267 ++++++++---------- 2 files changed, 115 insertions(+), 154 deletions(-) rename tools/{cfg-exporter.cpp => recorder-filter.cpp} (59%) diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 281bc1b..b457b64 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -107,7 +107,7 @@ endif() # Add Target(s) to CMake Install #----------------------------------------------------------------------------- #set(targets reader recorder2text metaops_checker conflict_detector) -set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary cfg-exporter) +set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary recorder-filter) foreach(target ${targets}) install( TARGETS diff --git a/tools/cfg-exporter.cpp b/tools/recorder-filter.cpp similarity index 59% rename from tools/cfg-exporter.cpp rename to tools/recorder-filter.cpp index c3a5929..e11fa3d 100644 --- a/tools/cfg-exporter.cpp +++ b/tools/recorder-filter.cpp @@ -6,9 +6,13 @@ #include #include #include +#include +extern "C" { #include "reader.h" +#include "recorder-sequitur.h" +} -RecorderReader reader; +static char formatting_record[32]; template @@ -91,18 +95,23 @@ class Filter{ }; + std::vector splitStringBySpace(const std::string& input) { std::vector result; std::istringstream stream(input); std::string token; - while (std::getline(stream, token, ' ')) { result.push_back(token); } - return result; } +/** + * TODO: no need to read a text file and process this mannually. + * It would be easier just writing filters in a json file + * and use an existing library to read; it will automatically + * handles the strings, floats and arrays. + */ std::pair splitIntoNumberAndRanges(const std::string& input) { std::regex pattern(R"((\d+)\[(.*)\])"); // Matches format "[]" std::smatch match; @@ -130,7 +139,11 @@ IntervalTable parseRanges(const std::string& ranges) { return table; } -std::vector>* read_filter(std::string &fpath, std::vector> *filters){ +/** + * TODO: the second argument is in/out argument + * then why do you need to return it? + */ +std::vector>* read_filters(std::string &fpath, std::vector> *filters){ std::ifstream ffile(fpath); if (!ffile.is_open()) { std::cerr << "Error: Unable to open file at " << fpath << "\n"; @@ -167,75 +180,10 @@ std::vector>* read_filter(std::string &fpath, std::vectoremplace_back(func_name, indices); } - std::cout << "Successfully processed filters.\n"; + std::cout << "Successfully read filters.\n"; return filters; } - -CST* reader_get_cst(RecorderReader* reader, int rank) { - CST* cst = reader->csts[rank]; - return cst; -} - -CFG* reader_get_cfg(RecorderReader* reader, int rank) { - CFG* cfg; - if (reader->metadata.interprocess_compression) - cfg = reader->cfgs[reader->ug_ids[rank]]; - else - cfg = reader->cfgs[rank]; - return cfg; -} - - -Record* reader_cs_to_record(CallSignature *cs) { - - Record *record = static_cast(malloc(sizeof(Record))); - - char* key = static_cast(cs->key); - - int pos = 0; - memcpy(&record->tid, key+pos, sizeof(pthread_t)); - pos += sizeof(pthread_t); - memcpy(&record->func_id, key+pos, sizeof(record->func_id)); - pos += sizeof(record->func_id); - memcpy(&record->call_depth, key+pos, sizeof(record->call_depth)); - pos += sizeof(record->call_depth); - memcpy(&record->arg_count, key+pos, sizeof(record->arg_count)); - pos += sizeof(record->arg_count); - - record->args = static_cast(malloc(sizeof(char *) * record->arg_count)); - - int arg_strlen; - memcpy(&arg_strlen, key+pos, sizeof(int)); - pos += sizeof(int); - - char* arg_str = key+pos; - int ai = 0; - int start = 0; - for(int i = 0; i < arg_strlen; i++) { - if(arg_str[i] == ' ') { - record->args[ai++] = strndup(arg_str+start, (i-start)); - start = i + 1; - } - } - - assert(ai == record->arg_count); - return record; -} - -std::string charPointerArrayToString(char** charArray, int size) { - std::string result; - for (int i = 0; i < size; ++i) { - if (charArray[i] != nullptr) { - result += std::string(charArray[i]); - if (i < size - 1) { - result += " "; - } - } - } - return result; -} - std::vector charPointerPointerArrayToList(char** charArray, int size) { std::vector args_list; for(int i= 0; i< size; i++){ @@ -246,23 +194,7 @@ std::vector charPointerPointerArrayToList(char** charArray, int siz return args_list; } - - - - -#define TERMINAL_START_ID 0 - -void printRules(RuleHash *rule) { - for (int i = 0; i < rule->symbols; ++i) { - if (rule->rule_body[2 * i + 0] >= TERMINAL_START_ID) { - std::cout << "Rule" << rule->rule_id << " : " << rule->rule_body[2 * i + 0] << "^" - << rule->rule_body[2 * i + 1] << std::endl; - } - } - -} - -void applyFilterToRecord(Record* record, RecorderReader *reader, std::vector> *filters){ +void apply_filter_to_record(Record* record, RecorderReader *reader, std::vector> *filters){ std::string func_name = recorder_get_func_name(reader, record); std::vector args = charPointerPointerArrayToList(record->args, record->arg_count); for(auto &filter:*filters){ @@ -276,7 +208,7 @@ void applyFilterToRecord(Record* record, RecorderReader *reader, std::vectorargs[i] is in any of the defined intervals if(std::stoi(args[index]) >= interval.first.lower && std::stoi(args[index]) < interval.first.upper){ @@ -298,95 +230,124 @@ void applyFilterToRecord(Record* record, RecorderReader *reader, std::vector>* filters; +} IterArg; + + +/** + * This function addes one record to the CFG and CST + * the implementation is identical to that of the + * recorder-logger.c + */ +static int current_cfg_terminal = 0; +void grow_cst_cfg(Grammar* cfg, CallSignature* cst, Record* record) { int key_len; char* key = compose_cs_key(record, &key_len); CallSignature *entry = NULL; - HASH_FIND(hh, logger.cst, key, key_len, entry); + HASH_FIND(hh, cst, key, key_len, entry); if(entry) { // Found entry->count++; - recorder_free(key, key_len); + free(key); } else { // Not exist, add to hash table - entry = (CallSignature*) recorder_malloc(sizeof(CallSignature)); + entry = (CallSignature*) malloc(sizeof(CallSignature)); entry->key = key; entry->key_len = key_len; - entry->rank = logger.rank; - entry->terminal_id = logger.current_cfg_terminal++; + entry->rank = 0; + entry->terminal_id = current_cfg_terminal++; entry->count = 1; - HASH_ADD_KEYPTR(hh, logger.cst, entry->key, entry->key_len, entry); + HASH_ADD_KEYPTR(hh, cst, entry->key, entry->key_len, entry); } - append_terminal(&logger.cfg, entry->terminal_id, 1); - logger.num_records++; + append_terminal(cfg, entry->terminal_id, 1); } -*/ - -void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, int free_record, std::vector> *filters) { - RuleHash *rule = NULL; - HASH_FIND_INT(cfg->cfg_head, &rule_id, rule); - assert(rule != NULL); - //printRules(rule); - for(int i = 0; i < rule->symbols; i++) { - int sym_val = rule->rule_body[2*i+0]; - int sym_exp = rule->rule_body[2*i+1]; - std::cout << sym_val << "^"<= TERMINAL_START_ID) { // terminal - for(int j = 0; j < sym_exp; j++) { - Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); - applyFilterToRecord(record, reader, filters); - -/* std::string func_name = recorder_get_func_name(reader, record); - std::string args = charPointerArrayToString(record->args, record->arg_count); - - std::cout << func_name << " "<tid: " << record->func_id << std::endl; - */ - if(free_record) - recorder_free_record(record); - } - } else { // non-terminal (i.e., rule) - for(int j = 0; j < sym_exp; j++) - rule_application(reader, cfg, cst, sym_val, free_record, filters); - } + +/** + * Function that processes one record at a time + * The pointer of this function needs to be + * passed to the recorder_decode_records() call. + * + * in this function: + * 1. we apply the filters + * 2. then build the cst and cfg + */ +void iterate_record(Record* record, void* arg) { + + IterArg *ia = (IterArg*) arg; + + bool user_func = (record->func_id == RECORDER_USER_FUNCTION); + + const char* func_name = recorder_get_func_name(ia->reader, record); + + fprintf(stdout, formatting_record, record->tstart, record->tend, // record->tid + func_name, record->call_depth, recorder_get_func_type(ia->reader, record)); + + for(int arg_id = 0; !user_func && arg_id < record->arg_count; arg_id++) { + char *arg = record->args[arg_id]; + fprintf(stdout, " %s", arg); } + fprintf(stdout, " )\n"); + + // apply fiter to the record + // then add it to the cst and cfg. + apply_filter_to_record(record, ia->reader, ia->filters); + grow_cst_cfg(ia->local_cfg, ia->global_cst, record); } int main(int argc, char* argv[]) { std::vector> filters; - std::string fpath = "/g/g90/zhu22/repos/Recorder-CFG/tools/filters.txt"; - read_filter(fpath, &filters); - - std::string rpath = "/g/g90/zhu22/iopattern/recorder-20241007/170016.899-ruby22-zhu22-ior-1614057/"; - recorder_init_reader(rpath.c_str(), &reader); - for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { - std::cout << "Rank: " << rank << std::endl; - CST* cst = reader_get_cst(&reader, rank); - CFG* cfg = reader_get_cfg(&reader, rank); - rule_application(&reader, cfg, cst, -1, 1, &filters); + // Recorder trace directory + // TODO: read it from command line argument + std::string trace_dir = "/g/g90/zhu22/iopattern/recorder-20241007/170016.899-ruby22-zhu22-ior-1614057/"; - } + // filter file path + // TODO: read it from command line argument + std::string filter_path = "/g/g90/zhu22/repos/Recorder-CFG/tools/filters.txt"; + read_filters(filter_path, &filters); + RecorderReader reader; + CallSignature global_cst; -/* - MultiIndexIntervalTable multiIndexTable; - multiIndexTable.insert("0", {1, 5}, "2"); - multiIndexTable.insert("0", {5, 10}, "7"); - multiIndexTable.insert("0", {10, 15}, "12"); + // For debug purpose, can delete later + int decimal = log10(1 / reader.metadata.time_resolution); + sprintf(formatting_record, "%%.%df %%.%df %%s %%d %%d (", decimal, decimal); - // Insert data into index "B" - multiIndexTable.insert("1", {1, 3}, "2"); - multiIndexTable.insert("1", {3, 7}, "4"); - multiIndexTable.insert("1", {7, 10}, "9"); - - std::cout << "Index A contents:\n"; - multiIndexTable.printIntervals("0"); - std::cout << "Index B contents:\n"; - multiIndexTable.printIntervals("1");*/ + // Go through each rank's records + recorder_init_reader(trace_dir.c_str(), &reader); + for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { + Grammar local_cfg; + + IterArg arg; + arg.rank = rank; + arg.reader = &reader; + arg.local_cfg = &local_cfg; + arg.global_cst = &global_cst; + arg.filters = &filters; + + // this call iterates through all records of one rank + // each record is processed by the iterate_record() function + recorder_decode_records(&reader, rank, iterate_record, &arg); + } + recorder_free_reader(&reader); } From 589babde11d584699b0a898bf71846b51ea699ca Mon Sep 17 00:00:00 2001 From: Zhaobin Zhu Date: Fri, 13 Dec 2024 02:08:33 -0800 Subject: [PATCH 09/17] add parser, and change filters to a class --- .gitignore | 1 + tools/recorder-filter.cpp | 104 +++++++++++++++++++++++++++++--------- 2 files changed, 82 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 11affce..a7888b2 100644 --- a/.gitignore +++ b/.gitignore @@ -96,6 +96,7 @@ tools/conflict_detector tools/recorder2text tools/metaops_checker tools/figures/ +tools/filters* install *.html diff --git a/tools/recorder-filter.cpp b/tools/recorder-filter.cpp index e11fa3d..e913461 100644 --- a/tools/recorder-filter.cpp +++ b/tools/recorder-filter.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include extern "C" { #include "reader.h" @@ -94,6 +95,77 @@ class Filter{ : func_name(name), indices(miit) {} }; +template +class Filters { +private: + std::vector> filters; + +public: + void addFilter(const Filter& filter) { + filters.push_back(filter); + } +/* + void addFilter(const std::string& name, const MultiIndexIntervalTable& miit) { + filters.emplace_back(name, miit); + } +*/ + const Filter& getFilter(size_t index) const { + if (index < filters.size()) { + return filters[index]; + } else { + throw std::out_of_range("Index out of range"); + } + } + + size_t size() const { + return filters.size(); + } + + // in case for accessing the underlying vector + const std::vector>& getFilters() const { + return filters; + } + + auto begin() { + return filters.begin(); + } + + auto end() { + return filters.end(); + } + +}; + + +void parseArguments(int argc, char** argv, std::string& trace_dir, std::string& filter_path) { + const char* const short_opts = "t:f:h"; + const option long_opts[] = { + {"trace-dir", required_argument, nullptr, 't'}, + {"filter-path", required_argument, nullptr, 'f'}, + {nullptr, no_argument, nullptr, 0} + }; + + while (true) { + const auto opt = getopt_long(argc, argv, short_opts, long_opts, nullptr); + if (-1 == opt) break; + switch (opt) { + case 't': + trace_dir = optarg; + break; + + case 'f': + filter_path = optarg; + break; + + default: + std::cout << "Usage: " << argv[0] << " [OPTIONS]\n" + << " -t, --trace-dir Set the trace directory\n" + << " -f, --filter-path Set the filter file path\n" + << " -h, --help Display this help message\n"; + exit(0); + } + } +} std::vector splitStringBySpace(const std::string& input) { @@ -139,15 +211,11 @@ IntervalTable parseRanges(const std::string& ranges) { return table; } -/** - * TODO: the second argument is in/out argument - * then why do you need to return it? - */ -std::vector>* read_filters(std::string &fpath, std::vector> *filters){ + +void read_filters(std::string &fpath, Filters *filters){ std::ifstream ffile(fpath); if (!ffile.is_open()) { std::cerr << "Error: Unable to open file at " << fpath << "\n"; - return nullptr; } std::string fline; @@ -177,11 +245,9 @@ std::vector>* read_filters(std::string &fpath, std::vectoremplace_back(func_name, indices); + filters->addFilter(Filter(func_name, indices)); } - std::cout << "Successfully read filters.\n"; - return filters; } std::vector charPointerPointerArrayToList(char** charArray, int size) { @@ -194,7 +260,7 @@ std::vector charPointerPointerArrayToList(char** charArray, int siz return args_list; } -void apply_filter_to_record(Record* record, RecorderReader *reader, std::vector> *filters){ +void apply_filter_to_record(Record* record, RecorderReader *reader, Filters *filters){ std::string func_name = recorder_get_func_name(reader, record); std::vector args = charPointerPointerArrayToList(record->args, record->arg_count); for(auto &filter:*filters){ @@ -234,11 +300,6 @@ void apply_filter_to_record(Record* record, RecorderReader *reader, std::vector< - - - - - /** * helper structure for passing arguments * to the iterate_record() function @@ -248,8 +309,7 @@ typedef struct IterArg_t { RecorderReader* reader; CallSignature* global_cst; Grammar* local_cfg; - // TODO: encapsulate filters to make it cleaner - std::vector>* filters; + Filters* filters; } IterArg; @@ -314,16 +374,14 @@ void iterate_record(Record* record, void* arg) { } -int main(int argc, char* argv[]) { - std::vector> filters; - +int main(int argc, char** argv) { // Recorder trace directory - // TODO: read it from command line argument std::string trace_dir = "/g/g90/zhu22/iopattern/recorder-20241007/170016.899-ruby22-zhu22-ior-1614057/"; - // filter file path - // TODO: read it from command line argument std::string filter_path = "/g/g90/zhu22/repos/Recorder-CFG/tools/filters.txt"; + parseArguments(argc, argv, trace_dir, filter_path); + + Filters filters; read_filters(filter_path, &filters); RecorderReader reader; From 32578050c8fe04021a51b855982d492b125334d1 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Mon, 23 Dec 2024 15:32:13 -0800 Subject: [PATCH 10/17] Clean code and fix bugs Signed-off-by: Chen Wang --- tools/recorder-filter.cpp | 152 +++++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 68 deletions(-) diff --git a/tools/recorder-filter.cpp b/tools/recorder-filter.cpp index e913461..45bcaa2 100644 --- a/tools/recorder-filter.cpp +++ b/tools/recorder-filter.cpp @@ -14,6 +14,7 @@ extern "C" { } static char formatting_record[32]; +static CallSignature* global_cst = NULL; template @@ -85,7 +86,7 @@ class MultiIndexIntervalTable { }; template -class Filter{ +class Filter { public: std::string func_name; MultiIndexIntervalTable indices; @@ -250,49 +251,42 @@ void read_filters(std::string &fpath, Filters *filters){ std::cout << "Successfully read filters.\n"; } -std::vector charPointerPointerArrayToList(char** charArray, int size) { - std::vector args_list; - for(int i= 0; i< size; i++){ - if (charArray[i] != nullptr){ - args_list.emplace_back(charArray[i]); - } - } - return args_list; -} +void apply_filter_to_record(Record* record, Record* new_record, RecorderReader *reader, Filters *filters){ + + // duplicate the original record and then + // make modifications to the new record + memcpy(new_record, record, sizeof(Record)); -void apply_filter_to_record(Record* record, RecorderReader *reader, Filters *filters){ std::string func_name = recorder_get_func_name(reader, record); - std::vector args = charPointerPointerArrayToList(record->args, record->arg_count); - for(auto &filter:*filters){ - if(filter.func_name == func_name){ - std::vector args_list; - int arg_cnt = 0; - for(auto it = filter.indices.begin(); it != filter.indices.end(); ++it){ + for(auto &filter:*filters) { + if(filter.func_name == func_name) { + std::vector new_args; + + // TODO: should the filters include the same number of incides as the actual call? + for(auto it = filter.indices.begin(); it != filter.indices.end(); ++it) { // for each index in the filter int index = stoi(it->first); auto &intervalTable = it->second; - if (intervalTable.data.empty()){ - // no intervals defined for this arg - args_list.push_back(args[index]); - } else { - for( auto &interval : intervalTable.data){ - //go through the intervals and check if the record->args[i] is in any of the defined intervals - if(std::stoi(args[index]) >= interval.first.lower && std::stoi(args[index]) < interval.first.upper){ - args_list.push_back(std::to_string(interval.second)); - } + + // Clustering + int arg_modified = 0; + for(auto &interval : intervalTable.data) { + if(atoi(record->args[index]) >= interval.first.lower && atoi(record->args[index]) < interval.first.upper) { + new_args.push_back(std::to_string(interval.second)); + arg_modified = 1; + break; } } - arg_cnt++; + + if (!arg_modified) + new_args.push_back(record->args[index]); } - if(arg_cnt == args_list.size()){ - record->args = static_cast(malloc(sizeof(char*) * args_list.size())); - for(int i=0; iargs[i] = strdup(args_list[i].c_str()); - } - record->arg_count = args_list.size(); - for(int i=0; i< record->arg_count; i++){ - std::cout << record->args[i] << std::endl; - } + + // Overwrite the orginal record with modified args + new_record->arg_count = new_args.size(); + new_record->args = (char**) malloc(sizeof(char*) * new_record->arg_count); + for(int i = 0; i < new_record->arg_count; i++) { + new_record->args[i] = strdup(new_args[i].c_str()); } } } @@ -307,8 +301,7 @@ void apply_filter_to_record(Record* record, RecorderReader *reader, Filters* filters; } IterArg; @@ -341,6 +334,27 @@ void grow_cst_cfg(Grammar* cfg, CallSignature* cst, Record* record) { append_terminal(cfg, entry->terminal_id, 1); } +/** + * This is a helper (debug) function that prints out + * the recorded function call + */ +static void print_record(Record* record, RecorderReader *reader) { + int decimal = log10(1 / reader->metadata.time_resolution); + sprintf(formatting_record, "%%.%df %%.%df %%s %%d %%d (", decimal, decimal); + + bool user_func = (record->func_id == RECORDER_USER_FUNCTION); + const char* func_name = recorder_get_func_name(reader, record); + + fprintf(stdout, formatting_record, record->tstart, record->tend, // record->tid + func_name, record->call_depth); + + for(int arg_id = 0; !user_func && arg_id < record->arg_count; arg_id++) { + char *arg = record->args[arg_id]; + fprintf(stdout, " %s", arg); + } + fprintf(stdout, " )\n"); +} + /** * Function that processes one record at a time * The pointer of this function needs to be @@ -354,58 +368,60 @@ void iterate_record(Record* record, void* arg) { IterArg *ia = (IterArg*) arg; - bool user_func = (record->func_id == RECORDER_USER_FUNCTION); - - const char* func_name = recorder_get_func_name(ia->reader, record); - - fprintf(stdout, formatting_record, record->tstart, record->tend, // record->tid - func_name, record->call_depth, recorder_get_func_type(ia->reader, record)); - - for(int arg_id = 0; !user_func && arg_id < record->arg_count; arg_id++) { - char *arg = record->args[arg_id]; - fprintf(stdout, " %s", arg); - } - fprintf(stdout, " )\n"); + // debug purpose; print out the original record + printf("old:"); + print_record(record, ia->reader); // apply fiter to the record // then add it to the cst and cfg. - apply_filter_to_record(record, ia->reader, ia->filters); - grow_cst_cfg(ia->local_cfg, ia->global_cst, record); + Record new_record; + apply_filter_to_record(record, &new_record, ia->reader, ia->filters); + + // debug purpose; print out the modified record + printf("new:"); + print_record(&new_record, ia->reader); + + grow_cst_cfg(&ia->local_cfg, global_cst, &new_record); } int main(int argc, char** argv) { // Recorder trace directory - std::string trace_dir = "/g/g90/zhu22/iopattern/recorder-20241007/170016.899-ruby22-zhu22-ior-1614057/"; + std::string trace_dir = "/p/lustre2/wang116/corona/sources/Recorder-CFG/test/recorder-20241223/135530.813-corona171-wang116-a.out-756755"; // filter file path - std::string filter_path = "/g/g90/zhu22/repos/Recorder-CFG/tools/filters.txt"; + std::string filter_path = "/p/lustre2/wang116/corona/sources/Recorder-CFG/test/recorder-20241223/135530.813-corona171-wang116-a.out-756755/filter.txt"; parseArguments(argc, argv, trace_dir, filter_path); Filters filters; read_filters(filter_path, &filters); RecorderReader reader; - CallSignature global_cst; + recorder_init_reader(trace_dir.c_str(), &reader); - // For debug purpose, can delete later - int decimal = log10(1 / reader.metadata.time_resolution); - sprintf(formatting_record, "%%.%df %%.%df %%s %%d %%d (", decimal, decimal); - // Go through each rank's records - recorder_init_reader(trace_dir.c_str(), &reader); + // Prepare the arguments to pass to each rank + // when iterating local records + IterArg *iter_args = (IterArg*) malloc(sizeof(IterArg)); for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { - Grammar local_cfg; + iter_args[rank].rank = rank; + iter_args[rank].reader = &reader; + iter_args[rank].filters = &filters; + // initialize local CFG + sequitur_init(&(iter_args[rank].local_cfg)); + } - IterArg arg; - arg.rank = rank; - arg.reader = &reader; - arg.local_cfg = &local_cfg; - arg.global_cst = &global_cst; - arg.filters = &filters; + // Go through each rank's records + for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { // this call iterates through all records of one rank // each record is processed by the iterate_record() function - recorder_decode_records(&reader, rank, iterate_record, &arg); + recorder_decode_records(&reader, rank, iterate_record, &(iter_args[rank])); } + + // At this point we should have built the global cst and each + // rank's local cfg. Now let's write them out. + + + recorder_free_reader(&reader); } From bac7b70bdd22c71c16563e7e483680982baeeb6b Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Mon, 23 Dec 2024 16:11:57 -0800 Subject: [PATCH 11/17] Now can write out the new cst and cfg; still need to save the timestamps Signed-off-by: Chen Wang --- tools/recorder-filter.cpp | 127 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 123 insertions(+), 4 deletions(-) diff --git a/tools/recorder-filter.cpp b/tools/recorder-filter.cpp index 45bcaa2..5c4a1ed 100644 --- a/tools/recorder-filter.cpp +++ b/tools/recorder-filter.cpp @@ -8,6 +8,7 @@ #include #include #include +#include extern "C" { #include "reader.h" #include "recorder-sequitur.h" @@ -292,8 +293,6 @@ void apply_filter_to_record(Record* record, Record* new_record, RecorderReader * } } - - /** * helper structure for passing arguments * to the iterate_record() function @@ -306,6 +305,126 @@ typedef struct IterArg_t { } IterArg; +/** + * this function is directly copied from recorder/lib/recorder-cst-cfg.c + * to avoid adding dependency to the entire recorder library. + * TODO: think a better way to reuse this code + */ +char* serialize_cst(CallSignature *cst, size_t *len) { + *len = sizeof(int); + + CallSignature *entry, *tmp; + HASH_ITER(hh, cst, entry, tmp) { + *len = *len + entry->key_len + sizeof(int)*3 + sizeof(unsigned); + } + + int entries = HASH_COUNT(cst); + char *res = (char*) malloc(*len); + char *ptr = res; + + memcpy(ptr, &entries, sizeof(int)); + ptr += sizeof(int); + + HASH_ITER(hh, cst, entry, tmp) { + memcpy(ptr, &entry->terminal_id, sizeof(int)); + ptr = ptr + sizeof(int); + memcpy(ptr, &entry->rank, sizeof(int)); + ptr = ptr + sizeof(int); + memcpy(ptr, &entry->key_len, sizeof(int)); + ptr = ptr + sizeof(int); + memcpy(ptr, &entry->count, sizeof(unsigned)); + ptr = ptr + sizeof(unsigned); + memcpy(ptr, entry->key, entry->key_len); + ptr = ptr + entry->key_len; + } + + return res; +} + +/** + * this function is directly copied from recorder/lib/recorder-utils.c + * to avoid adding dependency to the entire recorder library. + * TODO: think a better way to reuse this code + */ +void recorder_write_zlib(unsigned char* buf, size_t buf_size, FILE* out_file) { + // Always write two size_t (compressed_size and decopmressed_size) + // before writting the the compressed data. + // This allows easier post-processing. + long off = ftell(out_file); + size_t compressed_size = 0; + size_t decompressed_size = buf_size; + fwrite(&compressed_size, sizeof(size_t), 1, out_file); + fwrite(&decompressed_size, sizeof(size_t), 1, out_file); + + int ret; + unsigned have; + z_stream strm; + + unsigned char out[buf_size]; + + /* allocate deflate state */ + strm.zalloc = Z_NULL; + strm.zfree = Z_NULL; + strm.opaque = Z_NULL; + ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION); + // ret = deflateInit(&strm, Z_BEST_COMPRESSION); + if (ret != Z_OK) { + printf("[recorder-filter] fatal error: can't initialize zlib.\n"); + return; + } + + strm.avail_in = buf_size; + strm.next_in = buf; + /* run deflate() on input until output buffer not full, finish + compression if all of source has been read in */ + do { + strm.avail_out = buf_size; + strm.next_out = out; + ret = deflate(&strm, Z_FINISH); /* no bad return value */ + assert(ret != Z_STREAM_ERROR); /* state not clobbered */ + have = buf_size - strm.avail_out; + compressed_size += have; + if (fwrite(out, 1, have, out_file) != have) { + printf("[recorder-filter] fatal error: zlib write out error."); + (void)deflateEnd(&strm); + return; + } + } while (strm.avail_out == 0); + assert(strm.avail_in == 0); /* all input will be used */ + + /* clean up and return */ + (void)deflateEnd(&strm); + + fseek(out_file, off, SEEK_SET); + fwrite(&compressed_size, sizeof(size_t), 1, out_file); + fwrite(&decompressed_size, sizeof(size_t), 1, out_file); + fseek(out_file, compressed_size, SEEK_CUR); +} + +void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { + + for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { + char filename[1024] = {0}; + sprintf(filename, "./tmp/%d.cfg", rank); + FILE* f = fopen(filename, "wb"); + int integers; + int* data = serialize_grammar(&(iter_args[rank].local_cfg), &integers); + recorder_write_zlib((unsigned char*)data, sizeof(int)*integers, f); + fclose(f); + free(data); + } + + // write out global cst + FILE* f = fopen("./tmp/recorder.cst", "wb"); + size_t len; + char* data = serialize_cst(global_cst, &len); + recorder_write_zlib((unsigned char*)data, len, f); + fclose(f); + free(data); + + // TODO: write timestamps +} + /** * This function addes one record to the CFG and CST * the implementation is identical to that of the @@ -386,6 +505,7 @@ void iterate_record(Record* record, void* arg) { int main(int argc, char** argv) { + // Recorder trace directory std::string trace_dir = "/p/lustre2/wang116/corona/sources/Recorder-CFG/test/recorder-20241223/135530.813-corona171-wang116-a.out-756755"; // filter file path @@ -420,8 +540,7 @@ int main(int argc, char** argv) { // At this point we should have built the global cst and each // rank's local cfg. Now let's write them out. - - + save_filtered_trace(&reader, iter_args); recorder_free_reader(&reader); } From a3d1e049d11d54fb8a5735523f9de05f03c88761 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Tue, 14 Jan 2025 19:45:38 -0800 Subject: [PATCH 12/17] Code formating Signed-off-by: Chen Wang --- tools/reader-cst-cfg.c | 32 +++++------ tools/reader.c | 128 ++++++++++++++++++++--------------------- 2 files changed, 79 insertions(+), 81 deletions(-) diff --git a/tools/reader-cst-cfg.c b/tools/reader-cst-cfg.c index 3e992aa..7fb5355 100644 --- a/tools/reader-cst-cfg.c +++ b/tools/reader-cst-cfg.c @@ -35,14 +35,14 @@ void reader_decode_cst_2_3(RecorderReader *reader, int rank, CST *cst) { cst->cs_list = malloc(cst->entries * sizeof(CallSignature)); for(int i = 0; i < cst->entries; i++) { - fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f); - fread(&cst->cs_list[i].key_len, sizeof(int), 1, f); - - cst->cs_list[i].key = malloc(cst->cs_list[i].key_len); - fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f); - - assert(cst->cs_list[i].terminal_id < cst->entries); - } + fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f); + fread(&cst->cs_list[i].key_len, sizeof(int), 1, f); + + cst->cs_list[i].key = malloc(cst->cs_list[i].key_len); + fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f); + + assert(cst->cs_list[i].terminal_id < cst->entries); + } fclose(f); } @@ -58,7 +58,7 @@ void reader_decode_cfg_2_3(RecorderReader *reader, int rank, CFG* cfg) { cfg->cfg_head = NULL; for(int i = 0; i < cfg->rules; i++) { RuleHash *rule = malloc(sizeof(RuleHash)); - + fread(&(rule->rule_id), sizeof(int), 1, f); fread(&(rule->symbols), sizeof(int), 1, f); @@ -75,18 +75,18 @@ void reader_decode_cst(int rank, void* buf, CST* cst) { memcpy(&cst->entries, buf, sizeof(int)); buf += sizeof(int); - // cst->cs_list will be stored in the terminal_id order. + // cst->cs_list will be stored in the terminal_id order. cst->cs_list = malloc(cst->entries * sizeof(CallSignature)); for(int i = 0; i < cst->entries; i++) { - int terminal_id; + int terminal_id; memcpy(&terminal_id, buf, sizeof(int)); buf += sizeof(int); - assert(terminal_id < cst->entries); + assert(terminal_id < cst->entries); - CallSignature* cs = &(cst->cs_list[terminal_id]); - cs->terminal_id = terminal_id; + CallSignature* cs = &(cst->cs_list[terminal_id]); + cs->terminal_id = terminal_id; memcpy(&cs->rank, buf, sizeof(int)); buf += sizeof(int); @@ -125,14 +125,14 @@ void reader_decode_cfg(int rank, void* buf, CFG* cfg) { } CST* reader_get_cst(RecorderReader* reader, int rank) { - CST* cst = reader->csts[rank]; + CST* cst = reader->csts[rank]; return cst; } CFG* reader_get_cfg(RecorderReader* reader, int rank) { CFG* cfg; if (reader->metadata.interprocess_compression) - cfg = reader->cfgs[reader->ug_ids[rank]]; + cfg = reader->cfgs[reader->ug_ids[rank]]; else cfg = reader->cfgs[rank]; return cfg; diff --git a/tools/reader.c b/tools/reader.c index de76b43..4706ed9 100644 --- a/tools/reader.c +++ b/tools/reader.c @@ -30,11 +30,12 @@ void* read_zlib(FILE* source) { size_t compressed_size, decompressed_size; fread(&compressed_size, sizeof(size_t), 1, source); fread(&decompressed_size, sizeof(size_t), 1, source); + //printf("read zlib compressed size: %ld, decompressed size: %ld\n", + // compressed_size, decompressed_size); + //fflush(stdout); void* compressed = malloc(compressed_size); void* decompressed = malloc(decompressed_size); void* p_decompressed = decompressed; - //printf("compressed size: %ld, decompressed size: %ld\n", - // compressed_size, decompressed_size); strm.avail_in = fread(compressed, 1, compressed_size, source); strm.next_in = compressed; @@ -127,7 +128,7 @@ void read_metadata(RecorderReader* reader) { // first 1024 bytes are reserved for metadata block // the rest of the file stores all supported functions fseek(fp, 0, SEEK_END); - long fsize = ftell(fp) - 1024; + long fsize = ftell(fp) - 1024; char buf[fsize]; fseek(fp, 1024, SEEK_SET); @@ -155,19 +156,19 @@ void read_metadata(RecorderReader* reader) { memcpy(reader->func_list[func_id], buf+start_pos, end_pos-start_pos); start_pos = end_pos+1; if((reader->mpi_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "MPI"))) + (NULL!=strstr(reader->func_list[func_id], "MPI"))) reader->mpi_start_idx = func_id; if((reader->hdf5_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "H5"))) + (NULL!=strstr(reader->func_list[func_id], "H5"))) reader->hdf5_start_idx = func_id; if((reader->pnetcdf_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "ncmpi"))) + (NULL!=strstr(reader->func_list[func_id], "ncmpi"))) reader->pnetcdf_start_idx = func_id; if((reader->netcdf_start_idx==-1) && - (NULL!=strstr(reader->func_list[func_id], "nc_"))) + (NULL!=strstr(reader->func_list[func_id], "nc_"))) reader->netcdf_start_idx = func_id; func_id++; @@ -193,39 +194,39 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { read_metadata(reader); - int nprocs= reader->metadata.total_ranks; + int nprocs= reader->metadata.total_ranks; - reader->ug_ids = malloc(sizeof(int) * nprocs); + reader->ug_ids = malloc(sizeof(int) * nprocs); reader->ugs = malloc(sizeof(CFG*) * nprocs); - reader->csts = malloc(sizeof(CST*) * nprocs); - reader->cfgs = malloc(sizeof(CFG*) * nprocs); + reader->csts = malloc(sizeof(CST*) * nprocs); + reader->cfgs = malloc(sizeof(CFG*) * nprocs); - if(reader->metadata.interprocess_compression) { + if(reader->metadata.interprocess_compression) { // a single file for merged csts // and a single for unique cfgs void* buf_cst; void* buf_cfg; // Read and parse the cst file - char cst_fname[1096] = {0}; - sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir); - FILE* cst_file = fopen(cst_fname, "rb"); + char cst_fname[1096] = {0}; + sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir); + FILE* cst_file = fopen(cst_fname, "rb"); buf_cst = read_zlib(cst_file); reader->csts[0] = (CST*) malloc(sizeof(CST)); - reader_decode_cst(0, buf_cst, reader->csts[0]); + reader_decode_cst(0, buf_cst, reader->csts[0]); fclose(cst_file); free(buf_cst); - char ug_metadata_fname[1096] = {0}; - sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir); - FILE* f = fopen(ug_metadata_fname, "rb"); - fread(reader->ug_ids, sizeof(int), nprocs, f); - fread(&reader->num_ugs, sizeof(int), 1, f); - fclose(f); + char ug_metadata_fname[1096] = {0}; + sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir); + FILE* f = fopen(ug_metadata_fname, "rb"); + fread(reader->ug_ids, sizeof(int), nprocs, f); + fread(&reader->num_ugs, sizeof(int), 1, f); + fclose(f); - char cfg_fname[1096] = {0}; - sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir); - FILE* cfg_file = fopen(cfg_fname, "rb"); + char cfg_fname[1096] = {0}; + sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir); + FILE* cfg_file = fopen(cfg_fname, "rb"); for(int i = 0; i < reader->num_ugs; i++) { buf_cfg = read_zlib(cfg_file); reader->ugs[i] = (CFG*) malloc(sizeof(CFG)); @@ -238,8 +239,8 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { reader->csts[rank] = reader->csts[0]; reader->cfgs[rank] = reader->ugs[reader->ug_ids[rank]]; } + } else { // interprocess_compression == false - } else { for(int rank = 0; rank < nprocs; rank++) { if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) { reader->csts[rank] = (CST*) malloc(sizeof(CST)); @@ -254,7 +255,7 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { free(buf_cst); fclose(cst_file); } - + if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) { reader->cfgs[rank] = (CFG*) malloc(sizeof(CFG)); reader_decode_cfg_2_3(reader, rank, reader->cfgs[rank]); @@ -275,24 +276,24 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) { void recorder_free_reader(RecorderReader *reader) { assert(reader); - if(reader->metadata.interprocess_compression) { - reader_free_cst(reader->csts[0]); - free(reader->csts[0]); - for(int i = 0; i < reader->num_ugs; i++) { - reader_free_cfg(reader->ugs[i]); - free(reader->ugs[i]); - } - } else { - for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { + if(reader->metadata.interprocess_compression) { + reader_free_cst(reader->csts[0]); + free(reader->csts[0]); + for(int i = 0; i < reader->num_ugs; i++) { + reader_free_cfg(reader->ugs[i]); + free(reader->ugs[i]); + } + } else { + for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { reader_free_cst(reader->csts[rank]); reader_free_cfg(reader->cfgs[rank]); } } - free(reader->csts); - free(reader->cfgs); - free(reader->ugs); - free(reader->ug_ids); + free(reader->csts); + free(reader->cfgs); + free(reader->ugs); + free(reader->ug_ids); for(int i = 0; i < reader->supported_funcs; i++) free(reader->func_list[i]); free(reader->func_list); @@ -338,8 +339,7 @@ void recorder_free_record(Record* r) { #define TERMINAL_START_ID 0 void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, uint32_t* ts_buf, - void (*user_op)(Record*, void*), void* user_arg, int free_record) { - + void (*user_op)(Record*, void*), void* user_arg, int free_record) { RuleHash *rule = NULL; HASH_FIND_INT(cfg->cfg_head, &rule_id, rule); assert(rule != NULL); @@ -349,11 +349,10 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u int sym_exp = rule->rule_body[2*i+1]; if (sym_val >= TERMINAL_START_ID) { // terminal - for(int j = 0; j < sym_exp; j++) { - - Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); + Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); - // Fill in timestamps + for(int j = 0; j < sym_exp; j++) { + // update timestamps uint32_t ts[2] = {ts_buf[0], ts_buf[1]}; ts_buf += 2; record->tstart = ts[0] * reader->metadata.time_resolution + reader->prev_tstart; @@ -361,10 +360,9 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u reader->prev_tstart = record->tstart; user_op(record, user_arg); - - if(free_record) - recorder_free_record(record); } + if(free_record) + recorder_free_record(record); } else { // non-terminal (i.e., rule) for(int j = 0; j < sym_exp; j++) rule_application(reader, cfg, cst, sym_val, ts_buf, user_op, user_arg, free_record); @@ -425,10 +423,10 @@ uint32_t* read_timestamp_file(RecorderReader* reader, int rank) { void decode_records_core(RecorderReader *reader, int rank, - void (*user_op)(Record*, void*), void* user_arg, bool free_record) { + void (*user_op)(Record*, void*), void* user_arg, bool free_record) { - CST* cst = reader_get_cst(reader, rank); - CFG* cfg = reader_get_cfg(reader, rank); + CST* cst = reader_get_cst(reader, rank); + CFG* cfg = reader_get_cfg(reader, rank); reader->prev_tstart = 0.0; @@ -442,12 +440,12 @@ void decode_records_core(RecorderReader *reader, int rank, // Decode all records for one rank // one record at a time void recorder_decode_records(RecorderReader *reader, int rank, - void (*user_op)(Record*, void*), void* user_arg) { + void (*user_op)(Record*, void*), void* user_arg) { decode_records_core(reader, rank, user_op, user_arg, true); } void recorder_decode_records2(RecorderReader *reader, int rank, - void (*user_op)(Record*, void*), void* user_arg) { + void (*user_op)(Record*, void*), void* user_arg) { decode_records_core(reader, rank, user_op, user_arg, false); } @@ -569,7 +567,7 @@ int update_mpi_src_tag(VerifyIORecord* vir, Record* r, int src_idx, int tag_idx, int src = atoi(r->args[src_idx]); int tag = atoi(r->args[tag_idx]); char* status = r->args[status_idx]; - + if(src == RECORDER_MPI_ANY_SOURCE) { char* p = strstr(status, "_"); if (p == NULL) { @@ -633,11 +631,11 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi } } else if (func_type == RECORDER_MPI) { if (strcmp(func_name, "MPI_Send") == 0 || - strcmp(func_name, "MPI_Ssend") == 0) { + strcmp(func_name, "MPI_Ssend") == 0) { // dst, tag, comm verifyio_record_copy_args(vir, r, 3, 3, 4, 5); } else if (strcmp(func_name, "MPI_Issend") == 0 || - strcmp(func_name, "MPI_Isend") == 0) { + strcmp(func_name, "MPI_Isend") == 0) { // dst, tag, comm, req verifyio_record_copy_args(vir, r, 4, 3, 4, 5, 6); } else if (strcmp(func_name, "MPI_Recv") == 0) { @@ -681,7 +679,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi // comm verifyio_record_copy_args(vir, r, 1, 6); } else if (strcmp(func_name, "MPI_Allreduce") == 0 || - strcmp(func_name, "MPI_Reduce_scatter") == 0) { + strcmp(func_name, "MPI_Reduce_scatter") == 0) { // comm verifyio_record_copy_args(vir, r, 1, 5); } else if (strcmp(func_name, "MPI_Allgatherv") == 0) { @@ -700,7 +698,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi // comm, local_rank verifyio_record_copy_args(vir, r, 2, 5, 6); } else if ((strcmp(func_name, "MPI_Cart_sub") == 0) || - (strcmp(func_name, "MPI_Comm_create") == 0)) { + (strcmp(func_name, "MPI_Comm_create") == 0)) { // comm, local_rank verifyio_record_copy_args(vir, r, 2, 2, 3); } else if ((strcmp(func_name, "MPI_Waitall")) == 0) { @@ -720,14 +718,14 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi // only keep needed *write* *read* POSIX calls // additionally, MPI-IO often uses fcntl to lock files. if (strstr(func_name, "write") || - strstr(func_name, "read") || - strstr(func_name, "fcntl")) { + strstr(func_name, "read") || + strstr(func_name, "fcntl")) { verifyio_record_copy_args(vir, r, 1, 0); } else if (strstr(func_name, "fsync") || - strstr(func_name, "open") || - strstr(func_name, "close") || - strstr(func_name, "fopen") || - strstr(func_name, "fclose")) { + strstr(func_name, "open") || + strstr(func_name, "close") || + strstr(func_name, "fopen") || + strstr(func_name, "fclose")) { verifyio_record_copy_args(vir, r, 1, 0); //included = 0; } From d43347b7adb0f626b8fba46c5607591ca1543021 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Tue, 14 Jan 2025 19:47:18 -0800 Subject: [PATCH 13/17] Able to write out filtered CFG and CST now Signed-off-by: Chen Wang --- tools/recorder-filter.cpp | 89 ++++++++++++++++++++++++++++++--------- 1 file changed, 69 insertions(+), 20 deletions(-) diff --git a/tools/recorder-filter.cpp b/tools/recorder-filter.cpp index 5c4a1ed..4cb3402 100644 --- a/tools/recorder-filter.cpp +++ b/tools/recorder-filter.cpp @@ -15,7 +15,7 @@ extern "C" { } static char formatting_record[32]; -static CallSignature* global_cst = NULL; +static CallSignature* global_cst = NULL; template @@ -401,26 +401,70 @@ void recorder_write_zlib(unsigned char* buf, size_t buf_size, FILE* out_file) { fseek(out_file, compressed_size, SEEK_CUR); } +void save_updated_metadata(RecorderReader* reader) { + char old_metadata_filename[2048] = {0}; + char new_metadata_filename[2048] = {0}; + FILE* srcfh; + FILE* dstfh; + void* fhdata; + + sprintf(old_metadata_filename, "%s/recorder.mt", reader->logs_dir); + sprintf(new_metadata_filename, "./tmp/recorder.mt"); + + srcfh = fopen(old_metadata_filename, "rb"); + dstfh = fopen(new_metadata_filename, "wb"); + + // first copy the entire old meatdata file to the new metadata file + size_t res = 0; + fseek(srcfh, 0, SEEK_END); + long metafh_size = ftell(srcfh); + fhdata = malloc(metafh_size); + fseek(srcfh, 0, SEEK_SET); + res = fread(fhdata , 1, metafh_size, srcfh); + if (ferror(srcfh)) { + perror("Error reading metadata file\n"); + exit(1); + } + res = fwrite(fhdata, 1, metafh_size, dstfh); + + // then update the inter-process compression flag. + int oldval = reader->metadata.interprocess_compression; + reader->metadata.interprocess_compression = 0; + fseek(dstfh, 0, SEEK_SET); + fwrite(&reader->metadata, sizeof(RecorderMetadata), 1, dstfh); + reader->metadata.interprocess_compression = oldval; + + fclose(srcfh); + fclose(dstfh); + free(fhdata); + // copy the version file +} + void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { + size_t cst_data_len; + char* cst_data = serialize_cst(global_cst, &cst_data_len); + for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { char filename[1024] = {0}; sprintf(filename, "./tmp/%d.cfg", rank); FILE* f = fopen(filename, "wb"); int integers; - int* data = serialize_grammar(&(iter_args[rank].local_cfg), &integers); - recorder_write_zlib((unsigned char*)data, sizeof(int)*integers, f); + int* cfg_data = serialize_grammar(&(iter_args[rank].local_cfg), &integers); + recorder_write_zlib((unsigned char*)cfg_data, sizeof(int)*integers, f); + fclose(f); + free(cfg_data); + + // write out global cst, all ranks have the same copy + sprintf(filename, "./tmp/%d.cst", rank); + f = fopen(filename, "wb"); + recorder_write_zlib((unsigned char*)cst_data, cst_data_len, f); fclose(f); - free(data); } - // write out global cst - FILE* f = fopen("./tmp/recorder.cst", "wb"); - size_t len; - char* data = serialize_cst(global_cst, &len); - recorder_write_zlib((unsigned char*)data, len, f); - fclose(f); - free(data); + free(cst_data); + + save_updated_metadata(reader); // TODO: write timestamps } @@ -431,12 +475,12 @@ void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { * recorder-logger.c */ static int current_cfg_terminal = 0; -void grow_cst_cfg(Grammar* cfg, CallSignature* cst, Record* record) { +void grow_cst_cfg(Grammar* cfg, Record* record) { int key_len; char* key = compose_cs_key(record, &key_len); CallSignature *entry = NULL; - HASH_FIND(hh, cst, key, key_len, entry); + HASH_FIND(hh, global_cst, key, key_len, entry); if(entry) { // Found entry->count++; free(key); @@ -447,7 +491,7 @@ void grow_cst_cfg(Grammar* cfg, CallSignature* cst, Record* record) { entry->rank = 0; entry->terminal_id = current_cfg_terminal++; entry->count = 1; - HASH_ADD_KEYPTR(hh, cst, entry->key, entry->key_len, entry); + HASH_ADD_KEYPTR(hh, global_cst, entry->key, entry->key_len, entry); } append_terminal(cfg, entry->terminal_id, 1); @@ -476,10 +520,10 @@ static void print_record(Record* record, RecorderReader *reader) { /** * Function that processes one record at a time - * The pointer of this function needs to be - * passed to the recorder_decode_records() call. + * The pointer of this function is passed + * to the recorder_decode_records() call. * - * in this function: + * In this function: * 1. we apply the filters * 2. then build the cst and cfg */ @@ -500,7 +544,7 @@ void iterate_record(Record* record, void* arg) { printf("new:"); print_record(&new_record, ia->reader); - grow_cst_cfg(&ia->local_cfg, global_cst, &new_record); + grow_cst_cfg(&ia->local_cfg, &new_record); } @@ -518,7 +562,6 @@ int main(int argc, char** argv) { RecorderReader reader; recorder_init_reader(trace_dir.c_str(), &reader); - // Prepare the arguments to pass to each rank // when iterating local records IterArg *iter_args = (IterArg*) malloc(sizeof(IterArg)); @@ -532,7 +575,6 @@ int main(int argc, char** argv) { // Go through each rank's records for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { - // this call iterates through all records of one rank // each record is processed by the iterate_record() function recorder_decode_records(&reader, rank, iterate_record, &(iter_args[rank])); @@ -542,5 +584,12 @@ int main(int argc, char** argv) { // rank's local cfg. Now let's write them out. save_filtered_trace(&reader, iter_args); + // clean up everything + cleanup_cst(global_cst); + for(int rank = 0; rank < reader.metadata.total_ranks; rank++) { + sequitur_cleanup(&iter_args[rank].local_cfg); + } recorder_free_reader(&reader); + } + From 172e0f3ea61106c1083ad7f6bf207da42ecec940 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Wed, 15 Jan 2025 13:31:39 -0800 Subject: [PATCH 14/17] Clean code Signed-off-by: Chen Wang --- tools/recorder-filter.cpp | 65 ++++++++++++++------------------------- 1 file changed, 23 insertions(+), 42 deletions(-) diff --git a/tools/recorder-filter.cpp b/tools/recorder-filter.cpp index 4cb3402..5327a30 100644 --- a/tools/recorder-filter.cpp +++ b/tools/recorder-filter.cpp @@ -9,12 +9,15 @@ #include #include #include +#include +#include extern "C" { #include "reader.h" #include "recorder-sequitur.h" } static char formatting_record[32]; +static char filtered_trace_dir[1024]; static CallSignature* global_cst = NULL; @@ -139,37 +142,6 @@ class Filters { }; -void parseArguments(int argc, char** argv, std::string& trace_dir, std::string& filter_path) { - const char* const short_opts = "t:f:h"; - const option long_opts[] = { - {"trace-dir", required_argument, nullptr, 't'}, - {"filter-path", required_argument, nullptr, 'f'}, - {nullptr, no_argument, nullptr, 0} - }; - - while (true) { - const auto opt = getopt_long(argc, argv, short_opts, long_opts, nullptr); - if (-1 == opt) break; - switch (opt) { - case 't': - trace_dir = optarg; - break; - - case 'f': - filter_path = optarg; - break; - - default: - std::cout << "Usage: " << argv[0] << " [OPTIONS]\n" - << " -t, --trace-dir Set the trace directory\n" - << " -f, --filter-path Set the filter file path\n" - << " -h, --help Display this help message\n"; - exit(0); - } - } -} - - std::vector splitStringBySpace(const std::string& input) { std::vector result; std::istringstream stream(input); @@ -214,10 +186,11 @@ IntervalTable parseRanges(const std::string& ranges) { } -void read_filters(std::string &fpath, Filters *filters){ +void read_filters(char* filter_path, Filters *filters){ + std::string fpath(filter_path); std::ifstream ffile(fpath); if (!ffile.is_open()) { - std::cerr << "Error: Unable to open file at " << fpath << "\n"; + std::cerr << "Error: Unable to open file at " << fpath<< "\n"; } std::string fline; @@ -409,7 +382,7 @@ void save_updated_metadata(RecorderReader* reader) { void* fhdata; sprintf(old_metadata_filename, "%s/recorder.mt", reader->logs_dir); - sprintf(new_metadata_filename, "./tmp/recorder.mt"); + sprintf(new_metadata_filename, "%s/recorder.mt", filtered_trace_dir); srcfh = fopen(old_metadata_filename, "rb"); dstfh = fopen(new_metadata_filename, "wb"); @@ -447,7 +420,7 @@ void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { for(int rank = 0; rank < reader->metadata.total_ranks; rank++) { char filename[1024] = {0}; - sprintf(filename, "./tmp/%d.cfg", rank); + sprintf(filename, "%s/%d.cfg", filtered_trace_dir, rank); FILE* f = fopen(filename, "wb"); int integers; int* cfg_data = serialize_grammar(&(iter_args[rank].local_cfg), &integers); @@ -456,7 +429,7 @@ void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { free(cfg_data); // write out global cst, all ranks have the same copy - sprintf(filename, "./tmp/%d.cst", rank); + sprintf(filename, "%s/%d.cst", filtered_trace_dir, rank); f = fopen(filename, "wb"); recorder_write_zlib((unsigned char*)cst_data, cst_data_len, f); fclose(f); @@ -550,17 +523,25 @@ void iterate_record(Record* record, void* arg) { int main(int argc, char** argv) { - // Recorder trace directory - std::string trace_dir = "/p/lustre2/wang116/corona/sources/Recorder-CFG/test/recorder-20241223/135530.813-corona171-wang116-a.out-756755"; - // filter file path - std::string filter_path = "/p/lustre2/wang116/corona/sources/Recorder-CFG/test/recorder-20241223/135530.813-corona171-wang116-a.out-756755/filter.txt"; - parseArguments(argc, argv, trace_dir, filter_path); + if (argc != 3) { + printf("usage: recorder-filter /path/to/trace-folder /path/to/filter.txt\n"); + exit(1); + } + + char* trace_dir = argv[1]; + char* filter_path = argv[2]; Filters filters; read_filters(filter_path, &filters); RecorderReader reader; - recorder_init_reader(trace_dir.c_str(), &reader); + recorder_init_reader(trace_dir, &reader); + + // create a new folder to store the filtered trace files + sprintf(filtered_trace_dir, "%s/_filtered", reader.logs_dir); + if(access(filtered_trace_dir, F_OK) != -1) + rmdir(filtered_trace_dir); + mkdir(filtered_trace_dir, S_IRWXU|S_IRWXG|S_IROTH|S_IXOTH); // Prepare the arguments to pass to each rank // when iterating local records From 3d0ce61181af3a13cece7ad36708f80528ff9e40 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Wed, 15 Jan 2025 13:50:05 -0800 Subject: [PATCH 15/17] Copy the original timestamp file and version file Signed-off-by: Chen Wang --- tools/recorder-filter.cpp | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tools/recorder-filter.cpp b/tools/recorder-filter.cpp index 5327a30..a5273d4 100644 --- a/tools/recorder-filter.cpp +++ b/tools/recorder-filter.cpp @@ -410,7 +410,6 @@ void save_updated_metadata(RecorderReader* reader) { fclose(srcfh); fclose(dstfh); free(fhdata); - // copy the version file } void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { @@ -437,9 +436,21 @@ void save_filtered_trace(RecorderReader* reader, IterArg* iter_args) { free(cst_data); + // Update metadata and write out save_updated_metadata(reader); - // TODO: write timestamps + // Save timestamps + // for now, we simply copy the timestamp files from + // the original trace folder, if we want to cut some + // records, we also need to cut timestamps + char cmd[1024]; + sprintf(cmd, "cp %s/recorder.ts %s/recorder.ts", reader->logs_dir, filtered_trace_dir); + system(cmd); + + // Similarly, simply copy the version file from the + // original trace folder. + sprintf(cmd, "cp %s/VERSION %s/VERSION", reader->logs_dir, filtered_trace_dir); + system(cmd); } /** @@ -524,7 +535,7 @@ void iterate_record(Record* record, void* arg) { int main(int argc, char** argv) { if (argc != 3) { - printf("usage: recorder-filter /path/to/trace-folder /path/to/filter.txt\n"); + printf("usage: recorder-filter /path/to/trace-folder /path/to/filter-file\n"); exit(1); } From 12c2a4cc5693505f6cfa18a2d56a8b4024bb27bd Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Wed, 15 Jan 2025 13:53:58 -0800 Subject: [PATCH 16/17] Retrive a new record for each repetition Signed-off-by: Chen Wang --- tools/reader.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tools/reader.c b/tools/reader.c index 4706ed9..7c65ad3 100644 --- a/tools/reader.c +++ b/tools/reader.c @@ -349,9 +349,8 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u int sym_exp = rule->rule_body[2*i+1]; if (sym_val >= TERMINAL_START_ID) { // terminal - Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); - for(int j = 0; j < sym_exp; j++) { + Record* record = reader_cs_to_record(&(cst->cs_list[sym_val])); // update timestamps uint32_t ts[2] = {ts_buf[0], ts_buf[1]}; ts_buf += 2; @@ -360,9 +359,9 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u reader->prev_tstart = record->tstart; user_op(record, user_arg); + if(free_record) + recorder_free_record(record); } - if(free_record) - recorder_free_record(record); } else { // non-terminal (i.e., rule) for(int j = 0; j < sym_exp; j++) rule_application(reader, cfg, cst, sym_val, ts_buf, user_op, user_arg, free_record); From e8f78756ec5bb83c50bf2daaae45bd59ea461101 Mon Sep 17 00:00:00 2001 From: Chen Wang Date: Wed, 15 Jan 2025 13:55:14 -0800 Subject: [PATCH 17/17] Change back .gitignore Signed-off-by: Chen Wang --- .gitignore | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitignore b/.gitignore index a7888b2..11affce 100644 --- a/.gitignore +++ b/.gitignore @@ -96,7 +96,6 @@ tools/conflict_detector tools/recorder2text tools/metaops_checker tools/figures/ -tools/filters* install *.html