Skip to content

Commit

Permalink
Notification service implementation based on RabbitMQ
Browse files Browse the repository at this point in the history
This is going to work in conjuction with Dovecot plugin for Openchange
and also notification consumer implemented in Openchange

So we have:
* Dovecot plugin for Openchange. Whenever new message arrives,
  it pushes a message in RabbitMQ
* Daemon (provided by this patch) listens for such RMQ messages
  and register them into mapistore indexing
* When message is registered we publish RMQ message into
  user specific queue
* TODO: A consumer in Openchage layer consumes such messages
  and notify Outlook for changes - not fully implemented yet.

NOTE: We should have RMQ message handler implemented into Openchange
      for this to work.
  • Loading branch information
Samuel Cabrero authored and Julio García committed Jan 22, 2015
1 parent dd35bf6 commit f09f827
Show file tree
Hide file tree
Showing 10 changed files with 1,127 additions and 2 deletions.
34 changes: 34 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ all: $(OC_IDL) \
$(OC_LIBS) \
$(OC_TOOLS) \
$(OC_SERVER) \
$(OC_NOTIF) \
$(PYOPENCHANGEALL) \
$(OC_TESTSUITE) \
$(COVERAGE_INIT) \
Expand All @@ -62,6 +63,7 @@ install:: all \
$(OC_TOOLS_INSTALL) \
$(OC_TESTSUITE_INSTALL) \
$(OC_SERVER_INSTALL) \
$(OC_NOTIF_INSTALL) \
$(PYOPENCHANGEINSTALL)

installlib: $(OC_LIBS_INSTALL)
Expand All @@ -72,6 +74,7 @@ uninstall:: $(OC_LIBS_UNINSTALL) \
$(OC_TOOLS_UNINSTALL) \
$(OC_TESTSUITE_UNINSTALL) \
$(OC_SERVER_UNINSTALL) \
$(OC_NOTIF_UNINSTALL) \
$(PYOPENCHANGEUNINSTALL)

dist:: distclean
Expand Down Expand Up @@ -1707,6 +1710,37 @@ installnagios:

install:: installnagios


###################
# notifications
###################

ocnotification: bin/ocnotification

ocnotification-install: ocnotification
$(INSTALL) -d $(DESTDIR)$(sbindir)
$(INSTALL) -m 0755 bin/ocnotification $(DESTDIR)$(sbindir)

ocnotification-uninstall:
rm -f $(DESTDIR)$(sbindir)/ocnotification

