Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Initial revision, outline of ideas.

git-svn-id: svn+ssh://rubyforge.org/var/svn/alogr/trunk@1 5d4fd1e0-7adf-40a7-adde-e9af6235188f
  • Loading branch information...
commit 573e9f1021ac034489099d0a20fb9b93e7702085 0 parents
authored September 14, 2007
70  README
... ...
@@ -0,0 +1,70 @@
  1
+= About
  2
+
  3
+  AlogR enables a Ruby project to log to the filesystem using non-blocking asynchronous IO with no external dependencies.
  4
+
  5
+= Author
  6
+
  7
+  Wayne E. Seguin (wayneeseguin at gmail dot com)
  8
+
  9
+= How it works
  10
+
  11
+  A global logging buffer gets added to the running application.
  12
+  There is a global buffer which is an array of fifo queues.
  13
+  Worker threads are to pop the first element (which is a string) off of a fifo queue and "process" it. 
  14
+  Processing involves writing the string to disk using non-blocking asynchronous IO
  15
+  Threads will be event based where the event is something being placed in the queue, or the queue non-empty.
  16
+  
  17
+= Config
  18
+  
  19
+  You can choose the log that the application uses:
  20
+
  21
+  AlogR.new( "log/app_log.log" )
  22
+  
  23
+  :log specifies the default log
  24
+  
  25
+  AlogR.new( :log => "log/app_log.log" )
  26
+
  27
+  You may also specify any combination of logs corresponding to the log levels in addition to the default log:
  28
+
  29
+  AlogR.new( :log => "log/app_log.log", :error => "log/error.log", :debug => "log/debug.log")
  30
+
  31
+  where log levels are one of: [ emergency, alert, critical, error, warning, notice, info, debug ] 
  32
+
  33
+= Application Usage
  34
+
  35
+  First be sure to setup the logger:
  36
+
  37
+  $alogger = AlogR.new(:log)
  38
+
  39
+  To log a string to either the error log or the default log (depending on your configuration):
  40
+
  41
+  "Jonnie! You borked it!".log(:error)
  42
+
  43
+Feature Requests:
  44
+# Be able to specify conditional filters
  45
+
  46
+= Examples
  47
+
  48
+* Example 1
  49
+require "lib/alogr"
  50
+$logger = AlogR.new(:log => "/Users/wayne/projects/ruby/alogr/trunk/log/production.log")
  51
+"a test, should go to the logs 10 times\n".log.log.log.log.log.log.log.log.log.log
  52
+
  53
+* Example 2
  54
+require "lib/alogr"
  55
+$logger = AlogR.new(
  56
+:log => "/Users/wayne/projects/ruby/alogr/trunk/log/production.log", 
  57
+:error => "/Users/wayne/projects/ruby/alogr/trunk/log/error.log",
  58
+:info => "/Users/wayne/projects/ruby/alogr/trunk/log/info.log",
  59
+/paypal => "/Users/wayne/projects/ruby/alogr/trunk/log/paypal.log"
  60
+)
  61
+"this should go to info log".log
  62
+"this should go to error log".log(:error)
  63
+"this should go to production log".log(:warning)
  64
+
  65
+* If line matches a regexp then sent log/paypal.log as well as production.log
  66
+AlogR.new(:log => "log/production.log", /paypal/ => "log/paypal.log")
  67
+
  68
+Gets logged to both
  69
+{ :something => "value", :something_else => "value" }.log
  70
+
37  ext/aio/aio_log.h
... ...
@@ -0,0 +1,37 @@
  1
+// Asynchronously log the string of specified length to given file
  2
