Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ include_directories(${CMAKE_SOURCE_DIR}/include)
#------------------------------------------------------------------------------
# FLAGS for building
#------------------------------------------------------------------------------
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++11")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -std=c++17")

find_package(MPI REQUIRED)
if(MPI_FOUND)
Expand Down Expand Up @@ -59,6 +60,10 @@ add_executable(recorder-summary recorder-summary.c)
target_link_libraries(recorder-summary reader)
add_dependencies(recorder-summary reader)

add_executable(recorder-filter recorder-filter.cpp)
target_link_libraries(recorder-filter reader recorder)
add_dependencies(recorder-filter reader recorder)


if(RECORDER_ENABLE_PARQUET)
message("-- " "Configuring Parquet tool: TRUE")
Expand Down Expand Up @@ -102,7 +107,7 @@ endif()
# Add Target(s) to CMake Install
#-----------------------------------------------------------------------------
#set(targets reader recorder2text metaops_checker conflict_detector)
set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary)
set(targets reader recorder2text recorder2timeline conflict-detector recorder-summary recorder-filter)
foreach(target ${targets})
install(
TARGETS
Expand Down
32 changes: 16 additions & 16 deletions tools/reader-cst-cfg.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ void reader_decode_cst_2_3(RecorderReader *reader, int rank, CST *cst) {
cst->cs_list = malloc(cst->entries * sizeof(CallSignature));

for(int i = 0; i < cst->entries; i++) {
fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f);
fread(&cst->cs_list[i].key_len, sizeof(int), 1, f);
cst->cs_list[i].key = malloc(cst->cs_list[i].key_len);
fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f);
assert(cst->cs_list[i].terminal_id < cst->entries);
}
fread(&cst->cs_list[i].terminal_id, sizeof(int), 1, f);
fread(&cst->cs_list[i].key_len, sizeof(int), 1, f);

cst->cs_list[i].key = malloc(cst->cs_list[i].key_len);
fread(cst->cs_list[i].key, 1, cst->cs_list[i].key_len, f);

assert(cst->cs_list[i].terminal_id < cst->entries);
}
fclose(f);
}

