Skip to content

Commit

Permalink
Refactoring ebcore
Browse files Browse the repository at this point in the history
  • Loading branch information
tunnell committed Oct 30, 2014
1 parent 8d213c8 commit d7f8061
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 42 deletions.
9 changes: 9 additions & 0 deletions docs/usage.rst
Expand Up @@ -15,7 +15,16 @@ By default, the data processing is done continuously. This means that (unless o
However, this is meant to run as a daemon and should already be running. This is what one must do to restart.


===============
Starting Celery
===============

When Celery is running, all wax does is ship jobs off to Celery. Celery nodes
must pick these events up.

To start Celery once `wax` is installed, run::

celery -A wax.EventBuilder.Tasks worker



2 changes: 1 addition & 1 deletion wax/Configuration.py
Expand Up @@ -5,7 +5,7 @@
MAX_DRIFT = 30000 # units of 10 ns
HOSTNAME = '127.0.0.1'

CHUNKSIZE = 2 ** 28
CHUNKSIZE = 2 ** 30
PADDING = (5 * MAX_DRIFT)

# 1 ADC count = 2.2 V / 2^14
Expand Down
23 changes: 14 additions & 9 deletions wax/EventBuilder/Processor.py
Expand Up @@ -69,6 +69,10 @@ def __init__(self,
padding=Configuration.PADDING,
threshold=Configuration.THRESHOLD,
run_hostname=Configuration.HOSTNAME):
self.start_time_key = 'time_min'
self.stop_time_key = 'time_max'
self.bulk_key = 'bulk'

# Logging
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(message)s',
Expand Down Expand Up @@ -176,13 +180,13 @@ def _process_chosen_run(self, run_doc):
db = conn[data_location["dbname"]]
collection = db[data_location["dbcollection"]]

sort_key = [('time', -1),
sort_key = [(self.start_time_key, -1),
('module', -1),
('_id', -1)]

log.info("Ensuring index")
collection.ensure_index(sort_key,
background=True)
collection.ensure_index(sort_key)
#background=True)

# int rounds down
min_time_index = 0
Expand All @@ -197,17 +201,17 @@ def _process_chosen_run(self, run_doc):
# Find maximum time
log.info("Identifying maximum time")
doc = collection.find_one({},
fields=['time'],
sort=sort_key)
fields=[self.stop_time_key],
sort=sort_key)


if doc is None or doc['time'] is None:
if doc is None or doc[self.stop_time_key] is None:
log.warning("Cannot find maximum time; wait %d s and try again" % self.waittime)
log.debug(doc)
time.sleep(self.waittime)
continue

max_time = doc['time']
max_time = doc[self.stop_time_key]
log.info("Maximum time is %d { '_id' : %s}" % (max_time,
doc['_id']))

Expand All @@ -231,7 +235,7 @@ def _process_chosen_run(self, run_doc):
t1 = (i + 1) * self.chunksize

self.process(t0=t0, t1=t1 + self.padding,
collection_name=collection,
collection_name=data_location["dbcollection"],
hostname=data_location["dbaddr"])

processed_time = (max_time_index - current_time_index)
Expand Down Expand Up @@ -275,8 +279,9 @@ def process(self, **kwargs):
class Celery(Base):

def __init__(self, **kwargs):
Base.__init__(self, **kwargs)
self.results = result.ResultSet([])
Base.__init__(self, **kwargs)

def process(self, **kwargs):
self.log.fatal(str(kwargs))
self.results.add(process_time_range_task.delay(**kwargs))
8 changes: 5 additions & 3 deletions wax/EventBuilder/Tasks.py
Expand Up @@ -5,9 +5,9 @@
import logging
import snappy
from wax import Configuration
from wax.Database.InputDBInterface import MongoDBInput
from wax.Database.OutputDBInterface import MongoDBOutput
from wax.Database.ControlDBInterface import DBStats
#from wax.Database.InputDBInterface import MongoDBInput
#from wax.Database.OutputDBInterface import MongoDBOutput
#from wax.Database.ControlDBInterface import DBStats
import ebcore

# Specify mongodb host and datababse to connect to
Expand All @@ -23,11 +23,13 @@ def process_time_range_task(t0, t1,
collection_name, hostname,
threshold=Configuration.THRESHOLD):

reduction_factor = 100
return ebcore.process_time_range_task(t0,
t1,
Configuration.MAX_DRIFT,
Configuration.PADDING,
threshold,
reduction_factor,
hostname,
"input.dataset", "output.dataset")
# "%s.%s" % (MongoDBInput.get_db_name(), collection_name),
Expand Down
73 changes: 44 additions & 29 deletions wax/EventBuilder/ebcore.cpp
Expand Up @@ -47,6 +47,7 @@ u_int32_t ProcessTimeRangeTask(int64_t t0, int64_t t1,
int64_t max_drift,
int64_t padding,
uint32_t threshold,
int reduction_factor,
char *hostname,
char *mongo_input_location,
char *mongo_output_location) {
Expand All @@ -55,9 +56,9 @@ u_int32_t ProcessTimeRangeTask(int64_t t0, int64_t t1,
// ** Initialization phase **
// **************************
conn.connect(hostname);

// Overall statistics, processed is all data read from DB, triggered is just saved
u_int32_t processed_size = 0, triggered_size = 0;
u_int32_t stats_processed_size = 0, stats_triggered_size = 0;

// Fetch this per doc
vector <uint32_t> occurence_samples;
Expand All @@ -68,7 +69,6 @@ u_int32_t ProcessTimeRangeTask(int64_t t0, int64_t t1,
int64_t time;

int64_t time_correction;
int reduction_factor = 100;
int n = ceil((t1 - t0) / reduction_factor);

// Setup the sum waveform
Expand Down Expand Up @@ -97,34 +97,50 @@ u_int32_t ProcessTimeRangeTask(int64_t t0, int64_t t1,
auto_ptr < mongo::DBClientCursor > cursor = conn.query(mongo_input_location,
QUERY("time_min" << mongo::LT << (long long int) t1 << "time_max" << mongo::GT << (long long int) t0).sort("time_min", 1));

if (!cursor.get()) {
cerr << "query failure" << endl;
exit(-1);
}

// Iterate over all data for this query
while (cursor->more()) {

// Each document that the cursor returns has embedded documents within.
p = cursor->next();

std::vector<BSONElement> be = p.getField("embedded_docs").Array();

for (unsigned int i = 0; i<be.size(); i++) {
q = be[i].embeddedObject();
// Check that document actually contains occurences.
if ( p.hasField("bulk") == false ) {
cerr << "No field containing bulk documents."<<endl;
exit(-1);
}

occurence_samples.clear();
GetDataFromBSON(p, occurence_samples, id, module, zipped, time, size);
std::vector<BSONElement> be = p.getField("bulk").Array();

time_correction = time - t0;

// Take note of the time range corresponding to this occurence
local_occurence_ranges.push_back(time_correction / reduction_factor);
local_occurence_ranges.push_back((time_correction + occurence_samples.size()) / reduction_factor);

input_docs.push_back(q.copy());

// Add samples to sum waveform
AddSamplesFromOccurence(occurence_samples,
time_correction,
reduction_factor);

processed_size += size;
for (unsigned int i = 0; i<be.size(); i++) {
q = be[i].embeddedObject();

occurence_samples.clear();
GetDataFromBSON(q, occurence_samples, id, module, zipped, time, size);

// Sometimes the documents that are bundled along don't actually fall within the trigger window.
// In this case, just skip it.
if (time + occurence_samples.size() < t0 || time > t1) {
continue;
}

time_correction = time - t0;

// Take note of the time range corresponding to this occurence
local_occurence_ranges.push_back(time_correction / reduction_factor);
local_occurence_ranges.push_back((time_correction + occurence_samples.size()) / reduction_factor);

input_docs.push_back(q.copy());

// Add samples to sum waveform
AddSamplesFromOccurence(occurence_samples,
time_correction,
reduction_factor);

stats_processed_size += size;
}
}

Expand Down Expand Up @@ -191,7 +207,7 @@ u_int32_t ProcessTimeRangeTask(int64_t t0, int64_t t1,
indoc_builder.appendElements(input_docs[i]);
builder_occurences_array->append(indoc_builder.obj());
current_size += input_docs[i].getIntField("size");
triggered_size += input_docs[i].getIntField("size");
stats_triggered_size += input_docs[i].getIntField("size");
}
SaveDecision(output_docs,
builder,
Expand All @@ -201,15 +217,14 @@ u_int32_t ProcessTimeRangeTask(int64_t t0, int64_t t1,
builder_occurences_array,
padding);

cout<<"output_docs.size() "<<output_docs.size()<<endl;
//conn.setWriteConcern(WriteConcern::unacknowledged);
conn.insert(mongo_output_location, output_docs);
string e = conn.getLastError();
if( !e.empty() ) {
cout << "insert failed: " << e << endl;
}

return processed_size;
return stats_processed_size;
}

void Shutdown() {
Expand Down Expand Up @@ -352,7 +367,7 @@ bool SaveDecision(vector <mongo::BSONObj> &output_docs,
cerr << "Event spans two search blocks (mega-event?)" << endl;
cerr<<"e1 "<<e1 << " t1 " << t1 <<" "<<(e1 > t1)<<endl;
cerr<<"e0 "<<e0 << " t1 - padding" << t1 - padding<<" "<< (e0 < t1 - padding) << endl;
return false;
exit(-1);
}

// Save event
Expand All @@ -361,7 +376,7 @@ bool SaveDecision(vector <mongo::BSONObj> &output_docs,
//cout<<"SUCCESS!!!";
}
else {
cout<<"fail:"<<endl;
cout<<"fail:"<<endl;
cout<<"\te0:"<<e0<<endl;
cout<<"\tt0:"<<t0<<endl;
cout<<"\tpadding:"<<padding<<endl;
Expand Down

0 comments on commit d7f8061

Please sign in to comment.