+int aio_log(void * string, int length, char * file_name) {
  3
+  printf("() aio_log '%s' > %s\n", string, file_name);
  4
+  
  5
+  int done = 0;
  6
+  int error;
  7
+  int file_descriptor;
  8
+  
  9
+  if ((file_descriptor = open(file_name, O_WRONLY | O_CREAT | O_APPEND)) == -1) {
  10
+    fprintf(stderr, "Failed to open %s: %s\n", file_name, strerror(errno));
  11
+    return 1;
  12
+  }
  13
+  
  14
+  if (init_signal(SIG_AIO) == -1) {
  15
+    perror("Failed to initialize signal");
  16
+    return 1;
  17
+  }
  18
+  
  19
+  if (init_write(file_descriptor, SIG_AIO, string, length) == -1) {
  20
+    perror("Failed to initate the write");
  21
+    return 1;
  22
+  }
  23
+  
  24
+  for ( ; ; ) {
  25
+    done = suspend_until_done_or_timeout(); // Wait for the aio complete signal or timeout
  26
+    if (!done) {
  27
+      if (done = getdone()) {
  28
+        if (error = geterror()) {
  29
+          fprintf(stderr, "Failed to log:%s\n", strerror(error));
  30
+        }
  31
+      }
  32
+    } else {
  33
+      fprintf(stderr, "Logging successful, %d bytes\n", getbytes());
  34
+      break; // break out of for loop
  35
+    }
  36
+  }
  37
+}
156  ext/aio/aio_signal.h
... ...
@@ -0,0 +1,156 @@
  1
+#include "aio.h"
  2
+#include "errno.h"
  3
+#include "signal.h"
  4
+//#include "restart.h" // r_read, r_write, etc
  5
+
  6
+static struct aiocb aiocb;
  7
+static sig_atomic_t doneflag;
  8
+static int fdout;
  9
+static int globalerror;
  10
+static int totalbytes;
  11
+
  12
+static int readstart();
  13
+static void seterror(int error);
  14
+
  15
+static void aiohandler(int signo, siginfo_t *info, void *context) {
  16
+  int  myerrno;
  17
+  int  mystatus;
  18
+  int  serrno;
  19
+
  20
+  serrno = errno;
  21
+  myerrno = aio_error(&aiocb);
  22
+
  23
+  if (myerrno == EINPROGRESS) {
  24
+    errno = serrno;
  25
+    return;
  26
+  }
  27
+
  28
+  if (myerrno) {
  29
+    seterror(myerrno);
  30
+    errno = serrno;
  31
+    return;
  32
+  }
  33
+
  34
+  mystatus = aio_return(&aiocb);
  35
+  totalbytes += mystatus;
  36
+
  37
+  // Handling of regexen should go somewhere around here
  38
+
  39
+  if (mystatus == 0) {
  40
+    doneflag = 1;
  41
+  }
  42
+
  43
+  errno = serrno;
  44
+}
  45
+
  46
+// start an asynchronous read
  47
+static int readstart() {
  48
+  int error;
  49
+  if (error = aio_read(&aiocb)) {
  50
+    seterror(errno);
  51
+  }
  52
+  return error;
  53
+}
  54
+
  55
+// start an asynchronous write
  56
+static int writestart() {
  57
+  int error;
  58
+  if (error = aio_write(&aiocb)) {
  59
+    seterror(errno);
  60
+  }
  61
+  return error;
  62
+}
  63
+
  64
+// update globalerror if zero
  65
+static void seterror(int error) {
  66
+  if (!globalerror) {
  67
+    globalerror = error;
  68
+  }
  69
+  doneflag = 1;
  70
+}
  71
+
  72
+/* --------------------------Public Functions ---------------------------- */
  73
+// Get the total number of bytes
  74
+int getbytes() {
  75
+  if (doneflag){
  76
+    return totalbytes;
  77
+  }
  78
+  errno = EINVAL;
  79
+  return -1;
  80
+}
  81
+
  82
+// Check if done
  83
+int getdone() {
  84
+  return doneflag;
  85
+}
  86
+
  87
+// Return the globalerror value if process is done
  88
+int geterror() {
  89
+  if (doneflag) {
  90
+    return globalerror;
  91
+  }
  92
+  errno = EINVAL;
  93
+  return errno;
  94
+}
  95
