Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

svn merge -r5765:5774 svn+ssh://svn3.okws.org/home/max/svnrepos/okws2…

…/branches/3.1.14 ; version bump to 3.1.14pre1. With this release, we're reintegrating pthreads for linux and freebsd, for another attempt to retire PTH.

git-svn-id: svn://okws.org/okws2/devel/3.1@5775 7d287422-6eea-da11-b677-00123f2a783a
  • Loading branch information...
commit 7629d1e8ff043129715adcc545cf0446b7b22ec3 1 parent e842db1
max authored
View
109 acokws.m4
@@ -138,6 +138,14 @@ AC_DEFUN([OKWS_MYSQL],
[AC_ARG_WITH(mysql,
--with-mysql=DIR Specific location of mysqlclient library)
if test "$with_mysql" != "no"; then
+ libname="mysqlclient"
+
+ dnl For true pthread multithreading, we need the threaded
+ dnl reentrant version of mysqlclient
+ if test "${ac_do_pthreads}" = "1"; then
+ libname="mysqlclient_r"
+ fi
+
ac_save_CFLAGS=$CFLAGS
ac_save_LIBS=$LIBS
cdirs="${with_mysql}/include ${with_mysql}/include/mysql \
@@ -170,8 +178,8 @@ if test "$with_mysql" != "no"; then
[for dir in "" " " $dirs; do
case $dir in
"") lflags=" " ;;
- " ") lflags="-lmysqlclient -lm" ;;
- *) lflags="-L${dir} -lmysqlclient -lm" ;;
+ " ") lflags="-l$libname -lm" ;;
+ *) lflags="-L${dir} -l$libname -lm" ;;
esac
LIBS="$ac_save_LIBS $lflags"
AC_TRY_LINK([#include "mysql.h"],
@@ -333,57 +341,27 @@ AC_SUBST(LDADD_THR)
dnl
dnl Find pthreads
dnl
-AC_DEFUN([OKWS_FIND_PTHREADS],
-[AC_ARG_WITH(pthreads,
---with-pthreads=DIR Specify location of pthreads)
-ac_save_CFLAGS=$CFLAGS
-ac_save_LIBS=$LIBS
-dirs="$with_pthreads ${prefix} ${prefix}/pthreads"
-dirs="$dirs /usr/local /usr/local/pthreads"
-AC_CACHE_CHECK(for pthread.h, sfs_cv_pthread_h,
-[for dir in " " $dirs; do
- case $dir in
- " ") iflags=" " ;;
- *) iflags="-I${dir}/include" ;;
- esac
- CFLAGS="${ac_save_CFLAGS} $iflags"
- AC_TRY_COMPILE([#include <pthread.h>], 0,
- sfs_cv_pthread_h="${iflags}"; break)
-done
-if test "$sfs_cv_pthread_h" = " "; then
- sfs_cv_pthread_h="yes"
-fi
-])
-if test "$sfs_cv_pthread_h" = "yes"; then
- sfs_cv_pthread_h=" "
-fi
-if test "${sfs_cv_pthread_h+set}"; then
- AC_CACHE_CHECK(for libpthread, sfs_cv_libpthread,
- [for dir in "" " " $dirs; do
- case $dir in
- "") lflags=" " ;;
- " ") lflags="-lpthread" ;;
- *) lflags="-L${dir}/lib -lpthread" ;;
- esac
- LIBS="$ac_save_LIBS $lflags"
- AC_TRY_LINK([#include <pthread.h>],
- pthread_create (0, 0, 0, 0);,
- sfs_cv_libpthread=$lflags; break)
- done
- if test -z ${sfs_cv_libpthread+set}; then
- sfs_cv_libpthread="no"
- fi])
+AC_DEFUN([OKWS_DO_PTHREADS],
+[AC_ARG_ENABLE(pthreads,
+--enable-pthreads Allow OKWS to use standard pthreads)
+if test `uname` = "Linux"; then
+ ac_do_pthreads=1
+ if test "${enable_pthreads}" = "no"; then
+ ac_do_pthreads=0
+ fi
+else
+ ac_do_pthreads=0
+ if test "${enable_pthreads}" = "yes" ; then
+ ac_do_pthreads=1
+ fi
fi
-if test "$sfs_cv_libpthread" != "no" && test "${sfs_cv_libpthread+set}" ; then
- CFLAGS=$ac_save_CFLAGS
- CPPFLAGS="$CPPFLAGS $sfs_cv_pthread_h"
- LIBS="$ac_save_LIBS $sfs_cv_libpthread"
- LDADD_PTHREAD="$sfs_cv_libpthread"
- use_pthreads=yes
+
+if test $ac_do_pthreads -eq 1 ; then
+ AC_DEFINE(HAVE_PTHREADS, 1, Use pthread support)
fi
-AM_CONDITIONAL(USE_PTHREADS, test "${use_pthreads}" != "no")
-LIBS=$ac_save_LIBS
-CFLAGS=$ac_save_CFLAGS
+
+AM_CONDITIONAL(USE_PTHREADS, test ${ac_do_pthreads} -eq 1)
+
])
dnl
@@ -400,19 +378,15 @@ fi])
dnl
dnl Check that some threading exists
dnl
-AC_DEFUN([OKWS_REQUIRE_THREADS],
+AC_DEFUN([OKWS_DO_THREADS],
[
-dnl AC_ARG_ENABLE(pthreads,
-dnl --disable-pthreads Disable POSIX pthreads library, [])
-
-dnl
-dnl Either PTH, or nothing, since the other threading doesn't currently
-dnl work.
-dnl
-OKWS_FIND_PTH
-
-AC_SUBST(LDADD_THR)
+if test `uname` != "Linux"
+then
+ OKWS_FIND_PTH
+fi
+OKWS_DO_PTHREADS
])
+
#include <unistd.h>
#include <grp.h>
int getgrouplist ([$*]);
@@ -765,6 +739,9 @@ if test -f ${with_okws}/Makefile -a -f ${with_okws}/okwsconf.h; then
LIBOKXML=${with_okws}/libokxml/libokxml.la
LIBWEB=${with_okws}/libweb/libweb.la
LIBAMT=${with_okws}/libamt/libamt.la
+ if test "${ac_do_pthreads}" = "1"; then
+ LIBAMT_PTHREAD=${with_okws}/libamt_pthread/libamt_pthread.la
+ fi
LIBAHTTP=${with_okws}/libahttp/libahttp.la
LIBAMYSQL=${with_okws}/libamysql/libamysql.la
LIBOKSSL=${with_okws}/libokssl/libokssl.la
@@ -788,6 +765,9 @@ elif test -f ${with_okws}/include/${okwsstem}/okwsconf.h \
LIBOKXML=${okwslibdir}/libokxml.la
LIBWEB=${okwslibdir}/libweb.la
LIBAMT=${okwslibdir}/libamt.la
+ if test "${ac_do_pthreads}" = "1"; then
+ LIBAMT_PTHREAD=${okwslibdir}/libamt_pthread.la
+ fi
LIBAHTTP=${okwslibdir}/libahttp.la
LIBAMYSQL=${okwslibdir}/libamysql.la
LIBOKSSL=${okwslibdir}/libokssl.la
@@ -825,6 +805,7 @@ AC_SUBST(LIBOKXML)
AC_SUBST(LIBAHTTP)
AC_SUBST(LIBRFN)
AC_SUBST(LIBAMT)
+AC_SUBST(LIBAMT_PTHREAD)
AC_SUBST(LIBWEB)
AC_SUBST(LIBOKSSL)
AC_SUBST(LIBAMYSQL)
@@ -836,9 +817,9 @@ AC_SUBST(OKWS_LIB_MK)
LIBS='$(LIBEXPAT) $(LIBSSL)'"$LIBS"
LDEPS='$(LIBRFN) $(LIBWEB) $(LIBOKSSL) $(LIBAOK) $(LIBOKXML) $(LIBAHTTP) $(LIBPUB)'" $LDEPS"
-LDEPS_DB='$(LIBAMYSQL) $(LIBAMT) '" $LDEPS"
+LDEPS_DB='$(LIBAMYSQL) $(LIBAMT) $(LIBAMT_PTHREAD) '" $LDEPS"
LDADD='$(LIBRFN) $(LIBWEB) $(LIBOKSSL) $(LIBAOK) $(LIBAHTTP) $(LIBOKXML) $(LIBPUB)'" $LDADD"
-LDADD_DB='$(LIBAMYSQL) $(LIBAMT) '"$LDADD "'$(LDADD_THR) $(LDADD_MYSQL)'
+LDADD_DB='$(LIBAMYSQL) $(LIBAMT) $(LIBAMT_PTHREAD)'"$LDADD "'$(LDADD_THR) $(LDADD_MYSQL)'
AC_SUBST(LDEPS)
AC_SUBST(LDADD)
View
35 configure.in
@@ -4,10 +4,10 @@ dnl Process this file with autoconf to produce a configure script.
dnl
AC_INIT(config.h.in)
-AM_INIT_AUTOMAKE(okws, 3.1.13.11)
+AM_INIT_AUTOMAKE(okws, 3.1.14pre1)
AM_CONFIG_HEADER(config.h)
-SFS_INIT_LDVERSION(2500, SFSLITE_ABI_VERSION)
+SFS_INIT_LDVERSION(2600, SFSLITE_ABI_VERSION)
AC_CONFIG_MACRO_DIR([m4])
@@ -31,20 +31,11 @@ AC_PATH_PROGS(M4, gm4 gnum4 m4, '$(top_srcdir)/missing')
AC_PATH_PROGS(UPTIME, uptime, '$(top_srcdir)/missing')
dnl
-dnl make sure that some type of threading is available.
+dnl On FreeBSD, look for PTH by default. However, PTH stopped working
+dnl on Linux with hard system calls, so use pthreads by default on
+dnl Linux, and don't give the option for PTH.
dnl
-dnl SFS_REQUIRE_THREADS
-
-dnl
-dnl Pthreads now work on all platforms; but don't interoperate
-dnl well with pth.
-dnl
-OKWS_FIND_PTHREADS
-
-dnl
-dnl PTH is still on by default but can be disabled.
-dnl
-OKWS_FIND_PTH
+OKWS_DO_THREADS
test "$PUB" || PUB='$(top_builddir)/pub/pub'
test "$XMLRPCC" || XMLRPCC='$(top_builddir)/xmlrpcc/xmlrpcc'
@@ -129,7 +120,7 @@ done
dnl
dnl library directories
dnl
-for lib in libpub libahttp libokxml libaok libamt libamysql libweb libokssl librfn ezdb/libezdb ; do
+for lib in libpub libahttp libokxml libaok libamt libamt_pthread libamysql libweb libokssl librfn ezdb/libezdb ; do
CPPFLAGS="$CPPFLAGS -I"'$(top_srcdir)'"/$lib"
done
@@ -141,6 +132,9 @@ LIBAHTTP='$(top_builddir)/libahttp/libahttp.la'
LIBOKXML='$(top_builddir)/libokxml/libokxml.la'
LIBAOK='$(top_builddir)/libaok/libaok.la'
LIBAMT='$(top_builddir)/libamt/libamt.la'
+if test "$ac_do_pthreads" = "1"; then
+ LIBAMT_PTHREAD='$(top_builddir)/libamt_pthread/libamt_pthread.la'
+fi
LIBAMYSQL='$(top_builddir)/libamysql/libamysql.la'
LIBWEB='$(top_builddir)/libweb/libweb.la'
LIBEZDB='$(top_builddir)/ezdb/libezdb/libezdb.la'
@@ -149,7 +143,7 @@ LIBOKSSL='$(top_builddir)/libokssl/libokssl.la'
LIBRFN='$(top_builddir)/librfn/librfn.la'
LDADD_PUB='$(LIBPUB) '"$LDADD"
LDADD='$(LIBEZDB) $(LIBAOK) $(LIBRFN) $(LIBWEB) $(LIBOKSSL) $(LIBOKXML) $(LIBAHTTP) $(LIBPUB) '"$LDADD"' $(LIBEXPAT) $(LIBSSL)'
-LDADD_AMT='$(LIBAMT) '"$LDADD"
+LDADD_AMT='$(LIBAMT) $(LIBAMT_PTHREAD)'"$LDADD"
if test "$enable_shared" = yes; then
LDEPS=
LDEPS_PUB=
@@ -157,10 +151,10 @@ if test "$enable_shared" = yes; then
else
LDEPS_PUB='$(LIBPUB) '"$LDEPS"
LDEPS='$(LIBWEB) $(LIBAOK) $(LIBAHTTP) $(LIBOKXML) $(LIBPUB) '"$LDEPS"
- LDEPS_AMT='$(LIBAMT) '"$LDEPS"
- LDEPS_AMYSQL='$(LIBEZDBSRV) $(LIBAMT) $(LIBAMYSQL) '"$LDEPS"
+ LDEPS_AMT='$(LIBAMT) $(LIBAMT_PTHREAD)'"$LDEPS"
+ LDEPS_AMYSQL='$(LIBEZDBSRV) $(LIBAMT) $(LIBAMT_PTHREAD) $(LIBAMYSQL) '"$LDEPS"
fi
-LDADD_AMYSQL='$(LIBEZDBSRV) $(LIBAMT) $(LIBAMYSQL) '"$LDADD"' $(LDADD_THR) $(LDADD_MYSQL) '
+LDADD_AMYSQL='$(LIBEZDBSRV) $(LIBAMT) $(LIBAMT_PTHREAD) $(LIBAMYSQL) '"$LDADD"' $(LDADD_THR) $(LDADD_MYSQL) '
dnl
@@ -199,6 +193,7 @@ AC_SUBST(LIBAHTTP)
AC_SUBST(LIBOKXML)
AC_SUBST(LIBAOK)
AC_SUBST(LIBAMT)
+AC_SUBST(LIBAMT_PTHREAD)
AC_SUBST(LIBWEB)
AC_SUBST(LIBOKSSL)
AC_SUBST(LIBRFN)
View
19 libamt/amt.h
@@ -72,7 +72,7 @@
fprintf (stderr, "%s", s.cstr ()); \
} while (0)
-typedef enum { MTD_NONE = 0, MTD_PTH = 1 } mtd_thread_typ_t;
+typedef enum { MTD_NONE = 0, MTD_PTH = 1, MTD_PTHREAD } mtd_thread_typ_t;
// MTD = Mutli-Thread Dispatcher
@@ -380,6 +380,23 @@ class ssrv_client_t {
int _port;
};
+extern mtdispatch_t *g_mtdispatch;
+
+#define GIANT_LOCK() \
+ do { \
+ if (g_mtdispatch) { \
+ g_mtdispatch->giant_lock (); \
+ } \
+ } while (0)
+
+#define GIANT_UNLOCK() \
+ do { \
+ if (g_mtdispatch) { \
+ g_mtdispatch->giant_unlock (); \
+ } \
+ } while (0)
+
+
class ssrv_t { // Synchronous Server (I.e. its threads can block)
public:
ssrv_t (newthrcb_t c, const rpc_program &p, mtd_thread_typ_t typ = MTD_PTH,
View
46 libamt/mtdispatch.C
@@ -25,6 +25,9 @@
#include "rxx.h"
#include "parseopt.h"
#include "rpc_stats.h"
+#ifdef HAVE_PTHREADS
+#include "amt_pthread.h"
+#endif
#define LONG_REPLY_TIME 2
@@ -386,7 +389,11 @@ mtd_thread_t::run ()
{
mtd_status_t rc;
- if (!init_phase0 () || !init()) {
+ GIANT_LOCK();
+ bool ok = init_phase0() && init();
+ GIANT_UNLOCK();
+
+ if (!ok) {
TWARN ("thread could not initialize");
msg_send (MTD_SHUTDOWN);
delete this;
@@ -395,7 +402,9 @@ mtd_thread_t::run ()
become_ready ();
do {
+ GIANT_LOCK();
take_svccb ();
+ GIANT_UNLOCK();
rc = msg_recv ();
} while (rc == MTD_CONTINUE);
@@ -643,20 +652,43 @@ ssrv_t::init (mtdispatch_t *m)
mtd->init ();
}
+mtdispatch_t *g_mtdispatch;
+
ssrv_t::ssrv_t (newthrcb_t c, const rpc_program &p,
mtd_thread_typ_t typ, int n, int m)
: mtd (NULL), prog (&p), load_avg (0)
{
+ bool ok = false;
+
+ if (typ == MTD_PTH) {
#ifdef HAVE_PTH
- assert (PTH_SYSCALL_HARD && ! PTH_SYSCALL_SOFT);
- mtd = New mgt_dispatch_t (c, n, m, this);
- mtd->init ();
+ assert (PTH_SYSCALL_HARD && ! PTH_SYSCALL_SOFT);
+ mtd = New mgt_dispatch_t (c, n, m, this);
+ mtd->init ();
+ ok = true;
+#else /* HAVE_PTH */
+ panic ("pth is not available with this build; "
+ "cannot continue without threads\n");
+#endif
+ }
+
+ if (typ == MTD_PTHREAD) {
+#ifdef HAVE_PTHREADS
+ mtd = New mpt_dispatch_t (c, n, m, this);
+ mtd->init ();
+ ok = true;
#else
- panic ("pth is not available with this build; "
- "cannot continue without threads\n");
-#endif /* HAVE_PTH */
+ panic ("pthreads is not available; try --enable-pthreads");
+#endif
+ }
+
+ if (!ok) {
+ panic ("no threading package available!");
+ }
+ // Keep a global pointer to it, so that we can
+ g_mtdispatch = mtd;
}
bool
View
4 libamt_pthread/amt_pthread.h
@@ -1,8 +1,7 @@
// -*-c++-*-
/* $Id: amt_pthread.h 4522 2009-06-08 19:31:51Z max $ */
-#ifndef _LIBAMT_PTHREAD__AMT_PTHREAD_H_
-#define _LIBAMT_PTHREAD__AMT_PTHREAD_H_
+#pragma once
#include "amt.h"
#include <pthread.h>
@@ -20,4 +19,3 @@ class mpt_dispatch_t : public mtdispatch_t // Posix Threads
pthread_mutex_t _giant_lock;
};
-#endif /* _LIBAMT_PTHREAD__AMT_PTHREAD_H_ */
View
18 libamysql/amysql.C
@@ -42,6 +42,10 @@ mysql_t::connect (const str &db, const str &u, const str &h,
const str &pw, u_int prt, u_long fl)
{
bool ret = true;
+
+ // Use the default mysql port if none was provided.
+ if (prt == 0) { prt = 3306; }
+
#if defined(MYSQL_VERSION_ID) && (MYSQL_VERSION_ID >= 50000)
my_bool b = 1;
if (mysql_options (&mysql, MYSQL_OPT_RECONNECT, (const char *)&b) != 0) {
@@ -49,7 +53,11 @@ mysql_t::connect (const str &db, const str &u, const str &h,
}
#endif /* MYSQL_VERSION_ID */
- if (!mysql_real_connect (&mysql, h, u, pw, db, prt, NULL, fl)) {
+ GIANT_UNLOCK();
+ MYSQL *rc = mysql_real_connect (&mysql, h, u, pw, db, prt, NULL, fl);
+ GIANT_LOCK();
+
+ if (!rc) {
err = strbuf ("connection error: ") << mysql_error (&mysql);
ret = false;
}
@@ -69,7 +77,9 @@ mysql_t::prepare (const str &q, u_int l_opts, tz_corrector_t *tzc)
sth_t r = NULL;
if (l_opts & AMYSQL_PREPARED) {
#if defined(HAVE_MYSQL_BINDFUNCS) && defined(HAVE_MYSQL_BIND)
+ GIANT_UNLOCK();
MYSQL_STMT *s = mysql_stmt_init (&mysql);
+ GIANT_LOCK();
if (!s) {
err = strbuf ("MySQL ran out of memory on statment init: ")
<< mysql_error (&mysql);
@@ -77,7 +87,11 @@ mysql_t::prepare (const str &q, u_int l_opts, tz_corrector_t *tzc)
return NULL;
}
- if (mysql_stmt_prepare (s, q, q.len ())) {
+ GIANT_UNLOCK();
+ int rc = mysql_stmt_prepare (s, q, q.len ());
+ GIANT_LOCK();
+
+ if (rc) {
err = strbuf ("could not prepare query (")
<< q << "): " << mysql_error (&mysql);
errcode = ADB_BAD_QUERY;
View
73 libamysql/mystmt.C
@@ -34,7 +34,9 @@
u_int64_t
sth_prepared_t::insert_id ()
{
+ GIANT_UNLOCK();
u_int64_t r = mysql_stmt_insert_id (sth);
+ GIANT_LOCK();
return r;
}
@@ -43,7 +45,9 @@ sth_prepared_t::insert_id ()
sth_prepared_t::~sth_prepared_t ()
{
if (bnds) delete [] bnds;
+ GIANT_UNLOCK();
if (sth) mysql_stmt_close (sth);
+ GIANT_LOCK();
}
//-----------------------------------------------------------------------
@@ -51,7 +55,9 @@ sth_prepared_t::~sth_prepared_t ()
size_t
sth_prepared_t::affected_rows () const
{
+ GIANT_UNLOCK();
size_t r = mysql_stmt_affected_rows (sth);
+ GIANT_LOCK();
return r;
}
@@ -96,13 +102,21 @@ sth_prepared_t::execute2 (MYSQL_BIND *b, mybind_param_t **arr, u_int n)
if (b && arr && n) {
bind (b, arr, n);
- if (mysql_stmt_bind_param (sth, b) != 0) {
+ GIANT_UNLOCK();
+ int rc = mysql_stmt_bind_param (sth, b);
+ GIANT_LOCK();
+ if (rc != 0) {
err = strbuf ("bind error: ") << mysql_stmt_error (sth);
errno_n = mysql_stmt_errno (sth);
return false;
}
}
- if (mysql_stmt_execute (sth) != 0) {
+
+ GIANT_UNLOCK();
+ int rc = mysql_stmt_execute (sth);
+ GIANT_LOCK();
+
+ if (rc != 0) {
err = strbuf ("execute error: ") << mysql_stmt_error (sth);
errno_n = mysql_stmt_errno (sth);
state = AMYSQL_NONE;
@@ -148,7 +162,9 @@ sth_prepared_t::fetch2 (bool bnd)
if (state == AMYSQL_EXEC) {
state = AMYSQL_FETCH;
if (!(opts & AMYSQL_USERES)) {
+ GIANT_UNLOCK();
int rc = mysql_stmt_store_result (sth);
+ GIANT_LOCK();
if (rc != 0) {
err = strbuf ("stmt_store error (") << rc << "): "
<< mysql_stmt_error (sth);
@@ -162,7 +178,9 @@ sth_prepared_t::fetch2 (bool bnd)
if (bnd && !bind_result ())
return ADB_BIND_ERROR;
+ GIANT_UNLOCK();
int rc = mysql_stmt_fetch (sth);
+ GIANT_LOCK();
if (rc == MYSQL_NO_DATA) {
state = AMYSQL_FETCH_DONE;
return ADB_NOT_FOUND;
@@ -183,7 +201,9 @@ sth_prepared_t::fetch2 (bool bnd)
u_int64_t
sth_parsed_t::insert_id ()
{
+ GIANT_UNLOCK();
u_int64_t r = mysql_insert_id (mysql);
+ GIANT_LOCK();
return r;
}
@@ -194,14 +214,24 @@ sth_parsed_t::clearfetch ()
{
if (state == AMYSQL_EXEC) {
assert (!myres);
+
+ GIANT_UNLOCK();
myres = (opts & AMYSQL_USERES) ? mysql_use_result (mysql)
: mysql_store_result (mysql);
+ GIANT_LOCK();
+
if (myres)
warn << "exec() called without fetch() on query: " << last_qry << "\n";
state = AMYSQL_FETCH;
}
- if (state == AMYSQL_FETCH && (opts & AMYSQL_USERES))
- while (mysql_fetch_row (myres)) ;
+ if (state == AMYSQL_FETCH && (opts & AMYSQL_USERES)) {
+ int rc = 1;
+ while (rc) {
+ GIANT_UNLOCK();
+ mysql_fetch_row (myres);
+ GIANT_LOCK();
+ }
+ }
if (myres) {
mysql_free_result (myres);
@@ -219,24 +249,36 @@ sth_parsed_t::fetch2 (bool bnd)
{
if (!myres) {
state = AMYSQL_FETCH;
+
+ GIANT_UNLOCK();
myres = (opts & AMYSQL_USERES) ? mysql_use_result (mysql) :
mysql_store_result (mysql);
+ GIANT_LOCK();
- if (myres)
+ if (myres) {
+ GIANT_UNLOCK();
my_res_n = mysql_num_fields (myres);
- else {
+ GIANT_LOCK();
+ } else {
err = strbuf ("MySQL result error: ") << mysql_error (mysql);
errno_n = mysql_errno (mysql);
state = AMYSQL_NONE;
return ADB_ERROR;
}
}
+
+ GIANT_UNLOCK();
MYSQL_ROW row = mysql_fetch_row (myres);
+ GIANT_LOCK();
+
if (!row) {
state = AMYSQL_FETCH_DONE;
return ADB_NOT_FOUND;
}
+
+ GIANT_UNLOCK();
length_arr = mysql_fetch_lengths (myres);
+ GIANT_LOCK();
row_to_res (&row, myres);
return ADB_OK;
@@ -252,7 +294,9 @@ sth_parsed_t::row_to_res (MYSQL_ROW *row, MYSQL_RES *res)
for (u_int i = 0; i < lim && !ff; i++) {
if (res_arr[i].is_xdr_union_type ()) {
+ GIANT_UNLOCK();
ff = mysql_fetch_fields (myres);
+ GIANT_LOCK();
}
}
@@ -366,7 +410,12 @@ sth_parsed_t::execute2 (MYSQL_BIND *dummy, mybind_param_t **aarr, u_int n)
str q = make_query (aarr, n);
last_qry = q;
- if (mysql_real_query (mysql, q.cstr (), q.len ()) != 0) {
+
+ GIANT_UNLOCK();
+ int rc = mysql_real_query (mysql, q.cstr (), q.len ());
+ GIANT_LOCK();
+
+ if (rc != 0) {
err = strbuf ("Query execution error: ") << mysql_error (mysql) << "\n";
errno_n = mysql_errno (mysql);
state = AMYSQL_NONE;
@@ -390,12 +439,20 @@ const MYSQL_FIELD *
sth_parsed_t::fetch_fields (size_t *sz)
{
const MYSQL_FIELD *ret = NULL;
- if ((myres = mysql_store_result (mysql))) {
+
+ GIANT_UNLOCK();
+ myres = mysql_store_result (mysql);
+ GIANT_LOCK();
+
+ if (myres) {
unsigned int nf = mysql_num_fields (myres);
my_res_n = nf; // whenver we set myres, also set my_res_n, and bump state
state = AMYSQL_FETCH;
*sz = nf;
+
+ GIANT_UNLOCK();
ret = mysql_fetch_fields (myres);
+ GIANT_LOCK();
}
return ret;
}
View
4 libpub/okws_sfs.h
@@ -51,8 +51,8 @@
// patch level 100 is release
#define OKWS_VERSION_MAJOR 3
#define OKWS_VERSION_MINOR 1
-#define OKWS_VERSION_PATCHLEVEL 13
-#define OKWS_VERSION_PRE 111
+#define OKWS_VERSION_PATCHLEVEL 14
+#define OKWS_VERSION_PRE 1
#define OKWS_AT_VERSION(Maj,Min,Pat,Pre) \
(VERSION_FLATTEN(Maj,Min,Pat,Pre) <= \
View
8 test/system/3tier/Makefile.am
@@ -13,13 +13,14 @@ SUFFIXES = .g .C .T .x .h
#-----------------------------------------------------------------------
-TAMEIN = tst2.T
-TAMEOUT = tst2.C
+TAMEIN = tst2.T stress.T
+TAMEOUT = tst2.C stress.C
#-----------------------------------------------------------------------
tst2_SOURCES = tst2_prot.C tst2.C
tst2d_SOURCES = tst2_prot.C tst2d.C
+stress_SOURCES = stress.T tst2_prot.C
#-----------------------------------------------------------------------
@@ -29,11 +30,14 @@ tst2_prot.lo: tst2_prot.C
tst2.o: tst2.C
tst2.lo: tst2.C
tst2d.o: tst2_prot.h
+stress.o: stress.C
+stress.lo: stress.C
#-----------------------------------------------------------------------
okwssvc_PROGRAMS = tst2
okwsprx_PROGRAMS = tst2d
+noinst_PROGRAMS = stress
#-----------------------------------------------------------------------
View
276 test/system/3tier/stress.T
@@ -0,0 +1,276 @@
+// -*-c++-*-
+/* $Id: tst2.g 1007 2005-09-11 21:45:33Z max $ */
+
+/*
+ *
+ * Copyright (C) 2003-4 by Maxwell Krohn (max@okcupid.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation; either version 2, or (at
+ * your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
+ * USA
+ *
+ */
+
+#include "ok.h"
+#include "okcgi.h"
+#include "parseopt.h"
+#include "pub.h"
+#include <unistd.h>
+#include "tst2_prot.h"
+#include "tame_pipeline3.h"
+#include "crypt.h"
+#include "web.h"
+
+//-----------------------------------------------------------------------
+
+class runner_t {
+public:
+ runner_t ()
+ : _host ("localhost"),
+ _port (TST2_PORT),
+ _num (1000),
+ _concur (20),
+ _fd (-1),
+ _rows (30),
+ _sleep (10),
+ _put_freq (10) {}
+
+ int config (int argc, char **argv);
+ void init (evi_t ev, CLOSURE);
+ void run (evi_t ev, CLOSURE);
+protected:
+ void do_put (size_t i, evi_t ev, CLOSURE);
+ void do_get (size_t i, evi_t ev, CLOSURE);
+private:
+ str _host;
+ int _port;
+ size_t _num, _concur;
+ int _fd;
+ size_t _rows;
+ time_t _sleep;
+ size_t _put_freq;
+ ptr<axprt_stream> _x;
+ ptr<aclnt> _cli;
+ ptr<pipeline3::runner_t> _pipeline;
+};
+
+//-----------------------------------------------------------------------
+
+static void
+usage ()
+{
+ warnx << "usage: " << progname << " [-h <host>] [-p <port>] "
+ << "[-n <num-total>] [-c <num-concur>] [-r <rows>] "
+ << "[-s <sleep-msec>] [-f <put-freq>]\n";
+}
+
+//-----------------------------------------------------------------------
+
+int
+runner_t::config (int argc, char **argv)
+{
+ int ch;
+ int rc (0);
+ while ((ch = getopt (argc, argv, "h:p:n:c:r:f:s:")) != -1) {
+ switch (ch) {
+ case 'h':
+ _host = optarg;
+ break;
+ case 'p':
+ if (!convertint (optarg, &_port)) {
+ warn << "Cannot convert '" << optarg << "' to an int\n";
+ rc = -1;
+ }
+ break;
+ case 'n':
+ if (!convertint (optarg, &_num)) {
+ warn << "Cannot convert '" << optarg << "' to an int\n";
+ rc = -1;
+ }
+ break;
+ case 'c':
+ if (!convertint (optarg, &_concur)) {
+ warn << "Cannot convert '" << optarg << "' to an int\n";
+ rc = -1;
+ }
+ break;
+ case 'r':
+ if (!convertint (optarg, &_rows)) {
+ warn << "Cannot convert '" << optarg << "' to an int\n";
+ rc = -1;
+ }
+ break;
+ case 'f':
+ if (!convertint (optarg, &_put_freq)) {
+ warn << "Cannot convert '" << optarg << "' to an int\n";
+ rc = -1;
+ }
+ break;
+ case 's':
+ if (!convertint (optarg, &_sleep)) {
+ warn << "Cannot convert '" << optarg << "' to an int\n";
+ rc = -1;
+ }
+ break;
+ default:
+ rc = -1;
+ break;
+ }
+ }
+ if (rc != 0) {
+ usage ();
+ }
+ return rc;
+}
+
+//-----------------------------------------------------------------------
+
+tamed void
+runner_t::do_put (size_t i, evi_t ev)
+{
+ tvars {
+ int rc (0);
+ tst2_put_arg_t arg;
+ adb_status_t res;
+ clnt_stat err;
+ okdate_t now (okwstime ());
+ }
+ arg.key = strbuf ("0x%" PRIx64, rnd.gethyper ());
+ warn << "putting key = " << arg.key << "\n";
+ arg.data.i = rnd.getword ();
+ arg.data.pk = rnd.getword ();
+ now.to_xdr (&arg.data.d);
+ now.to_xdr (&arg.data.d2);
+ twait { rpc::tst2_prog_1::tst2_put (_cli, &arg, &res, mkevent (err)); }
+
+ if (err) {
+ warn << "XXX DB error in put: " << err << "\n";
+ rc = -1;
+ } else if (res != ADB_OK) {
+ warn << "XXX put failed with status=" << int (res) << "\n";
+ rc =-1;
+ } else {
+ warn << "iter[" << i << "]: PUT\n";
+ }
+
+ ev->trigger (rc);
+}
+
+//-----------------------------------------------------------------------
+
+tamed void
+runner_t::do_get (size_t i, evi_t ev)
+{
+ tvars {
+ int rc (0);
+ tst2_mget_arg_t arg;
+ tst2_mget_res_t res;
+ clnt_stat err;
+ }
+
+ if (_sleep) {
+ arg.sleep_msec = rnd.getword () % ( 2 * _sleep);
+ } else {
+ arg.sleep_msec = 0;
+ }
+ arg.lim = _rows;
+
+ twait { rpc::tst2_prog_1::tst2_mget (_cli, &arg, &res, mkevent (err)); }
+ if (err) {
+ warn << "XXX DB error in get: " << err << "\n";
+ rc = -1;
+ } else if (res.status != ADB_OK) {
+ warn << "XXX get failed with status=" << int (res.status) << "\n";
+ rc = -1;
+ } else {
+ warn << "iter[" << i << "]: got " << res.rows->size () << " rows\n";
+ }
+
+ ev->trigger (rc);
+}
+
+//-----------------------------------------------------------------------
+
+tamed void
+runner_t::run (evi_t ev)
+{
+ tvars {
+ int trc (0), rc (0);
+ size_t i (0);
+ }
+
+ while (i < _num) {
+ twait { _pipeline->queue_for_takeoff (mkevent ()); }
+ if (i % _put_freq == 0) {
+ twait { do_put (i, _pipeline->mkev (trc)); }
+ } else {
+ twait { do_get (i, _pipeline->mkev (trc)); }
+ }
+ if (trc != 0) { rc = trc; }
+ i++;
+ }
+ twait { _pipeline->flush (mkevent ()); }
+
+ ev->trigger (rc);
+}
+
+//-----------------------------------------------------------------------
+
+tamed void
+runner_t::init (evi_t ev)
+{
+ tvars {
+ int rc (0);
+ }
+ twait { tcpconnect (_host, _port, mkevent (_fd)); }
+ if (_fd < 0) {
+ warn ("cannot connect to %s:%d: %m\n", _host.cstr (), _port);
+ rc = -1;
+ } else {
+ _x = axprt_stream::alloc (_fd, 0x1000000);
+ _cli = aclnt::alloc (_x, tst2_prog_1);
+ _pipeline = New refcounted<pipeline3::runner_t>
+ (New refcounted<pipeline3::passive_control_t> (_concur));
+ }
+
+ ev->trigger (rc);
+}
+
+//-----------------------------------------------------------------------
+
+tamed static void
+main2 (int argc, char **argv)
+{
+ tvars {
+ runner_t r;
+ int rc (0);
+ }
+ random_init ();
+ rc = r.config (argc, argv);
+ if (rc == 0) { twait { r.init (mkevent (rc)); } }
+ if (rc == 0) { twait { r.run (mkevent (rc)); } }
+ exit (rc);
+}
+
+//-----------------------------------------------------------------------
+
+int
+main (int argc, char *argv[])
+{
+ setprogname (argv[0]);
+ main2 (argc, argv);
+ amain ();
+}
+
+//-----------------------------------------------------------------------
View
21 test/system/3tier/tst2_prot.x
@@ -8,6 +8,8 @@ struct tst2_data_t {
x_okdate_t d2;
};
+typedef tst2_data_t tst2_data_rows_t<>;
+
struct tst2_put_arg_t {
string key<>;
tst2_data_t data;
@@ -36,8 +38,22 @@ default:
void;
};
+union tst2_mget_res_t switch (adb_status_t status) {
+case ADB_OK:
+ tst2_data_rows_t rows;
+default:
+ void;
+};
+
+struct tst2_mget_arg_t {
+ unsigned sleep_msec;
+ unsigned lim;
+};
+
typedef unsigned hyper u64_vec_t<>;
+namespace rpc {
+
program TST2_PROG {
version TST2_VERS {
@@ -59,7 +75,12 @@ program TST2_PROG {
unsigned hyper
TST2_SUM(u64_vec_t) = 5;
+ tst2_mget_res_t
+ TST2_MGET(tst2_mget_arg_t) = 6;
+
} = 1;
} = 10808;
+};
+
%#define TST2_PORT 10808
View
133 test/system/3tier/tst2d.C
@@ -30,22 +30,23 @@
#include "web.h"
#include "json_rpc.h"
-class tst2_srv_t : public amysql_thread_t {
+class tst2_srv_t : public amysql_thread2_t {
public:
tst2_srv_t (mtd_thread_arg_t *a, int meth)
- : amysql_thread_t (a, meth), err (false) {}
+ : amysql_thread2_t (a, meth), err (false) {}
bool init ();
- void dispatch (svccb *sbp);
+ void dispatch (ptr<amt::req_t> b);
static mtd_thread_t *alloc (int meth, mtd_thread_arg_t *arg)
{ return New tst2_srv_t (arg, meth); }
protected:
- void get (svccb *sbp);
- void put (svccb *sbp);
- void foo_reflect (svccb *sbp);
- void negate (svccb *sbp);
- void sum (svccb *sbp);
+ void get (ptr<amt::req_t> b);
+ void mget (ptr<amt::req_t> b);
+ void put (ptr<amt::req_t> b);
+ void foo_reflect (ptr<amt::req_t> b);
+ void negate (ptr<amt::req_t> b);
+ void sum (ptr<amt::req_t> b);
private:
- sth_t _q_get, _q_put;
+ sth_t _q_get, _q_put, _q_mget;
bool err;
};
@@ -59,7 +60,10 @@ tst2_srv_t::init ()
TWARN (mysql.error ());
rc = false;
} else if (!(_q_get = PREP("SELECT id,d,i,d2 FROM tst2 WHERE s = ?")) ||
- !(_q_put = PREP("INSERT INTO tst2(s,d,i,d2) VALUES(?,?,?,?)"))) {
+ !(_q_put = PREP("INSERT INTO tst2(s,d,i,d2) VALUES(?,?,?,?)")) ||
+ !(_q_mget = PREP("SELECT SLEEP(?/1000),id,d,i,d2 "
+ "FROM tst2 ORDER BY RAND() "
+ "LIMIT ?"))) {
rc = false;
}
return rc;
@@ -67,14 +71,14 @@ tst2_srv_t::init ()
}
void
-tst2_srv_t::dispatch (svccb *sbp)
+tst2_srv_t::dispatch (ptr<amt::req_t> sbp)
{
//int id = getid ();
u_int p = sbp->proc ();
err = false;
switch (p) {
case TST2_NULL:
- sbp->reply (NULL);
+ sbp->replynull ();
break;
case TST2_PUT:
put (sbp);
@@ -91,47 +95,89 @@ tst2_srv_t::dispatch (svccb *sbp)
case TST2_SUM:
sum (sbp);
break;
+ case TST2_MGET:
+ mget (sbp);
+ break;
default:
- reject ();
+ sbp->reject ();
break;
}
}
+//-----------------------------------------------------------------------
+
void
-tst2_srv_t::foo_reflect (svccb *b)
+tst2_srv_t::foo_reflect (ptr<amt::req_t> b)
{
- const foo_t *arg = b->Xtmpl getarg<foo_t> ();
- ptr<foo_t> res = New refcounted<foo_t> (*arg);
- reply (res);
+ rpc::tst2_prog_1::tst2_foo_reflect_srv_t<amt::req_t> srv (b);
+ const foo_t *arg = srv.getarg ();
+ ptr<foo_t> res = srv.alloc_res (*arg);
+ srv.reply (res);
}
+//-----------------------------------------------------------------------
+
void
-tst2_srv_t::sum (svccb *b)
+tst2_srv_t::sum (ptr<amt::req_t> b)
{
- const u64_vec_t *arg = b->Xtmpl getarg<u64_vec_t> ();
+ rpc::tst2_prog_1::tst2_sum_srv_t<amt::req_t> srv (b);
+ const u64_vec_t *arg = srv.getarg ();
+
u_int64_t res = 0;
for (size_t i = 0; i < arg->size (); i++) {
res += (*arg)[i];
}
- reply (New refcounted<u_int64_t> (res));
+ srv.reply (srv.alloc_res (res));
}
+//-----------------------------------------------------------------------
+
void
-tst2_srv_t::negate (svccb *b)
+tst2_srv_t::negate (ptr<amt::req_t> b)
{
- const bool *arg = b->Xtmpl getarg<bool> ();
- ptr<bool> ret = New refcounted<bool> (!*arg);
- reply (ret);
+ rpc::tst2_prog_1::tst2_negate_srv_t<amt::req_t> srv (b);
+ const bool *arg = srv.getarg ();
+ ptr<bool> res = srv.alloc_res ();
+ *res = !*arg;
+ srv.reply (res);
}
+//-----------------------------------------------------------------------
+
void
-tst2_srv_t::get (svccb *b)
+tst2_srv_t::mget (ptr<amt::req_t> b)
{
- const tst2_get_arg_t *arg = b->Xtmpl getarg<tst2_get_arg_t> ();
+ rpc::tst2_prog_1::tst2_mget_srv_t<amt::req_t> srv (b);
+ const tst2_mget_arg_t *arg = srv.getarg ();
+ ptr<tst2_mget_res_t> res = srv.alloc_res (ADB_OK);
+
+ if (!_q_mget->execute (arg->sleep_msec, arg->lim)) {
+ res->set_status (ADB_EXECUTE_ERROR);
+ TWARN("mget error: " << _q_mget->error ());
+ } else {
+ adb_status_t s;
+ tst2_data_t dat;
+ while ((s = _q_mget->fetch (&dat.d, &dat.i, &dat.pk, &dat.d2)) == ADB_OK) {
+ res->rows->push_back (dat);
+ }
+ if (s != ADB_NOT_FOUND) {
+ TWARN("mget error: " << _q_mget->error ());
+ res->set_status (s);
+ }
+ }
+ srv.reply (res);
+}
+
+//-----------------------------------------------------------------------
+
+void
+tst2_srv_t::get (ptr<amt::req_t> b)
+{
+ rpc::tst2_prog_1::tst2_get_srv_t<amt::req_t> srv (b);
+ const tst2_get_arg_t *arg = srv.getarg ();
adb_status_t rc;
- passptr<tst2_get_res_t> res;
- res = New refcounted<tst2_get_res_t> (ADB_OK);
+ ptr<tst2_get_res_t> res = srv.alloc_res (ADB_OK);
if (!_q_get->execute (*arg)) {
rc = ADB_EXECUTE_ERROR;
@@ -147,16 +193,17 @@ tst2_srv_t::get (svccb *b)
TWARN(es);
}
- reply_pass (res);
+ srv.reply (res);
}
+//-----------------------------------------------------------------------
+
void
-tst2_srv_t::put (svccb *b)
+tst2_srv_t::put (ptr<amt::req_t> b)
{
- const tst2_put_arg_t *arg = b->Xtmpl getarg<tst2_put_arg_t> ();
- passptr<adb_status_t> res (New refcounted<adb_status_t> (ADB_OK));
-
- passptr<adb_status_t> tmp = res;
+ rpc::tst2_prog_1::tst2_put_srv_t<amt::req_t> srv (b);
+ const tst2_put_arg_t *arg = srv.getarg ();
+ ptr<adb_status_t> res = srv.alloc_res (ADB_OK);
if (!_q_put->execute (arg->key, arg->data.d, arg->data.i, arg->data.d2)) {
*res = ADB_EXECUTE_ERROR;
@@ -164,13 +211,12 @@ tst2_srv_t::put (svccb *b)
if ((es = _q_put->error ()))
TWARN("put error: " << es);
}
- reply_pass (res);
- // see comments in libamt/passptr.h
- assert (!res);
- assert (!tmp);
+ b->reply (res);
}
+//-----------------------------------------------------------------------
+
static void
usage ()
{
@@ -207,9 +253,18 @@ start_server (int argc, char *argv[])
if (argc != 0)
usage ();
+
+ mtd_thread_typ_t method = MTD_NONE;
+#if HAVE_PTHREADS
+ method = MTD_PTHREAD;
+#else
+# if HAVE_PTH
+ method = MTD_PTH;
+# endif
+#endif
ssrv_t *s = New ssrv_t (wrap (&tst2_srv_t::alloc, mysql_sth_method),
- tst2_prog_1, MTD_PTH, tcnt, maxq);
+ tst2_prog_1, method, tcnt, maxq);
json_XDR_dispatch_t::enable ();
View
2  test/system/okws_config.in
@@ -47,7 +47,7 @@ Service slow /slow
Service encoder /encoder
Service cpubomb -n3 /cpubomb
-#Service 3tier/tst2 /tst2
+Service 3tier/tst2 /tst2
StatPageURL /stats
Please sign in to comment.
Something went wrong with that request. Please try again.