Skip to content

Commit

Permalink
chore(push): Refactored curl_wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
gjasny committed Nov 12, 2021
1 parent b6615c7 commit 882fec9
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 113 deletions.
2 changes: 2 additions & 0 deletions push/CMakeLists.txt
Expand Up @@ -2,6 +2,8 @@
find_package(CURL REQUIRED)

add_library(push
src/curl_wrapper.cc
src/curl_wrapper.h
src/gateway.cc
)

Expand Down
12 changes: 12 additions & 0 deletions push/include/prometheus/detail/http_method.h
@@ -0,0 +1,12 @@
#pragma once

namespace prometheus {
namespace detail {
enum class HttpMethod {
Post,
Put,
Delete,
};

} // namespace detail
} // namespace prometheus
21 changes: 6 additions & 15 deletions push/include/prometheus/gateway.h
Expand Up @@ -3,17 +3,19 @@
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>

#include "prometheus/collectable.h"
#include "prometheus/detail/http_method.h"
#include "prometheus/detail/push_export.h"

namespace prometheus {

namespace detail {
class CurlWrapper;
}

class PROMETHEUS_CPP_PUSH_EXPORT Gateway {
public:
Expand Down Expand Up @@ -54,27 +56,16 @@ class PROMETHEUS_CPP_PUSH_EXPORT Gateway {
private:
std::string jobUri_;
std::string labels_;
std::string auth_;
std::unique_ptr<CurlWrapper> curlWrapper_;
std::mutex mutex_;
std::unique_ptr<detail::CurlWrapper> curlWrapper_;

using CollectableEntry = std::pair<std::weak_ptr<Collectable>, std::string>;
std::vector<CollectableEntry> collectables_;

std::string getUri(const CollectableEntry& collectable) const;

enum class HttpMethod {
Post,
Put,
Delete,
};
int push(detail::HttpMethod method);

int performHttpRequest(HttpMethod method, const std::string& uri,
const std::string& body);

int push(HttpMethod method);

std::future<int> async_push(HttpMethod method);
std::future<int> async_push(detail::HttpMethod method);

static void CleanupStalePointers(std::vector<CollectableEntry>& collectables);
};
Expand Down
88 changes: 88 additions & 0 deletions push/src/curl_wrapper.cc
@@ -0,0 +1,88 @@
#include "curl_wrapper.h"

#include <stdexcept>

namespace prometheus {
namespace detail {

static const char CONTENT_TYPE[] =
"Content-Type: text/plain; version=0.0.4; charset=utf-8";

CurlWrapper::CurlWrapper(const std::string& username,
const std::string& password) {
/* In windows, this will init the winsock stuff */
auto error = curl_global_init(CURL_GLOBAL_ALL);
if (error) {
throw std::runtime_error("Cannot initialize global curl!");
}

curl_ = curl_easy_init();
if (!curl_) {
curl_global_cleanup();
throw std::runtime_error("Cannot initialize easy curl!");
}

if (!username.empty()) {
auth_ = username + ":" + password;
}
}

CurlWrapper::~CurlWrapper() {
curl_easy_cleanup(curl_);
curl_global_cleanup();
}

int CurlWrapper::performHttpRequest(HttpMethod method, const std::string& uri,
const std::string& body) {
std::lock_guard<std::mutex> l(mutex_);

curl_easy_reset(curl_);
curl_easy_setopt(curl_, CURLOPT_URL, uri.c_str());

curl_slist* header_chunk = nullptr;
header_chunk = curl_slist_append(header_chunk, CONTENT_TYPE);
curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, header_chunk);

if (!body.empty()) {
curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, body.size());
curl_easy_setopt(curl_, CURLOPT_POSTFIELDS, body.data());
}

if (!auth_.empty()) {
curl_easy_setopt(curl_, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
curl_easy_setopt(curl_, CURLOPT_USERPWD, auth_.c_str());
}

switch (method) {
case HttpMethod::Post:
curl_easy_setopt(curl_, CURLOPT_POST, 1L);
break;

case HttpMethod::Put:
curl_easy_setopt(curl_, CURLOPT_NOBODY, 0L);
curl_easy_setopt(curl_, CURLOPT_CUSTOMREQUEST, "PUT");
break;

case HttpMethod::Delete:
curl_easy_setopt(curl_, CURLOPT_HTTPGET, 0L);
curl_easy_setopt(curl_, CURLOPT_NOBODY, 0L);
curl_easy_setopt(curl_, CURLOPT_CUSTOMREQUEST, "DELETE");
break;
}

auto curl_error = curl_easy_perform(curl_);

long response_code;
curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &response_code);

curl_slist_free_all(header_chunk);

if (curl_error != CURLE_OK) {
return -curl_error;
}

return response_code;
}

} // namespace detail
} // namespace prometheus
32 changes: 32 additions & 0 deletions push/src/curl_wrapper.h
@@ -0,0 +1,32 @@
#include <curl/curl.h>