ocnotification-clean:
rm -f bin/ocnotification
rm -f mapiproxy/services/notification/src/*.o

bin/ocnotification: mapiproxy/services/notification/src/notification_amqp.o \
mapiproxy/services/notification/src/notification_config.o \
mapiproxy/services/notification/src/notification.o \
mapiproxy/services/notification/src/notification_register.o \
mapiproxy/libmapistore.$(SHLIBEXT).$(PACKAGE_VERSION) \
mapiproxy/libmapiproxy.$(SHLIBEXT).$(PACKAGE_VERSION) \
libmapi.$(SHLIBEXT).$(PACKAGE_VERSION)
@echo "Linking $@"
@$(CC) $(CFLAGS) -o $@ $^ $(LDFLAGS) $(LIBS) -lpopt

clean:: ocnotification-clean


###################
# libmapi examples
###################
Expand Down
24 changes: 22 additions & 2 deletions config.mk.in
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,23 @@ TDB_LIBS=@TDB_LIBS@
TALLOC_CFLAGS=@TALLOC_CFLAGS@
TALLOC_LIBS=@TALLOC_LIBS@

CFLAGS+=$(SAMBA_CFLAGS) $(LDB_CFLAGS) $(TALLOC_CFLAGS) $(TDB_CFLAGS) $(THREAD_CFLAGS)
LIBS+=$(SAMBA_LIBS) $(LDB_LIBS) $(TALLOC_LIBS) $(THREAD_LIBS) $(RT_LIBS)
BSD_CFLAGS=@BSD_CFLAGS@
BSD_LIBS=@BSD_LIBS@

SSL_CFLAGS=@SSL_CFLAGS@
SSL_LIBS=@SSL_LIBS@

RABBITMQ_CFLAGS=@RABBITMQ_CFLAGS@
RABBITMQ_LIBS=@RABBITMQ_LIBS@

JSONC_CFLAGS=@JSONC_CFLAGS@
JSONC_LIBS=@JSONC_LIBS@

CONFIG_CFLAGS=@CONFIG_CFLAGS@
CONFIG_LIBS=@CONFIG_LIBS@

CFLAGS+=$(SAMBA_CFLAGS) $(LDB_CFLAGS) $(TALLOC_CFLAGS) $(TDB_CFLAGS) $(THREAD_CFLAGS) $(JSONC_CFLAGS) $(RABBITMQ_CFLAGS) $(CONFIG_CFLAGS) $(BSD_CFLAGS) $(SSL_CFLAGS)
LIBS+=$(SAMBA_LIBS) $(LDB_LIBS) $(TALLOC_LIBS) $(THREAD_LIBS) $(RT_LIBS) $(JSONC_LIBS) $(RABBITMQ_LIBS) $(CONFIG_LIBS) $(BSD_LIBS) $(SSL_LIBS)
LDFLAGS+=@LDFLAGS@

THREAD_LIBS=@THREAD_LIBS@
Expand Down Expand Up @@ -124,6 +139,11 @@ OC_TESTSUITE_INSTALL=@OC_TESTSUITE_INSTALL@
OC_TESTSUITE_UNINSTALL=@OC_TESTSUITE_UNINSTALL@
OC_TESTSUITE_CHECK=@OC_TESTSUITE_CHECK@

# Notification
OC_NOTIF=@OC_NOTIF@
OC_NOTIF_INSTALL=@OC_NOTIF_INSTALL@
OC_NOTIF_UNINSTALL=@OC_NOTIF_UNINSTALL@

# Python
PYOPENCHANGEALL=@PYOPENCHANGEALL@
PYOPENCHANGEINSTALL=@PYOPENCHANGEINSTALL@
Expand Down
18 changes: 18 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,24 @@ AC_ARG_WITH(modulesdir,

AC_SUBST(modulesdir)

dnl ##########################################################################
dnl Notifications service
dnl ##########################################################################
PKG_CHECK_MODULES(SSL, libssl)
PKG_CHECK_MODULES(BSD, libbsd)
PKG_CHECK_MODULES(RABBITMQ, librabbitmq >= 0.4.1)
PKG_CHECK_MODULES(JSONC, json-c >= 0.11)
PKG_CHECK_MODULES(CONFIG, libconfig >= 1.4.8)

OC_NOTIF=ocnotification
OC_NOTIF_CLEAN=ocnotification-clean
OC_NOTIF_INSTALL=ocnotification-install
OC_NOTIF_UNINSTALL=ocnotification-uninstall
AC_SUBST(OC_NOTIF)
AC_SUBST(OC_NOTIF_CLEAN)
AC_SUBST(OC_NOTIF_INSTALL)
AC_SUBST(OC_NOTIF_UNINSTALL)

dnl ##########################################################################
dnl Python bindings dependencies
dnl ##########################################################################
Expand Down
217 changes: 217 additions & 0 deletions mapiproxy/services/notification/src/notification.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
#include <syslog.h>
#include <unistd.h>
#include <stdbool.h>
#include <popt.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <bsd/libutil.h>
#include <talloc.h>
#include <err.h>

#include "notification.h"

#include <param.h>
#include "mapiproxy/libmapistore/mapistore.h"
#include "mapiproxy/libmapiproxy/libmapiproxy.h"

volatile sig_atomic_t abort_flag = 0;

static bool notification_init(TALLOC_CTX *mem_ctx, struct context *ctx)
{
enum MAPISTATUS retval;

/* Initialize configuration */
ctx->lp_ctx = loadparm_init(mem_ctx);
if (ctx->lp_ctx == NULL) {
errx(EXIT_FAILURE, "Failed to initialize configuration");
return false;
}
lpcfg_load_default(ctx->lp_ctx);