+
  96
+int init_read(int fdread, int fdwrite, int signo, char *buf, int bufsize) {
  97
+  // Setup control block
  98
+  aiocb.aio_fildes = fdread;
  99
+  aiocb.aio_offset = 0;
  100
+  aiocb.aio_buf = (void *) buf;
  101
+  aiocb.aio_nbytes = bufsize;
  102
+
  103
+  // Signal handling
  104
+  aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
  105
+  aiocb.aio_sigevent.sigev_signo = signo;
  106
+  aiocb.aio_sigevent.sigev_value.sival_ptr = &aiocb;
  107
+  fdout = fdwrite;
  108
+  doneflag = 0;
  109
+  globalerror = 0;
  110
+  totalbytes = 0;
  111
+  return readstart(); // Initiate the aio_read call
  112
+}
  113
+
  114
+int init_write(int fdwrite, int signal_number, char *buffer, int buffer_size) {
  115
+  // Setup control block
  116
+  aiocb.aio_fildes = fdwrite;
  117
+  aiocb.aio_offset = 0; // Should be ignored if file is in append-mode
  118
+  aiocb.aio_buf = (void *)buffer;
  119
+  aiocb.aio_nbytes = buffer_size;
  120
+
  121
+  // Signal handling
  122
+  aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
  123
+  aiocb.aio_sigevent.sigev_signo = signal_number;
  124
+  aiocb.aio_sigevent.sigev_value.sival_ptr = &aiocb;
  125
+
  126
+  aiocb.aio_sigevent.sigev_signo = SIGUSR1;
  127
+  // Initialize variables
  128
+  fdout = fdwrite;
  129
+  doneflag = 0;
  130
+  globalerror = 0;
  131
+  totalbytes = 0;
  132
+
  133
+  return writestart(); // Initiate the aio_write call
  134
+}
  135
+
  136
+// set up the handler for the async I/O
  137
+int init_signal(int signo) {
  138
+  struct sigaction newact;
  139
+
  140
+  newact.sa_sigaction = aiohandler;
  141
+  newact.sa_flags = SA_SIGINFO;
  142
+  if ((sigemptyset(&newact.sa_mask) == -1) || 
  143
+  (sigaction(signo, &newact, NULL) == -1)) {
  144
+    return -1;
  145
+  }
  146
+  return 0;
  147
+}
  148
+
  149
+// return 1 if done, 0 otherwise
  150
+int suspend_until_done_or_timeout() {
  151
+  const struct aiocb *aiocblist;
  152
+
  153
+  aiocblist = &aiocb;
  154
+  aio_suspend(&aiocblist, 1, NULL);
  155
+  return doneflag;
  156
+}
125  ext/alogr_ext.c
... ...
@@ -0,0 +1,125 @@
  1
+// There might be a more efficient way to do this
  2
+// However the goal is to make it work first :)
  3
+// NOTE:
  4
+//The signal handler does not output any error messages. Output from an asynchronous signal handler can interfere with I/O operations in the main program, and the standard library routines such as fprintf and perror may not be safe to use in signal handlers. Instead, the signal handler just keeps track of the errno value of the first error that occurred. The main program can then print an error message, using strerror.
  5
+
  6
+// ruby
  7
+#include "ruby.h"
  8
+#include "rubyio.h"
  9
+
  10
+// pthread
  11
+#include <pthread.h>
  12
+#include <unistd.h>   // Standard unix constants (posix)
  13
+#define NUM_THREADS 1
  14
+
  15
+// aio
  16
+#include <aio.h>
  17
+#include <errno.h>
  18
+#include <fcntl.h>
  19
+#include <sys/stat.h> // MODEs
  20
+#include <signal.h>
  21
+//#define SIG_AIO (SIGRTMIN+5) // OSX seems to be lacking
  22
+#define SIG_AIO SIGUSR1 // So trying this for now... :(
  23
+#include "aio/aio_signal.h"
  24
+#include "aio/aio_log.h"
  25
