Skip to content
Browse files

LWKT message ports contain a number of function pointers which abstract

their backend operation.

* Add a new function, mp_getport(), which takes over the functionality
  of lwkt_getport().

* Formalize the default backend and rename it the 'thread' port backend, used
  when a message port will only be drained by a single thread.  This backend
  is able to use critical sections and IPI messages to handle races.

* Fix a small timing window in the thread port backend where replying a
  synchronous message request from a different cpu may fail to wake up
  the originator who is waiting for the message completion.

* Abstract-out the message port initialization code and clean up related
  code pollution.

* Add a new backend called the 'spin' port backend.  This backend can be
  used if a message port might be drained by several different threads.
  For example, this would allow us to use a message port as part of a
  file pointer / file descriptor construct.

* Add a boot-time tunable, lwkt.use_spin_port (defaults to off) which
  forces spin ports to be used instead of thread ports for the per-thread
  message port.  This is used only for debugging.
  • Loading branch information...
1 parent f4cf4cb commit fb0f29c4a99dddd488cd40818c57b035b5406d84 Matthew Dillon committed May 24, 2007
View
5 sys/dev/acpica5/Osd/OsdSchedule.c
@@ -25,7 +25,7 @@
* SUCH DAMAGE.
*
* $FreeBSD: src/sys/dev/acpica/Osd/OsdSchedule.c,v 1.28 2004/05/06 02:18:58 njl Exp $
- * $DragonFly: src/sys/dev/acpica5/Osd/OsdSchedule.c,v 1.8 2007/05/23 08:57:10 dillon Exp $
+ * $DragonFly: src/sys/dev/acpica5/Osd/OsdSchedule.c,v 1.9 2007/05/24 05:51:28 dillon Exp $
*/
/*
@@ -80,8 +80,7 @@ struct lwkt_port acpi_afree_rport;
int
acpi_task_thread_init(void)
{
- lwkt_initport(&acpi_afree_rport, NULL);
- acpi_afree_rport.mp_replyport = acpi_autofree_reply;
+ lwkt_initport_replyonly(&acpi_afree_rport, acpi_autofree_reply);
kthread_create(acpi_task_thread, NULL, &acpi_task_td,
0, 0, "acpi_task");
return (0);
View
581 sys/kern/lwkt_msgport.c
@@ -34,7 +34,7 @@
* NOTE! This file may be compiled for userland libraries as well as for
* the kernel.
*
- * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.40 2007/05/23 08:57:04 dillon Exp $
+ * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.41 2007/05/24 05:51:27 dillon Exp $
*/
#ifdef _KERNEL
@@ -64,6 +64,7 @@
#include <sys/thread2.h>
#include <sys/msgport2.h>
+#include <sys/spinlock2.h>
#include <machine/stdarg.h>
#include <machine/cpufunc.h>
@@ -94,11 +95,6 @@ MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
* MESSAGE FUNCTIONS *
************************************************************************/
-#ifdef SMP
-static void lwkt_replyport_remote(lwkt_msg_t msg);
-static void lwkt_putport_remote(lwkt_msg_t msg);
-#endif
-
/*
* lwkt_sendmsg()
*
@@ -149,36 +145,173 @@ lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg)
return(error);
}
+/*
+ * lwkt_forwardmsg()
+ *
+ * Forward a message received on one port to another port.
+ */
+int
+lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+ int error;
+
+ crit_enter();
+ KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
+ if ((error = port->mp_putport(port, msg)) != EASYNC)
+ lwkt_replymsg(msg, error);
+ crit_exit();
+ return(error);
+}
+
+/*
+ * lwkt_abortmsg()
+ *
+ * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
+ * The caller must ensure that the message will not be both replied AND
+ * destroyed while the abort is in progress.
+ *
+ * This function issues a callback which might block!
+ */
+void
+lwkt_abortmsg(lwkt_msg_t msg)
+{
+ /*
+ * A critical section protects us from reply IPIs on this cpu.
+ */
+ crit_enter();
+
+ /*
+ * Shortcut the operation if the message has already been returned.
+ * The callback typically constructs a lwkt_msg with the abort request,
+ * issues it synchronously, and waits for completion. The callback
+ * is not required to actually abort the message and the target port,
+ * upon receiving an abort request message generated by the callback
+ * should check whether the original message has already completed or
+ * not.
+ */
+ if (msg->ms_flags & MSGF_ABORTABLE) {
+ if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
+ msg->ms_abortfn(msg);
+ }
+ crit_exit();
+}
+
/************************************************************************
- * PORT FUNCTIONS *
+ * PORT INITIALIZATION API *
************************************************************************/
+static void *lwkt_thread_getport(lwkt_port_t port);
+static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
+static void *lwkt_thread_waitport(lwkt_port_t port, lwkt_msg_t msg);
+static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
+
+static void *lwkt_spin_getport(lwkt_port_t port);
+static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
+static void *lwkt_spin_waitport(lwkt_port_t port, lwkt_msg_t msg);
+static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
+
+static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
+static void *lwkt_panic_getport(lwkt_port_t port);
+static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
+static void *lwkt_panic_waitport(lwkt_port_t port, lwkt_msg_t msg);
+static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
+
/*
- * lwkt_initport()
- *
- * Initialize a port for use and assign it to the specified thread.
- * The default reply function is to return the message to the originator.
+ * Core port initialization (internal)
*/
+static __inline
void
-lwkt_initport(lwkt_port_t port, thread_t td)
+_lwkt_initport(lwkt_port_t port,
+ void *(*gportfn)(lwkt_port_t),
+ int (*pportfn)(lwkt_port_t, lwkt_msg_t),
+ void *(*wportfn)(lwkt_port_t, lwkt_msg_t),
+ void (*rportfn)(lwkt_port_t, lwkt_msg_t))
{
bzero(port, sizeof(*port));
TAILQ_INIT(&port->mp_msgq);
- port->mp_td = td;
- port->mp_putport = lwkt_default_putport;
- port->mp_waitport = lwkt_default_waitport;
- port->mp_replyport = lwkt_default_replyport;
+ port->mp_getport = gportfn;
+ port->mp_putport = pportfn;
+ port->mp_waitport = wportfn;
+ port->mp_replyport = rportfn;
+}
+
+/*
+ * lwkt_initport_thread()
+ *
+ * Initialize a port for use by a particular thread. The port may
+ * only be used by <td>.
+ */
+void
+lwkt_initport_thread(lwkt_port_t port, thread_t td)
+{
+ _lwkt_initport(port,
+ lwkt_thread_getport,
+ lwkt_thread_putport,
+ lwkt_thread_waitport,
+ lwkt_thread_replyport);
+ port->mpu_td = td;
+}
+
+/*
+ * lwkt_initport_spin()
+ *
+ * Initialize a port for use with descriptors that might be accessed
+ * via multiple LWPs, processes, or threads. Has somewhat more
+ * overhead then thread ports.
+ */
+void
+lwkt_initport_spin(lwkt_port_t port)
+{
+ _lwkt_initport(port,
+ lwkt_spin_getport,
+ lwkt_spin_putport,
+ lwkt_spin_waitport,
+ lwkt_spin_replyport);
+ spin_init(&port->mpu_spin);
}
/*
* Similar to the standard initport, this function simply marks the message
* as being done and does not attempt to return it to an originating port.
*/
void
-lwkt_initport_null_rport(lwkt_port_t port, thread_t td)
+lwkt_initport_replyonly_null(lwkt_port_t port)
{
- lwkt_initport(port, td);
- port->mp_replyport = lwkt_null_replyport;
+ _lwkt_initport(port,
+ lwkt_panic_getport,
+ lwkt_panic_putport,
+ lwkt_panic_waitport,
+ lwkt_null_replyport);
+}
+
+/*
+ * Initialize a reply-only port, typically used as a message sink. Such
+ * ports can only be used as a reply port.
+ */
+void
+lwkt_initport_replyonly(lwkt_port_t port,
+ void (*rportfn)(lwkt_port_t, lwkt_msg_t))
+{
+ _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
+ lwkt_panic_waitport, rportfn);
+}
+
+void
+lwkt_initport_putonly(lwkt_port_t port,
+ int (*pportfn)(lwkt_port_t, lwkt_msg_t))
+{
+ _lwkt_initport(port, lwkt_panic_getport, pportfn,
+ lwkt_panic_waitport, lwkt_panic_replyport);
+}
+
+void
+lwkt_initport_panic(lwkt_port_t port)
+{
+ _lwkt_initport(port,
+ lwkt_panic_getport,
+ lwkt_panic_putport,
+ lwkt_panic_waitport,
+ lwkt_panic_replyport);
}
/*
@@ -202,19 +335,16 @@ _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
msg->ms_flags &= ~MSGF_QUEUED;
}
-void *
-lwkt_getport(lwkt_port_t port)
-{
- lwkt_msg_t msg;
-
- KKASSERT(port->mp_td == curthread);
-
- crit_enter_quick(port->mp_td);
- if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
- _lwkt_pullmsg(port, msg);
- crit_exit_quick(port->mp_td);
- return(msg);
-}
+/************************************************************************
+ * THREAD PORT BACKEND *
+ ************************************************************************
+ *
+ * This backend is used when the port a message is retrieved from is owned
+ * by a single thread (the calling thread). Messages are IPId to the
+ * correct cpu before being enqueued to a port. Note that this is fairly
+ * optimal since scheduling would have had to do an IPI anyway if the
+ * message were headed to a different cpu.
+ */
#ifdef SMP
@@ -224,84 +354,108 @@ lwkt_getport(lwkt_port_t port)
*/
static
void
-lwkt_replyport_remote(lwkt_msg_t msg)
+lwkt_thread_replyport_remote(lwkt_msg_t msg)
{
lwkt_port_t port = msg->ms_reply_port;
+ /*
+ * Chase any thread migration that occurs
+ */
+ if (port->mpu_td->td_gd != mycpu) {
+ lwkt_send_ipiq(port->mpu_td->td_gd,
+ (ipifunc1_t)lwkt_thread_replyport_remote, msg);
+ return;
+ }
+
+ /*
+ * Cleanup
+ */
#ifdef INVARIANTS
KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
msg->ms_flags &= ~MSGF_INTRANSIT;
#endif
- TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
- msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
+ if (msg->ms_flags & MSGF_SYNC) {
+ msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
+ } else {
+ TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
+ msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
+ }
if (port->mp_flags & MSGPORTF_WAITING)
- lwkt_schedule(port->mp_td);
+ lwkt_schedule(port->mpu_td);
}
#endif
/*
- * lwkt_default_replyport() - Backend to lwkt_replymsg()
+ * lwkt_thread_replyport() - Backend to lwkt_replymsg()
*
* Called with the reply port as an argument but in the context of the
- * original target port.
+ * original target port. Completion must occur on the target port's
+ * cpu.
*
* The critical section protects us from IPIs on the this CPU.
*/
void
-lwkt_default_replyport(lwkt_port_t port, lwkt_msg_t msg)
+lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
{
KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
- crit_enter();
if (msg->ms_flags & MSGF_SYNC) {
/*
* If a synchronous completion has been requested, just wakeup
* the message without bothering to queue it to the target port.
+ *
+ * Assume the target thread is non-preemptive, so no critical
+ * section is required.
*/
- msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
- if (port->mp_flags & MSGPORTF_WAITING)
- lwkt_schedule(port->mp_td);
+#ifdef SMP
+ if (port->mpu_td->td_gd == mycpu) {
+#endif
+ msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
+ if (port->mp_flags & MSGPORTF_WAITING)
+ lwkt_schedule(port->mpu_td);
+#ifdef SMP
+ } else {
+#ifdef INVARIANTS
+ msg->ms_flags |= MSGF_INTRANSIT;
+#endif
+ msg->ms_flags |= MSGF_REPLY;
+ lwkt_send_ipiq(port->mpu_td->td_gd,
+ (ipifunc1_t)lwkt_thread_replyport_remote, msg);
+ }
+#endif
} else {
/*
* If an asynchronous completion has been requested the message
* must be queued to the reply port. MSGF_REPLY cannot be set
* until the message actually gets queued.
+ *
+ * A critical section is required to interlock the port queue.
*/
#ifdef SMP
- if (port->mp_td->td_gd == mycpu) {
+ if (port->mpu_td->td_gd == mycpu) {
#endif
+ crit_enter();
TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
if (port->mp_flags & MSGPORTF_WAITING)
- lwkt_schedule(port->mp_td);
+ lwkt_schedule(port->mpu_td);
+ crit_exit();
#ifdef SMP
} else {
#ifdef INVARIANTS
msg->ms_flags |= MSGF_INTRANSIT;
#endif
msg->ms_flags |= MSGF_REPLY;
- lwkt_send_ipiq(port->mp_td->td_gd,
- (ipifunc1_t)lwkt_replyport_remote, msg);
+ lwkt_send_ipiq(port->mpu_td->td_gd,
+ (ipifunc1_t)lwkt_thread_replyport_remote, msg);
}
#endif
}
- crit_exit();
-}
-
-/*
- * You can point a port's reply vector at this function if you just want
- * the message marked done, without any queueing or signaling. This is
- * often used for structure-embedded messages.
- */
-void
-lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
-{
- msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
}
/*
- * lwkt_default_putport() - Backend to lwkt_beginmsg()
+ * lwkt_thread_putport() - Backend to lwkt_beginmsg()
*
* Called with the target port as an argument but in the context of the
* reply port. This function always implements an asynchronous put to
@@ -314,103 +468,84 @@ lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
static
void
-lwkt_putport_remote(lwkt_msg_t msg)
+lwkt_thread_putport_remote(lwkt_msg_t msg)
{
lwkt_port_t port = msg->ms_target_port;
+ /*
+ * Chase any thread migration that occurs
+ */
+ if (port->mpu_td->td_gd != mycpu) {
+ lwkt_send_ipiq(port->mpu_td->td_gd,
+ (ipifunc1_t)lwkt_thread_putport_remote, msg);
+ return;
+ }
+
+ /*
+ * Cleanup
+ */
#ifdef INVARIANTS
KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
msg->ms_flags &= ~MSGF_INTRANSIT;
#endif
TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
msg->ms_flags |= MSGF_QUEUED;
if (port->mp_flags & MSGPORTF_WAITING)
- lwkt_schedule(port->mp_td);
+ lwkt_schedule(port->mpu_td);
}
#endif
+static
int
-lwkt_default_putport(lwkt_port_t port, lwkt_msg_t msg)
+lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
{
KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
msg->ms_target_port = port;
- crit_enter();
#ifdef SMP
- if (port->mp_td->td_gd == mycpu) {
+ if (port->mpu_td->td_gd == mycpu) {
#endif
+ crit_enter();
msg->ms_flags |= MSGF_QUEUED;
TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
if (port->mp_flags & MSGPORTF_WAITING)
- lwkt_schedule(port->mp_td);
+ lwkt_schedule(port->mpu_td);
+ crit_exit();
#ifdef SMP
} else {
#ifdef INVARIANTS
msg->ms_flags |= MSGF_INTRANSIT;
#endif
- lwkt_send_ipiq(port->mp_td->td_gd,
- (ipifunc1_t)lwkt_putport_remote, msg);
+ lwkt_send_ipiq(port->mpu_td->td_gd,
+ (ipifunc1_t)lwkt_thread_putport_remote, msg);
}
#endif
- crit_exit();
return (EASYNC);
}
/*
- * lwkt_forwardmsg()
+ * lwkt_thread_getport()
*
- * Forward a message received on one port to another port.
+ * Retrieve the next message from the port or NULL if no messages
+ * are ready.
*/
-int
-lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
-{
- int error;
-
- crit_enter();
- KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
- if ((error = port->mp_putport(port, msg)) != EASYNC)
- lwkt_replymsg(msg, error);
- crit_exit();
- return(error);
-}
-
-/*
- * lwkt_abortmsg()
- *
- * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
- * The caller must ensure that the message will not be both replied AND
- * destroyed while the abort is in progress.
- *
- * This function issues a callback which might block!
- */
-
-void
-lwkt_abortmsg(lwkt_msg_t msg)
+void *
+lwkt_thread_getport(lwkt_port_t port)
{
- /*
- * A critical section protects us from reply IPIs on this cpu.
- */
- crit_enter();
+ lwkt_msg_t msg;
- /*
- * Shortcut the operation if the message has already been returned.
- * The callback typically constructs a lwkt_msg with the abort request,
- * issues it synchronously, and waits for completion. The callback
- * is not required to actually abort the message and the target port,
- * upon receiving an abort request message generated by the callback
- * should check whether the original message has already completed or
- * not.
- */
- if (msg->ms_flags & MSGF_ABORTABLE) {
- if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
- msg->ms_abortfn(msg);
- }
- crit_exit();
+ KKASSERT(port->mpu_td == curthread);
+
+ crit_enter_quick(port->mpu_td);
+ if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
+ _lwkt_pullmsg(port, msg);
+ crit_exit_quick(port->mpu_td);
+ return(msg);
}
/*
- * lwkt_default_waitport()
+ * lwkt_thread_waitport()
*
* If msg is NULL, dequeue the next message from the port's message
* queue, block until a message is ready. This function never
@@ -426,12 +561,12 @@ lwkt_abortmsg(lwkt_msg_t msg)
* on a single port. The port must be owned by the caller.
*/
void *
-lwkt_default_waitport(lwkt_port_t port, lwkt_msg_t msg)
+lwkt_thread_waitport(lwkt_port_t port, lwkt_msg_t msg)
{
thread_t td = curthread;
int sentabort;
- KKASSERT(port->mp_td == td);
+ KKASSERT(port->mpu_td == td);
crit_enter_quick(td);
if (msg == NULL) {
/*
@@ -463,7 +598,7 @@ lwkt_default_waitport(lwkt_port_t port, lwkt_msg_t msg)
* completion after sending an abort request.
*/
if (msg->ms_flags & MSGF_PCATCH) {
- if (sentabort == 0 && CURSIG(port->mp_td->td_lwp)) {
+ if (sentabort == 0 && CURSIG(port->mpu_td->td_lwp)) {
sentabort = 1;
lwkt_abortmsg(msg);
continue;
@@ -488,12 +623,214 @@ lwkt_default_waitport(lwkt_port_t port, lwkt_msg_t msg)
* Once the MSGF_DONE bit is set, the message is stable. We
* can just check MSGF_QUEUED to determine
*/
- if (msg->ms_flags & MSGF_QUEUED) {
- msg->ms_flags &= ~MSGF_QUEUED;
- TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
- }
+ if (msg->ms_flags & MSGF_QUEUED)
+ _lwkt_pullmsg(port, msg);
}
crit_exit_quick(td);
return(msg);
}
+/************************************************************************
+ * SPIN PORT BACKEND *
+ ************************************************************************
+ *
+ * This backend uses spinlocks instead of making assumptions about which
+ * thread is accessing the port. It must be used when a port is not owned
+ * by a particular thread. This is less optimal then thread ports but
+ * you don't have a choice if there are multiple threads accessing the port.
+ */
+
+static
+void *
+lwkt_spin_getport(lwkt_port_t port)
+{
+ lwkt_msg_t msg;
+
+ spin_lock_wr(&port->mpu_spin);
+ if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
+ _lwkt_pullmsg(port, msg);
+ spin_unlock_wr(&port->mpu_spin);
+ return(msg);
+}
+
+static
+int
+lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
+{
+ int dowakeup;
+
+ KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
+
+ msg->ms_target_port = port;
+ spin_lock_wr(&port->mpu_spin);
+ msg->ms_flags |= MSGF_QUEUED;
+ TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
+ dowakeup = 0;
+ if (port->mp_flags & MSGPORTF_WAITING) {
+ port->mp_flags &= ~MSGPORTF_WAITING;
+ dowakeup = 1;
+ }
+ spin_unlock_wr(&port->mpu_spin);
+ if (dowakeup)
+ wakeup(port);
+ return (EASYNC);
+}
+
+static
+void *
+lwkt_spin_waitport(lwkt_port_t port, lwkt_msg_t msg)
+{
+ int sentabort;
+ int error;
+
+ spin_lock_wr(&port->mpu_spin);
+ if (msg == NULL) {
+ /*
+ * Wait for any message
+ */
+ while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
+ port->mp_flags |= MSGPORTF_WAITING;
+ msleep(port, &port->mpu_spin, 0, "wport", 0);
+ /* see note at the top on the MSGPORTF_WAITING flag */
+ }
+ _lwkt_pullmsg(port, msg);
+ } else {
+ /*
+ * Wait for a specific message.
+ */
+ KKASSERT(msg->ms_reply_port == port);
+ if ((msg->ms_flags & MSGF_DONE) == 0) {
+ sentabort = 0;
+ while ((msg->ms_flags & MSGF_DONE) == 0) {
+ void *won;
+
+ /*
+ * If message was sent synchronously from the beginning
+ * the wakeup will be on the message structure, else it
+ * will be on the port structure.
+ */
+ if (msg->ms_flags & MSGF_SYNC) {
+ won = msg;
+ } else {
+ won = port;
+ port->mp_flags |= MSGPORTF_WAITING;
+ }
+
+ /*
+ * MSGF_PCATCH is only set by processes which wish to
+ * abort the message they are blocked on when a signal
+ * occurs. Note that we still must wait for message
+ * completion after sending an abort request.
+ *
+ * XXX ERESTART not handled.
+ */
+ if ((msg->ms_flags & MSGF_PCATCH) && sentabort == 0) {
+ error = msleep(won, &port->mpu_spin, PCATCH, "wmsg", 0);
+ if (error) {
+ sentabort = error;
+ spin_unlock_wr(&port->mpu_spin);
+ lwkt_abortmsg(msg);
+ spin_lock_wr(&port->mpu_spin);
+ }
+ } else {
+ error = msleep(won, &port->mpu_spin, 0, "wmsg", 0);
+ }
+ /* see note at the top on the MSGPORTF_WAITING flag */
+ }
+ /*
+ * Turn EINTR into ERESTART if the signal indicates.
+ */
+ if (sentabort && msg->ms_error == EINTR)
+ msg->ms_error = sentabort;
+
+ /*
+ * Once the MSGF_DONE bit is set, the message is stable. We
+ * can just check MSGF_QUEUED to determine
+ */
+ if (msg->ms_flags & MSGF_QUEUED)
+ _lwkt_pullmsg(port, msg);
+ }
+ }
+ spin_unlock_wr(&port->mpu_spin);
+ return(msg);
+}
+
+static
+void
+lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
+{
+ int dowakeup;
+
+ KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
+
+ if (msg->ms_flags & MSGF_SYNC) {
+ /*
+ * If a synchronous completion has been requested, just wakeup
+ * the message without bothering to queue it to the target port.
+ */
+ msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
+ wakeup(msg);
+ } else {
+ /*
+ * If an asynchronous completion has been requested the message
+ * must be queued to the reply port. MSGF_REPLY cannot be set
+ * until the message actually gets queued.
+ */
+ spin_lock_wr(&port->mpu_spin);
+ TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
+ msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
+ dowakeup = 0;
+ if (port->mp_flags & MSGPORTF_WAITING) {
+ port->mp_flags &= ~MSGPORTF_WAITING;
+ dowakeup = 1;
+ }
+ spin_unlock_wr(&port->mpu_spin);
+ if (dowakeup)
+ wakeup(port);
+ }
+}
+
+/************************************************************************
+ * PANIC AND SPECIAL PORT FUNCTIONS *
+ ************************************************************************/
+
+/*
+ * You can point a port's reply vector at this function if you just want
+ * the message marked done, without any queueing or signaling. This is
+ * often used for structure-embedded messages.
+ */
+static
+void
+lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
+{
+ msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
+}
+
+static
+void *
+lwkt_panic_getport(lwkt_port_t port)
+{
+ panic("lwkt_getport() illegal on port %p", port);
+}
+
+static
+int
+lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
+{
+ panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
+}
+
+static
+void *
+lwkt_panic_waitport(lwkt_port_t port, lwkt_msg_t msg)
+{
+ panic("port %p cannot be waited on msg %p", port, msg);
+}
+
+static
+void
+lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
+{
+ panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
+}
+
View
14 sys/kern/lwkt_thread.c
@@ -31,7 +31,7 @@
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
- * $DragonFly: src/sys/kern/lwkt_thread.c,v 1.107 2007/05/01 00:05:18 dillon Exp $
+ * $DragonFly: src/sys/kern/lwkt_thread.c,v 1.108 2007/05/24 05:51:27 dillon Exp $
*/
/*
@@ -103,9 +103,16 @@ static __int64_t preempt_miss = 0;
static __int64_t preempt_weird = 0;
static __int64_t token_contention_count = 0;
static __int64_t mplock_contention_count = 0;
+static int lwkt_use_spin_port;
#ifdef _KERNEL
+/*
+ * We can make all thread ports use the spin backend instead of the thread
+ * backend. This should only be set to debug the spin backend.
+ */
+TUNABLE_INT("lwkt.use_spin_port", &lwkt_use_spin_port);
+
SYSCTL_INT(_lwkt, OID_AUTO, untimely_switch, CTLFLAG_RW, &untimely_switch, 0, "");
#ifdef INVARIANTS
SYSCTL_INT(_lwkt, OID_AUTO, panic_on_cscount, CTLFLAG_RW, &panic_on_cscount, 0, "");
@@ -334,7 +341,10 @@ lwkt_init_thread(thread_t td, void *stack, int stksize, int flags,
if ((flags & TDF_MPSAFE) == 0)
td->td_mpcount = 1;
#endif
- lwkt_initport(&td->td_msgport, td);
+ if (lwkt_use_spin_port)
+ lwkt_initport_spin(&td->td_msgport);
+ else
+ lwkt_initport_thread(&td->td_msgport, td);
pmap_init_thread(td);
#ifdef SMP
/*
View
45 sys/net/netisr.c
@@ -35,7 +35,7 @@
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
- * $DragonFly: src/sys/net/netisr.c,v 1.31 2007/05/23 08:57:10 dillon Exp $
+ * $DragonFly: src/sys/net/netisr.c,v 1.32 2007/05/24 05:51:29 dillon Exp $
*/
#include <sys/param.h>
@@ -73,6 +73,8 @@ lwkt_port netisr_adone_rport;
lwkt_port netisr_apanic_rport;
lwkt_port netisr_sync_port;
+static int (*netmsg_fwd_port_fn)(lwkt_port_t, lwkt_msg_t);
+
/*
* netisr_afree_rport replymsg function, only used to handle async
* messages which the sender has abandoned to their fate.
@@ -83,37 +85,29 @@ netisr_autofree_reply(lwkt_port_t port, lwkt_msg_t msg)
kfree(msg, M_LWKTMSG);
}
-static void
-netisr_autopanic_reply(lwkt_port_t port, lwkt_msg_t msg)
-{
- panic("unreplyable msg %p was replied!", msg);
-}
-
/*
- * We must construct a custom putport function (which runs in the context
- * of the message originator)
- *
- * Our custom putport must check for self-referential messages, which can
- * occur when the so_upcall routine is called (e.g. nfs). Self referential
- * messages are executed synchronously. However, we must panic if the message
- * is not marked DONE on completion because the self-referential case cannot
- * block without deadlocking.
+ * We need a custom putport function to handle the case where the
+ * message target is the current thread's message port. This case
+ * can occur when the TCP or UDP stack does a direct callback to NFS and NFS
+ * then turns around and executes a network operation synchronously.
*
- * note: ms_target_port does not need to be set when returning a synchronous
- * error code.
+ * To prevent deadlocking, we must execute these self-referential messages
+ * synchronously, effectively turning the message into a glorified direct
+ * procedure call back into the protocol stack. The operation must be
+ * complete on return or we will deadlock, so panic if it isn't.
*/
static int
netmsg_put_port(lwkt_port_t port, lwkt_msg_t lmsg)
{
netmsg_t netmsg = (void *)lmsg;
- if ((lmsg->ms_flags & MSGF_SYNC) && port->mp_td == curthread) {
+ if ((lmsg->ms_flags & MSGF_SYNC) && port == &curthread->td_msgport) {
netmsg->nm_dispatch(netmsg);
if ((lmsg->ms_flags & MSGF_DONE) == 0)
panic("netmsg_put_port: self-referential deadlock on netport");
return(EASYNC);
} else {
- return(lwkt_default_putport(port, lmsg));
+ return(netmsg_fwd_port_fn(port, lmsg));
}
}
@@ -167,17 +161,15 @@ netisr_init(void)
* the message as being done. The netisr_apanic_rport panics if
* the message is replied to.
*/
- lwkt_initport(&netisr_afree_rport, NULL);
- netisr_afree_rport.mp_replyport = netisr_autofree_reply;
- lwkt_initport_null_rport(&netisr_adone_rport, NULL);
- lwkt_initport(&netisr_apanic_rport, NULL);
- netisr_apanic_rport.mp_replyport = netisr_autopanic_reply;
+ lwkt_initport_replyonly(&netisr_afree_rport, netisr_autofree_reply);
+ lwkt_initport_replyonly_null(&netisr_adone_rport);
+ lwkt_initport_panic(&netisr_apanic_rport);
/*
* The netisr_syncport is a special port which executes the message
* synchronously and waits for it if EASYNC is returned.
*/
- lwkt_initport(&netisr_sync_port, NULL);
+ lwkt_initport_putonly(&netisr_sync_port, netmsg_sync_putport);
netisr_sync_port.mp_putport = netmsg_sync_putport;
}
@@ -197,6 +189,9 @@ netmsg_service_port_init(lwkt_port_t port)
* Override the putport function. Our custom function checks for
* self-references and executes such commands synchronously.
*/
+ if (netmsg_fwd_port_fn == NULL)
+ netmsg_fwd_port_fn = port->mp_putport;
+ KKASSERT(netmsg_fwd_port_fn == port->mp_putport);
port->mp_putport = netmsg_put_port;
/*
View
4 sys/netinet/tcp_syncache.c
@@ -69,7 +69,7 @@
* SUCH DAMAGE.
*
* $FreeBSD: src/sys/netinet/tcp_syncache.c,v 1.5.2.14 2003/02/24 04:02:27 silby Exp $
- * $DragonFly: src/sys/netinet/tcp_syncache.c,v 1.30 2007/05/23 08:57:09 dillon Exp $
+ * $DragonFly: src/sys/netinet/tcp_syncache.c,v 1.31 2007/05/24 05:51:29 dillon Exp $
*/
#include "opt_inet6.h"
@@ -305,7 +305,7 @@ syncache_init(void)
}
tcp_syncache.hashmask = tcp_syncache.hashsize - 1;
- lwkt_initport_null_rport(&syncache_null_rport, NULL);
+ lwkt_initport_replyonly_null(&syncache_null_rport);
for (cpu = 0; cpu < ncpus2; cpu++) {
struct tcp_syncache_percpu *syncache_percpu;
View
4 sys/netinet/tcp_usrreq.c
@@ -65,7 +65,7 @@
*
* From: @(#)tcp_usrreq.c 8.2 (Berkeley) 1/3/94
* $FreeBSD: src/sys/netinet/tcp_usrreq.c,v 1.51.2.17 2002/10/11 11:46:44 ume Exp $
- * $DragonFly: src/sys/netinet/tcp_usrreq.c,v 1.43 2007/05/23 08:57:09 dillon Exp $
+ * $DragonFly: src/sys/netinet/tcp_usrreq.c,v 1.44 2007/05/24 05:51:29 dillon Exp $
*/
#include "opt_ipsec.h"
@@ -980,7 +980,7 @@ tcp_connect(struct tcpcb *tp, struct sockaddr *nam, struct thread *td)
inp->inp_laddr.s_addr : if_sin->sin_addr.s_addr,
inp->inp_lport);
- if (port->mp_td != curthread) {
+ if (port != &curthread->td_msgport) {
struct netmsg_tcp_connect msg;
netmsg_init(&msg.nm_netmsg, &curthread->td_msgport, 0,
View
51 sys/sys/msgport.h
@@ -3,7 +3,7 @@
*
* Implements LWKT messages and ports.
*
- * $DragonFly: src/sys/sys/msgport.h,v 1.24 2007/05/23 08:56:59 dillon Exp $
+ * $DragonFly: src/sys/sys/msgport.h,v 1.25 2007/05/24 05:51:28 dillon Exp $
*/
#ifndef _SYS_MSGPORT_H_
@@ -15,6 +15,9 @@
#ifndef _SYS_STDINT_H_
#include <sys/stdint.h>
#endif
+#ifndef _SYS_SPINLOCK_H_
+#include <sys/spinlock.h>
+#endif
#ifdef _KERNEL
@@ -128,38 +131,58 @@ MALLOC_DECLARE(M_LWKTMSG);
* - reply a message (executed on the originating port to return a
* message to it). This can be rather involved if abort is to be
* supported, see lwkt_default_replyport(). Generally speaking
- * one sets MSGF_DONE. If MSGF_ASYNC is set the message is queued
- * to the port, else the port's thread is scheduled.
+ * one sets MSGF_DONE. If MSGF_SYNC is set the message is not
+ * queued to the port and the reply code wakes up the waiter
+ * directly.
+ *
+ * The use of mp_u.td and mp_u.spin is specific to the port callback function
+ * set. Default ports are tied to specific threads and use cpu locality
+ * of reference and mp_u.td (and not mp_u.spin at all). Descriptor ports
+ * assume access via descriptors, signal interruption, etc. Such ports use
+ * mp_u.spin (and not mp_u.td at all) and may be accessed by multiple threads.
*/
typedef struct lwkt_port {
lwkt_msg_queue mp_msgq;
int mp_flags;
- int mp_unused01;
- struct thread *mp_td;
+ union {
+ struct spinlock spin;
+ struct thread *td;
+ void *data;
+ } mp_u;
+ void * (*mp_getport)(lwkt_port_t);
int (*mp_putport)(lwkt_port_t, lwkt_msg_t);
void * (*mp_waitport)(lwkt_port_t, lwkt_msg_t);
void (*mp_replyport)(lwkt_port_t, lwkt_msg_t);
} lwkt_port;
+#ifdef _KERNEL
+
+#define mpu_td mp_u.td
+#define mpu_spin mp_u.spin
+#define mpu_data mp_u.data
+
+#endif
+
#define MSGPORTF_WAITING 0x0001
/*
* These functions are good for userland as well as the kernel. The
* messaging function support for userland is provided by the kernel's
* kern/lwkt_msgport.c. The port functions are provided by userland.
*/
-void lwkt_initport(lwkt_port_t, struct thread *);
-void lwkt_initport_null_rport(lwkt_port_t, struct thread *);
+
+void lwkt_initport_thread(lwkt_port_t, struct thread *);
+void lwkt_initport_spin(lwkt_port_t);
+void lwkt_initport_panic(lwkt_port_t);
+void lwkt_initport_replyonly_null(lwkt_port_t);
+void lwkt_initport_replyonly(lwkt_port_t,
+ void (*rportfn)(lwkt_port_t, lwkt_msg_t));
+void lwkt_initport_putonly(lwkt_port_t,
+ int (*pportfn)(lwkt_port_t, lwkt_msg_t));
+
void lwkt_sendmsg(lwkt_port_t, lwkt_msg_t);
int lwkt_domsg(lwkt_port_t, lwkt_msg_t);
int lwkt_forwardmsg(lwkt_port_t, lwkt_msg_t);
void lwkt_abortmsg(lwkt_msg_t);
-void *lwkt_getport(lwkt_port_t);
-
-int lwkt_default_putport(lwkt_port_t port, lwkt_msg_t msg);
-void *lwkt_default_waitport(lwkt_port_t port, lwkt_msg_t msg);
-void lwkt_default_replyport(lwkt_port_t port, lwkt_msg_t msg);
-void lwkt_default_abortport(lwkt_port_t port, lwkt_msg_t msg);
-void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
#endif
View
9 sys/sys/msgport2.h
@@ -3,7 +3,7 @@
*
* Implements Inlines for LWKT messages and ports.
*
- * $DragonFly: src/sys/sys/msgport2.h,v 1.13 2007/05/23 08:56:59 dillon Exp $
+ * $DragonFly: src/sys/sys/msgport2.h,v 1.14 2007/05/24 05:51:28 dillon Exp $
*/
#ifndef _SYS_MSGPORT2_H_
@@ -73,6 +73,13 @@ lwkt_replymsg(lwkt_msg_t msg, int error)
static __inline
void *
+lwkt_getport(lwkt_port_t port)
+{
+ return(port->mp_getport(port));
+}
+
+static __inline
+void *
lwkt_waitport(lwkt_port_t port, lwkt_msg_t msg)
{
return(port->mp_waitport(port, msg));

0 comments on commit fb0f29c

Please sign in to comment.
Something went wrong with that request. Please try again.