/* Initialize mapistore */
ctx->mstore_ctx = mapistore_init(mem_ctx, ctx->lp_ctx, ctx->mapistore_backends_path);
if (ctx->mstore_ctx == NULL) {
errx(EXIT_FAILURE, "Failed to initialize mapistore");
return false;
}

/* Initialize openchangedb context */
retval = openchangedb_initialize(mem_ctx, ctx->lp_ctx, &ctx->ocdb_ctx);
if (retval != MAPI_E_SUCCESS || ctx->ocdb_ctx == NULL) {
errx(EXIT_FAILURE, "Failed to initialize openchange db");
return false;
}

ctx->mstore_ctx->conn_info = talloc_zero(ctx->mstore_ctx, struct mapistore_connection_info);
ctx->mstore_ctx->conn_info->mstore_ctx = ctx->mstore_ctx;
ctx->mstore_ctx->conn_info->sam_ctx = NULL;
ctx->mstore_ctx->conn_info->oc_ctx = ctx->ocdb_ctx;
ctx->mstore_ctx->conn_info->username = NULL;

return true;
}

static void abort_handler(int sig)
{
abort_flag = 1;
}

int main(int argc, const char *argv[])
{
TALLOC_CTX *mem_ctx;
const char binary_name[] = "openchange-notification-service";
char *pidfile;
bool opt_daemon = false;
bool opt_debug = false;
char *opt_config = NULL;
int opt;
poptContext pc;
struct context *ctx;
struct pidfh *pfh;
pid_t otherpid;
enum {
OPT_DAEMON = 1000,
OPT_DEBUG,
OPT_CONFIG,
};
struct poptOption long_options[] = {
POPT_AUTOHELP
{"daemon", 'D', POPT_ARG_NONE, NULL, OPT_DAEMON,
"Become a daemon", NULL },
{"debug", 'd', POPT_ARG_NONE, NULL, OPT_DEBUG,
"Debug mode", NULL },
{"config", 'c', POPT_ARG_STRING, NULL, OPT_CONFIG,
"Config file path", NULL },
{ NULL },
};

/* Alloc context */
mem_ctx = talloc_named(NULL, 0, binary_name);
if (mem_ctx == NULL) {
errx(EXIT_FAILURE, "No memory");
}
ctx = talloc(mem_ctx, struct context);
if (ctx == NULL) {
errx(EXIT_FAILURE, "No memory");
}
ctx->mem_ctx = mem_ctx;

/* Parse command line arguments */
pc = poptGetContext(binary_name, argc, argv, long_options, 0);
while ((opt = poptGetNextOpt(pc)) != -1) {
switch (opt) {
case OPT_DAEMON:
opt_daemon = true;
break;
case OPT_DEBUG:
opt_debug = true;
break;
case OPT_CONFIG:
opt_config = poptGetOptArg(pc);
break;
default:
fprintf(stderr, "Invalid option %s: %s\n", poptBadOption(pc, 0), poptStrerror(opt));
poptPrintUsage(pc, stderr, 0);
talloc_free(mem_ctx);
exit(EXIT_FAILURE);
}
}
poptFreeContext(pc);

/* Setup logging and open log */
setlogmask(LOG_UPTO(opt_debug ? LOG_DEBUG : LOG_INFO));
openlog(binary_name, LOG_PID | (opt_daemon ? 0 : LOG_PERROR), LOG_DAEMON);

/* Read config */
if (opt_config != NULL) {
read_config(ctx, opt_config);
} else {
read_config(ctx, DEFAULT_CONFIG_FILE);
}

/* Check daemon not already running */
pidfile = talloc_asprintf(mem_ctx, "/run/%s.pid", binary_name);
if (pidfile == NULL) {
closelog();
talloc_free(mem_ctx);
errx(EXIT_FAILURE, "No memory");
}
pfh = pidfile_open(pidfile, 0644, &otherpid);
if (pfh == NULL) {
if (errno == EEXIST) {
closelog();
errx(EXIT_FAILURE, "Daemon already running, pid: %jd.", (intmax_t) otherpid);
}
/* If we cannot create pidfile from other reasons, only warn. */
warn("Cannot open or create pidfile %s", pidfile);
}
talloc_free(pidfile);

/* Set file mask */
umask(0);

/* TODO chdir */

/* Become daemon */
if (opt_daemon) {
if (daemon(0, 0) < 0) {
closelog();
talloc_free(mem_ctx);
pidfile_remove(pfh);
err(EXIT_FAILURE, "Failed to daemonize");
}
}

/* Setup signals */
signal(SIGHUP, opt_daemon ? SIG_IGN : abort_handler);
signal(SIGINT, abort_handler);
signal(SIGTERM, abort_handler);

/* Write pid to file */
pidfile_write(pfh);

/* Initialize */
if (!notification_init(mem_ctx, ctx)) {
closelog();
talloc_free(mem_ctx);
pidfile_remove(pfh);
err(EXIT_FAILURE, "Failed to initialize");
}

/* Do work */
while (!abort_flag) {
if (!broker_is_alive(ctx)) {
if (!broker_connect(ctx)) {
usleep(500000);
continue;
}
if (!broker_declare(ctx)) {
usleep(500000);
continue;
}
if (!broker_start_consumer(ctx)) {
usleep(500000);
continue;
}
}
broker_consume(ctx);
}

/* Disconnect from broker */
broker_disconnect(ctx);

/* Close logs */
closelog();

/* Unlink pidfile */
pidfile_remove(pfh);

/* Free memory */
talloc_free(mem_ctx);

exit(EXIT_SUCCESS);
}
54 changes: 54 additions & 0 deletions mapiproxy/services/notification/src/notification.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include <amqp.h>
#include <talloc.h>
#include <stdbool.h>