+
  26
+#define BLKSIZE 1024
  27
+#define MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
  28
+
  29
+struct thread_data { } payload[NUM_THREADS];
  30
+int status;
  31
+pthread_mutex_t log_mutex;
  32
+pthread_cond_t log_threshold_cv;
  33
+
  34
+// Consume the queue one item at a time
  35
+void *muncher(void *thread_arg) {
  36
+  printf("\n() muncher!");
  37
+
  38
+  VALUE work, rb_string, buffer, log_files;
  39
+  int log_level;
  40
+  log_files = rb_gv_get("$alogr_log_files");
  41
+  buffer= rb_gv_get("$alogr_buffer");
  42
+
  43
+  for( ; ; ) {
  44
+    // Remove the first item from the buffer
  45
+    work = rb_ary_shift(buffer);
  46
+    while(!NIL_P(work)) {
  47
+      log_level = FIX2INT(rb_ary_shift(work));
  48
+      rb_string = rb_ary_shift(work);
  49
+
  50
+      aio_log( RSTRING(rb_string)->ptr, // The string to log
  51
+        RSTRING(rb_string)->len, // Real length of string to log
  52
+        RSTRING(rb_ary_entry(log_files, log_level))->ptr // log file name
  53
+        );
  54
+      
  55
+      work = rb_ary_shift(buffer); // Fetch the next item of work
  56
+    }
  57
+    sleep(0.05); // Wait before rechecking the queue
  58
+    // TODO: Process regexen on string afterwards, then log strings that match to appropriate locations. Do this in callback.
  59
+    // TODO: Figure out a clean exit condition... If $alogr_log_files is marked for GC then break?
  60
+  };
  61
+
  62
+  printf("<i> Thread exiting!\n");
  63
+  pthread_exit((void *) 0);
  64
+}
  65
+
  66
+static VALUE signal_munchers() {
  67
+  pthread_mutex_lock(&log_mutex);
  68
+  pthread_cond_signal(&log_threshold_cv);
  69
+  pthread_mutex_unlock(&log_mutex);
  70
+
  71
+  return Qnil;
  72
+}
  73
+
  74
+// Initialize the muncher thread(s)
  75
+static VALUE init_munchers() {  
  76
+  printf("\n() init_munchers");
  77
+  int return_code;
  78
+
  79
+  pthread_t threads[NUM_THREADS];
  80
+  pthread_attr_t attr;
  81
+
  82
+  // Initialize and set thread detached attribute
  83
+  pthread_attr_init(&attr);
  84
+  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  85
+
  86
+  int t;
  87
+  for(t = 0; t < NUM_THREADS; t++) {
  88
+  //payload[t].data = ...;
  89
+    pthread_create(&threads[t], &attr, muncher, (void *) &payload[t]);
  90
+  }
  91
+
  92
+  for(t=0; t<NUM_THREADS; t++) {
  93
+    printf("Creating thread %d\n", t);
  94
+    return_code = pthread_create(&threads[t], &attr, muncher, NULL); 
  95
+    if (return_code) {
  96
+      printf("ERROR; return code from pthread_create() is %d\n", return_code);
  97
+      exit(-1);
  98
+    }
  99
+  }
  100
+
  101
+  // Free attribute and wait for the other threads
  102
+  pthread_attr_destroy(&attr);
  103
+  for(t=0; t<NUM_THREADS; t++) {
  104
+    return_code = pthread_join(threads[t], (void **) &status);
  105
+    if(return_code) {
  106
+      printf("ERROR; return code from pthread_join() is %d\n", return_code);
  107
+      exit(-1);
  108
+    }
  109
+    printf("Completed join with thread %d status= %d\n",t, status);
  110
+  }
  111
+
  112
+  pthread_exit(NULL);
  113
+  
  114
+  return Qnil;
  115
+}
  116
+
  117
+static VALUE cAlogR;
  118
+// Called when interpreter loads alogr
  119
