Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

First connection made between Cassandra and Monitor/Parser

All monitored files in Common Log Format will now end up in a Cassandra
cluster (specified in the config file) indexed by status code and date.
  • Loading branch information...
commit 2d17cfd8857c27b09bd01e4aa47f547f38f6afa4 1 parent a66c194
@jbohman authored
View
17 README
@@ -3,4 +3,21 @@ Logsandra
Work in progress!
Requirements:
+ - Cassandra + Thrift
+ - pycassa
- PyYaml
+ - Python Dateutils
+
+
+Current keyspace config (for testing purposes only):
+
+<Keyspaces>
+ <Keyspace Name="logsandra">
+ <ColumnFamily Name="entries" CompareWith="BytesType" />
+ <ColumnFamily Name="by_date" CompareWith="LongType" />
+ <ColumnFamily Name="by_date_data" CompareWith="LongType" />
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+ <ReplicationFactor>1</ReplicationFactor>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+ </Keyspace>
+</Keyspaces>
View
4 config.yaml
@@ -1,3 +1,6 @@
+# Ident
+ident: 'Logsandra Example Server 1'
+
# Webservice
webservice_enabled: True
webservice_address: localhost
@@ -6,6 +9,7 @@ webservice_port: 8080
# Cassandra cluster to connect to
cassandra_address: localhost
cassandra_port: 9160
+cassandra_timeout: 5
# List of paths (files and directories) to monitor
paths:
View
8 logsandra/logsandra.py
@@ -16,9 +16,9 @@
class Application(Daemon):
def monitor(self):
- reader = monitor.Reader(False)
- watcher = monitor.Watcher(self.settings['paths'], reader.callback)
- watcher.loop()
+ print 'Starting monitor service...'
+ m = monitor.Monitor(self.settings, False)
+ m.run()
# TODO: setup application here, monitor + pylon webservice
def run(self):
@@ -35,6 +35,8 @@ def run(self):
self.monitor_process = multiprocessing.Process(target=self.monitor)
self.monitor_process.start()
+ print 'Starting web service...'
+
self.running = True
while self.running:
time.sleep(10)
View
2  logsandra/monitor/__init__.py
@@ -1 +1 @@
-from monitor import Reader, Watcher
+from monitor import Monitor
View
73 logsandra/monitor/monitor.py
@@ -1,24 +1,47 @@
-#!/usr/bin/env python
-import sys
+# Global imports
import os.path
-from optparse import OptionParser
+import time
+import uuid
+import struct
+import pycassa
-# Try to import pyinotify handler, else standard handler
-#try:
-# from watchers.inotify import InotifyWatcher as Watcher
-#except ImportError:
-from watchers.standard import StandardWatcher as Watcher
+from watchers import Watcher
from parsers.clf import ClfParser
-class Reader(object):
+class Monitor(object):
+
+ def __init__(self, settings, tail=False):
+ self.settings = settings
- def __init__(self, tail=False):
self.tail = tail
self.seek_data = {}
self.parser = {}
+ def run(self):
+ # Connect to cassandra
+ connect_string = '%s:%s' % (self.settings['cassandra_address'], self.settings['cassandra_port'])
+ self.client = pycassa.connect([connect_string], timeout=self.settings['cassandra_timeout'])
+
+ # Column families
+ self.entries = pycassa.ColumnFamily(self.client, 'logsandra', 'entries')
+ self.by_date = pycassa.ColumnFamily(self.client, 'logsandra', 'by_date')
+ self.by_date_data = pycassa.ColumnFamily(self.client, 'logsandra', 'by_date_data')
+
+ # Struct
+ self.long_struct = struct.Struct('l')
+
+ # Start watcher (inf loop)
+ self.watcher = Watcher(self.settings['paths'], self.callback)
+ self.watcher.loop()
+
+ def _to_long(self, data):
+ return self.long_struct.pack(data)
+
+ def _from_long(self, data):
+ return self.long_struct.unpack(data)
+
def callback(self, filename, data):
if os.path.basename(filename).startswith('.'):
return False
@@ -39,6 +62,14 @@ def callback(self, filename, data):
result = self.parser[filename].parse_line(line)
+ # TODO: Should move this to every individual parser
+ key = uuid.uuid4()
+ self.entries.insert(key.bytes, {'ident': self.settings['ident'], 'entry': line})
+
+ if 'status' in result:
+ # TODO: is this really how pycassa should be used?
+ self.by_date.insert(str(result['status']), {self._to_long(time.mktime(result['time'].timetuple())): str(key)})
+
print result
self.seek_data[filename] = file_handler.tell()
@@ -46,25 +77,3 @@ def callback(self, filename, data):
except IOError:
pass
-
-# Run it
-if __name__ == '__main__':
- usage = 'usage: %prog [options] path [path ...]'
- parser = OptionParser(usage=usage)
- parser.add_option('-r', '--rescan-freq', dest='rescan_freq', help='rescan frequency in seconds', metavar='SECONDS', default=20)
- parser.add_option('-u', '--update-freq', dest='update_freq', help='update frequnecy in seconds', metavar='SECONDS', default=10)
- parser.add_option('-t', '--tail', action='store_true', dest='tail', help='start reading from the bottom only', default=False)
- parser.add_option('--recursive', action='store_true', dest='recursive')
- (options, args) = parser.parse_args()
-
- if not args:
- print "Need atleast one path (file or directory) to monitor, see --help"
- sys.exit(1)
-
- entities = []
- for arg in args:
- entities.append({'name': arg, 'recursive': options.recursive})
-
- r = Reader(options.tail)
- w = Watcher(entities, r.callback, update_freq=options.update_freq, rescan_freq=options.rescan_freq)
- w.loop()
View
22 logsandra/monitor/parsers/clf.py
@@ -32,20 +32,20 @@ def __init__(self, format):
def parse_line(self, line):
match = self.pattern.match(line)
- res = match.groupdict()
+ result = match.groupdict()
- if 'user' in res and res['user'] == '-':
- res['user'] = None
+ if 'user' in result and result['user'] == '-':
+ result['user'] = None
- if 'size' in res and res['size'] == '-':
- res['size'] = None
+ if 'size' in result and result['size'] == '-':
+ result['size'] = None
- if 'size_with_headers' in res and res['size_with_headers'] == '-':
- res['size_with_headers'] = None
+ if 'size_with_headers' in result and result['size_with_headers'] == '-':
+ result['size_with_headers'] = None
- if 'referer' in res and res['referer'] == '-':
- res['referer'] = None
+ if 'referer' in result and result['referer'] == '-':
+ result['referer'] = None
- res['time'] = dateutil.parser.parse(res['time'], fuzzy=True)
+ result['time'] = dateutil.parser.parse(result['time'], fuzzy=True)
- return res
+ return result
View
7 logsandra/monitor/watchers/__init__.py
@@ -0,0 +1,7 @@
+# Try to import pyinotify handler, else standard handler
+#try:
+# from inotify import InotifyWatcher as Watcher
+#except ImportError:
+# from standard import StandardWatcher as Watcher
+
+from standard import StandardWatcher as Watcher
Please sign in to comment.
Something went wrong with that request. Please try again.