An event aggregator.
Two components coupled by a database and a data model
- ETL System
- Reports webapp
Two .INI-style config files:
local.ini
-- read byworkhours.tasks
andworkhours.climain
development.ini
-- read by pserve, gunicorn
$ workhours --help Usage: workhours [-c conf] [--fs path] [--db uri]] <options> [-s source path+] [-r report+] event aggregation CLI Options: -h, --help show this help message and exit -c CONFIG_FILE, --config=CONFIG_FILE path to configuration file containing db.uri, fs.uri, and TaskQueue definitions --db=EVENTSDB_URI, --eventsdb=EVENTSDB_URI database uri for storing task results ex: ``sqlite:///:memory:`` --fs=FS_URI, --task-storage=FS_URI Path where task data will be copied and reports files will be stored -l, --list-source-types List supported source (TaskQueue) types -s SRC_QUEUES, --src=SRC_QUEUES Type and filename tuples (ex: ``-s shell.log ./.usrlog``) -P, --parse Parse and extract all sources defined in by the ``-s`` option and the ``-c`` config -u USERNAMES, --username=USERNAMES Usernames to include --list-report-types List supported report types -r REPORTS, --report=REPORTS Generate a report type -o OUTPUT, --output-file=OUTPUT Output file (default: '-' for stdout) -O OUTPUT_FORMAT, --output-format=OUTPUT_FORMAT Output format <csv|json> (default: None) -G GAPTIME, --gaptime=GAPTIME Minute gap to detect between entries -p, --print-all Dump the events table to stdout -v, --verbose -q, --quiet -t, --tes
a one-pass copy and parse of each source listed in -c --config-file
as
[queue_type] uniqkey_n = file_uri_n
and on the commandline as source path
to -s --src
:
workhours -s log.shell ~/shell.log
Each source is copied into a filestore at ``fs.uri specified as either
- config:
fs.uri
in the config file - CLI:
--fs
on the commandline
and read into a SQL database wrapped by SQLAlchemy specified either by
- Config:
eventsdb.uri
in thelocal.ini
configuration file - CLI:
--db sqlite:///example.db
- TODO: es indexing
Parse functions are imported ("registered")
as named queues workhours.tasks
linked to parse_
functions.
@classmethod
def Event.from_uhm(cls, source, obj, **kwargs):
_kwargs = {}
_kwargs['task_id'] = kwargs.get('task_id')
try:
if isinstance(obj, dict):
_kwargs.update(obj)
_obj = cls(source, **_kwargs)
elif hasattr(obj, 'to_event_row'):
_obj = cls(source, *obj.to_event_row(), **_kwargs)
# punt
elif hasattr(obj, '__iter__'):
_obj = cls(source, *obj, **_kwargs)
else:
raise Exception("uh")
except Exception, e:
log.error({'obj': obj,
'type': type(obj),
'dir': dir(obj)
})
log.exception(e)
raise Exception()
- TODO: normalize parse function signatures:
*args
,*kwargs
- TODO:
workhours.interfaces.IDataSource
- TODO: Tag Support
- TODO: IDataSource Interface
- TODO: Tests
- TODO: Standard bookmarks.html file
- TODO: HTTP common log
- TOOD: Pyline column mappings
to_event_row()
:tuple
- TODO: IEventRecord Interface
- sqlite:///:memory:
- mysql://...
- [...]://...
- TODO: connection timeouts configuration
- TODO: tasks configuration
- TODO: elasticsearch sqlalchemy event integration
- TODO: generate a
pandas.DataFrame
from event tables
Standard python classes mapped to SQLAlchemy tables.
Event
Place
TaskQueue
Task Models
Event . .date .url .text .task_id
- TODO: sadisplay
- TODO: stdout norm (__{str,unicode}__)
- TODO: periodic tasks
- TODO: inotify throttling
- TODO: messaging middleware
- TODO: celery || zmq
- TODO: handle potentially frequently changing events.db files when
- TODO: or, manage n databases and n sets of models (see)
TODO: tests: histograms with sqlalchemy date paging
TODO: date aggregation
- TODO: webapp configuration
- TODO: fulltext search
- TODO: faceted search and highlighting
TODO: events HTML tables + paging TODO: frequency timeline histogram TODO: REST API TODO: js layer