Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 73 additions & 71 deletions ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#define DEBUG_ON 0
#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123
#define INIT_LEN 10

/*Used for loading file-offsets per aggregator*/
typedef struct mca_io_ompio_local_io_array{
Expand All @@ -55,13 +56,11 @@ typedef struct mca_io_ompio_aggregator_data {
int current_index, current_position;
int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
int *procs_in_group, iov_index;
bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous;
int bytes_sent, prev_bytes_sent;
struct iovec *decoded_iov;
int bytes_to_write, prev_bytes_to_write;
mca_io_ompio_io_array_t *io_array, *prev_io_array;
int num_io_entries, prev_num_io_entries;
char *send_buf, *prev_send_buf;
} mca_io_ompio_aggregator_data;


Expand All @@ -76,9 +75,7 @@ typedef struct mca_io_ompio_aggregator_data {
for (_i=0; _i<_num; _i++ ) { \
_aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
_aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
_aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \
_aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
_aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \
_aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
_t=_aggr[_i]->prev_global_buf; \
_aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
Expand Down Expand Up @@ -229,8 +226,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
aggr_data[i]->comm = fh->f_comm;
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now
}

/*********************************************************************
Expand Down Expand Up @@ -611,10 +606,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif

if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
}

} /* end for (index = 0; index < cycles; index++) */
Expand Down Expand Up @@ -644,10 +635,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
end_write_time = MPI_Wtime();
write_time += end_write_time - start_write_time;
#endif

if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
free (aggr_data[i]->prev_send_buf);
}
}
}

Expand Down Expand Up @@ -785,7 +772,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
data->num_io_entries = 0;
data->bytes_sent = 0;
data->io_array=NULL;
data->send_buf=NULL;
/**********************************************************************
*** 7a. Getting ready for next cycle: initializing and freeing buffers
**********************************************************************/
Expand Down Expand Up @@ -1143,73 +1129,89 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
}
} /* end if (entries_per_aggr > 0 ) */
}/* end if (aggregator == rank ) */

if ( data->sendbuf_is_contiguous ) {
data->send_buf = &((char*)data->buf)[data->total_bytes_written];
}
else if (bytes_sent) {
/* allocate a send buffer and copy the data that needs
to be sent into it in case the data is non-contigous
in memory */
ptrdiff_t mem_address;
size_t remaining = 0;
size_t temp_position = 0;

data->send_buf = malloc (bytes_sent);
if (NULL == data->send_buf) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;

if (bytes_sent) {
size_t remaining = bytes_sent;
int block_index = -1;
int blocklength_size = INIT_LEN;

ptrdiff_t send_mem_address = NULL;
ompi_datatype_t *newType = MPI_DATATYPE_NULL;
int* blocklength_proc = (int *) calloc (blocklength_size, sizeof (int));
ptrdiff_t* displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));

if (NULL == blocklength_proc || NULL == displs_proc ) {
opal_output (1, "OUT OF MEMORY\n");
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}

while (remaining) {
block_index++;

if(0 == block_index) {
send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
data->current_position;
}

remaining = bytes_sent;

while (remaining) {
mem_address = (ptrdiff_t)
(data->decoded_iov[data->iov_index].iov_base) + data->current_position;

if (remaining >=
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *)mem_address,
data->decoded_iov[data->iov_index].iov_len - data->current_position);
remaining = remaining -
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
temp_position = temp_position +
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
data->iov_index = data->iov_index + 1;
data->current_position = 0;
}
else {
memcpy (data->send_buf+temp_position,
(IOVBASE_TYPE *) mem_address,
remaining);
data->current_position += remaining;
remaining = 0;
}
else {
// Reallocate more memory if blocklength_size is not enough
if(0 == block_index % INIT_LEN) {
blocklength_size += INIT_LEN;
blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int));
displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
}
displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
data->current_position - send_mem_address;
}
}
data->total_bytes_written += bytes_sent;
data->bytes_sent = bytes_sent;
/* Gather the sendbuf from each process in appropritate locations in
aggregators*/

if (bytes_sent){
ret = MCA_PML_CALL(isend(data->send_buf,
bytes_sent,
MPI_BYTE,

if (remaining >=
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {

blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
data->current_position;
remaining = remaining -
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
data->iov_index = data->iov_index + 1;
data->current_position = 0;
}
else {
blocklength_proc[block_index] = remaining;
data->current_position += remaining;
remaining = 0;
}
}

data->total_bytes_written += bytes_sent;
data->bytes_sent = bytes_sent;

if ( 0 <= block_index ) {
ompi_datatype_create_hindexed(block_index+1,
blocklength_proc,
displs_proc,
MPI_BYTE,
&newType);
ompi_datatype_commit(&newType);

ret = MCA_PML_CALL(isend((char *)send_mem_address,
1,
newType,
aggregator,
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
MCA_PML_BASE_SEND_STANDARD,
data->comm,
&reqs[data->procs_per_group]));


if ( OMPI_SUCCESS != ret ){
if (OMPI_SUCCESS != ret){
goto exit;
}
}
if ( MPI_DATATYPE_NULL != newType ) {
ompi_datatype_destroy(&newType);
}

free(blocklength_proc);
free(displs_proc);
}


#if DEBUG_ON
if (aggregator == rank){
Expand Down