@@ -134,6 +134,203 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
}
}
void
lnet_msg_commit (lnet_msg_t *msg, int sending)
{
struct lnet_msg_container *container = &the_lnet.ln_msg_container ;
lnet_counters_t *counters = &the_lnet.ln_counters ;
/* routed message can be committed for both receiving and sending */
LASSERT (!msg->msg_tx_committed );
if (msg->msg_rx_committed ) { /* routed message, or reply for GET */
LASSERT (sending);
LASSERT (msg->msg_onactivelist );
msg->msg_tx_committed = 1 ;
return ;
}
LASSERT (!msg->msg_onactivelist );
msg->msg_onactivelist = 1 ;
cfs_list_add (&msg->msg_activelist , &container->msc_active );
counters->msgs_alloc ++;
if (counters->msgs_alloc > counters->msgs_max )
counters->msgs_max = counters->msgs_alloc ;
if (sending)
msg->msg_tx_committed = 1 ;
else
msg->msg_rx_committed = 1 ;
}
static void
lnet_msg_tx_decommit (lnet_msg_t *msg, int status)
{
lnet_counters_t *counters = &the_lnet.ln_counters ;
lnet_event_t *ev = &msg->msg_ev ;
LASSERT (msg->msg_tx_committed );
if (status != 0 )
goto out;
switch (ev->type ) {
default : /* routed message */
LASSERT (msg->msg_routing );
LASSERT (msg->msg_rx_committed );
LASSERT (ev->type == 0 );
counters->route_length += msg->msg_len ;
counters->route_count ++;
goto out;
case LNET_EVENT_PUT:
/* should have been decommitted */
LASSERT (!msg->msg_rx_committed );
/* overwritten while sending ACK */
LASSERT (msg->msg_type == LNET_MSG_ACK);
msg->msg_type = LNET_MSG_PUT; /* fix type */
break ;
case LNET_EVENT_SEND:
LASSERT (!msg->msg_rx_committed );
if (msg->msg_type == LNET_MSG_PUT)
counters->send_length += msg->msg_len ;
break ;
case LNET_EVENT_GET:
LASSERT (msg->msg_rx_committed );
/* overwritten while sending reply */
LASSERT (msg->msg_type == LNET_MSG_REPLY);
msg->msg_type = LNET_MSG_GET; /* fix type */
counters->send_length += msg->msg_len ;
break ;
}
counters->send_count ++;
out:
lnet_return_tx_credits_locked (msg);
msg->msg_tx_committed = 0 ;
}
static void
lnet_msg_rx_decommit (lnet_msg_t *msg, int status)
{
lnet_counters_t *counters = &the_lnet.ln_counters ;
lnet_event_t *ev = &msg->msg_ev ;
LASSERT (!msg->msg_tx_committed ); /* decommitted or uncommitted */
LASSERT (msg->msg_rx_committed );
if (status != 0 )
goto out;
switch (ev->type ) {
default :
LASSERT (ev->type == 0 );
LASSERT (msg->msg_routing );
goto out;
case LNET_EVENT_ACK:
LASSERT (msg->msg_type == LNET_MSG_ACK);
break ;
case LNET_EVENT_GET:
LASSERT (msg->msg_type == LNET_MSG_GET);
break ;
case LNET_EVENT_PUT:
LASSERT (msg->msg_type == LNET_MSG_PUT);
break ;
case LNET_EVENT_REPLY:
LASSERT (msg->msg_type == LNET_MSG_REPLY ||
msg->msg_type == LNET_MSG_GET); /* optimized GET */
break ;
}
counters->recv_count ++;
if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
counters->recv_length += msg->msg_wanted ;
out:
lnet_return_rx_credits_locked (msg);
msg->msg_rx_committed = 0 ;
}
void
lnet_msg_decommit (lnet_msg_t *msg, int status)
{
lnet_counters_t *counters = &the_lnet.ln_counters ;
LASSERT (msg->msg_tx_committed || msg->msg_rx_committed );
LASSERT (msg->msg_onactivelist );
if (msg->msg_tx_committed ) /* always decommit for sending first */
lnet_msg_tx_decommit (msg, status);
if (msg->msg_rx_committed )
lnet_msg_rx_decommit (msg, status);
cfs_list_del (&msg->msg_activelist );
msg->msg_onactivelist = 0 ;
counters->msgs_alloc --;
}
void
lnet_msg_attach_md (lnet_msg_t *msg, lnet_libmd_t *md,
unsigned int offset, unsigned int mlen)
{
/* Here, we attach the MD on lnet_msg and mark it busy and
* decrementing its threshold. Come what may, the lnet_msg "owns"
* the MD until a call to lnet_msg_detach_md or lnet_finalize()
* signals completion. */
LASSERT (!msg->msg_routing );
msg->msg_md = md;
if (msg->msg_receiving ) { /* commited for receiving */
msg->msg_offset = offset;
msg->msg_wanted = mlen;
}
md->md_refcount ++;
if (md->md_threshold != LNET_MD_THRESH_INF) {
LASSERT (md->md_threshold > 0 );
md->md_threshold --;
}
/* build umd in event */
lnet_md2handle (&msg->msg_ev .md_handle , md);
lnet_md_deconstruct (md, &msg->msg_ev .md );
}
void
lnet_msg_detach_md (lnet_msg_t *msg, int status)
{
lnet_libmd_t *md = msg->msg_md ;
int unlink ;
if (md == NULL )
return ;
/* Now it's safe to drop my caller's ref */
md->md_refcount --;
LASSERT (md->md_refcount >= 0 );
unlink = lnet_md_unlinkable (md);
if (md->md_eq != NULL ) {
msg->msg_ev .status = status;
msg->msg_ev .unlinked = unlink ;
lnet_eq_enqueue_event (md->md_eq , &msg->msg_ev );
}
if (unlink )
lnet_md_unlink (md);
msg->msg_md = NULL ;
}
void
lnet_complete_msg_locked (lnet_msg_t *msg)
{
@@ -146,7 +343,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg)
if (status == 0 && msg->msg_ack ) {
/* Only send an ACK if the PUT completed successfully */
lnet_return_credits_locked (msg);
lnet_msg_decommit (msg, 0 );
msg->msg_ack = 0 ;
LNET_UNLOCK ();
@@ -183,12 +380,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg)
return ;
}
lnet_return_credits_locked (msg);
LASSERT (msg->msg_onactivelist );
msg->msg_onactivelist = 0 ;
cfs_list_del (&msg->msg_activelist );
the_lnet.ln_counters .msgs_alloc --;
lnet_msg_decommit (msg, status);
lnet_msg_free_locked (msg);
}
@@ -197,7 +389,6 @@ void
lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
{
struct lnet_msg_container *container;
lnet_libmd_t *md;
int my_slot;
int i;
@@ -228,26 +419,16 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
msg->msg_ev .status = status;
md = msg->msg_md ;
if (md != NULL ) {
int unlink ;
/* Now it's safe to drop my caller's ref */
md->md_refcount --;
LASSERT (md->md_refcount >= 0 );
unlink = lnet_md_unlinkable (md);
msg->msg_ev .unlinked = unlink ;
if (md->md_eq != NULL )
lnet_eq_enqueue_event (md->md_eq , &msg->msg_ev );
if (msg->msg_md != NULL )
lnet_msg_detach_md (msg, status);
if (unlink )
lnet_md_unlink (md);
msg->msg_md = NULL ;
}
if (!msg->msg_tx_committed && !msg->msg_rx_committed ) {
LNET_UNLOCK ();
/* not commited to network yet */
LASSERT (!msg->msg_onactivelist );
lnet_msg_free (msg);
return ;
}
container = &the_lnet.ln_msg_container ;
cfs_list_add_tail (&msg->msg_list , &container->msc_finalizing );