Skip to content

Commit

Permalink
Merge pull request #121 from mbehr1/fix_local
Browse files Browse the repository at this point in the history
Fix local http returning data tuples (see #79). Added option to return max number of tuples (neg. buffer values).
  • Loading branch information
mbehr1 committed Feb 15, 2015
2 parents dbef151 + 680270a commit 61fffc9
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 45 deletions.
2 changes: 1 addition & 1 deletion etc/vzlogger.conf
Expand Up @@ -18,7 +18,7 @@
"port": 8080, // the TCP port for the local HTTPd
"index": true, // should we provide a index listing of available channels if no UUID was requested?
"timeout": 30, // timeout for long polling comet requests, 0 disables comet, in seconds
"buffer": 600 // how long to buffer readings for the local interface, in seconds
"buffer": -1 // how long to buffer readings for the local interface, in seconds (if greater 0) or how many tuples to return per channel if <0 (-3 = return 3 tuples), default -1
},

"meters": [
Expand Down
3 changes: 0 additions & 3 deletions include/Buffer.hpp
Expand Up @@ -60,9 +60,6 @@ class Buffer {
inline bool newValues() const { return _newValues; }
inline void clear_newValues() { _newValues = false; }

inline size_t keep() const { return _keep; }
inline void keep(const size_t keep) { _keep = keep; }

inline void lock() { pthread_mutex_lock(&_mutex); }
inline void unlock() { pthread_mutex_unlock(&_mutex); }
inline void wait(pthread_cond_t *condition) { pthread_cond_wait(condition, &_mutex); }
Expand Down
3 changes: 1 addition & 2 deletions include/Channel.hpp
Expand Up @@ -64,7 +64,7 @@ class Channel {
if (_identifier.use_count() < 1) throw vz::VZException("Not identifier defined.") ; return _identifier; }
double tvtod() const { return _last == NULL ? 0 : _last->tvtod(); }

const char* uuid() { return _uuid.c_str(); }
const char* uuid() const { return _uuid.c_str(); }
const std::string apiProtocol() { return _apiProtocol; }

void last(Reading *rd) { _last = rd;}
Expand All @@ -73,7 +73,6 @@ class Channel {
Buffer::Ptr buffer() { return _buffer; }

size_t size() const { return _buffer->size(); }
size_t keep() const { return _buffer->keep(); }

inline void notify() {
_buffer->lock();
Expand Down
4 changes: 4 additions & 0 deletions include/local.h
Expand Up @@ -43,6 +43,10 @@ int handle_request(
void **con_cls
);

class Channel;
void shrink_localbuffer(); // remove old data in the local buffer
void add_ch_to_localbuffer(Channel &ch);

#endif /* _LOCAL_H_ */


11 changes: 0 additions & 11 deletions src/Buffer.cpp
Expand Up @@ -171,17 +171,6 @@ void Buffer::undelete() {
unlock();
}


void Buffer::shrink(/*size_t keep*/) {
lock();

// while(size > keep && begin() != sent) {
// pop();
// }

unlock();
}

char * Buffer::dump(char *dump, size_t len) {
size_t pos = 0;
dump[pos++] = '{';
Expand Down
5 changes: 3 additions & 2 deletions src/Config_Options.cpp
Expand Up @@ -40,7 +40,7 @@ Config_Options::Config_Options()
, _port(8080)
, _verbosity(0)
, _comet_timeout(30)
, _buffer_length(600)
, _buffer_length(-1)
, _retry_pause(15)
, _daemon(false)
, _local(false)
Expand All @@ -57,7 +57,7 @@ Config_Options::Config_Options(
, _port(8080)
, _verbosity(0)
, _comet_timeout(30)
, _buffer_length(600)
, _buffer_length(-1)
, _retry_pause(15)
, _daemon(false)
, _local(false)
Expand Down Expand Up @@ -140,6 +140,7 @@ void Config_Options::config_parse(
}
else if (strcmp(key, "buffer") == 0 && local_type == json_type_int) {
_buffer_length = json_object_get_int(local_value);
if (!_buffer_length) _buffer_length = -1; // 0 makes no sense, use size based mode with 1 element
}
else if (strcmp(key, "index") == 0 && local_type == json_type_boolean) {
_channel_index = json_object_get_boolean(local_value);
Expand Down
4 changes: 0 additions & 4 deletions src/MeterMap.cpp
Expand Up @@ -68,10 +68,6 @@ void MeterMap::start() {

print(log_debug, "Meter is opened. Starting channels.", _meter->name());
for (iterator it = _channels.begin(); it!=_channels.end(); it++) {
// set buffer length for perriodic meters
if (meter_get_details(_meter->protocolId())->periodic && options.local()) {
(*it)->buffer()->keep(ceil(options.buffer_length() / (double) _meter->interval()));
}

if (options.logging()) {
(*it)->start();
Expand Down
117 changes: 103 additions & 14 deletions src/local.cpp
Expand Up @@ -23,6 +23,9 @@
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/

#include <list>
#include <map>

#include <json-c/json.h>
#include <string.h>
#include <stdio.h>
Expand All @@ -33,9 +36,99 @@
#include "local.h"
#include <MeterMap.hpp>
#include <VZException.hpp>
#include <pthread.h>

extern Config_Options options;

class ChannelData
{
public:
ChannelData(const double &t, const double &v) : _t(t), _v(v) {};
double _t;
double _v;
};

typedef std::list<ChannelData> LIST_ChannelData;
typedef std::map<std::string, LIST_ChannelData> MAP_UUID_ChannelData;
pthread_mutex_t localbuffer_mutex = PTHREAD_MUTEX_INITIALIZER;
MAP_UUID_ChannelData localbuffer;

void shrink_localbuffer() // remove old data in the local buffer
{
if (options.buffer_length()>=0){ // time based localbuffer. keep buffer_length secs
Reading rnow;
rnow.time(); // sets to "now"
double minT = rnow.tvtod() - options.buffer_length(); // now - time to keep in buffer

pthread_mutex_lock(&localbuffer_mutex);

MAP_UUID_ChannelData::iterator it = localbuffer.begin();
for (;it!=localbuffer.end(); ++it) {
LIST_ChannelData &l = it->second;
LIST_ChannelData::iterator lit = l.begin();

while (lit!=l.end() && ((lit->_t) < minT))
lit = l.erase(lit);
}

pthread_mutex_unlock(&localbuffer_mutex);
}
}

void add_ch_to_localbuffer(Channel &ch)
{
pthread_mutex_lock(&localbuffer_mutex);
LIST_ChannelData &l = localbuffer[ch.uuid()];

// now add all not-deleted items to the localbuffer:
Buffer::Ptr buf = ch.buffer();
Buffer::iterator it;
for (it = buf->begin(); it != buf->end(); ++it) {
Reading &r = *it;
if (!r.deleted()) {
l.push_back(ChannelData(r.tvtod(), r.value()));
}
}
if (options.buffer_length()<0) { // max size based localbuffer. keep max -buffer_length items
while (l.size() > static_cast<unsigned int> (-(options.buffer_length())))
l.pop_front();
}

pthread_mutex_unlock(&localbuffer_mutex);
}

json_object * api_json_tuples(const char *uuid) {

if (!uuid) return NULL;
pthread_mutex_lock(&localbuffer_mutex);
LIST_ChannelData &l = localbuffer[uuid];

print(log_debug, "==> number of tuples: %d", uuid, l.size());

if (l.size() < 1 ) {
pthread_mutex_unlock(&localbuffer_mutex);
return NULL;
}

json_object *json_tuples = json_object_new_array();
for (LIST_ChannelData::const_iterator cit = l.cbegin(); cit != l.cend(); ++cit) {
struct json_object *json_tuple = json_object_new_array();

// return ms as in API => * 1000
int64_t timestamp = cit->_t * 1000; // TODO add support for int64_t timestamps in Reading and use those values directly
double value = cit->_v;

json_object_array_add(json_tuple, json_object_new_int64(timestamp));
json_object_array_add(json_tuple, json_object_new_double(value));

json_object_array_add(json_tuples, json_tuple);
}
pthread_mutex_unlock(&localbuffer_mutex);

return json_tuples;
}


int handle_request(
void *cls
, struct MHD_Connection *connection
Expand All @@ -50,8 +143,7 @@ int handle_request(
int status;
int response_code = MHD_HTTP_NOT_FOUND;

/* mapping between meters and channels */
//std::list<Map> *mappings = static_cast<std::list<Map>*>(cls);
// mapping between meters and channels
MapContainer *mappings = static_cast<MapContainer*>(cls);

struct MHD_Response *response;
Expand All @@ -62,14 +154,12 @@ int handle_request(
"http", method, url, mode);

if (strcmp(method, "GET") == 0) {
// struct timespec ts;
// struct timeval tp;

struct json_object *json_obj = json_object_new_object();
struct json_object *json_data = json_object_new_array();
struct json_object *json_exception = NULL;

const char *uuid = url + 1; /* strip leading slash */
const char *uuid = url + 1; // strip leading slash
const char *json_str;
int show_all = 0;

Expand All @@ -85,33 +175,32 @@ int handle_request(
}
}

shrink_localbuffer(); // in case the channel return very few/seldom data

for (MapContainer::iterator mapping = mappings->begin(); mapping!=mappings->end(); mapping++) {
for (MeterMap::iterator ch = mapping->begin(); ch!=mapping->end(); ch++) {
//foreach(mapping->channels, ch, channel_t) {
if (strcmp((*ch)->uuid(), uuid) == 0 || show_all) {
response_code = MHD_HTTP_OK;

/* blocking until new data arrives (comet-like blocking of HTTP response) */
// blocking until new data arrives (comet-like blocking of HTTP response)
if (mode && strcmp(mode, "comet") == 0) {
/* convert from timeval to timespec */
// TODO wait only options.comet_timeout()!
// gettimeofday(&tp, NULL);
// ts.tv_sec = tp.tv_sec + options.comet_timeout();
// ts.tv_nsec = tp.tv_usec * 1000;

(*ch)->wait();
// (*ch)->wait(); // TODO not usefull with show_all! Wait only if this channel empty?
}

struct json_object *json_ch = json_object_new_object();

json_object_object_add(json_ch, "uuid", json_object_new_string((*ch)->uuid()));
//json_object_object_add(json_ch, "middleware", json_object_new_string(ch->middleware()));
//json_object_object_add(json_ch, "last", json_object_new_double(ch->last.value));
json_object_object_add(json_ch, "last", json_object_new_double((*ch)->tvtod()));
json_object_object_add(json_ch, "last", json_object_new_int64((*ch)->tvtod()*1000)); // return here in ms as well
json_object_object_add(json_ch, "interval", json_object_new_int(mapping->meter()->interval()));
json_object_object_add(json_ch, "protocol", json_object_new_string(meter_get_details(mapping->meter()->protocolId())->name));

//struct json_object *json_tuples = api_json_tuples(&ch->buffer, ch->buffer.head, ch->buffer.tail);
//json_object_object_add(json_ch, "tuples", json_tuples);
struct json_object *json_tuples = api_json_tuples((*ch)->uuid());
if (json_tuples) json_object_object_add(json_ch, "tuples", json_tuples);

json_object_array_add(json_data, json_ch);
}
Expand Down
14 changes: 8 additions & 6 deletions src/threads.cpp
Expand Up @@ -33,6 +33,7 @@
#include <api/Volkszaehler.hpp>
#include <api/MySmartGrid.hpp>
#include <api/Null.hpp>
#include "local.h"

extern Config_Options options;

Expand Down Expand Up @@ -113,10 +114,6 @@ void * reading_thread(void *arg) {
}
}

/* update buffer length */
if (options.local()) {
(*ch)->buffer()->keep((mtr->interval() > 0) ? ceil(options.buffer_length() / mtr->interval()) : 0);
}
} // channel loop
} while((mtr->aggtime() > 0) && (time(NULL) < aggIntEnd)); /* default aggtime is -1 */

Expand All @@ -130,6 +127,11 @@ void * reading_thread(void *arg) {
/* shrink buffer */
(*ch)->buffer()->clean();

if (options.local()) {
shrink_localbuffer(); // remove old/outdated data in the local buffer
add_ch_to_localbuffer(*(*ch)); // add this ch data to the local buffer
}

/* notify webserver and logging thread */
(*ch)->notify();

Expand All @@ -148,8 +150,8 @@ void * reading_thread(void *arg) {
dump = (char*)malloc(dump_len);
}

print(log_debug, "Buffer dump (size=%i keep=%i): %s", (*ch)->name(),
(*ch)->size(), (*ch)->keep(), dump);
print(log_debug, "Buffer dump (size=%i): %s", (*ch)->name(),
(*ch)->size(), dump);

free(dump);
}
Expand Down
3 changes: 2 additions & 1 deletion tests/mocks/CMakeLists.txt
Expand Up @@ -22,8 +22,9 @@ add_executable(mock_metermap mock_metermap.cpp ../../src/Meter.cpp ../../src/Opt
../../src/api/CurlCallback.cpp
../../src/api/CurlResponse.cpp
../../src/CurlSessionProvider.cpp
../../src/local.cpp
)
target_link_libraries(mock_metermap ${CURL_STATIC_LIBRARIES} ${CURL_LIBRARIES} ${GNUTLS_LIBRARIES} ${OPENSSL_LIBRARIES})
target_link_libraries(mock_metermap ${CURL_STATIC_LIBRARIES} ${CURL_LIBRARIES} ${MICROHTTPD_LIBRARY} ${GNUTLS_LIBRARIES} ${OPENSSL_LIBRARIES})

target_link_libraries(mock_metermap
${GTEST_LIBS_DIR}/libgtest.a
Expand Down
1 change: 0 additions & 1 deletion tests/mocks/Channel.hpp
Expand Up @@ -28,7 +28,6 @@ class Channel
MOCK_METHOD0( notify, void ());
MOCK_METHOD2( dump, char* (char *dump, size_t len));
MOCK_CONST_METHOD0( size, size_t ());
MOCK_CONST_METHOD0( keep, size_t ());
MOCK_METHOD0( wait, void ());
MOCK_METHOD0( uuid, const char* ());

Expand Down

0 comments on commit 61fffc9

Please sign in to comment.