Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no squash] Avoid use of select(2) #14255

Merged
merged 3 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ option(ENABLE_CURL "Enable cURL support for fetching media" TRUE)
set(USE_CURL FALSE)

if(ENABLE_CURL)
find_package(CURL)
find_package(CURL 7.28.0)
if (CURL_FOUND)
message(STATUS "cURL support enabled.")
set(USE_CURL TRUE)
Expand Down
156 changes: 63 additions & 93 deletions src/httpfetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <unordered_map>
#include <cerrno>
#include <mutex>
#include "network/socket.h" // for select()
#include "threading/event.h"
#include "config.h"
#include "exceptions.h"
Expand Down Expand Up @@ -166,30 +165,27 @@ static size_t httpfetch_discardfunction(

class CurlHandlePool
{
std::list<CURL*> handles;
std::vector<CURL*> handles;

public:
CurlHandlePool() = default;

~CurlHandlePool()
{
for (std::list<CURL*>::iterator it = handles.begin();
it != handles.end(); ++it) {
curl_easy_cleanup(*it);
for (CURL *it : handles) {
curl_easy_cleanup(it);
}
}
CURL * alloc()
{
CURL *curl;
if (handles.empty()) {
curl = curl_easy_init();
if (curl == NULL) {
errorstream<<"curl_easy_init returned NULL"<<std::endl;
}
}
else {
curl = handles.front();
handles.pop_front();
if (!curl)
throw std::bad_alloc();
} else {
curl = handles.back();
handles.pop_back();
}
return curl;
}
Expand Down Expand Up @@ -436,6 +432,12 @@ HTTPFetchOngoing::~HTTPFetchOngoing()
}


#if LIBCURL_VERSION_NUM >= 0x074200
#define HAVE_CURL_MULTI_POLL
#else
#undef HAVE_CURL_MULTI_POLL
#endif

class CurlFetchThread : public Thread
{
protected:
Expand All @@ -448,7 +450,7 @@ class CurlFetchThread : public Thread
struct Request {
RequestType type;
HTTPFetchRequest fetch_request;
Event *event;
Event *event = nullptr;
};

CURLM *m_multi;
Expand All @@ -474,8 +476,7 @@ class CurlFetchThread : public Thread
Request req;
req.type = RT_FETCH;
req.fetch_request = fetch_request;
req.event = NULL;
m_requests.push_back(req);
m_requests.push_back(std::move(req));
}

void requestClear(u64 caller, Event *event)
Expand All @@ -484,31 +485,29 @@ class CurlFetchThread : public Thread
req.type = RT_CLEAR;
req.fetch_request.caller = caller;
req.event = event;
m_requests.push_back(req);
m_requests.push_back(std::move(req));
}

void requestWakeUp()
{
Request req;
req.type = RT_WAKEUP;
req.event = NULL;
m_requests.push_back(req);
m_requests.push_back(std::move(req));
}

protected:
// Handle a request from some other thread
// E.g. new fetch; clear fetches for one caller; wake up
void processRequest(const Request &req)
void processRequest(Request &req)
{
if (req.type == RT_FETCH) {
// New fetch, queue until there are less
// than m_parallel_limit ongoing fetches
m_queued_fetches.push_back(req.fetch_request);
m_queued_fetches.push_back(std::move(req.fetch_request));

// see processQueued() for what happens next

}
else if (req.type == RT_CLEAR) {
} else if (req.type == RT_CLEAR) {
u64 caller = req.fetch_request.caller;

// Abort all ongoing fetches for the caller
Expand All @@ -521,20 +520,18 @@ class CurlFetchThread : public Thread
}

// Also abort all queued fetches for the caller
for (std::list<HTTPFetchRequest>::iterator
it = m_queued_fetches.begin();
for (auto it = m_queued_fetches.begin();
it != m_queued_fetches.end();) {
if ((*it).caller == caller)
it = m_queued_fetches.erase(it);
else
++it;
}
}
else if (req.type == RT_WAKEUP) {
} else if (req.type == RT_WAKEUP) {
// Wakeup: Nothing to do, thread is awake at this point
}

if (req.event != NULL)
if (req.event)
req.event->signal();
}

Expand All @@ -543,7 +540,7 @@ class CurlFetchThread : public Thread
{
while (m_all_ongoing.size() < m_parallel_limit &&
!m_queued_fetches.empty()) {
HTTPFetchRequest request = m_queued_fetches.front();
HTTPFetchRequest request = std::move(m_queued_fetches.front());
m_queued_fetches.pop_front();

// Create ongoing fetch data and make a cURL handle
Expand All @@ -563,20 +560,16 @@ class CurlFetchThread : public Thread
// Process CURLMsg (indicates completion of a fetch)
void processCurlMessage(CURLMsg *msg)
{
if (msg->msg != CURLMSG_DONE)
return;
// Determine which ongoing fetch the message pertains to
size_t i = 0;
bool found = false;
for (i = 0; i < m_all_ongoing.size(); ++i) {
if (m_all_ongoing[i]->getEasyHandle() == msg->easy_handle) {
found = true;
break;
}
}
if (msg->msg == CURLMSG_DONE && found) {
// m_all_ongoing[i] succeeded or failed.
HTTPFetchOngoing &ongoing = *m_all_ongoing[i];
for (auto it = m_all_ongoing.begin(); it != m_all_ongoing.end(); ++it) {
auto &ongoing = **it;
if (ongoing.getEasyHandle() != msg->easy_handle)
continue;
httpfetch_deliver_result(*ongoing.complete(msg->data.result));
m_all_ongoing.erase(m_all_ongoing.begin() + i);
m_all_ongoing.erase(it);
return;
}
}

Expand All @@ -595,73 +588,48 @@ class CurlFetchThread : public Thread
// Wait until some IO happens, or timeout elapses
void waitForIO(long timeout)
{
fd_set read_fd_set;
fd_set write_fd_set;
fd_set exc_fd_set;
int max_fd;
long select_timeout = -1;
struct timeval select_tv;
CURLMcode mres;

FD_ZERO(&read_fd_set);
FD_ZERO(&write_fd_set);
FD_ZERO(&exc_fd_set);
#ifdef HAVE_CURL_MULTI_POLL
mres = curl_multi_poll(m_multi, nullptr, 0, timeout, nullptr);

mres = curl_multi_fdset(m_multi, &read_fd_set,
&write_fd_set, &exc_fd_set, &max_fd);
if (mres != CURLM_OK) {
errorstream<<"curl_multi_fdset"
<<" returned error code "<<mres
<<std::endl;
select_timeout = 0;
errorstream << "curl_multi_poll returned error code "
<< mres << std::endl;
}
#else
// If there's nothing to do curl_multi_wait() will immediately return
// so we have to emulate the sleeping.

mres = curl_multi_timeout(m_multi, &select_timeout);
fd_set dummy;
int max_fd;
mres = curl_multi_fdset(m_multi, &dummy, &dummy, &dummy, &max_fd);
if (mres != CURLM_OK) {
errorstream<<"curl_multi_timeout"
<<" returned error code "<<mres
<<std::endl;
select_timeout = 0;
errorstream << "curl_multi_fdset returned error code "
<< mres << std::endl;
max_fd = -1;
}

// Limit timeout so new requests get through
if (select_timeout < 0 || select_timeout > timeout)
select_timeout = timeout;

if (select_timeout > 0) {
// in Winsock it is forbidden to pass three empty
// fd_sets to select(), so in that case use sleep_ms
if (max_fd != -1) {
select_tv.tv_sec = select_timeout / 1000;
select_tv.tv_usec = (select_timeout % 1000) * 1000;
int retval = select(max_fd + 1, &read_fd_set,
&write_fd_set, &exc_fd_set,
&select_tv);
if (retval == -1) {
#ifdef _WIN32
errorstream<<"select returned error code "
<<WSAGetLastError()<<std::endl;
#else
errorstream<<"select returned error code "
<<errno<<std::endl;
#endif
}
}
else {
sleep_ms(select_timeout);
if (max_fd == -1) { // curl has nothing to wait for
if (timeout > 0)
sleep_ms(timeout);
} else {
mres = curl_multi_wait(m_multi, nullptr, 0, timeout, nullptr);

if (mres != CURLM_OK) {
errorstream << "curl_multi_wait returned error code "
<< mres << std::endl;
}
}
#endif
}

void *run()
{
CurlHandlePool pool;

m_multi = curl_multi_init();
if (m_multi == NULL) {
errorstream<<"curl_multi_init returned NULL\n";
return NULL;
}
FATAL_ERROR_IF(!m_multi, "curl_multi_init returned NULL");

FATAL_ERROR_IF(!m_all_ongoing.empty(), "Expected empty");

Expand Down Expand Up @@ -733,15 +701,17 @@ class CurlFetchThread : public Thread
}
};

std::unique_ptr<CurlFetchThread> g_httpfetch_thread = nullptr;
static std::unique_ptr<CurlFetchThread> g_httpfetch_thread;

void httpfetch_init(int parallel_limit)
{
FATAL_ERROR_IF(g_httpfetch_thread, "httpfetch_init called twice");

verbosestream<<"httpfetch_init: parallel_limit="<<parallel_limit
<<std::endl;

CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
FATAL_ERROR_IF(res != CURLE_OK, "CURL init failed");
FATAL_ERROR_IF(res != CURLE_OK, "cURL init failed");

g_httpfetch_thread = std::make_unique<CurlFetchThread>(parallel_limit);

Expand Down Expand Up @@ -779,7 +749,7 @@ static void httpfetch_request_clear(u64 caller)
g_httpfetch_thread->requestClear(caller, &event);
event.wait();
} else {
g_httpfetch_thread->requestClear(caller, NULL);
g_httpfetch_thread->requestClear(caller, nullptr);
}
}

Expand All @@ -791,7 +761,7 @@ void httpfetch_sync(const HTTPFetchRequest &fetch_request,
CurlHandlePool pool;
HTTPFetchOngoing ongoing(fetch_request, &pool);
// Do the fetch (curl_easy_perform)
CURLcode res = ongoing.start(NULL);
CURLcode res = ongoing.start(nullptr);
// Update fetch result
fetch_result = *ongoing.complete(res);
}
Expand Down