#include <mutex>
#include <string>

#include "prometheus/detail/http_method.h"

namespace prometheus {
namespace detail {

class CurlWrapper {
public:
CurlWrapper(const std::string& username, const std::string& password);

CurlWrapper(const CurlWrapper&) = delete;
CurlWrapper(CurlWrapper&&) = delete;
CurlWrapper& operator=(const CurlWrapper&) = delete;
CurlWrapper& operator=(CurlWrapper&&) = delete;

~CurlWrapper();

int performHttpRequest(HttpMethod method, const std::string& uri,
const std::string& body);

private:
CURL* curl_;
std::string auth_;
std::mutex mutex_;
};

} // namespace detail
} // namespace prometheus
115 changes: 17 additions & 98 deletions push/src/gateway.cc
@@ -1,14 +1,13 @@

#include "prometheus/gateway.h"

#include <curl/curl.h>

#include <algorithm>
#include <iterator>
#include <memory>
#include <mutex>
#include <sstream>

#include "curl_wrapper.h"
#include "prometheus/detail/future_std.h"
#include "prometheus/metric_family.h" // IWYU pragma: keep
#include "prometheus/text_serializer.h"
Expand All @@ -21,51 +20,23 @@ namespace prometheus {
static const char CONTENT_TYPE[] =
"Content-Type: text/plain; version=0.0.4; charset=utf-8";

class CurlWrapper {
public:
CurlWrapper() noexcept = default;

CurlWrapper(const CurlWrapper&) = delete;
CurlWrapper(CurlWrapper&&) = delete;
CurlWrapper& operator=(const CurlWrapper&) = delete;
CurlWrapper& operator=(CurlWrapper&&) = delete;

~CurlWrapper() { curl_easy_cleanup(curl_); }

CURL* curl() {
if (!curl_) {
curl_ = curl_easy_init();
}
return curl_;
}

private:
CURL* curl_ = nullptr;
};

Gateway::Gateway(const std::string& host, const std::string& port,
const std::string& jobname, const Labels& labels,
const std::string& username, const std::string& password) {
/* In windows, this will init the winsock stuff */
curl_global_init(CURL_GLOBAL_ALL);
curlWrapper_ = detail::make_unique<CurlWrapper>();
curlWrapper_ = detail::make_unique<detail::CurlWrapper>(username, password);

std::stringstream jobUriStream;
jobUriStream << host << ':' << port << "/metrics/job/" << jobname;
jobUri_ = jobUriStream.str();

if (!username.empty()) {
auth_ = username + ":" + password;
}

std::stringstream labelStream;
for (auto& label : labels) {
labelStream << "/" << label.first << "/" << label.second;
}
labels_ = labelStream.str();
}

Gateway::~Gateway() { curl_global_cleanup(); }
Gateway::~Gateway() = default;

const Gateway::Labels Gateway::GetInstanceLabel(std::string hostname) {
if (hostname.empty()) {
Expand All @@ -88,75 +59,18 @@ void Gateway::RegisterCollectable(const std::weak_ptr<Collectable>& collectable,
collectables_.push_back(std::make_pair(collectable, ss.str()));
}

int Gateway::performHttpRequest(HttpMethod method, const std::string& uri,
const std::string& body) {
std::lock_guard<std::mutex> l(mutex_);

auto curl = curlWrapper_->curl();
if (!curl) {
return -CURLE_FAILED_INIT;
}

curl_easy_reset(curl);
curl_easy_setopt(curl, CURLOPT_URL, uri.c_str());

curl_slist* header_chunk = nullptr;
header_chunk = curl_slist_append(header_chunk, CONTENT_TYPE);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, header_chunk);

if (!body.empty()) {
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, body.size());
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.data());
}

if (!auth_.empty()) {
curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
curl_easy_setopt(curl, CURLOPT_USERPWD, auth_.c_str());
}

switch (method) {
case HttpMethod::Post:
curl_easy_setopt(curl, CURLOPT_POST, 1L);
break;

case HttpMethod::Put:
curl_easy_setopt(curl, CURLOPT_NOBODY, 0L);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
break;

case HttpMethod::Delete:
curl_easy_setopt(curl, CURLOPT_HTTPGET, 0L);
curl_easy_setopt(curl, CURLOPT_NOBODY, 0L);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
break;
}

auto curl_error = curl_easy_perform(curl);

long response_code;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);

