Skip to content

Commit

Permalink
Merge pull request collectd#12 from ChrisLundquist/as/mongodb
Browse files Browse the repository at this point in the history
Write MongoDB plugin: New plugin to write statistics to MongoDB, a NoSQL database using JSON documents.
  • Loading branch information
octo committed Jan 25, 2012
2 parents 3dffa6f + fc5c43d commit dba4945
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 0 deletions.
6 changes: 6 additions & 0 deletions AUTHORS
Expand Up @@ -15,6 +15,9 @@ Sebastian "tokkee" Harl <sh at tokkee.org>
Contributors (sorted alphabetically)
====================================

Akkarit Sangpetch <asangpet at andrew.cmu.edu>
- write_mongodb plugin.

Alessandro Iurlano <alessandro.iurlano at gmail.com>
- Initial filecount plugin.

Expand Down Expand Up @@ -44,6 +47,9 @@ Bruno Prémont <bonbons at linux-vserver.org>
especially a nasty bug in the network plugin.
- Wireshark dissector.

Chris Lundquist <clundquist at bluebox.net>
- Improvements to the write_mongodb plugin.

Christophe Kalt <collectd at klb.taranis.org>
- The version 3 `log' mode.
- Many Solaris related hints and fixes.
Expand Down
66 changes: 66 additions & 0 deletions configure.in
Expand Up @@ -1241,6 +1241,7 @@ AC_CHECK_MEMBERS([kstat_io_t.nwritten, kstat_io_t.writes, kstat_io_t.nwrites, ks
#
# Checks for libraries begin here
#

with_libresolv="yes"
AC_CHECK_LIB(resolv, res_search,
[
Expand Down Expand Up @@ -2143,6 +2144,69 @@ then
fi
# }}}

# --with-libmongoc {{{
AC_ARG_WITH(libmongoc, [AS_HELP_STRING([--with-libmongoc@<:@=PREFIX@:>@], [Path to libmongoc.])],
[
if test "x$withval" = "xyes"
then
with_libmongoc="yes"
else if test "x$withval" = "xno"
then
with_libmongoc="no"
else
with_libmongoc="yes"
LIBMONGOC_CPPFLAGS="$LIBMONGOC_CPPFLAGS -I$withval/include"
LIBMONGOC_LDFLAGS="$LIBMONGOC_LDFLAGS -L$withval/lib"
fi; fi
],
[with_libmongoc="yes"])

SAVE_CPPFLAGS="$CPPFLAGS"
SAVE_LDFLAGS="$LDFLAGS"

CPPFLAGS="$CPPFLAGS $LIBMONGOC_CPPFLAGS"
LDFLAGS="$LDFLAGS $LIBMONGOC_LDFLAGS"

if test "x$with_libmongoc" = "xyes"
then
if test "x$LIBMONGOC_CPPFLAGS" != "x"
then
AC_MSG_NOTICE([libmongoc CPPFLAGS: $LIBMONGOC_CPPFLAGS])
fi
AC_CHECK_HEADERS(mongo.h,
[with_libmongoc="yes"],
[with_libmongoc="no ('mongo.h' not found)"],
[#if HAVE_STDINT_H
# define MONGO_HAVE_STDINT 1
#else
# define MONGO_USE_LONG_LONG_INT 1
#endif
])
fi
if test "x$with_libmongoc" = "xyes"
then
if test "x$LIBMONGOC_LDFLAGS" != "x"
then
AC_MSG_NOTICE([libmongoc LDFLAGS: $LIBMONGOC_LDFLAGS])
fi
AC_CHECK_LIB(mongoc, mongo_run_command,
[with_libmongoc="yes"],
[with_libmongoc="no (symbol 'mongo_run_command' not found)"])
fi

CPPFLAGS="$SAVE_CPPFLAGS"
LDFLAGS="$SAVE_LDFLAGS"

if test "x$with_libmongoc" = "xyes"
then
BUILD_WITH_LIBMONGOC_CPPFLAGS="$LIBMONGOC_CPPFLAGS"
BUILD_WITH_LIBMONGOC_LDFLAGS="$LIBMONGOC_LDFLAGS"
AC_SUBST(BUILD_WITH_LIBMONGOC_CPPFLAGS)
AC_SUBST(BUILD_WITH_LIBMONGOC_LDFLAGS)
fi
AM_CONDITIONAL(BUILD_WITH_LIBMONGOC, test "x$with_libmongoc" = "xyes")
# }}}

# --with-libmysql {{{
with_mysql_config="mysql_config"
with_mysql_cflags=""
Expand Down Expand Up @@ -4679,6 +4743,7 @@ AC_PLUGIN([vserver], [$plugin_vserver], [Linux VServer statistics])
AC_PLUGIN([wireless], [$plugin_wireless], [Wireless statistics])
AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin])
AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin])
AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin])
AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics])
AC_PLUGIN([zfs_arc], [$plugin_zfs_arc], [ZFS ARC statistics])

Expand Down Expand Up @@ -5000,6 +5065,7 @@ Configuration:
wireless . . . . . . $enable_wireless
write_http . . . . . $enable_write_http
write_redis . . . . . $enable_write_redis
write_mongodb . . . . $enable_write_mongodb
xmms . . . . . . . . $enable_xmms
zfs_arc . . . . . . . $enable_zfs_arc

Expand Down
10 changes: 10 additions & 0 deletions src/Makefile.am
Expand Up @@ -1243,6 +1243,16 @@ endif
collectd_DEPENDENCIES += write_http.la
endif

if BUILD_PLUGIN_WRITE_MONGODB
pkglib_LTLIBRARIES += write_mongodb.la
write_mongodb_la_SOURCES = write_mongodb.c
write_mongodb_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBMONGOC_CPPFLAGS)
write_mongodb_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBMONGOC_LDFLAGS)
write_mongodb_la_LIBADD = -lmongoc
collectd_LDADD += "-dlopen" write_mongodb.la
collectd_DEPENDENCIES += write_mongodb.la
endif

if BUILD_PLUGIN_WRITE_REDIS
pkglib_LTLIBRARIES += write_redis.la
write_redis_la_SOURCES = write_redis.c
Expand Down
9 changes: 9 additions & 0 deletions src/collectd.conf.in
Expand Up @@ -146,6 +146,7 @@
#@BUILD_PLUGIN_WIRELESS_TRUE@LoadPlugin wireless
#@BUILD_PLUGIN_WRITE_HTTP_TRUE@LoadPlugin write_http
#@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis
#@BUILD_PLUGIN_WRITE_MONGO_TRUE@LoadPlugin write_mongo
#@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms
#@BUILD_PLUGIN_ZFS_ARC_TRUE@LoadPlugin zfs_arc

Expand Down Expand Up @@ -972,6 +973,14 @@
# </Node>
#</Plugin>

#<Plugin write_mongo>
# <Node "example">
# Host "localhost"
# Port "27017"
# Timeout 1000
# </Node>
#</Plugin>

##############################################################################
# Filter configuration #
#----------------------------------------------------------------------------#
Expand Down
245 changes: 245 additions & 0 deletions src/write_mongodb.c
@@ -0,0 +1,245 @@
/**
* collectd - src/write_mongodb.c
* Copyright (C) 2010 Florian Forster
* Copyright (C) 2010 Akkarit Sangpetch
* Copyright (C) 2012 Chris Lundquist
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*
* Authors:
* Florian Forster <ff at octo.it>
* Akkarit Sangpetch <asangpet at andrew.cmu.edu>
* Chris Lundquist <clundquist at bluebox.net>
**/

#include "collectd.h"
#include "plugin.h"
#include "common.h"
#include "configfile.h"

#include <pthread.h>

#if HAVE_STDINT_H
# define MONGO_HAVE_STDINT 1
#else
# define MONGO_USE_LONG_LONG_INT 1
#endif
#include <mongo.h>

struct wm_node_s
{
char name[DATA_MAX_NAME_LEN];

char *host;
int port;
int timeout;

int connected;

mongo conn[1];
pthread_mutex_t lock;
};
typedef struct wm_node_s wm_node_t;

/*
* Functions
*/
static int wm_write (const data_set_t *ds, /* {{{ */
const value_list_t *vl,
user_data_t *ud)
{
wm_node_t *node = ud->data;
char collection_name[512];
int status;
int i;
bson record;

ssnprintf(collection_name, sizeof (collection_name), "collectd.%s", vl->plugin);

bson_init(&record);
bson_append_time_t(&record,"ts",CDTIME_T_TO_TIME_T(vl->time));
bson_append_string(&record,"h",vl->host);
bson_append_string(&record,"i",vl->plugin_instance);
bson_append_string(&record,"t",vl->type);
bson_append_string(&record,"ti",vl->type_instance);

for (i = 0; i < ds->ds_num; i++)
{
if (ds->ds[i].type == DS_TYPE_COUNTER)
bson_append_long(&record, ds->ds[i].name, vl->values[i].counter);
else if (ds->ds[i].type == DS_TYPE_GAUGE)
bson_append_double(&record, ds->ds[i].name, vl->values[i].gauge);
else if (ds->ds[i].type == DS_TYPE_DERIVE)
bson_append_long(&record, ds->ds[i].name, vl->values[i].derive);
else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
bson_append_long(&record, ds->ds[i].name, vl->values[i].absolute);
else
assert (23 == 42);
}
bson_finish(&record);

pthread_mutex_lock (&node->lock);

if (node->connected == 0)
{
status = mongo_connect(node->conn, node->host, node->port);
if (status != MONGO_OK) {
ERROR ("write_mongodb plugin: Connecting to host \"%s\" (port %i) failed.",
(node->host != NULL) ? node->host : "localhost",
(node->port != 0) ? node->port : MONGO_DEFAULT_PORT);
mongo_destroy(node->conn);
pthread_mutex_unlock (&node->lock);
return (-1);
} else {
node->connected = 1;
}
}

/* Assert if the connection has been established */
assert (node->connected == 1);

DEBUG ( "write_mongodb plugin: writing record");
/* bson_print(&record); */

status = mongo_insert(node->conn,collection_name,&record);

if(status != MONGO_OK)
{
ERROR ( "write_mongodb plugin: error inserting record: %d", node->conn->err);
if (node->conn->err == MONGO_BSON_INVALID)
ERROR ("write_mongodb plugin: %s", node->conn->errstr);
else if (record.err)
ERROR ("write_mongodb plugin: %s", record.errstr);
}

pthread_mutex_unlock (&node->lock);

return (0);
} /* }}} int wm_write */

static void wm_config_free (void *ptr) /* {{{ */
{
wm_node_t *node = ptr;

if (node == NULL)
return;

if (node->connected != 0)
{
mongo_destroy(node->conn);
node->connected = 0;
}

sfree (node->host);
sfree (node);
} /* }}} void wm_config_free */

static int wm_config_node (oconfig_item_t *ci) /* {{{ */
{
wm_node_t *node;
int status;
int i;

node = malloc (sizeof (*node));
if (node == NULL)
return (ENOMEM);
memset (node, 0, sizeof (*node));
node->host = NULL;
node->port = 0;
node->timeout = 1000;
node->connected = 0;
pthread_mutex_init (&node->lock, /* attr = */ NULL);

status = cf_util_get_string_buffer (ci, node->name, sizeof (node->name));

if (status != 0)
{
sfree (node);
return (status);
}

for (i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;

if (strcasecmp ("Host", child->key) == 0)
status = cf_util_get_string (child, &node->host);
else if (strcasecmp ("Port", child->key) == 0)
{
status = cf_util_get_port_number (child);
if (status > 0)
{
node->port = status;
status = 0;
}
}
else if (strcasecmp ("Timeout", child->key) == 0)
status = cf_util_get_int (child, &node->timeout);
else
WARNING ("write_mongodb plugin: Ignoring unknown config option \"%s\".",
child->key);

if (status != 0)
break;
} /* for (i = 0; i < ci->children_num; i++) */

if (status == 0)
{
char cb_name[DATA_MAX_NAME_LEN];
user_data_t ud;

ssnprintf (cb_name, sizeof (cb_name), "write_mongodb/%s", node->name);

ud.data = node;
ud.free_func = wm_config_free;

status = plugin_register_write (cb_name, wm_write, &ud);
INFO ("write_mongodb plugin: registered write plugin %s %d",cb_name,status);
}

if (status != 0)
wm_config_free (node);

return (status);
} /* }}} int wm_config_node */

static int wm_config (oconfig_item_t *ci) /* {{{ */
{
int i;

for (i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;

if (strcasecmp ("Node", child->key) == 0)
wm_config_node (child);
else
WARNING ("write_mongodb plugin: Ignoring unknown "
"configuration option \"%s\" at top level.", child->key);
}

return (0);
} /* }}} int wm_config */

void module_register (void)
{
plugin_register_complex_config ("write_mongodb", wm_config);
}

/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */

0 comments on commit dba4945

Please sign in to comment.