Skip to content

Commit

Permalink
Merge pull request #4535 from sysown/v2.x_240429-2
Browse files Browse the repository at this point in the history
Various code refactoring
  • Loading branch information
renecannao committed May 6, 2024
2 parents 5b87e89 + 9283cd2 commit 4ab9675
Show file tree
Hide file tree
Showing 13 changed files with 991 additions and 501 deletions.
70 changes: 70 additions & 0 deletions doc/internal/MySQL_Connection.md
@@ -0,0 +1,70 @@
### Flowchart of `MySQL_Connection::async_query()`

This function asynchronously executes a query on the MySQL connection.
It handles various states of the asynchronous query execution process and returns appropriate status codes indicating the result of the execution.

Returns an integer status code indicating the result of the query execution:
- 0: Query execution completed successfully.
- -1: Query execution failed.
- 1: Query execution in progress.
- 2: Processing a multi-statement query, control needs to be transferred to MySQL_Session.
- 3: In the middle of processing a multi-statement query.


```mermaid
---
title: MySQL_Connection::async_query()
---
flowchart TD
Assert["assert()"]
ValidConnection{Valid Connection}
ValidConnection -- no --> Assert
IsServerOffline{"IsServerOffline()"}
ValidConnection -- yes --> IsServerOffline
IsServerOffline -- yes --> ReturnMinus1
asyncStateMachine1{async_state_machine}
asyncStateMachine2{async_state_machine}
IsServerOffline -- no --> asyncStateMachine1
asyncStateMachine1 -- ASYNC_QUERY_END --> Return0
handler["handler()"]
asyncStateMachine1 --> handler
handler --> asyncStateMachine2
asyncStateMachine2 -- ASYNC_QUERY_END --> mysql_error{"mysql_error"}
asyncStateMachine2 -- ASYNC_STMT_EXECUTE_END --> mysql_error
asyncStateMachine2 -- ASYNC_STMT_PREPARE_FAILED --> ReturnMinus1
asyncStateMachine2 -- ASYNC_STMT_PREPARE_SUCCESSFUL --> Return0
mysql_error -- yes --> ReturnMinus1
mysql_error -- no --> Return0
asyncStateMachine2 -- ASYNC_NEXT_RESULT_START --> Return2
processing_multi_statement{"processing_multi_statement"}
asyncStateMachine2 --> processing_multi_statement
processing_multi_statement -- yes --> Return3
processing_multi_statement -- no --> Return1
ReturnMinus1["return -1"]
Return0["return 0"]
Return1["return 1"]
Return2["return 2"]
Return3["return 3"]
```

### Flowchart of `MySQL_Connection::IsServerOffline()`

```mermaid
---
title: MySQL_Connection::IsServerOffline()
---
flowchart TD
True[true]
False[false]
SS1{"server_status"}
SA{"shunned_automatic"}
SB{"shunned_and_kill_all_connections"}
SS1 -- OFFLINE_HARD --> True
SS1 -- REPLICATION_LAG --> True
SS1 -- SHUNNED --> SA
SA -- yes --> SB
SB -- yes --> True
SA -- no --> False
SB -- no --> False
SS1 --> False
```
68 changes: 68 additions & 0 deletions doc/internal/MySQL_Session.md
@@ -0,0 +1,68 @@
### Flowchart of `MySQL_Session::RunQuery()`

This function mostly calls `MySQL_Connection::async_query()` with the right arguments.
Returns an integer status code indicating the result of the query execution:
- 0: Query execution completed successfully.
- -1: Query execution failed.
- 1: Query execution in progress.
- 2: Processing a multi-statement query, control needs to be transferred to MySQL_Session.
- 3: In the middle of processing a multi-statement query.

```mermaid
---
title: MySQL_Session::RunQuery()
---
flowchart TD
RQ["MySQL_Connection::async_query()"]
BEGIN --> RQ
RQ --> END
```

### Flowchart of `MySQL_Session::handler()`

WORK IN PROGRESS