curl_slist_free_all(header_chunk);

if (curl_error != CURLE_OK) {
return -curl_error;
}

return response_code;
}

std::string Gateway::getUri(const CollectableEntry& collectable) const {
std::stringstream uri;
uri << jobUri_ << labels_ << collectable.second;

return uri.str();
}

int Gateway::Push() { return push(HttpMethod::Post); }
int Gateway::Push() { return push(detail::HttpMethod::Post); }

int Gateway::PushAdd() { return push(HttpMethod::Put); }
int Gateway::PushAdd() { return push(detail::HttpMethod::Put); }

int Gateway::push(HttpMethod method) {
int Gateway::push(detail::HttpMethod method) {
const auto serializer = TextSerializer{};

for (auto& wcollectable : collectables_) {
Expand All @@ -168,7 +82,7 @@ int Gateway::push(HttpMethod method) {
auto metrics = collectable->Collect();
auto body = serializer.Serialize(metrics);
auto uri = getUri(wcollectable);
auto status_code = performHttpRequest(method, uri, body);
auto status_code = curlWrapper_->performHttpRequest(method, uri, body);

if (status_code < 100 || status_code >= 400) {
return status_code;
Expand All @@ -178,11 +92,15 @@ int Gateway::push(HttpMethod method) {
return 200;
}

std::future<int> Gateway::AsyncPush() { return async_push(HttpMethod::Post); }
std::future<int> Gateway::AsyncPush() {
return async_push(detail::HttpMethod::Post);
}

std::future<int> Gateway::AsyncPushAdd() { return async_push(HttpMethod::Put); }
std::future<int> Gateway::AsyncPushAdd() {
return async_push(detail::HttpMethod::Put);
}

std::future<int> Gateway::async_push(HttpMethod method) {
std::future<int> Gateway::async_push(detail::HttpMethod method) {
const auto serializer = TextSerializer{};
std::vector<std::future<int>> futures;

Expand All @@ -197,7 +115,7 @@ std::future<int> Gateway::async_push(HttpMethod method) {
auto uri = getUri(wcollectable);

futures.push_back(std::async(std::launch::async, [method, uri, body, this] {
return performHttpRequest(method, uri, *body);
return curlWrapper_->performHttpRequest(method, uri, *body);
}));
}

Expand All @@ -219,7 +137,8 @@ std::future<int> Gateway::async_push(HttpMethod method) {
}

int Gateway::Delete() {
return performHttpRequest(HttpMethod::Delete, jobUri_, {});
return curlWrapper_->performHttpRequest(detail::HttpMethod::Delete, jobUri_,
{});
}

std::future<int> Gateway::AsyncDelete() {
Expand Down

0 comments on commit 882fec9

Please sign in to comment.