Skip to content

Commit

Permalink
use single curl session for Volkszaehler
Browse files Browse the repository at this point in the history
  • Loading branch information
mbehr1 committed Jan 31, 2015
1 parent 2c3f1e9 commit 7b487e8
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 32 deletions.
43 changes: 43 additions & 0 deletions include/CurlSessionProvider.hpp
@@ -0,0 +1,43 @@
#ifndef __CURL_SESSION_PROVIDER_
#define __CURL_SESSION_PROVIDER_

#include <string>
#include <map>
#include <pthread.h>
#include <curl/curl.h>

class CurlSessionProvider
{
public:
// non thread safe:
CurlSessionProvider();
~CurlSessionProvider();

// thread-safe functions:
CURL *get_easy_session(std::string key, int timeout=0); // this is intended to block if the handle for the current key is in use and single_session_per_key
void return_session(std::string key, CURL *&); // return a handle. this unblocks another pending request for this key if single_session_per_key
bool inUse(std::string key); // check whether a key is in use (does not guarantee that get... will not block)

protected:
class CurlUsage
{
public:
CurlUsage() : eh(0), inUse(false) { mutex = PTHREAD_MUTEX_INITIALIZER; };
CURL *eh;
bool inUse;
pthread_mutex_t mutex;
};

typedef std::map<std::string, CurlUsage>::iterator map_it;
typedef std::map<std::string, CurlUsage>::const_iterator cmap_it;

std::map<std::string, CurlUsage> _easy_handle_map;
private:
pthread_mutex_t _map_mutex;
};

// var to a global/single instance. needs to be initialzed e.g. in main()
extern CurlSessionProvider *curlSessionProvider;


