From 7b487e89b28f8f081ea14b4376138e6fcf92c830 Mon Sep 17 00:00:00 2001 From: Matthias Behr Date: Sat, 31 Jan 2015 17:27:42 +0100 Subject: [PATCH] use single curl session for Volkszaehler --- include/CurlSessionProvider.hpp | 43 ++++++++++++++++ include/api/Volkszaehler.hpp | 3 +- modules/CompilerFlags.cmake | 3 +- src/CMakeLists.txt | 1 + src/CurlSessionProvider.cpp | 87 ++++++++++++++++++++++++++++++++ src/api/Volkszaehler.cpp | 59 ++++++++++++---------- src/vzlogger.cpp | 14 +++-- tests/CMakeLists.txt | 2 +- tests/mocks/CMakeLists.txt | 1 + tests/ut_CurlSessionProvider.cpp | 41 +++++++++++++++ 10 files changed, 222 insertions(+), 32 deletions(-) create mode 100644 include/CurlSessionProvider.hpp create mode 100644 src/CurlSessionProvider.cpp create mode 100644 tests/ut_CurlSessionProvider.cpp diff --git a/include/CurlSessionProvider.hpp b/include/CurlSessionProvider.hpp new file mode 100644 index 00000000..f9255d81 --- /dev/null +++ b/include/CurlSessionProvider.hpp @@ -0,0 +1,43 @@ +#ifndef __CURL_SESSION_PROVIDER_ +#define __CURL_SESSION_PROVIDER_ + +#include +#include +#include +#include + +class CurlSessionProvider +{ +public: + // non thread safe: + CurlSessionProvider(); + ~CurlSessionProvider(); + + // thread-safe functions: + CURL *get_easy_session(std::string key, int timeout=0); // this is intended to block if the handle for the current key is in use and single_session_per_key + void return_session(std::string key, CURL *&); // return a handle. this unblocks another pending request for this key if single_session_per_key + bool inUse(std::string key); // check whether a key is in use (does not guarantee that get... will not block) + +protected: + class CurlUsage + { + public: + CurlUsage() : eh(0), inUse(false) { mutex = PTHREAD_MUTEX_INITIALIZER; }; + CURL *eh; + bool inUse; + pthread_mutex_t mutex; + }; + + typedef std::map::iterator map_it; + typedef std::map::const_iterator cmap_it; + + std::map _easy_handle_map; +private: + pthread_mutex_t _map_mutex; +}; + +// var to a global/single instance. needs to be initialzed e.g. in main() +extern CurlSessionProvider *curlSessionProvider; + + +#endif diff --git a/include/api/Volkszaehler.hpp b/include/api/Volkszaehler.hpp index af674291..5cbaa932 100644 --- a/include/api/Volkszaehler.hpp +++ b/include/api/Volkszaehler.hpp @@ -69,8 +69,9 @@ namespace vz { private: std::string _middleware; + unsigned int _curlTimeout; + std::string _url; - CURL *curl() { return _api.curl; } /** * Create JSON object of tuples * diff --git a/modules/CompilerFlags.cmake b/modules/CompilerFlags.cmake index 6304bf2e..5ea28bdc 100644 --- a/modules/CompilerFlags.cmake +++ b/modules/CompilerFlags.cmake @@ -78,6 +78,7 @@ if(NOT WIN32) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-constant-logical-operand") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-deprecated-declarations") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") endif() # TODO: we need to check the compiler here, gcc does not know about those flags, is this The Right Thing To Do (TM)? @@ -95,4 +96,4 @@ else(NOT WIN32) set(CMAKE_C_STANDARD_LIBRARIES "${CMAKE_C_STANDARD_LIBRARIES} ws2_32.lib rpcrt4.lib") set(CMAKE_CXX_STANDARD_LIBRARIES "${CMAKE_CXX_STANDARD_LIBRARIES} ws2_32.lib rpcrt4.lib") #set(CMAKE_CXX_FLAGS "/Z7") -endif(NOT WIN32) \ No newline at end of file +endif(NOT WIN32) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a0c55009..6c11ff18 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -23,6 +23,7 @@ set(vzlogger_srcs ltqnorm.cpp Meter.cpp ${CMAKE_BINARY_DIR}/gitSha1.cpp + CurlSessionProvider.cpp ) set(libvz_srcs diff --git a/src/CurlSessionProvider.cpp b/src/CurlSessionProvider.cpp new file mode 100644 index 00000000..c27aff02 --- /dev/null +++ b/src/CurlSessionProvider.cpp @@ -0,0 +1,87 @@ +/** + * CurlSessionProvider - provides one curl session (easy handle) for a key + * + * @author Matthias Behr + * @copyright Copyright (c) 2015, The volkszaehler.org project + * @package vzlogger + * @license http://opensource.org/licenses/gpl-license.php GNU Public License + */ + +#include +#include "CurlSessionProvider.hpp" + +CurlSessionProvider::CurlSessionProvider() +{ + _map_mutex = PTHREAD_MUTEX_INITIALIZER; + curl_global_init(CURL_GLOBAL_ALL); +} + +CurlSessionProvider::~CurlSessionProvider() +{ + // curl_easy_cleanup for each CURL* + pthread_mutex_lock(&_map_mutex); + for (map_it it = _easy_handle_map.begin(); it!=_easy_handle_map.end(); ++it) + { + CurlUsage cu = (*it).second; + curl_easy_cleanup(cu.eh); + } + curl_global_cleanup(); + + pthread_mutex_unlock(&_map_mutex); + pthread_mutex_destroy(&_map_mutex); +} + +// thread-safe functions: +CURL *CurlSessionProvider::get_easy_session(std::string key, int timeout) // this is intended to block if the handle for the current key is in use and single_session_per_key +{ + CURL *toRet=0; + // thread safe lock here to access the map: + assert( 0 == pthread_mutex_lock(&_map_mutex)); + map_it it = _easy_handle_map.find(key); + if (it!= _easy_handle_map.end()){ + CurlUsage &cur = (*it).second; + pthread_mutex_unlock(&_map_mutex); // we unlock here already but access the current element anyhow assuming an insert doesnt invalidate the reference + assert(0 == pthread_mutex_lock(&cur.mutex)); + assert(!cur.inUse); + cur.inUse = true; + toRet = cur.eh; + } else { + // create new one: + CurlUsage cu; + cu.eh = curl_easy_init(); + cu.inUse=true; + pthread_mutex_lock(&cu.mutex); + _easy_handle_map.insert(std::make_pair(key, cu)); + toRet = cu.eh; + } + pthread_mutex_unlock(&_map_mutex); + + return toRet; +} + +void CurlSessionProvider::return_session(std::string key, CURL *&eh) // return a handle. this unblocks another pending request for this key if single_session_per_key +{ + // thread safe lock here: + assert( 0 == pthread_mutex_lock(&_map_mutex)); + CurlUsage &cu = _easy_handle_map[key]; + assert(eh == cu.eh); + eh=0; + cu.inUse = false; + pthread_mutex_unlock(&cu.mutex); + pthread_mutex_unlock(&_map_mutex); +} + +bool CurlSessionProvider::inUse(std::string key) +{ + pthread_mutex_lock(&_map_mutex); + cmap_it it = _easy_handle_map.find(key); + if (it != _easy_handle_map.end()){ + pthread_mutex_unlock(&_map_mutex); + return (*it).second.inUse; + } + pthread_mutex_unlock(&_map_mutex); + return false; +} + +// global var: +CurlSessionProvider *curlSessionProvider=0; diff --git a/src/api/Volkszaehler.cpp b/src/api/Volkszaehler.cpp index 285eefbb..b55da66d 100644 --- a/src/api/Volkszaehler.cpp +++ b/src/api/Volkszaehler.cpp @@ -38,6 +38,7 @@ #include #include "Config_Options.hpp" #include +#include "CurlSessionProvider.hpp" extern Config_Options options; @@ -49,8 +50,7 @@ vz::api::Volkszaehler::Volkszaehler( , _last_timestamp(0) { OptionList optlist; - char url[255], agent[255]; - unsigned short curlTimeout = 30; // 30 seconds + char agent[255]; // parse options try { @@ -62,38 +62,25 @@ vz::api::Volkszaehler::Volkszaehler( } try { - curlTimeout = optlist.lookup_int(pOptions, "timeout"); + _curlTimeout = optlist.lookup_int(pOptions, "timeout"); } catch (vz::OptionNotFoundException &e) { - curlTimeout = 30; // 30 seconds default + _curlTimeout = 30; // 30 seconds default } catch (vz::VZException &e) { throw; } // prepare header, uuid & url sprintf(agent, "User-Agent: %s/%s (%s)", PACKAGE, VERSION, curl_version()); // build user agent - sprintf(url, "%s/data/%s.json", middleware().c_str(), channel()->uuid()); // build url + _url = _middleware; + _url.append("/data/"); + _url.append(channel()->uuid()); + _url.append(".json"); _api.headers = NULL; _api.headers = curl_slist_append(_api.headers, "Content-type: application/json"); _api.headers = curl_slist_append(_api.headers, "Accept: application/json"); _api.headers = curl_slist_append(_api.headers, agent); - _api.curl = curl_easy_init(); - if (!_api.curl) { - throw vz::VZException("CURL: cannot create handle."); - } - - curl_easy_setopt(_api.curl, CURLOPT_URL, url); - curl_easy_setopt(_api.curl, CURLOPT_HTTPHEADER, _api.headers); - curl_easy_setopt(_api.curl, CURLOPT_VERBOSE, options.verbosity()); - curl_easy_setopt(_api.curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback); - curl_easy_setopt(_api.curl, CURLOPT_DEBUGDATA, channel().get()); - - // signal-handling in libcurl is NOT thread-safe. so force to deactivated them! - curl_easy_setopt(_api.curl, CURLOPT_NOSIGNAL, 1); - - // set timeout to 5 sec. required if next router has an ip-change. - curl_easy_setopt(_api.curl, CURLOPT_TIMEOUT, curlTimeout); } vz::api::Volkszaehler::~Volkszaehler() @@ -120,14 +107,34 @@ void vz::api::Volkszaehler::send() return; } + _api.curl = curlSessionProvider ? curlSessionProvider->get_easy_session(_middleware) : 0; // TODO add option to use parallel sessions. Simply add uuid() to the key. + if (!_api.curl) { + throw vz::VZException("CURL: cannot create handle."); + } + curl_easy_setopt(_api.curl, CURLOPT_URL, _url.c_str()); + curl_easy_setopt(_api.curl, CURLOPT_HTTPHEADER, _api.headers); + curl_easy_setopt(_api.curl, CURLOPT_VERBOSE, options.verbosity()); + curl_easy_setopt(_api.curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback); + curl_easy_setopt(_api.curl, CURLOPT_DEBUGDATA, channel().get()); + + // signal-handling in libcurl is NOT thread-safe. so force to deactivated them! + curl_easy_setopt(_api.curl, CURLOPT_NOSIGNAL, 1); + + // set timeout to 5 sec. required if next router has an ip-change. + curl_easy_setopt(_api.curl, CURLOPT_TIMEOUT, _curlTimeout); + + print(log_debug, "JSON request body: %s", channel()->name(), json_str); - curl_easy_setopt(curl(), CURLOPT_POSTFIELDS, json_str); - curl_easy_setopt(curl(), CURLOPT_WRITEFUNCTION, curl_custom_write_callback); - curl_easy_setopt(curl(), CURLOPT_WRITEDATA, (void *) &response); + curl_easy_setopt(_api.curl, CURLOPT_POSTFIELDS, json_str); + curl_easy_setopt(_api.curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback); + curl_easy_setopt(_api.curl, CURLOPT_WRITEDATA, (void *) &response); + + curl_code = curl_easy_perform(_api.curl); + curl_easy_getinfo(_api.curl, CURLINFO_RESPONSE_CODE, &http_code); - curl_code = curl_easy_perform(curl()); - curl_easy_getinfo(curl(), CURLINFO_RESPONSE_CODE, &http_code); + if (curlSessionProvider) + curlSessionProvider->return_session(_middleware, _api.curl); // check response if (curl_code == CURLE_OK && http_code == 200) { // everything is ok diff --git a/src/vzlogger.cpp b/src/vzlogger.cpp index 65273541..819af49e 100644 --- a/src/vzlogger.cpp +++ b/src/vzlogger.cpp @@ -45,6 +45,7 @@ #include "vzlogger.h" #include "Channel.hpp" #include "threads.h" +#include "CurlSessionProvider.hpp" #ifdef LOCAL_SUPPORT #include "local.h" @@ -343,7 +344,7 @@ int main(int argc, char *argv[]) { sigaction(SIGTERM, &action, NULL); /* catch kill signal */ /* initialize ADTs and APIs */ - curl_global_init(CURL_GLOBAL_ALL); +// curl_global_init(CURL_GLOBAL_ALL); /* parse command line and file options */ // TODO command line should have a higher priority as file @@ -364,11 +365,13 @@ int main(int argc, char *argv[]) { // make sure command line options override config settings, just re-parse config_parse_cli(argc, argv, &options); + curlSessionProvider = new CurlSessionProvider(); + // Register vzlogger if (options.doRegistration()) { register_device(); return (0); - } + } // @todo clarify why no logging in local mode options.logging((!options.local() || options.daemon())); @@ -457,13 +460,18 @@ int main(int argc, char *argv[]) { #endif /* LOCAL_SUPPORT */ /* householding */ - curl_global_cleanup(); +//curl_global_cleanup(); /* close logfile */ if (options.logfd()) { fclose(options.logfd()); } + if (curlSessionProvider) { + delete curlSessionProvider; + curlSessionProvider = 0; + } + return EXIT_SUCCESS; } #endif diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1217d276..ae8447a5 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -9,7 +9,7 @@ if(NOT SML_FOUND) list(REMOVE_ITEM test_sources ${CMAKE_CURRENT_SOURCE_DIR}/MeterSML.cpp) endif(NOT SML_FOUND) -add_executable(vzlogger_unit_tests ${test_sources}) +add_executable(vzlogger_unit_tests ${test_sources} ../src/CurlSessionProvider.cpp) target_link_libraries(vzlogger_unit_tests ${GTEST_LIBS_DIR}/libgtest.a diff --git a/tests/mocks/CMakeLists.txt b/tests/mocks/CMakeLists.txt index 03f08e65..c3b2780d 100644 --- a/tests/mocks/CMakeLists.txt +++ b/tests/mocks/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable(mock_metermap mock_metermap.cpp ../../src/Meter.cpp ../../src/Opt ../../src/api/CurlIF.cpp ../../src/api/CurlCallback.cpp ../../src/api/CurlResponse.cpp + ../../src/CurlSessionProvider.cpp ) target_link_libraries(mock_metermap ${CURL_STATIC_LIBRARIES} ${CURL_LIBRARIES} ${GNUTLS_LIBRARIES} ${OPENSSL_LIBRARIES}) diff --git a/tests/ut_CurlSessionProvider.cpp b/tests/ut_CurlSessionProvider.cpp new file mode 100644 index 00000000..587ed643 --- /dev/null +++ b/tests/ut_CurlSessionProvider.cpp @@ -0,0 +1,41 @@ +#include "gtest/gtest.h" + +#include "CurlSessionProvider.hpp" + +TEST(CurlSessionProvider, init) +{ + ASSERT_EQ(0, curlSessionProvider); + curlSessionProvider = new CurlSessionProvider(); + + delete curlSessionProvider; + curlSessionProvider = 0; + + } + +TEST(CurlSessionProvider, single1) +{ + curlSessionProvider = new CurlSessionProvider(); + + ASSERT_FALSE( curlSessionProvider->inUse("1")); + + CURL *eh = curlSessionProvider->get_easy_session("1"); + ASSERT_TRUE(0 != eh); + ASSERT_TRUE( curlSessionProvider->inUse("1")); + curlSessionProvider->return_session("1", eh); + ASSERT_EQ(0, eh); + ASSERT_FALSE( curlSessionProvider->inUse("1")); + + // 2nd try: + eh = curlSessionProvider->get_easy_session("1"); + ASSERT_TRUE(0 != eh); + ASSERT_TRUE( curlSessionProvider->inUse("1")); + curlSessionProvider->return_session("1", eh); + ASSERT_EQ(0, eh); + ASSERT_FALSE( curlSessionProvider->inUse("1")); + + + delete curlSessionProvider; + curlSessionProvider = 0; + + // TODO create that that's spanws a thread and tests blocking on a shared session +}