Skip to content

Commit

Permalink
Update windows branch codes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Barenboim committed May 8, 2024
1 parent 95e45eb commit 8fb2cce
Show file tree
Hide file tree
Showing 20 changed files with 1,173 additions and 485 deletions.
80 changes: 60 additions & 20 deletions src/factory/DnsTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
*/

#include <string>
#include <atomic>
#include "DnsMessage.h"
#include "WFTaskError.h"
#include "WFTaskFactory.h"
#include "DnsMessage.h"
#include "WFServer.h"

using namespace protocol;

Expand All @@ -31,16 +33,17 @@ class ComplexDnsTask : public WFComplexClientTask<DnsRequest, DnsResponse,
std::function<void (WFDnsTask *)>>
{
static struct addrinfo hints;
static std::atomic<size_t> seq;

public:
ComplexDnsTask(int retry_max, dns_callback_t&& cb):
WFComplexClientTask(retry_max, std::move(cb))
{
#ifdef _WIN32
this->set_transport_type(TT_TCP);
#else
this->set_transport_type(TT_UDP);
#endif
this->set_transport_type(TT_TCP);
#else
this->set_transport_type(TT_UDP);
#endif
}

protected:
Expand All @@ -54,24 +57,21 @@ class ComplexDnsTask : public WFComplexClientTask<DnsRequest, DnsResponse,

struct addrinfo ComplexDnsTask::hints =
{
/*.ai_flags =*/ AI_NUMERICSERV | AI_NUMERICHOST,
/*.ai_family =*/ AF_UNSPEC,
/*.ai_socktype =*/ SOCK_STREAM,
/*.ai_protocol =*/ 0,
/*.ai_addrlen =*/ 0,
/*.ai_addr =*/ NULL,
/*.ai_canonname =*/ NULL,
/*.ai_next =*/ NULL
/*.ai_flags =*/ AI_NUMERICSERV | AI_NUMERICHOST,
/*.ai_family =*/ AF_UNSPEC,
/*.ai_socktype =*/ SOCK_STREAM
};

std::atomic<size_t> ComplexDnsTask::seq(0);

CommMessageOut *ComplexDnsTask::message_out()
{
DnsRequest *req = this->get_req();
DnsResponse *resp = this->get_resp();
TransportType type = this->get_transport_type();
enum TransportType type = this->get_transport_type();

if (req->get_id() == 0)
req->set_id((this->get_seq() + 1) * 99991 % 65535 + 1);
req->set_id(++ComplexDnsTask::seq * 99991 % 65535 + 1);
resp->set_request_id(req->get_id());
resp->set_request_name(req->get_question_name());
req->set_single_packet(type == TT_UDP);
Expand All @@ -93,21 +93,22 @@ bool ComplexDnsTask::init_success()

if (!this->route_result_.request_object)
{
TransportType type = this->get_transport_type();
enum TransportType type = this->get_transport_type();
struct addrinfo *addr;
int ret;

ret = getaddrinfo(uri_.host, uri_.port, &hints, &addr);
if (ret != 0)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_URI_PARSE_FAILED;
this->state = WFT_STATE_DNS_ERROR;
this->error = ret;
return false;
}

auto *ep = &WFGlobal::get_global_settings()->dns_server_params;
ret = WFGlobal::get_route_manager()->get(type, addr, info_, ep,
uri_.host, route_result_);
uri_.host, ssl_ctx_,
route_result_);
freeaddrinfo(addr);
if (ret < 0)
{
Expand Down Expand Up @@ -146,7 +147,7 @@ bool ComplexDnsTask::finish_once()
bool ComplexDnsTask::need_redirect()
{
DnsResponse *client_resp = this->get_resp();
TransportType type = this->get_transport_type();
enum TransportType type = this->get_transport_type();

if (type == TT_UDP && client_resp->get_tc() == 1)
{
Expand Down Expand Up @@ -189,3 +190,42 @@ WFDnsTask *WFTaskFactory::create_dns_task(const ParsedURI& uri,
return task;
}


/**********Server**********/

class WFDnsServerTask : public WFServerTask<DnsRequest, DnsResponse>
{
public:
WFDnsServerTask(CommService *service,
std::function<void (WFDnsTask *)>& proc) :
WFServerTask(service, WFGlobal::get_scheduler(), proc)
{
// this->type = ((WFServerBase *)service)->get_params()->transport_type;
this->type = TT_TCP;
}

protected:
virtual CommMessageIn *message_in()
{
this->get_req()->set_single_packet(this->type == TT_UDP);
return this->WFServerTask::message_in();
}

virtual CommMessageOut *message_out()
{
this->get_resp()->set_single_packet(this->type == TT_UDP);
return this->WFServerTask::message_out();
}

protected:
enum TransportType type;
};

/**********Server Factory**********/

WFDnsTask *WFServerTaskFactory::create_dns_task(CommService *service,
std::function<void (WFDnsTask *)>& proc)
{
return new WFDnsServerTask(service, proc);
}

3 changes: 2 additions & 1 deletion src/factory/HttpTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ class ComplexHttpProxyTask : public ComplexHttpTask

int ComplexHttpProxyTask::init_ssl_connection()
{
SSL *ssl = __create_ssl(WFGlobal::get_ssl_client_ctx());
static SSL_CTX *ssl_ctx = WFGlobal::get_ssl_client_ctx();
SSL *ssl = __create_ssl(ssl_ctx_ ? ssl_ctx_ : ssl_ctx);
WFConnection *conn;

if (!ssl)
Expand Down
24 changes: 18 additions & 6 deletions src/factory/KafkaTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,16 +747,28 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(enum TransportType type,
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));

std::string url = (type == TT_TCP_SSL ? "kafkas://" : "kafka://");
ParsedURI uri;
char buf[32];

if (type == TT_TCP_SSL)
uri.scheme = strdup("kafkas");
else
uri.scheme = strdup("kafka");

if (!info.empty())
url += info + "@";
uri.userinfo = strdup(info.c_str());

url += host;
url += ":" + std::to_string(port);
uri.host = strdup(host);
sprintf(buf, "%u", port);
uri.port = strdup(buf);

if (!uri.scheme || !uri.host || !uri.port ||
(!info.empty() && !uri.userinfo))
{
uri.state = URI_STATE_ERROR;
uri.error = errno;
}

ParsedURI uri;
URIParser::parse(url, uri);
task->init(std::move(uri));
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
Expand Down
37 changes: 21 additions & 16 deletions src/factory/MySQLTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ CommMessageOut *ComplexMySQLTask::message_out()
break;

case ST_FIRST_USER_REQUEST:
if (this->is_fixed_addr())
if (this->is_fixed_conn())
{
auto *target = (RouteManager::RouteTarget *)this->target;

Expand Down Expand Up @@ -350,23 +350,28 @@ int ComplexMySQLTask::check_handshake(MySQLHandshakeResponse *resp)

if (is_ssl_)
{
if (!(resp->get_capability_flags() & 0x800))
if (resp->get_capability_flags() & 0x800)
{
static SSL_CTX *ssl_ctx = WFGlobal::get_ssl_client_ctx();

ssl = __create_ssl(ssl_ctx_ ? ssl_ctx_ : ssl_ctx);
if (!ssl)
{
state_ = WFT_STATE_SYS_ERROR;
error_ = errno;
return 0;
}

SSL_set_connect_state(ssl);
}
else
{
this->resp = std::move(*(MySQLResponse *)resp);
state_ = WFT_STATE_TASK_ERROR;
error_ = WFT_ERR_MYSQL_SSL_NOT_SUPPORTED;
return 0;
}

ssl = __create_ssl(WFGlobal::get_ssl_client_ctx());
if (!ssl)
{
state_ = WFT_STATE_SYS_ERROR;
error_ = errno;
return 0;
}

SSL_set_connect_state(ssl);
}

auto *conn = this->get_connection();
Expand Down Expand Up @@ -712,9 +717,9 @@ bool ComplexMySQLTask::init_success()

if (!transaction.empty())
{
this->WFComplexClientTask::set_info(std::string("?maxconn=1&") +
info + "|txn:" + transaction);
this->set_fixed_addr(true);
this->set_fixed_conn(true);
this->WFComplexClientTask::set_info(info + ("|txn:" + transaction));
}
else
this->WFComplexClientTask::set_info(info);
Expand All @@ -741,7 +746,7 @@ bool ComplexMySQLTask::finish_once()
return false;
}

if (this->is_fixed_addr())
if (this->is_fixed_conn())
{
if (this->state != WFT_STATE_SUCCESS || this->keep_alive_timeo == 0)
{
Expand All @@ -767,7 +772,7 @@ WFMySQLTask *WFTaskFactory::create_mysql_task(const std::string& url,

URIParser::parse(url, uri);
task->init(std::move(uri));
if (task->is_fixed_addr())
if (task->is_fixed_conn())
task->set_keep_alive(MYSQL_KEEPALIVE_TRANSACTION);
else
task->set_keep_alive(MYSQL_KEEPALIVE_DEFAULT);
Expand All @@ -782,7 +787,7 @@ WFMySQLTask *WFTaskFactory::create_mysql_task(const ParsedURI& uri,
auto *task = new ComplexMySQLTask(retry_max, std::move(callback));

task->init(uri);
if (task->is_fixed_addr())
if (task->is_fixed_conn())
task->set_keep_alive(MYSQL_KEEPALIVE_TRANSACTION);
else
task->set_keep_alive(MYSQL_KEEPALIVE_DEFAULT);
Expand Down
7 changes: 7 additions & 0 deletions src/factory/WFTaskFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,13 @@ class WFNetworkTaskFactory
int retry_max,
std::function<void (T *)> callback);

static T *create_client_task(enum TransportType type,
const struct sockaddr *addr,
socklen_t addrlen,
SSL_CTX *ssl_ctx,
int retry_max,
std::function<void (T *)> callback);

public:
static T *create_server_task(CommService *service,
std::function<void (T *)>& process);
Expand Down

0 comments on commit 8fb2cce

Please sign in to comment.