+void Init_alogr_ext() {
  120
+  printf("() Init_alogr_ext\n");
  121
+  rb_gv_set("alogr_buffer", rb_ary_new());
  122
+  rb_gv_set("alogr_log_files",rb_ary_new());
  123
+  rb_define_method(cAlogR, "init_munchers", init_munchers, 0);
  124
+  rb_define_method(cAlogR, "signal_munchers", signal_munchers, 0);
  125
+}
13  ext/extconf.rb
... ...
@@ -0,0 +1,13 @@
  1
+# Loads mkmf which is used to make makefiles for Ruby extensions
  2
+require "mkmf"
  3
+
  4
+# Give it a name
  5
+extension_name = "alogr_ext"
  6
+
  7
+# pthread
  8
+
  9
+# The destination
  10
+dir_config(extension_name)
  11
+
  12
+# Do the work
  13
+create_makefile(extension_name)
56  lib/alogr.rb
... ...
@@ -0,0 +1,56 @@
  1
+class AlogR
  2
+
  3
+  Levels = { :emergency => 0, :alert => 1, :critical => 2, :error => 3, :warning => 4, :notice => 5, :info => 6, :debug => 7 }
  4
+
  5
+  class << self
  6
+    attr_accessor :default_log_level
  7
+  end
  8
+
  9
+  def initialize(options = {})
  10
+    puts "In Ruby initialize method"
  11
+    case options
  12
+    when String:
  13
+      options = {:log => options}
  14
+    when Hash: 
  15
+      options[:log] ||= "log/alogr.log"
  16
+    else
  17
+      raise "AlogR: Invalid configuration"
  18
+    end
  19
+
  20
+    default_log_level = options[:default_log_level] || :info
  21
+    
  22
+    AlogR::Levels.keys.each do | key |
  23
+      file_name = options[key] || options[:log]
  24
+      $alogr_log_files[AlogR::Levels[key]] = file_name
  25
+      system("mkdir -p #{File.dirname(file_name)} ; touch #{file_name}")
  26
+    end
  27
+
  28
+    init_munchers
  29
+
  30
+  end
  31
+
  32
+  def log(string, level = default_log_level)
  33
+    $alogr_buffer << [level, string]
  34
+    signal_munchers
  35
+  end
  36
+
  37
+end
  38
+
  39
+require "alogr_ext"
  40
+
  41
+class Object
  42
+
  43
+  def log(level = :info)
  44
+    if self.class == String
  45
+      if AlogR::Levels.has_key?(level)
  46
+        $logger.log(self, AlogR::Levels[level])
  47
+      else
  48
+        raise "Error: No such log level #{level}"
  49
+      end
  50
+    else
  51
+      self.to_s.log
  52
+    end
  53
+    self
  54
+  end
  55
+
  56
+end
5  tests/test1.rb
... ...
@@ -0,0 +1,5 @@
  1
+# Example 1
  2
+require "lib/alogr"
  3
+$logger = AlogR.new(:log => "/Users/wayne/projects/ruby/alogr/trunk/log/production.log")
  4
+"a test, should go to the logs 10 times\n".log.log.log.log.log.log.log.log.log.log
  5
+sleep(5)
13  tests/test2.rb
... ...
@@ -0,0 +1,13 @@
  1
+# Example 2
  2
+require "lib/alogr"
  3
+$logger = AlogR.new(
  4
+:log => "/Users/wayne/projects/ruby/alogr/trunk/log/production.log", 
  5
+:error => "/Users/wayne/projects/ruby/alogr/trunk/log/error.log",
  6
+:info => "/Users/wayne/projects/ruby/alogr/trunk/log/info.log"
  7
+)
  8
+"this should go to info log".log
  9
+sleep(1)
  10
+"this should go to error log".log(:error)
  11
+sleep(1)
  12
+"this should go to production log".log(:warning)
  13
+sleep(2)

0 notes on commit 573e9f1

Please sign in to comment.
Something went wrong with that request. Please try again.