```mermaid
---
title: MySQL_Session::handler()
---
flowchart TD
RQ["rc = RunQuery()"]
RC{rc}
CBCS["rc1 = handler_ProcessingQueryError_CheckBackendConnectionStatus()"]
RC1{rc1}
RQ --> RC
RC -- 0 --> OK
RC -- -1 --> CBCS
CBCS --> RC1
CS["CONNECTING_SERVER"]
ReturnMinus1["return -1"]
RC1 -- -1 --> ReturnMinus1
RC1 -- 1 --> CS
HM1CLE1["handler_minus1_ClientLibraryError()"]
HM1CLE2["handler_minus1_ClientLibraryError()"]
myerr1{"myerr >= 2000
&&
myerr < 3000"}
RC1 --> myerr1
myerr1 -- yes --> HM1CLE1
HM1CLE1 -- true --> CS
HM1CLE1 -- false --> ReturnMinus1
HM1LEDQ1["handler_minus1_LogErrorDuringQuery()"]
myerr1 -- no --> HM1LEDQ1
HM1HEC1["handler_minus1_HandleErrorCodes()"]
HM1LEDQ1 --> HM1HEC1
HM1HEC1 -- true --> HR1{"handler_ret"}
HR1 -- 0 --> CS
HR1 --> RHR1["return handler_ret"]
HM1GEM1["handler_minus1_GenerateErrorMessage()"]
HM1HEC1 -- false --> HM1GEM1
RE["RequestEnd()"]
HM1HBC1["handler_minus1_HandleBackendConnection()"]
HM1GEM1 --> RE
RE --> HM1HBC1
```


### Flowchart of `MySQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus()`
TODO
9 changes: 9 additions & 0 deletions include/MySQL_Thread.h
Expand Up @@ -129,6 +129,7 @@ class __attribute__((aligned(64))) MySQL_Thread
void idle_thread_prepares_session_to_send_to_worker_thread(int i);
void idle_thread_to_kill_idle_sessions();
bool move_session_to_idle_mysql_sessions(MySQL_Data_Stream *myds, unsigned int n);
void run_Handle_epoll_wait(int);
#endif // IDLE_THREADS

unsigned int find_session_idx_in_mysql_sessions(MySQL_Session *sess);
Expand All @@ -142,6 +143,13 @@ class __attribute__((aligned(64))) MySQL_Thread
void tune_timeout_for_session_needs_pause(MySQL_Data_Stream *myds);
void configure_pollout(MySQL_Data_Stream *myds, unsigned int n);

void run_MoveSessionsBetweenThreads();
void run_BootstrapListener();
int run_ComputePollTimeout();
void run_StopListener();
void run_SetAllSession_ToProcess0();


protected:
int nfds;

