Permalink
Browse files

connection pool++

  • Loading branch information...
1 parent 3e7c242 commit 5723625713d3959bc5c392a7127369ae8d94b431 @toddtreece committed Feb 3, 2012
Showing with 63 additions and 65 deletions.
  1. +28 −3 common.h
  2. +3 −3 io.c
  3. +3 −1 io.h
  4. +26 −24 server.c
  5. +3 −34 server.h
View
31 common.h
@@ -7,6 +7,8 @@
#include <time.h>
#include <ctype.h>
#include <assert.h>
+#include <pthread.h>
+#include <semaphore.h>
typedef unsigned long long ulonglong;
typedef long long longlong;
@@ -15,16 +17,39 @@ typedef long long longlong;
#include "mongo.h"
#include "encodings.h"
-#include "server.h"
-#include "io.h"
#define VERSION_STRING "1.0"
#define VERSION_STRING_LENGTH 3
#define ERRMSG_SIZE 1000
-#define NUMBER_OF_THREADS 20
+#define NUMBER_OF_THREADS 50
#define LOG_PATH "/home/todd/lib_mysqludf_mongodb.log"
+// TYPE DEFINITIONS
+typedef struct job_type {
+ UDF_ARGS *args;
+ struct job_type *next;
+ struct job_type *prev;
+} job_type;
+
+typedef struct queue_type {
+ int count;
+ job_type *start;
+ job_type *end;
+ sem_t *semaphore;
+} queue_type;
+
+typedef struct pool_type {
+ int count;
+ pthread_t *threads;
+ queue_type *queue;
+} pool_type;
+
+typedef struct thread_type {
+ pthread_mutex_t *connection_mutex;
+ pool_type *pool;
+} thread_type;
+
typedef struct mongodb_connection_type {
mongo connection[1];
int init;
View
6 io.c
@@ -1,4 +1,4 @@
-#include "common.h"
+#include "io.h"
my_bool mongodb_save_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
@@ -17,8 +17,8 @@ my_bool mongodb_save_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
}
long long mongodb_save(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) {
-
- pool_push(connection_pool, args);;
+
+ pool_push(connection_pool, args);
*is_null = 1;
return 0;
View
4 io.h
@@ -1,7 +1,9 @@
#ifndef MYSQLUDF_IO_H
-
#define MYSQLUDF_IO_H
+#include "common.h"
+#include "server.h"
+
my_bool mongodb_save_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
long long mongodb_save(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error);
View
50 server.c
@@ -1,7 +1,6 @@
-#include "common.h"
#include "server.h"
-FILE *stderr;
+FILE *log_file;
static int pool_keepalive = 1;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t iconv_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -99,7 +98,7 @@ my_bool mongodb_disconnect_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
// CONNECTION FUNCTIONS
-void connect(UDF_ARGS *args) {
+void mdb_connect(UDF_ARGS *args) {
int i,status;
@@ -145,10 +144,17 @@ void connect(UDF_ARGS *args) {
}
-void insert(UDF_ARGS *args) {
+void mdb_insert(UDF_ARGS *args) {
+
+ int i, thread = 0;
+
+ for(i = 0; i < NUMBER_OF_THREADS; i++) {
+
+ if(pthread_equal(connection_pool->threads[i],pthread_self()))
+ thread = i;
+
+ }
- int thread = (int)pthread_self();
-
if(mdb[thread].init == 0)
fprintf(stderr, "mongodb connection was not initialized.\n");
@@ -160,7 +166,7 @@ void insert(UDF_ARGS *args) {
// the first argument is reserved
// for the collection name, so start
// the loop at 1.
- int i = 1;
+ i = 1;
bson_init(b);
@@ -184,17 +190,13 @@ void insert(UDF_ARGS *args) {
} else {
- pthread_mutex_lock(&iconv_mutex);
-
- char *item = NULL;
-
- item = utf8_encode(args->args[i]);
-
- bson_append_string(b, args->attributes[i], item);
+ char *item = NULL;
- free(item);
+ item = utf8_encode(args->args[i]);
+
+ bson_append_string(b, args->attributes[i], item);
- pthread_mutex_unlock(&iconv_mutex);
+ free(item);
}
@@ -242,7 +244,7 @@ void insert(UDF_ARGS *args) {
}
-void disconnect() {
+void mdb_disconnect() {
int i;
@@ -265,7 +267,7 @@ void disconnect() {
// UDF FUNCTIONS
long long mongodb_connect(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) {
- connect(args);
+ mdb_connect(args);
*is_null = 1;
return 0;
@@ -274,7 +276,7 @@ long long mongodb_connect(UDF_INIT *initid, UDF_ARGS *args, char *result, unsign
long long mongodb_disconnect(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error) {
- disconnect();
+ mdb_disconnect();
*is_null = 1;
return 0;
@@ -284,7 +286,7 @@ long long mongodb_disconnect(UDF_INIT *initid, UDF_ARGS *args, char *result, uns
// QUEUE FUNCTIONS
-void queue_push(pool_type *pool, job_type *job) {
+void queue_push(pool_type *pool, job_type *job) {
job->next = NULL;
job->prev = NULL;
@@ -394,9 +396,9 @@ void *pool_run(void *p) {
queue_pop(pool);
- insert(arg_buff);
-
pthread_mutex_unlock(&mutex);
+
+ mdb_insert(arg_buff);
free(job);
@@ -413,7 +415,7 @@ void *pool_run(void *p) {
}
int pool_push(pool_type *pool, UDF_ARGS *args){
-
+
job_type *job;
job = (job_type*)malloc(sizeof(job_type));
@@ -430,7 +432,7 @@ int pool_push(pool_type *pool, UDF_ARGS *args){
queue_push(pool, job);
pthread_mutex_unlock(&mutex);
-
+
return 0;
}
View
37 server.h
@@ -2,39 +2,8 @@
#define MYSQLUDF_SERVER_H
-#include <pthread.h>
-#include <semaphore.h>
#include "common.h"
-FILE *log_file;
-
-// TYPE DEFINITIONS
-typedef struct job_type {
- UDF_ARGS *args;
- struct job_type *next;
- struct job_type *prev;
-} job_type;
-
-typedef struct queue_type {
- int count;
- job_type *start;
- job_type *end;
- sem_t *semaphore;
-} queue_type;
-
-typedef struct pool_type {
- int count;
- pthread_t *threads;
- queue_type *queue;
-} pool_type;
-
-typedef struct thread_type {
- pthread_mutex_t *connection_mutex;
- pool_type *pool;
-} thread_type;
-
-
-
// INIT FUNCTIONS
pool_type* pool_init(int thread_count);
@@ -46,11 +15,11 @@ my_bool mongodb_disconnect_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
// CONNECTION FUNCTIONS
-void connect(UDF_ARGS *args);
+void mdb_connect(UDF_ARGS *args);
-void insert(UDF_ARGS *args);
+void mdb_insert(UDF_ARGS *args);
-void disconnect();
+void mdb_disconnect();
// UDF FUNCTIONS

0 comments on commit 5723625

Please sign in to comment.