Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <util/generic/yexception.h>
#include <util/stream/str.h>
#include <util/string/builder.h>
#include <util/datetime/base.h>
#include <yql/essentials/utils/log/log.h>

#include <thread>
Expand Down Expand Up @@ -480,6 +481,8 @@ class TEasyCurlStream : public TEasyCurl {
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
std::weak_ptr<CURLM> handle,
size_t threshold,
const TCurlInitConfig& config = TCurlInitConfig(),
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
: TEasyCurl(counter, downloadedBytes, uploadededBytes, url, std::move(headers), EMethod::GET, offset, sizeLimit, 0ULL, std::move(config), std::move(dnsCache))
Expand All @@ -488,6 +491,8 @@ class TEasyCurlStream : public TEasyCurl {
, OnFinish(std::move(onFinish))
, Counter(std::make_shared<std::atomic_size_t>(0ULL))
, InflightCounter(inflightCounter)
, Handle(std::move(handle))
, Threshold(threshold)
{}

static TPtr Make(
Expand All @@ -502,10 +507,12 @@ class TEasyCurlStream : public TEasyCurl {
IHTTPGateway::TOnNewDataPart onNewData,
IHTTPGateway::TOnDownloadFinish onFinish,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
std::weak_ptr<CURLM> handle = {},
size_t threshold = 0,
const TCurlInitConfig& config = TCurlInitConfig(),
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
{
return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config), std::move(dnsCache));
return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, handle, threshold, std::move(config), std::move(dnsCache));
}

enum class EAction : i8 {
Expand Down Expand Up @@ -565,8 +572,9 @@ class TEasyCurlStream : public TEasyCurl {
size_t Write(void* contents, size_t size, size_t nmemb) final {
MaybeStart(CURLE_OK);
const auto realsize = size * nmemb;
if (!Cancelled)
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter));
if (!Cancelled) {
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter, Handle, Threshold));
}
return realsize;
}

Expand All @@ -583,6 +591,8 @@ class TEasyCurlStream : public TEasyCurl {
bool Paused = false;
bool Cancelled = false;
long HttpResponseCode = 0L;
std::weak_ptr<CURLM> Handle;
size_t Threshold;
};

using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IHTTPGateway::TRetryPolicy::TPtr>;
Expand Down Expand Up @@ -676,8 +686,8 @@ friend class IHTTPGateway;
}

~THTTPMultiGateway() {
curl_multi_wakeup(Handle);
IsStopped = true;
curl_multi_wakeup(Handle.get());
if (Thread.joinable()) {
Thread.join();
}
Expand All @@ -691,23 +701,26 @@ friend class IHTTPGateway;
TCurlInitConfig InitConfig;

void InitCurl() {
// FIXME: NOT SAFE (see man libcurl(3))
const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL);
if (globalInitResult != CURLE_OK) {
throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl;
}
Handle = curl_multi_init();
Handle = std::shared_ptr<CURLM>(curl_multi_init(), [](auto handle) {
const CURLMcode multiCleanupResult = curl_multi_cleanup(handle);
if (multiCleanupResult != CURLM_OK) {
Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
}
curl_global_cleanup(); // FIXME: NOT SAFE (see man libcurl(3))
});
if (!Handle) {
throw yexception() << "curl_multi_init error";
}
}

void UninitCurl() {
Y_ABORT_UNLESS(Handle);
const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle);
if (multiCleanupResult != CURLM_OK) {
Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
}
curl_global_cleanup();
Handle.reset();
}