Expand Down Expand Up @@ -212,6 +220,7 @@ class __attribute__((aligned(64))) MySQL_Thread
void ProcessAllSessions_SortingSessions();
void ProcessAllSessions_CompletedMirrorSession(unsigned int& n, MySQL_Session *sess);
void ProcessAllSessions_MaintenanceLoop(MySQL_Session *sess, unsigned long long sess_time, unsigned int& total_active_transactions_);
void ProcessAllSessions_Healthy0(MySQL_Session *sess, unsigned int& n);
void process_all_sessions();
void refresh_variables();
void register_session_connection_handler(MySQL_Session *_sess, bool _new=false);
Expand Down
9 changes: 9 additions & 0 deletions include/mysql_connection.h
Expand Up @@ -60,6 +60,15 @@ class MySQL_Connection {
void update_warning_count_from_statement();
bool is_expired(unsigned long long timeout);
unsigned long long inserted_into_pool;
void connect_start_SetAttributes();
void connect_start_SetCharset();
void connect_start_SetClientFlag(unsigned long&);
char * connect_start_DNS_lookup();
void connect_start_SetSslSettings();
void ProcessQueryAndSetStatusFlags_Warnings(char *);
void ProcessQueryAndSetStatusFlags_UserVariables(char *, int);
void ProcessQueryAndSetStatusFlags_Savepoint(char *);
void ProcessQueryAndSetStatusFlags_SetBackslashEscapes();
public:
struct {
char *server_version;
Expand Down
4 changes: 2 additions & 2 deletions include/proxysql_structs.h
Expand Up @@ -916,7 +916,7 @@ __thread int mysql_thread___monitor_groupreplication_healthcheck_interval;
__thread int mysql_thread___monitor_groupreplication_healthcheck_timeout;
__thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
__thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count;
__thread int mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only;
__thread int mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only;
__thread int mysql_thread___monitor_galera_healthcheck_interval;
__thread int mysql_thread___monitor_galera_healthcheck_timeout;
__thread int mysql_thread___monitor_galera_healthcheck_max_timeout_count;
Expand Down Expand Up @@ -1087,7 +1087,7 @@ extern __thread int mysql_thread___monitor_replication_lag_count;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_interval;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_timeout;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
extern __thread int mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only;
extern __thread int mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only;
extern __thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count;
extern __thread int mysql_thread___monitor_galera_healthcheck_interval;
extern __thread int mysql_thread___monitor_galera_healthcheck_timeout;
Expand Down
8 changes: 4 additions & 4 deletions lib/MySQL_HostGroups_Manager.cpp
Expand Up @@ -2866,8 +2866,8 @@ void MySQL_HostGroups_Manager::group_replication_lag_action(
MyHGC* myhgc = nullptr;

if (
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 0 ||
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 2 ||
mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 0 ||
mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 2 ||
enable
) {
if (read_only == false) {
Expand All @@ -2877,8 +2877,8 @@ void MySQL_HostGroups_Manager::group_replication_lag_action(
}

if (
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 1 ||
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 2 ||
mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 1 ||
mysql_thread___monitor_groupreplication_max_transactions_behind_for_read_only == 2 ||
enable
) {
myhgc = MyHGM->MyHGC_find(reader_hostgroup);
Expand Down
50 changes: 42 additions & 8 deletions lib/MySQL_Session.cpp
Expand Up @@ -4609,20 +4609,32 @@ int MySQL_Session::get_pkts_from_client(bool& wrong_pass, PtrSize_t& pkt) {
// 0 : no action
// -1 : the calling function will return
// 1 : call to NEXT_IMMEDIATE


/**
* @brief Handler for processing query errors and checking backend connection status.
*
* This function handles query errors and checks the status of the backend connection.
* It evaluates the server status and takes appropriate actions based on specific conditions.
*
* @param myds Pointer to the MySQL data stream associated with the session.
*
* @return Returns an integer status code indicating the result of the error handling:
* - 0: No action required, query processing can continue.
* - 1: Retry required due to backend connection status, query processing needs to be retried.
* - -1: Error encountered, query processing cannot continue.
*/

int MySQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(MySQL_Data_Stream *myds) {
MySQL_Connection *myconn = myds->myconn;
// the query failed
if (
// due to #774 , we now read myconn->server_status instead of myconn->parent->status
(myconn->server_status==MYSQL_SERVER_STATUS_OFFLINE_HARD) // the query failed because the server is offline hard
||
(myconn->server_status==MYSQL_SERVER_STATUS_SHUNNED && myconn->parent->shunned_automatic==true && myconn->parent->shunned_and_kill_all_connections==true) // the query failed because the server is shunned due to a serious failure
||
(myconn->server_status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) // slave is lagging! see #774
) {
if (myconn->IsServerOffline()) {
// Set maximum connect time if connect timeout is configured
if (mysql_thread___connect_timeout_server_max) {
myds->max_connect_time=thread->curtime+mysql_thread___connect_timeout_server_max*1000;
}

// Variables to track retry and error conditions
bool retry_conn=false;
if (myconn->server_status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
thread->status_variables.stvar[st_var_backend_lagging_during_query]++;
Expand All @@ -4633,6 +4645,8 @@ int MySQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(MyS
proxy_error("Detected an offline server during query: %s, %d, session_id:%u\n", myconn->parent->address, myconn->parent->port, this->thread_session_id);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, myconn->parent->myhgc->hid, myconn->parent->address, myconn->parent->port, ER_PROXYSQL_OFFLINE_SRV);
}

// Retry the query if retries are allowed and conditions permit
if (myds->query_retries_on_failure > 0) {
myds->query_retries_on_failure--;
if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false && myds->myconn->MultiplexDisabled()==false) {
Expand All @@ -4644,6 +4658,8 @@ int MySQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus(MyS
}
}
}

// Destroy MySQL connection from pool and reset file descriptor
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd=0;
if (retry_conn) {
Expand Down Expand Up @@ -4992,6 +5008,24 @@ void MySQL_Session::handler_minus1_HandleBackendConnection(MySQL_Data_Stream *my
}

// this function was inline
/**
* @brief Run a MySQL query on the current session.
*
* This function executes a MySQL query on the current session based on its status.
* The query execution is asynchronous and handled by the specified MySQL connection.
* It returns the result code of the asynchronous query execution.
*
* @param myds Pointer to the MySQL data stream associated with the session.
* @param myconn Pointer to the MySQL connection used to execute the query.
*
* @return Returns an integer status code indicating the result of the query execution,
* as returned by async_query():
* - 0: Query execution completed successfully.
* - -1: Query execution failed.
* - 1: Query execution in progress.
* - 2: Processing a multi-statement query, control needs to be transferred to MySQL_Session.
* - 3: In the middle of processing a multi-statement query.
*/
int MySQL_Session::RunQuery(MySQL_Data_Stream *myds, MySQL_Connection *myconn) {
PROXY_TRACE2();
int rc = 0;
Expand Down

0 comments on commit 4ab9675

Please sign in to comment.