#define DEFAULT_CONFIG_FILE "/etc/openchange/notification-service.cfg"

struct loadparm_context;
struct mapistore_context;
struct openchangedb_context;

struct context {
TALLOC_CTX *mem_ctx;

amqp_connection_state_t broker_conn;
amqp_socket_t *broker_socket;

const char *broker_host;
int broker_port;
const char *broker_user;
const char *broker_pass;
const char *broker_vhost;
const char *broker_exchange;

const char *broker_new_mail_queue;
const char *broker_new_mail_routing_key;
const char *broker_new_mail_consumer_tag;

const char *mapistore_backends_path;
const char *mapistore_backend;
struct loadparm_context *lp_ctx;
struct mapistore_context *mstore_ctx;
struct openchangedb_context *ocdb_ctx;
};

void notification_register_message(TALLOC_CTX *, const struct context *,
const char *, const char *, uint32_t);

/* Definitions from notification_amqp.c */
char *broker_err(TALLOC_CTX *, amqp_rpc_reply_t);
bool broker_is_alive(struct context *);
void broker_disconnect(struct context *);
bool broker_connect(struct context *);
bool broker_declare(struct context *);
bool broker_start_consumer(struct context *);
void broker_consume(struct context *);

/* Definitions from notification_register.c */
void notification_register_message(TALLOC_CTX *, const struct context *,
const char *, const char *, uint32_t);

/* Definitions from notification_config.c */
void read_config(struct context *, const char *);
Loading

0 comments on commit f09f827

Please sign in to comment.