void Perform() {
Expand All @@ -722,22 +735,22 @@ friend class IHTTPGateway;
OutputMemory->Set(OutputSize);

int running = 0;
if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) {
if (const auto c = curl_multi_perform(Handle.get(), &running); CURLM_OK != c) {
Fail(c);
break;
}

if (running < int(handlers)) {
for (int messages = int(handlers) - running; messages;) {
if (const auto msg = curl_multi_info_read(Handle, &messages)) {
if (const auto msg = curl_multi_info_read(Handle.get(), &messages)) {
if(msg->msg == CURLMSG_DONE) {
Done(msg->easy_handle, msg->data.result);
}
}
}
} else {
const int timeoutMs = 300;
if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) {
if (const auto c = curl_multi_poll(Handle.get(), nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) {
Fail(c);
break;
}
Expand All @@ -752,16 +765,16 @@ friend class IHTTPGateway;
const auto streamHandle = stream->GetHandle();
switch (stream->GetAction(BuffersSizePerStream)) {
case TEasyCurlStream::EAction::Init:
curl_multi_add_handle(Handle, streamHandle);
curl_multi_add_handle(Handle.get(), streamHandle);
break;
case TEasyCurlStream::EAction::Work:
curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT);
break;
case TEasyCurlStream::EAction::Stop:
curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE);
curl_easy_pause(streamHandle, CURLPAUSE_RECV);
break;
case TEasyCurlStream::EAction::Drop:
curl_multi_remove_handle(Handle, streamHandle);
curl_multi_remove_handle(Handle.get(), streamHandle);
Allocated.erase(streamHandle);
break;
case TEasyCurlStream::EAction::None:
Expand All @@ -784,7 +797,7 @@ friend class IHTTPGateway;
const auto handle = Await.front()->GetHandle();
Allocated.emplace(handle, std::move(Await.front()));
Await.pop();
curl_multi_add_handle(Handle, handle);
curl_multi_add_handle(Handle.get(), handle);
}
AwaitQueue->Set(Await.size());
AllocatedMemory->Set(AllocatedSize);
Expand Down Expand Up @@ -859,7 +872,7 @@ friend class IHTTPGateway;

const TIssue error(curl_multi_strerror(result));
while (!works.empty()) {
curl_multi_remove_handle(Handle, works.top()->GetHandle());
curl_multi_remove_handle(Handle.get(), works.top()->GetHandle());
works.top()->Fail(CURLE_OK, error);
works.pop();
}
Expand Down Expand Up @@ -914,7 +927,7 @@ friend class IHTTPGateway;
TOnDownloadFinish onFinish,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
{
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList());
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, Handle, BuffersSizePerStream, InitConfig, DnsGateway.GetDNSCurlList());
const std::unique_lock lock(SyncRef());
const auto handle = stream->GetHandle();
TEasyCurlStream::TWeakPtr weak = stream;
Expand Down Expand Up @@ -942,20 +955,20 @@ friend class IHTTPGateway;
void Wakeup(size_t sizeLimit) {
AwaitQueue->Set(Await.size());
if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) {
curl_multi_wakeup(Handle);
curl_multi_wakeup(Handle.get());
}
}

CURLM* GetHandle() const {
return Handle;
return Handle.get();
}

private:
std::mutex& SyncRef() {
return *Sync;
}

CURLM* Handle = nullptr;
std::shared_ptr<CURLM> Handle;

std::queue<TEasyCurlBuffer::TPtr> Await;
std::vector<TEasyCurlStream::TWeakPtr> Streams;
Expand Down Expand Up @@ -1043,28 +1056,34 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con
{}

IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter,
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter)
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr<CURLM> handle, size_t threshold)
: TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter)
, Handle(handle), Threshold(threshold)
{
Counter->fetch_add(size());
if (InflightCounter) {
InflightCounter->Add(size());
}
}

IHTTPGateway::TCountedContent::~TCountedContent()
{
Counter->fetch_sub(size());
void IHTTPGateway::TCountedContent::BeforeRelease() {
auto oldSize = Counter->fetch_sub(size());
if (oldSize >= Threshold && oldSize - size() < Threshold) {
if (auto handle = Handle.lock()) {
curl_multi_wakeup(handle.get());
}
}
if (InflightCounter) {
InflightCounter->Sub(size());
}
}

IHTTPGateway::TCountedContent::~TCountedContent() {
BeforeRelease();
}

TString IHTTPGateway::TCountedContent::Extract() {
Counter->fetch_sub(size());
if (InflightCounter) {
InflightCounter->Sub(size());
}
BeforeRelease();
return TContentBase::Extract();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,20 @@ class IHTTPGateway {

class TCountedContent : public TContentBase {
public:
TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter);
TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr<CURLM> handle, size_t threshold);
~TCountedContent();

TCountedContent(TCountedContent&&) = default;
TCountedContent& operator=(TCountedContent&& src) = default;

TString Extract();
private:
void BeforeRelease();

const std::shared_ptr<std::atomic_size_t> Counter;
const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter;
std::weak_ptr<CURLM> Handle;
const size_t Threshold;
};

using TOnDownloadStart = std::function<void(CURLcode, long)>; // http code.
Expand Down
Loading