Expand All @@ -58,7 +58,7 @@ void reader_decode_cfg_2_3(RecorderReader *reader, int rank, CFG* cfg) {
cfg->cfg_head = NULL;
for(int i = 0; i < cfg->rules; i++) {
RuleHash *rule = malloc(sizeof(RuleHash));

fread(&(rule->rule_id), sizeof(int), 1, f);
fread(&(rule->symbols), sizeof(int), 1, f);

Expand All @@ -75,18 +75,18 @@ void reader_decode_cst(int rank, void* buf, CST* cst) {
memcpy(&cst->entries, buf, sizeof(int));
buf += sizeof(int);

// cst->cs_list will be stored in the terminal_id order.
// cst->cs_list will be stored in the terminal_id order.
cst->cs_list = malloc(cst->entries * sizeof(CallSignature));

for(int i = 0; i < cst->entries; i++) {

int terminal_id;
int terminal_id;
memcpy(&terminal_id, buf, sizeof(int));
buf += sizeof(int);
assert(terminal_id < cst->entries);
assert(terminal_id < cst->entries);

CallSignature* cs = &(cst->cs_list[terminal_id]);
cs->terminal_id = terminal_id;
CallSignature* cs = &(cst->cs_list[terminal_id]);
cs->terminal_id = terminal_id;

memcpy(&cs->rank, buf, sizeof(int));
buf += sizeof(int);
Expand Down Expand Up @@ -125,14 +125,14 @@ void reader_decode_cfg(int rank, void* buf, CFG* cfg) {
}

CST* reader_get_cst(RecorderReader* reader, int rank) {
CST* cst = reader->csts[rank];
CST* cst = reader->csts[rank];
return cst;
}

CFG* reader_get_cfg(RecorderReader* reader, int rank) {
CFG* cfg;
if (reader->metadata.interprocess_compression)
cfg = reader->cfgs[reader->ug_ids[rank]];
cfg = reader->cfgs[reader->ug_ids[rank]];
else
cfg = reader->cfgs[rank];
return cfg;
Expand Down
121 changes: 59 additions & 62 deletions tools/reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ void* read_zlib(FILE* source) {
size_t compressed_size, decompressed_size;
fread(&compressed_size, sizeof(size_t), 1, source);
fread(&decompressed_size, sizeof(size_t), 1, source);
//printf("read zlib compressed size: %ld, decompressed size: %ld\n",
// compressed_size, decompressed_size);
//fflush(stdout);
void* compressed = malloc(compressed_size);
void* decompressed = malloc(decompressed_size);
void* p_decompressed = decompressed;
//printf("compressed size: %ld, decompressed size: %ld\n",
// compressed_size, decompressed_size);

strm.avail_in = fread(compressed, 1, compressed_size, source);
strm.next_in = compressed;
Expand Down Expand Up @@ -127,7 +128,7 @@ void read_metadata(RecorderReader* reader) {
// first 1024 bytes are reserved for metadata block
// the rest of the file stores all supported functions
fseek(fp, 0, SEEK_END);
long fsize = ftell(fp) - 1024;
long fsize = ftell(fp) - 1024;
char buf[fsize];

fseek(fp, 1024, SEEK_SET);
Expand Down Expand Up @@ -155,19 +156,19 @@ void read_metadata(RecorderReader* reader) {
memcpy(reader->func_list[func_id], buf+start_pos, end_pos-start_pos);
start_pos = end_pos+1;
if((reader->mpi_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "MPI")))
(NULL!=strstr(reader->func_list[func_id], "MPI")))
reader->mpi_start_idx = func_id;

if((reader->hdf5_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "H5")))
(NULL!=strstr(reader->func_list[func_id], "H5")))
reader->hdf5_start_idx = func_id;

if((reader->pnetcdf_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "ncmpi")))
(NULL!=strstr(reader->func_list[func_id], "ncmpi")))
reader->pnetcdf_start_idx = func_id;

if((reader->netcdf_start_idx==-1) &&
(NULL!=strstr(reader->func_list[func_id], "nc_")))
(NULL!=strstr(reader->func_list[func_id], "nc_")))
reader->netcdf_start_idx = func_id;

func_id++;
Expand All @@ -193,39 +194,39 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {

read_metadata(reader);

int nprocs= reader->metadata.total_ranks;
int nprocs= reader->metadata.total_ranks;

reader->ug_ids = malloc(sizeof(int) * nprocs);
reader->ug_ids = malloc(sizeof(int) * nprocs);
reader->ugs = malloc(sizeof(CFG*) * nprocs);
reader->csts = malloc(sizeof(CST*) * nprocs);
reader->cfgs = malloc(sizeof(CFG*) * nprocs);
reader->csts = malloc(sizeof(CST*) * nprocs);
reader->cfgs = malloc(sizeof(CFG*) * nprocs);

if(reader->metadata.interprocess_compression) {
if(reader->metadata.interprocess_compression) {
// a single file for merged csts
// and a single for unique cfgs
void* buf_cst;
void* buf_cfg;

// Read and parse the cst file
char cst_fname[1096] = {0};
sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir);
FILE* cst_file = fopen(cst_fname, "rb");
char cst_fname[1096] = {0};
sprintf(cst_fname, "%s/recorder.cst", reader->logs_dir);
FILE* cst_file = fopen(cst_fname, "rb");
buf_cst = read_zlib(cst_file);
reader->csts[0] = (CST*) malloc(sizeof(CST));
reader_decode_cst(0, buf_cst, reader->csts[0]);
reader_decode_cst(0, buf_cst, reader->csts[0]);
fclose(cst_file);
free(buf_cst);

char ug_metadata_fname[1096] = {0};
sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir);
FILE* f = fopen(ug_metadata_fname, "rb");
fread(reader->ug_ids, sizeof(int), nprocs, f);
fread(&reader->num_ugs, sizeof(int), 1, f);
fclose(f);
char ug_metadata_fname[1096] = {0};
sprintf(ug_metadata_fname, "%s/ug.mt", reader->logs_dir);
FILE* f = fopen(ug_metadata_fname, "rb");
fread(reader->ug_ids, sizeof(int), nprocs, f);
fread(&reader->num_ugs, sizeof(int), 1, f);
fclose(f);

char cfg_fname[1096] = {0};
sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir);
FILE* cfg_file = fopen(cfg_fname, "rb");
char cfg_fname[1096] = {0};
sprintf(cfg_fname, "%s/ug.cfg", reader->logs_dir);
FILE* cfg_file = fopen(cfg_fname, "rb");
for(int i = 0; i < reader->num_ugs; i++) {
buf_cfg = read_zlib(cfg_file);
reader->ugs[i] = (CFG*) malloc(sizeof(CFG));
Expand All @@ -238,8 +239,8 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {
reader->csts[rank] = reader->csts[0];
reader->cfgs[rank] = reader->ugs[reader->ug_ids[rank]];
}
} else { // interprocess_compression == false

} else {
for(int rank = 0; rank < nprocs; rank++) {
if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) {
reader->csts[rank] = (CST*) malloc(sizeof(CST));
Expand All @@ -254,7 +255,7 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {
free(buf_cst);
fclose(cst_file);
}

if (reader->trace_version_major == 2 && reader->trace_version_minor == 3) {
reader->cfgs[rank] = (CFG*) malloc(sizeof(CFG));
reader_decode_cfg_2_3(reader, rank, reader->cfgs[rank]);
Expand All @@ -275,24 +276,24 @@ void recorder_init_reader(const char* logs_dir, RecorderReader *reader) {
void recorder_free_reader(RecorderReader *reader) {
assert(reader);

if(reader->metadata.interprocess_compression) {
reader_free_cst(reader->csts[0]);
free(reader->csts[0]);
for(int i = 0; i < reader->num_ugs; i++) {
reader_free_cfg(reader->ugs[i]);
free(reader->ugs[i]);
}
} else {
for(int rank = 0; rank < reader->metadata.total_ranks; rank++) {
if(reader->metadata.interprocess_compression) {
reader_free_cst(reader->csts[0]);
free(reader->csts[0]);
for(int i = 0; i < reader->num_ugs; i++) {
reader_free_cfg(reader->ugs[i]);
free(reader->ugs[i]);
}
} else {
for(int rank = 0; rank < reader->metadata.total_ranks; rank++) {
reader_free_cst(reader->csts[rank]);
reader_free_cfg(reader->cfgs[rank]);
}
}

free(reader->csts);
free(reader->cfgs);
free(reader->ugs);
free(reader->ug_ids);
free(reader->csts);
free(reader->cfgs);
free(reader->ugs);
free(reader->ug_ids);
for(int i = 0; i < reader->supported_funcs; i++)
free(reader->func_list[i]);
free(reader->func_list);
Expand Down Expand Up @@ -338,8 +339,7 @@ void recorder_free_record(Record* r) {
#define TERMINAL_START_ID 0

void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, uint32_t* ts_buf,
void (*user_op)(Record*, void*), void* user_arg, int free_record) {

void (*user_op)(Record*, void*), void* user_arg, int free_record) {
RuleHash *rule = NULL;
HASH_FIND_INT(cfg->cfg_head, &rule_id, rule);
assert(rule != NULL);
Expand All @@ -350,18 +350,15 @@ void rule_application(RecorderReader* reader, CFG* cfg, CST* cst, int rule_id, u

if (sym_val >= TERMINAL_START_ID) { // terminal
for(int j = 0; j < sym_exp; j++) {

Record* record = reader_cs_to_record(&(cst->cs_list[sym_val]));

// Fill in timestamps
// update timestamps
uint32_t ts[2] = {ts_buf[0], ts_buf[1]};
ts_buf += 2;
record->tstart = ts[0] * reader->metadata.time_resolution + reader->prev_tstart;
record->tend = ts[1] * reader->metadata.time_resolution + reader->prev_tstart;
reader->prev_tstart = record->tstart;

user_op(record, user_arg);

if(free_record)
recorder_free_record(record);
}
Expand Down Expand Up @@ -425,10 +422,10 @@ uint32_t* read_timestamp_file(RecorderReader* reader, int rank) {


void decode_records_core(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg, bool free_record) {
void (*user_op)(Record*, void*), void* user_arg, bool free_record) {

CST* cst = reader_get_cst(reader, rank);
CFG* cfg = reader_get_cfg(reader, rank);
CST* cst = reader_get_cst(reader, rank);
CFG* cfg = reader_get_cfg(reader, rank);

reader->prev_tstart = 0.0;

Expand All @@ -442,12 +439,12 @@ void decode_records_core(RecorderReader *reader, int rank,
// Decode all records for one rank
// one record at a time
void recorder_decode_records(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg) {
void (*user_op)(Record*, void*), void* user_arg) {
decode_records_core(reader, rank, user_op, user_arg, true);
}

void recorder_decode_records2(RecorderReader *reader, int rank,
void (*user_op)(Record*, void*), void* user_arg) {
void (*user_op)(Record*, void*), void* user_arg) {
decode_records_core(reader, rank, user_op, user_arg, false);
}

Expand Down Expand Up @@ -569,7 +566,7 @@ int update_mpi_src_tag(VerifyIORecord* vir, Record* r, int src_idx, int tag_idx,
int src = atoi(r->args[src_idx]);
int tag = atoi(r->args[tag_idx]);
char* status = r->args[status_idx];

if(src == RECORDER_MPI_ANY_SOURCE) {
char* p = strstr(status, "_");
if (p == NULL) {
Expand Down Expand Up @@ -633,11 +630,11 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
}
} else if (func_type == RECORDER_MPI) {
if (strcmp(func_name, "MPI_Send") == 0 ||
strcmp(func_name, "MPI_Ssend") == 0) {
strcmp(func_name, "MPI_Ssend") == 0) {
// dst, tag, comm
verifyio_record_copy_args(vir, r, 3, 3, 4, 5);
} else if (strcmp(func_name, "MPI_Issend") == 0 ||
strcmp(func_name, "MPI_Isend") == 0) {
strcmp(func_name, "MPI_Isend") == 0) {
// dst, tag, comm, req
verifyio_record_copy_args(vir, r, 4, 3, 4, 5, 6);
} else if (strcmp(func_name, "MPI_Recv") == 0) {
Expand Down Expand Up @@ -681,7 +678,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
// comm
verifyio_record_copy_args(vir, r, 1, 6);
} else if (strcmp(func_name, "MPI_Allreduce") == 0 ||
strcmp(func_name, "MPI_Reduce_scatter") == 0) {
strcmp(func_name, "MPI_Reduce_scatter") == 0) {
// comm
verifyio_record_copy_args(vir, r, 1, 5);
} else if (strcmp(func_name, "MPI_Allgatherv") == 0) {
Expand All @@ -700,7 +697,7 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
// comm, local_rank
verifyio_record_copy_args(vir, r, 2, 5, 6);
} else if ((strcmp(func_name, "MPI_Cart_sub") == 0) ||
(strcmp(func_name, "MPI_Comm_create") == 0)) {
(strcmp(func_name, "MPI_Comm_create") == 0)) {
// comm, local_rank
verifyio_record_copy_args(vir, r, 2, 2, 3);
} else if ((strcmp(func_name, "MPI_Waitall")) == 0) {
Expand All @@ -720,14 +717,14 @@ int create_verifyio_record(RecorderReader* reader, Record* r, VerifyIORecord* vi
// only keep needed *write* *read* POSIX calls
// additionally, MPI-IO often uses fcntl to lock files.
if (strstr(func_name, "write") ||
strstr(func_name, "read") ||
strstr(func_name, "fcntl")) {
strstr(func_name, "read") ||
strstr(func_name, "fcntl")) {
verifyio_record_copy_args(vir, r, 1, 0);
} else if (strstr(func_name, "fsync") ||
strstr(func_name, "open") ||
strstr(func_name, "close") ||
strstr(func_name, "fopen") ||
strstr(func_name, "fclose")) {
strstr(func_name, "open") ||
strstr(func_name, "close") ||
strstr(func_name, "fopen") ||
strstr(func_name, "fclose")) {
verifyio_record_copy_args(vir, r, 1, 0);
//included = 0;
}
Expand Down
Loading
Loading