#endif
3 changes: 2 additions & 1 deletion include/api/Volkszaehler.hpp
Expand Up @@ -69,8 +69,9 @@ namespace vz {

private:
std::string _middleware;
unsigned int _curlTimeout;
std::string _url;

CURL *curl() { return _api.curl; }
/**
* Create JSON object of tuples
*
Expand Down
3 changes: 2 additions & 1 deletion modules/CompilerFlags.cmake
Expand Up @@ -78,6 +78,7 @@ if(NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-constant-logical-operand")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated-declarations")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
endif()

# TODO: we need to check the compiler here, gcc does not know about those flags, is this The Right Thing To Do (TM)?
Expand All @@ -95,4 +96,4 @@ else(NOT WIN32)
set(CMAKE_C_STANDARD_LIBRARIES "${CMAKE_C_STANDARD_LIBRARIES} ws2_32.lib rpcrt4.lib")
set(CMAKE_CXX_STANDARD_LIBRARIES "${CMAKE_CXX_STANDARD_LIBRARIES} ws2_32.lib rpcrt4.lib")
#set(CMAKE_CXX_FLAGS "/Z7")
endif(NOT WIN32)
endif(NOT WIN32)
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Expand Up @@ -23,6 +23,7 @@ set(vzlogger_srcs
ltqnorm.cpp
Meter.cpp
${CMAKE_BINARY_DIR}/gitSha1.cpp
CurlSessionProvider.cpp
)

set(libvz_srcs
Expand Down
87 changes: 87 additions & 0 deletions src/CurlSessionProvider.cpp
@@ -0,0 +1,87 @@
/**
* CurlSessionProvider - provides one curl session (easy handle) for a key
*
* @author Matthias Behr <mbehr@mcbehr.de>
* @copyright Copyright (c) 2015, The volkszaehler.org project
* @package vzlogger
* @license http://opensource.org/licenses/gpl-license.php GNU Public License
*/

#include <assert.h>
#include "CurlSessionProvider.hpp"

CurlSessionProvider::CurlSessionProvider()
{
_map_mutex = PTHREAD_MUTEX_INITIALIZER;
curl_global_init(CURL_GLOBAL_ALL);
}

CurlSessionProvider::~CurlSessionProvider()
{
// curl_easy_cleanup for each CURL*
pthread_mutex_lock(&_map_mutex);
for (map_it it = _easy_handle_map.begin(); it!=_easy_handle_map.end(); ++it)
{
CurlUsage cu = (*it).second;
curl_easy_cleanup(cu.eh);
}
curl_global_cleanup();

pthread_mutex_unlock(&_map_mutex);
pthread_mutex_destroy(&_map_mutex);
}

// thread-safe functions:
CURL *CurlSessionProvider::get_easy_session(std::string key, int timeout) // this is intended to block if the handle for the current key is in use and single_session_per_key
{
CURL *toRet=0;
// thread safe lock here to access the map:
assert( 0 == pthread_mutex_lock(&_map_mutex));
map_it it = _easy_handle_map.find(key);
if (it!= _easy_handle_map.end()){
CurlUsage &cur = (*it).second;
pthread_mutex_unlock(&_map_mutex); // we unlock here already but access the current element anyhow assuming an insert doesnt invalidate the reference
assert(0 == pthread_mutex_lock(&cur.mutex));
assert(!cur.inUse);
cur.inUse = true;
toRet = cur.eh;
} else {
// create new one:
CurlUsage cu;
cu.eh = curl_easy_init();
cu.inUse=true;
pthread_mutex_lock(&cu.mutex);
_easy_handle_map.insert(std::make_pair(key, cu));
toRet = cu.eh;
}
pthread_mutex_unlock(&_map_mutex);

return toRet;
}

void CurlSessionProvider::return_session(std::string key, CURL *&eh) // return a handle. this unblocks another pending request for this key if single_session_per_key
{
// thread safe lock here:
assert( 0 == pthread_mutex_lock(&_map_mutex));
CurlUsage &cu = _easy_handle_map[key];
assert(eh == cu.eh);
eh=0;
cu.inUse = false;
pthread_mutex_unlock(&cu.mutex);
pthread_mutex_unlock(&_map_mutex);
}

bool CurlSessionProvider::inUse(std::string key)
{
pthread_mutex_lock(&_map_mutex);
cmap_it it = _easy_handle_map.find(key);
if (it != _easy_handle_map.end()){
pthread_mutex_unlock(&_map_mutex);
return (*it).second.inUse;
}
pthread_mutex_unlock(&_map_mutex);
return false;
}

// global var:
CurlSessionProvider *curlSessionProvider=0;
59 changes: 33 additions & 26 deletions src/api/Volkszaehler.cpp
Expand Up @@ -38,6 +38,7 @@
#include <VZException.hpp>
#include "Config_Options.hpp"
#include <api/Volkszaehler.hpp>
#include "CurlSessionProvider.hpp"

extern Config_Options options;

Expand All @@ -49,8 +50,7 @@ vz::api::Volkszaehler::Volkszaehler(
, _last_timestamp(0)
{
OptionList optlist;
char url[255], agent[255];
unsigned short curlTimeout = 30; // 30 seconds
char agent[255];

// parse options
try {
Expand All @@ -62,38 +62,25 @@ vz::api::Volkszaehler::Volkszaehler(
}

try {
curlTimeout = optlist.lookup_int(pOptions, "timeout");
_curlTimeout = optlist.lookup_int(pOptions, "timeout");
} catch (vz::OptionNotFoundException &e) {
curlTimeout = 30; // 30 seconds default
_curlTimeout = 30; // 30 seconds default
} catch (vz::VZException &e) {
throw;
}

// prepare header, uuid & url
sprintf(agent, "User-Agent: %s/%s (%s)", PACKAGE, VERSION, curl_version()); // build user agent
sprintf(url, "%s/data/%s.json", middleware().c_str(), channel()->uuid()); // build url
_url = _middleware;
_url.append("/data/");
_url.append(channel()->uuid());
_url.append(".json");

_api.headers = NULL;
_api.headers = curl_slist_append(_api.headers, "Content-type: application/json");
_api.headers = curl_slist_append(_api.headers, "Accept: application/json");
_api.headers = curl_slist_append(_api.headers, agent);

_api.curl = curl_easy_init();
if (!_api.curl) {
throw vz::VZException("CURL: cannot create handle.");
}

curl_easy_setopt(_api.curl, CURLOPT_URL, url);
curl_easy_setopt(_api.curl, CURLOPT_HTTPHEADER, _api.headers);
curl_easy_setopt(_api.curl, CURLOPT_VERBOSE, options.verbosity());
curl_easy_setopt(_api.curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(_api.curl, CURLOPT_DEBUGDATA, channel().get());

// signal-handling in libcurl is NOT thread-safe. so force to deactivated them!
curl_easy_setopt(_api.curl, CURLOPT_NOSIGNAL, 1);

// set timeout to 5 sec. required if next router has an ip-change.
curl_easy_setopt(_api.curl, CURLOPT_TIMEOUT, curlTimeout);
}

vz::api::Volkszaehler::~Volkszaehler()
Expand All @@ -120,14 +107,34 @@ void vz::api::Volkszaehler::send()
return;
}

_api.curl = curlSessionProvider ? curlSessionProvider->get_easy_session(_middleware) : 0; // TODO add option to use parallel sessions. Simply add uuid() to the key.
if (!_api.curl) {
throw vz::VZException("CURL: cannot create handle.");
}
curl_easy_setopt(_api.curl, CURLOPT_URL, _url.c_str());
curl_easy_setopt(_api.curl, CURLOPT_HTTPHEADER, _api.headers);
curl_easy_setopt(_api.curl, CURLOPT_VERBOSE, options.verbosity());
curl_easy_setopt(_api.curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(_api.curl, CURLOPT_DEBUGDATA, channel().get());

// signal-handling in libcurl is NOT thread-safe. so force to deactivated them!
curl_easy_setopt(_api.curl, CURLOPT_NOSIGNAL, 1);

// set timeout to 5 sec. required if next router has an ip-change.
curl_easy_setopt(_api.curl, CURLOPT_TIMEOUT, _curlTimeout);


print(log_debug, "JSON request body: %s", channel()->name(), json_str);

curl_easy_setopt(curl(), CURLOPT_POSTFIELDS, json_str);
curl_easy_setopt(curl(), CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl(), CURLOPT_WRITEDATA, (void *) &response);
curl_easy_setopt(_api.curl, CURLOPT_POSTFIELDS, json_str);
curl_easy_setopt(_api.curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(_api.curl, CURLOPT_WRITEDATA, (void *) &response);

curl_code = curl_easy_perform(_api.curl);
curl_easy_getinfo(_api.curl, CURLINFO_RESPONSE_CODE, &http_code);

curl_code = curl_easy_perform(curl());
curl_easy_getinfo(curl(), CURLINFO_RESPONSE_CODE, &http_code);
if (curlSessionProvider)
curlSessionProvider->return_session(_middleware, _api.curl);

// check response
if (curl_code == CURLE_OK && http_code == 200) { // everything is ok
Expand Down
14 changes: 11 additions & 3 deletions src/vzlogger.cpp
Expand Up @@ -45,6 +45,7 @@
#include "vzlogger.h"
#include "Channel.hpp"
#include "threads.h"
#include "CurlSessionProvider.hpp"

#ifdef LOCAL_SUPPORT
#include "local.h"
Expand Down Expand Up @@ -343,7 +344,7 @@ int main(int argc, char *argv[]) {
sigaction(SIGTERM, &action, NULL); /* catch kill signal */

/* initialize ADTs and APIs */
curl_global_init(CURL_GLOBAL_ALL);
// curl_global_init(CURL_GLOBAL_ALL);

/* parse command line and file options */
// TODO command line should have a higher priority as file
Expand All @@ -364,11 +365,13 @@ int main(int argc, char *argv[]) {
// make sure command line options override config settings, just re-parse
config_parse_cli(argc, argv, &options);

curlSessionProvider = new CurlSessionProvider();

// Register vzlogger
if (options.doRegistration()) {
register_device();
return (0);
}
}

// @todo clarify why no logging in local mode
options.logging((!options.local() || options.daemon()));
Expand Down Expand Up @@ -457,13 +460,18 @@ int main(int argc, char *argv[]) {
#endif /* LOCAL_SUPPORT */

/* householding */
curl_global_cleanup();
//curl_global_cleanup();

/* close logfile */
if (options.logfd()) {
fclose(options.logfd());
}

if (curlSessionProvider) {
delete curlSessionProvider;
curlSessionProvider = 0;
}

return EXIT_SUCCESS;
}
#endif
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Expand Up @@ -9,7 +9,7 @@ if(NOT SML_FOUND)
list(REMOVE_ITEM test_sources ${CMAKE_CURRENT_SOURCE_DIR}/MeterSML.cpp)
endif(NOT SML_FOUND)

add_executable(vzlogger_unit_tests ${test_sources})
add_executable(vzlogger_unit_tests ${test_sources} ../src/CurlSessionProvider.cpp)

target_link_libraries(vzlogger_unit_tests
${GTEST_LIBS_DIR}/libgtest.a
Expand Down
1 change: 1 addition & 0 deletions tests/mocks/CMakeLists.txt
Expand Up @@ -21,6 +21,7 @@ add_executable(mock_metermap mock_metermap.cpp ../../src/Meter.cpp ../../src/Opt
../../src/api/CurlIF.cpp
../../src/api/CurlCallback.cpp
../../src/api/CurlResponse.cpp
../../src/CurlSessionProvider.cpp
)
target_link_libraries(mock_metermap ${CURL_STATIC_LIBRARIES} ${CURL_LIBRARIES} ${GNUTLS_LIBRARIES} ${OPENSSL_LIBRARIES})

Expand Down
41 changes: 41 additions & 0 deletions tests/ut_CurlSessionProvider.cpp
@@ -0,0 +1,41 @@
#include "gtest/gtest.h"

#include "CurlSessionProvider.hpp"

TEST(CurlSessionProvider, init)
{
ASSERT_EQ(0, curlSessionProvider);
curlSessionProvider = new CurlSessionProvider();

delete curlSessionProvider;
curlSessionProvider = 0;

}

TEST(CurlSessionProvider, single1)
{
curlSessionProvider = new CurlSessionProvider();

ASSERT_FALSE( curlSessionProvider->inUse("1"));

CURL *eh = curlSessionProvider->get_easy_session("1");
ASSERT_TRUE(0 != eh);
ASSERT_TRUE( curlSessionProvider->inUse("1"));
curlSessionProvider->return_session("1", eh);
ASSERT_EQ(0, eh);
ASSERT_FALSE( curlSessionProvider->inUse("1"));

// 2nd try:
eh = curlSessionProvider->get_easy_session("1");
ASSERT_TRUE(0 != eh);
ASSERT_TRUE( curlSessionProvider->inUse("1"));
curlSessionProvider->return_session("1", eh);
ASSERT_EQ(0, eh);
ASSERT_FALSE( curlSessionProvider->inUse("1"));


delete curlSessionProvider;
curlSessionProvider = 0;

// TODO create that that's spanws a thread and tests blocking on a shared session
}

0 comments on commit 7b487e8

Please sign in to comment.