diff --git a/Cargo.lock b/Cargo.lock index 3bdc5d11b209..64e51f8dfdd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,12 @@ dependencies = [ "winapi 0.3.4", ] +[[package]] +name = "antidote" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34fde25430d87a9388dadbe6e34d7f72a462c8b43ac8d309b42b0a8505d7e2a5" + [[package]] name = "anyhow" version = "1.0.26" @@ -1431,6 +1437,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-openssl" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f52657b5cdb2a8067efd29a02e011b7cf656b473ec8a5c34e86645e85d763006" +dependencies = [ + "antidote", + "bytes 0.4.12", + "futures 0.1.29", + "hyper 0.12.35", + "lazy_static 1.4.0", + "linked_hash_set", + "openssl", + "openssl-sys", + "tokio-io", + "tokio-openssl 0.3.0", +] + [[package]] name = "hyper-tls" version = "0.3.2" @@ -1719,6 +1743,21 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae91b68aebc4ddb91978b11a1b02ddd8602a05ec19002801c5666000e05e0f83" + +[[package]] +name = "linked_hash_set" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c7c91c4c7bbeb4f2f7c4e5be11e6a05bd6830bc37249c47ce1ad86ad453ff9c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "lock_api" version = "0.1.3" @@ -2531,7 +2570,7 @@ version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb1a649b870f7fe03c965bf2fa6728361f6abbb6f3b72079515af8cb56d769cf" dependencies = [ - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2620,7 +2659,7 @@ version = "0.4.3" source = "git+https://github.com/tikv/raft-rs?rev=9782199fa6318f94bf0974129664d974b03f8f11#9782199fa6318f94bf0974129664d974b03f8f11" dependencies = [ "fxhash", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf", "quick-error 1.2.2", "rand 0.5.5", @@ -3772,6 +3811,7 @@ dependencies = [ "hashbrown", "hex 0.3.2", "hyper 0.12.35", + "hyper-openssl", "indexmap", "kvproto", "lazy_static 1.4.0", @@ -3784,6 +3824,7 @@ dependencies = [ "nom 5.0.0-beta1", "num", "num-traits 0.2.5", + "openssl", "ordered-float", "panic_hook", "pprof", @@ -3818,6 +3859,9 @@ dependencies = [ "tipb_helper", "tokio-core", "tokio-executor", + "tokio-io", + "tokio-openssl 0.2.1", + "tokio-tcp", "tokio-threadpool", "tokio-timer", "toml", @@ -4036,6 +4080,28 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-openssl" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4646ae1fd623393de3d796ea53af75acd02938dd5579544fbd6d236d041978a6" +dependencies = [ + "futures 0.1.29", + "openssl", + "tokio-io", +] + +[[package]] +name = "tokio-openssl" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "771d6246b170ae108d67d9963c23f31a579016c016d73bd4bd7d6ef0252afda7" +dependencies = [ + "futures 0.1.29", + "openssl", + "tokio-io", +] + [[package]] name = "tokio-process" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index fdd3252b314c..d7e5800a1894 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,10 @@ tikv_alloc = { path = "components/tikv_alloc", default-features = false } tokio-core = "0.1" tokio-timer = "0.2" tokio-executor = "0.1" +tokio-tcp = "0.1" +tokio-openssl = "0.2" +tokio-io = "0.1" +openssl = "0.10" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" @@ -130,6 +134,7 @@ panic_hook = { path = "components/panic_hook" } # of `real_blackbox` feature. criterion = { version = "0.2.11", features=['real_blackbox'] } arrow = "0.10.0" +hyper-openssl = "0.7" [target.'cfg(target_os = "linux")'.dependencies] procinfo = { git = "https://github.com/tikv/procinfo-rs" } diff --git a/components/backup/src/endpoint.rs b/components/backup/src/endpoint.rs index 42e741e215d9..079091a9e3fa 100644 --- a/components/backup/src/endpoint.rs +++ b/components/backup/src/endpoint.rs @@ -517,9 +517,17 @@ impl Endpoint { let store_id = self.store_id; // TODO: make it async. self.pool.borrow_mut().spawn(lazy(move || loop { - let mut progress = prs.lock().unwrap(); - let branges = progress.forward(WORKER_TAKE_RANGE); - let is_raw_kv = progress.is_raw_kv; + let (branges, is_raw_kv, cf) = { + // Release lock as soon as possible. + // It is critical to speed up backup, otherwise workers are + // blocked by each other. + let mut progress = prs.lock().unwrap(); + ( + progress.forward(WORKER_TAKE_RANGE), + progress.is_raw_kv, + progress.cf, + ) + }; if branges.is_empty() { return Ok(()); } @@ -541,7 +549,7 @@ impl Endpoint { let name = backup_file_name(store_id, &brange.region, key); let res = if is_raw_kv { - brange.backup_raw_kv_to_file(&engine, db.clone(), &storage, name, progress.cf) + brange.backup_raw_kv_to_file(&engine, db.clone(), &storage, name, cf) } else { brange.backup_to_file(&engine, db.clone(), &storage, name, backup_ts, start_ts) }; diff --git a/components/test_util/data/ca.crt b/components/test_util/data/ca.crt deleted file mode 100644 index fb59b8af545c..000000000000 --- a/components/test_util/data/ca.crt +++ /dev/null @@ -1,20 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDWTCCAkGgAwIBAgIJAIIbhWlMUnaXMA0GCSqGSIb3DQEBCwUAMEIxCzAJBgNV -BAYTAlhYMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQg -Q29tcGFueSBMdGQwIBcNMTcxMTI0MjAzNjE3WhgPMjExNzEwMzEyMDM2MTdaMEIx -CzAJBgNVBAYTAlhYMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0Rl -ZmF1bHQgQ29tcGFueSBMdGQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIB -AQC2KD3v7ewrvsJ2ZluJe7I5sPJG3abd1FZ1FyYc1NAP+FWwbOlNbBYNrMxOW2uK -Dcj3Hsdsiqhw7AfpjgP1qRVGifXSP8s3drFWRfsXRzBRlN0qtxfpvUMBGiypBpYq -RSOSbr0JUZ/NN3rqxTtiFvylVFnErfAoSf5XNeb/gYlOQgmK39qzvgF0ChdXQUib -94XP88FtgmIm+gZBCo8B7fTsAeb3jQfQ40nWYiJ+7NForHcI3gAUulm5gAV5wIwM -5OySf8GGiYJCE0efoKmxkFtC7WCqLMFYdyKfqWuXKKo9cWub/TRKVaA3pkPjAE1b -8kPCDhKBdNk90bQQBIRQMBLRAgMBAAGjUDBOMB0GA1UdDgQWBBQpW0hjwq0KAxLJ -HoDNR4A9V2tz3DAfBgNVHSMEGDAWgBQpW0hjwq0KAxLJHoDNR4A9V2tz3DAMBgNV -HRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQB1RJVs9CWZVZz0Z/fguQR8xcu3 -zjjgE2EPC2l/zKIp92MyCI4NEtt4tNe01e+IqTONYB/6AGIy4QbhDbsWNzkZjlkk -KxoRJ+wp8eVjnBPhvNNCLdxdc+eguoBAlFA7TfXcdGbTRe3QaR8BolTnvEbPzlEc -GOdIdSwnI4Mh+1x+e3GVxvY3C6AqQip10Ewxv2ypzhgkqEjrjr1UNrM+txgMA3YB -IMi+iBY2yxqlUMsNQYxcepAayaFVpMDH2ruXKIfUxX2xSOw49p1g+eisXWpvhqnC -txeTqia2xlYge6/NsMusZDyKZxjRWj5z9Oc5t9BitiV9k80CjQ7nn0IIA4p0 ------END CERTIFICATE----- diff --git a/components/test_util/data/ca.pem b/components/test_util/data/ca.pem new file mode 100644 index 000000000000..e130a8eece90 --- /dev/null +++ b/components/test_util/data/ca.pem @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDojCCAoqgAwIBAgIUdZFW8VQoZZzek8cA+5GGu6ZInjowDQYJKoZIhvcNAQEL +BQAwVzELMAkGA1UEBhMCQ04xEDAOBgNVBAgTB0JlaWppbmcxEDAOBgNVBAcTB0Jl +aWppbmcxEDAOBgNVBAoTB1BpbmdDQVAxEjAQBgNVBAMTCU15IG93biBDQTAeFw0x +OTA5MDIwNjEyMDBaFw0yNDA4MzEwNjEyMDBaMFcxCzAJBgNVBAYTAkNOMRAwDgYD +VQQIEwdCZWlqaW5nMRAwDgYDVQQHEwdCZWlqaW5nMRAwDgYDVQQKEwdQaW5nQ0FQ +MRIwEAYDVQQDEwlNeSBvd24gQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQDcDtQ7UX+xlVY0vpklp1uUmPoFsN0U6fqRzHU+LvYS5AM5RPJMVLiKBiSi +zGsB+XPmXZ8H7rZZ+osZsEmDIF3HdyiSNpPNzRJKxsz4KVRzfoKZXL9D41TpuE27 ++7tN6qGytYrnAy8cHMA0S1TnQ0biOFTcXZrwh5lvlIcx7ceUamGuEl94tblxSSJl +2SkpHkKIDv0kcgoGmmh4y8SzAtmnwcCjkCSoITvvwKklp5830pFKOnpN9uZJzkXa +tuUSpSji/JG79nQfH91LtL7xMprORVtg9YAa3aJm0Uf33WFvaCTSrt//7CVK8nqK +xayS3u7dNH3GV9b81OGtlR76leFlAgMBAAGjZjBkMA4GA1UdDwEB/wQEAwIBBjAS +BgNVHRMBAf8ECDAGAQH/AgECMB0GA1UdDgQWBBS3hxTaN9B7eF8xr0DKLZ3b5vFn +rDAfBgNVHSMEGDAWgBS3hxTaN9B7eF8xr0DKLZ3b5vFnrDANBgkqhkiG9w0BAQsF +AAOCAQEAi9WiEvTQQjmb7ekXHf1tKwdLNu5akQXIwTKeZSWRSeMgqVQcoyTZMPBX +ythl6K3175RUIMtCwO4uZTOpRU1mTl0pIjoEcJGHYX91zyA5BjWahXZttvt7/hyX +UwJN9clBXLfZTCp1ysLCtarLcip4WxWNsxEwXFUisE2gbu3F9ELHAbRSVUe/CwC6 +8BkY+G+fovazjGoTV4NadJVFRzTR/zsWkBNllBOBTrop8FH23ePVh3hXafzJlcip +bDbRxNqSzNtLr88mwswklgiIHXF6PY2TkyscsXVkHPAswZnrv4lLov7M3VjL8ITA +uYm4Me5Tmj+6pb+Foky15+ehmicQbA== +-----END CERTIFICATE----- diff --git a/components/test_util/data/key.pem b/components/test_util/data/key.pem new file mode 100644 index 000000000000..c7f9fa8c340c --- /dev/null +++ b/components/test_util/data/key.pem @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAsRpq/E/VC82YxsC5LlKFvI9HJuchMtKskn53anW4rNE3sfN0 +WDS6qCyxNumUVBqO98J18xxbz/XkV7aP6TcXZrNgEqw07PZWTDoyZVi+n9HXyWwl +BeiE2WWrCESqsar+cXV5UE3oE7Y4CT56tMN+awKqnf1zLyRl9DlqSg1/GabheVzz +fGhdqddqdpAZcaOHH8UMEWdnZ4qTFaaGNRlrRy3W0VjzgIocQorpvvtZkQM5iCxx +z9wuF9/6gGdopTA0J2SvZKa+oI/867NLpN5Hx+cn/ThHhCTh1N34Ulloa0aiou72 +mGgyMIdQxYAsRnG62EHn+9aPtegIjQd13Be9/wIDAQABAoIBAHJ8v3iIKxNMP10M +rSlS032HqdluRLnUExdIhe3eWBnvze9NkIKM47Vf3te+u9J6sL1dil40kO2o6YoC +TJnYsVoEzzCC/lvJCxSP8pAthF1QjAx7yps9KtRWsu/PZAEipwW1iUzub/5+J09i +gnRkhE6tFJq5g0KQZxAwJZPlkaqEcZIOObfh9zD9hutvCPmXBtB600EbQU4XzyjP +KaU08LtNZVm4mhKMuhXuFt8LBkjjfuw6zNcjsvgMkyflFTLc/SgWWIpq1ALHQCsq +OiFfTPyuLy+8tGTbawvRIqiHHRd23XttPcfkdfWbNVTSBfodTOhXGFaVYbJ6EVA4 +OzVzftECgYEAz/D99wpWbjU8vye5cjKjZCY/+QnY0t76YsUmfD9+fQNBDSQnKCKj +6nO6oYFQ9RI/vPMfrNX0sqo5hKfufNBCr/MILDXR6vtcEuaqd84DgaPVPRjHef6v +paYUi0Enb3gF3LXYggTN1mz9leEW8BablTN/DLP5AAvMfM/XSkVzlIsCgYEA2gjc +mcUDL1smAvriFVmpD4IrPzaZ9kINOfFNqkp/+y7S0BZGeS5ESSodrs0CIojttp3o +9GL7QLhZ9DehJWfh2qfA5mvzKGzUeM2oapR2Ts/m3voS4ErPTm+cTBOjRe3gGSSN +4sAJ5LA071RfNjEZBSktow//WX/oWrhIyovnxt0CgYBxyge/4xlO77URSdSySEGf +MUs6pYfQRRKxb/9SaJB4KoqzfUAsN2CJkNDlRlWd9mGIrWZ89wwTpREapabdCD4l ++JFVWBJKS0ikUzOfoc3LaHLtHx0xhgxqUkrVtU62MfDLSXt0Etrs5vGRzf32Xfi/ +mdGBiw7MVqiM+FNwojbQZwKBgDly5E1P78rmhVl7qV5exYDkl2iMhnywYrPFtOUN +xDL2320csWz0l+F/S1rngYx/78KSUPMzsWgYKvuCPN+SQ5xNXzJXdzZLlqBN7/ZF +L/cMKJTP53FZxM2x8sjI09h1GPsG+quoVfL/yrLU1FF/FkyZ0QCKEooOfbaJoARe +YK+xAoGAfT0P200WsLKRl73XYJZNYQl5+h5s7Sk9J8QuPwFWqm/mGwYKTLI042jg +lsAym4krAR0c1CHTW3aHRimYpYbi7/kztZU1zUQgcGL+79afer3ZuFF7mGzR+I/r +yOQ2dEfmVASfl/fMh1qyExpcCaMuejaODWyILlxOwvnywHWMSCU= +-----END RSA PRIVATE KEY----- diff --git a/components/test_util/data/server.crt b/components/test_util/data/server.crt deleted file mode 100644 index 65db7945340c..000000000000 --- a/components/test_util/data/server.crt +++ /dev/null @@ -1,19 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDGDCCAgACCQC8hUiY0zaHIDANBgkqhkiG9w0BAQsFADBCMQswCQYDVQQGEwJY -WDEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBh -bnkgTHRkMCAXDTE3MTEyNDIxMTI0NFoYDzIxMTcxMDMxMjExMjQ0WjBYMQswCQYD -VQQGEwJYWDEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MRwwGgYDVQQKDBNEZWZhdWx0 -IENvbXBhbnkgTHRkMRQwEgYDVQQDDAtleGFtcGxlLmNvbTCCASIwDQYJKoZIhvcN -AQEBBQADggEPADCCAQoCggEBANoua/h0FSBGyq+8B3plgpm5JJm0tDUqIBn/VYGb -sNN90KN4HIKi+jb2Me5yJPuteVJxR1H7UIZ7lhYyyFx07YNFYCIOLhlOoAMOw1Mo -LKUPnJRrg+uu/ek4RdlLgyg81poEc4vMc1ohdy/UYQr+XU/tO2+vtEHSxVLIwyha -rX2fmLhONcFU2aZoMXpmsndQ9M64rAJ9AjKVCmTmNm7aVt/CuqCOZ6tj82UVPBrR -Hwii4CqBTc0f0bRmM9pp5wb7OFrnz0jyC25TagdZNc1yWM6i8ct68YWwFsWlxtoW -GcZMIUkSIPMjB+h5L3oc5t37tq4Fjq2vrEfw9bozGTWVoeECAwEAATANBgkqhkiG -9w0BAQsFAAOCAQEAV+g/54X2q+oy2PFNjIhplg7cY7vcR12lfv6Qyfn19Mgq7r+S -+gjT4LHv8cPNIrNSWGtLqZtqaTrOJk/0E7OybLs6tkX1OL9A43z8tKFlOrGl4V9s -ZP/mbMoXeZtOlqTLOeHUscg+PEGDwL4BXi5U1WxUU0Doh1XoO0D1pXLQCm3AP9o2 -s3nLyCExwykPouz7DFgvov4WDFiYquwMzxjEGgjoKV5aqupjWt18UgLTJiip7WSs -fkD9JnVUvJ5kFzoY0CIMeHw/l7EX/4yMveAMiRj7vnODhwh64GRtcbdHiKcPRBLZ -PvdqgIpsxFLK9UhvE2KI+rudDSfhaARj05Svdg== ------END CERTIFICATE----- diff --git a/components/test_util/data/server.pem b/components/test_util/data/server.pem index 95215237d7d4..09200bd82f6a 100644 --- a/components/test_util/data/server.pem +++ b/components/test_util/data/server.pem @@ -1,27 +1,22 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpAIBAAKCAQEA2i5r+HQVIEbKr7wHemWCmbkkmbS0NSogGf9VgZuw033Qo3gc -gqL6NvYx7nIk+615UnFHUftQhnuWFjLIXHTtg0VgIg4uGU6gAw7DUygspQ+clGuD -66796ThF2UuDKDzWmgRzi8xzWiF3L9RhCv5dT+07b6+0QdLFUsjDKFqtfZ+YuE41 -wVTZpmgxemayd1D0zrisAn0CMpUKZOY2btpW38K6oI5nq2PzZRU8GtEfCKLgKoFN -zR/RtGYz2mnnBvs4WufPSPILblNqB1k1zXJYzqLxy3rxhbAWxaXG2hYZxkwhSRIg -8yMH6Hkvehzm3fu2rgWOra+sR/D1ujMZNZWh4QIDAQABAoIBAEfb/EGzcfXUexNQ -OaJNZqtcuDpLswLDohkN6MqsTZwKlzoP6Ev6g7Cwe5eOTrH527iUiKnuvQHeGHut -NCKHfGa85cGxq+s34ym+pgRweevPbYHQu31XgFdc6lx8K3GIQCIwDyJfLyrjVM8T -AvdM+czGVMofM55uXgE7EFPtMbDq0JKHj/A0uAd8+3WKTWebshVxi2h7ejGQK5fO -xi3WzeeAYu65lX7rrqDT9bFZFXuyxd5mhZFJYxG9KxYgI3rRhYLuLDMZHLT0N0wn -M2mcf4lRgoXQr+stzyT5P43fu1QM+FaNZ8uhBUGJm6nRi3cw8QIqWfTzvb22PLvk -NPhznLECgYEA/sbfqqAfm7GH8WGWfAKJLTd4tdoAHn17udTAZWn5Kegy0q+81YMj -BIiETrnpGQRxIZnCJw+aHU/lic5dbkD0ZwvDR5+CNzQjxs4y/hTpjNEsaCKyArSm -jWvuy6mYWcwM1Lp5yuUpL7VQk9x7maXD+7a97xYL2/9t3/CyYS/+zqUCgYEA2zqS -Rbor7dbVkKgDzgRtt8GGC2k+yz1U1rBnHke/OSfUSUBbrdUr7ybsAa3qon02LGA7 -+qidDY4j+s+PQfTr4SMTCOXlV1DpMpJd5+qh5yPXe+XThu+FWh1CXdcWI0XT+/9B -0AwLgWzAuRU/m1JIBhMZakEBasRzQiqB1deWvY0CgYBmM/0xgz5qxJLWH+GwKYxB -2UjRGnyFvqzNZS0xAYv0ZbNNlTXZKNv5S5JXynhZktCXPAkIhle6fnyEBYaxXdkt -JSjXKIOiBYZ8j+cgyd7OoHKB67khILrXbH7EsGnvS82x4IRPAhK9kqyaRA5JGpg9 -95bFvEBRpmu7M+E633gGCQKBgQCSjYYpDLq/JUXhjR/2AinilIFqcXHj5d0oJAbb -TDU+HS0hxt9CxuW22vscaEoZU8D6S17tQviyjhnpWgW3nuZsu8jGwwDcrR8niocy -OT0ASoqLrekJJGeuBS9PkjCfZde/dzVkwhiS7cOsNtMtnwS84tmzmT88Q5WVXtsq -vBNuJQKBgQDgf4GyhMwooWWJy7dkgrLdd5/WIrBIgXi3l731GytEDaQZyWGgNpXN -jiW67honEyX/TX59Fw5nU3eFodvjsPZFDHvhFb6QPUikgnY4TEuR03wxiiTzQO5a -zcG3Oyw62OSzUuI7Yu7nDkV5Pr1BYvHoL3mM7NqqrQOpD9zMuLv1Gg== ------END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIIDlTCCAn2gAwIBAgIUGKdjy/Uqp64ZiwqMwpTMGP5tKT0wDQYJKoZIhvcNAQEL +BQAwVzELMAkGA1UEBhMCQ04xEDAOBgNVBAgTB0JlaWppbmcxEDAOBgNVBAcTB0Jl +aWppbmcxEDAOBgNVBAoTB1BpbmdDQVAxEjAQBgNVBAMTCU15IG93biBDQTAgFw0x +OTA5MDIwNjEzMDBaGA8yMTE5MDgwOTA2MTMwMFowFjEUMBIGA1UEAxMLdGlrdi1z +ZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCxGmr8T9ULzZjG +wLkuUoW8j0cm5yEy0qySfndqdbis0Tex83RYNLqoLLE26ZRUGo73wnXzHFvP9eRX +to/pNxdms2ASrDTs9lZMOjJlWL6f0dfJbCUF6ITZZasIRKqxqv5xdXlQTegTtjgJ +Pnq0w35rAqqd/XMvJGX0OWpKDX8ZpuF5XPN8aF2p12p2kBlxo4cfxQwRZ2dnipMV +poY1GWtHLdbRWPOAihxCium++1mRAzmILHHP3C4X3/qAZ2ilMDQnZK9kpr6gj/zr +s0uk3kfH5yf9OEeEJOHU3fhSWWhrRqKi7vaYaDIwh1DFgCxGcbrYQef71o+16AiN +B3XcF73/AgMBAAGjgZcwgZQwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsG +AQUFBwMBBggrBgEFBQcDAjAMBgNVHRMBAf8EAjAAMB0GA1UdDgQWBBTw7yUYqbAv +BJw3zZctLUfUi0vyqzAfBgNVHSMEGDAWgBS3hxTaN9B7eF8xr0DKLZ3b5vFnrDAV +BgNVHREEDjAMhwSsEAUohwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQCBljfge2fC +5X+tt1v7AkWoH5xpymEVvuIWWJmT/6FNTn1rdnIaxWCQzJbBCXjZS/75lKnwfrTB +ZK7iMv1GQaBevT/qm+7GcApsr5nFrI/MvzrvY+XRqvU8gsRhUjHYI+JPLGWxhzZD +pQdJTAGvsDLHu1VVdHR2KsE4M8ceGq58f7zPSq/suf+8SYEOFP8zfuXX1HfUrFVe +69ZQw8PZh4EYL0PYtE5BYfe9iJyFNNtZiejiribMQz/NtNkKM3M+Hm40ULGuwHXq +bKDjDq1PvmpVb/kKO/xADTIAbqproXETZ4W2keI3hwm6NxysvEbYV9+puQBXQqwT +KOt9Lo4ofSAF +-----END CERTIFICATE----- diff --git a/components/test_util/src/security.rs b/components/test_util/src/security.rs index 4ed99be48bca..5ff4610d44e3 100644 --- a/components/test_util/src/security.rs +++ b/components/test_util/src/security.rs @@ -7,10 +7,10 @@ use tikv_util::security::SecurityConfig; pub fn new_security_cfg() -> SecurityConfig { let p = PathBuf::from(env!("CARGO_MANIFEST_DIR")); SecurityConfig { - ca_path: format!("{}", p.join("data/ca.crt").display()), - cert_path: format!("{}", p.join("data/server.crt").display()), - key_path: format!("{}", p.join("data/server.pem").display()), - override_ssl_target: "example.com".to_owned(), + ca_path: format!("{}", p.join("data/ca.pem").display()), + cert_path: format!("{}", p.join("data/server.pem").display()), + key_path: format!("{}", p.join("data/key.pem").display()), + override_ssl_target: "".to_owned(), cipher_file: "".to_owned(), } } diff --git a/src/import/service.rs b/src/import/service.rs index d73222104185..2ccaefeece24 100644 --- a/src/import/service.rs +++ b/src/import/service.rs @@ -1,7 +1,6 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. -use futures::Future; -use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink}; +use grpcio::{RpcStatus, RpcStatusCode}; use super::Error; @@ -9,16 +8,6 @@ pub fn make_rpc_error(err: Error) -> RpcStatus { RpcStatus::new(RpcStatusCode::Unknown, Some(format!("{:?}", err))) } -pub fn send_rpc_error(ctx: RpcContext<'_>, sink: UnarySink, error: E) -where - Error: From, -{ - let err = make_rpc_error(Error::from(error)); - ctx.spawn(sink.fail(err).map_err(|e| { - warn!("send rpc failed"; "err" => %e); - })); -} - macro_rules! send_rpc_response { ($res:ident, $sink:ident, $label:ident, $timer:ident) => {{ let res = match $res { diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index e7ebb6bb52e8..8a9f8b22028a 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -207,7 +207,12 @@ impl ImportSst for ImportSSTService { let (cb, future) = paired_future_callback(); if let Err(e) = self.router.send_command(cmd, Callback::Write(cb)) { - return send_rpc_error(ctx, sink, e); + let mut resp = IngestResponse::default(); + resp.set_error(e.into()); + ctx.spawn(sink.success(resp).map_err(|e| { + warn!("send rpc failed"; "err" => %e); + })); + return; } ctx.spawn( diff --git a/src/raftstore/store/fsm/apply.rs b/src/raftstore/store/fsm/apply.rs index edc578a64be6..721f54a971e5 100644 --- a/src/raftstore/store/fsm/apply.rs +++ b/src/raftstore/store/fsm/apply.rs @@ -524,6 +524,24 @@ fn should_write_to_engine(cmd: &RaftCmdRequest, kv_wb_keys: usize) -> bool { false } +/// Checks if a write is needed to be issued after handling the command. +fn should_sync_log(cmd: &RaftCmdRequest) -> bool { + if cmd.has_admin_request() { + return true; + } + + for req in cmd.get_requests() { + // After ingest sst, sst files are deleted quickly. As a result, + // ingest sst command can not be handled again and must be synced. + // See more in Cleanup worker. + if req.has_ingest_sst() { + return true; + } + } + + false +} + /// A struct that stores the state related to Merge. /// /// When executing a `CommitMerge`, the source peer may have not applied @@ -882,9 +900,8 @@ impl ApplyDelegate { ); } - if cmd.has_admin_request() { - apply_ctx.sync_log_hint = true; - } + // Set sync log hint if the cmd requires so. + apply_ctx.sync_log_hint |= should_sync_log(&cmd); let is_conf_change = get_change_peer_cmd(&cmd).is_some(); apply_ctx.host.pre_apply(&self.region, &cmd); @@ -1114,8 +1131,7 @@ impl ApplyDelegate { AdminCmdType::CommitMerge => self.exec_commit_merge(ctx, request), AdminCmdType::RollbackMerge => self.exec_rollback_merge(ctx, request), AdminCmdType::InvalidAdmin => Err(box_err!("unsupported admin command type")), - } - .unwrap(); + }?; response.set_cmd_type(cmd_type); let mut resp = RaftCmdResponse::new(); @@ -3058,6 +3074,27 @@ mod tests { e } + #[test] + fn test_should_sync_log() { + // Admin command + let mut req = RaftCmdRequest::default(); + req.mut_admin_request() + .set_cmd_type(AdminCmdType::ComputeHash); + assert_eq!(should_sync_log(&req), true); + + // IngestSst command + let mut req = Request::default(); + req.set_cmd_type(CmdType::IngestSST); + req.set_ingest_sst(IngestSSTRequest::default()); + let mut cmd = RaftCmdRequest::default(); + cmd.mut_requests().push(req); + assert_eq!(should_sync_log(&cmd), true); + + // Normal command + let req = RaftCmdRequest::default(); + assert_eq!(should_sync_log(&req), false); + } + #[test] fn test_should_write_to_engine() { // ComputeHash command diff --git a/src/server/errors.rs b/src/server/errors.rs index 80f7a662bfb5..ea0d13f89e82 100644 --- a/src/server/errors.rs +++ b/src/server/errors.rs @@ -8,6 +8,7 @@ use std::result; use futures::Canceled; use grpcio::Error as GrpcError; use hyper::Error as HttpError; +use openssl::error::ErrorStack as OpenSSLError; use protobuf::ProtobufError; use super::snap::Task as SnapTask; @@ -106,6 +107,12 @@ quick_error! { display("{:?}", err) description(err.description()) } + OpenSSL(err: OpenSSLError) { + from() + cause(err) + display("{:?}", err) + description(err.description()) + } } } diff --git a/src/server/status_server.rs b/src/server/status_server.rs index 8ef04c60f881..e19026e325e0 100644 --- a/src/server/status_server.rs +++ b/src/server/status_server.rs @@ -1,26 +1,33 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. use futures::future::{err, ok}; +use futures::stream::Stream; use futures::sync::oneshot::{Receiver, Sender}; use futures::{self, Future}; +use hyper::server::Builder as HyperBuilder; use hyper::service::service_fn; use hyper::{self, Body, Method, Request, Response, Server, StatusCode}; +use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; #[cfg(target_os = "linux")] use pprof; #[cfg(target_os = "linux")] use pprof::protos::Message; #[cfg(target_os = "linux")] use regex::Regex; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_openssl::SslAcceptorExt; +use tokio_tcp::TcpListener; use tokio_threadpool::{Builder, ThreadPool}; +use std::error::Error as StdError; use std::net::SocketAddr; use std::str::FromStr; -use tikv_util::timer::GLOBAL_TIMER_HANDLE; - use super::Result; use tikv_util::collections::HashMap; use tikv_util::metrics::dump; +use tikv_util::security::SecurityConfig; +use tikv_util::timer::GLOBAL_TIMER_HANDLE; pub struct StatusServer { thread_pool: ThreadPool, @@ -210,12 +217,12 @@ impl StatusServer { ) } - pub fn start(&mut self, status_addr: String) -> Result<()> { - let addr = SocketAddr::from_str(&status_addr)?; - - // TODO: support TLS for the status server. - let builder = Server::try_bind(&addr)?; - + fn start_serve(&mut self, builder: HyperBuilder) + where + I: Stream + Send + 'static, + I::Error: Into>, + I::Item: AsyncRead + AsyncWrite + Send + 'static, + { // Start to serve. let server = builder.serve(move || { // Create a status service. @@ -250,11 +257,48 @@ impl StatusServer { }, ) }); - self.addr = Some(server.local_addr()); + let graceful = server .with_graceful_shutdown(self.rx.take().unwrap()) .map_err(|e| error!("Status server error: {:?}", e)); self.thread_pool.spawn(graceful); + } + + pub fn start(&mut self, status_addr: String, security_config: &SecurityConfig) -> Result<()> { + let addr = SocketAddr::from_str(&status_addr)?; + + let tcp_listener = TcpListener::bind(&addr)?; + self.addr = Some(tcp_listener.local_addr()?); + + if !security_config.cert_path.is_empty() + && !security_config.key_path.is_empty() + && !security_config.ca_path.is_empty() + { + let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls())?; + acceptor.set_ca_file(&security_config.ca_path)?; + acceptor.set_certificate_chain_file(&security_config.cert_path)?; + acceptor.set_private_key_file(&security_config.key_path, SslFiletype::PEM)?; + let acceptor = acceptor.build(); + + let tls_stream = tcp_listener + .incoming() + .and_then(move |stream| { + acceptor.accept_async(stream).then(|r| match r { + Ok(stream) => Ok(Some(stream)), + Err(e) => { + error!("failed to accept TLS connection"; "err" => ?e); + Ok(None) + } + }) + }) + .filter_map(|x| x); + let server = Server::builder(tls_stream); + self.start_serve(server); + } else { + let tcp_stream = tcp_listener.incoming(); + let server = Server::builder(tcp_stream); + self.start_serve(server); + } Ok(()) } @@ -278,12 +322,21 @@ impl StatusServer { mod tests { use crate::server::status_server::StatusServer; use futures::future::{lazy, Future}; - use hyper::{Client, StatusCode, Uri}; + use hyper::client::HttpConnector; + use hyper::{Body, Client, StatusCode, Uri}; + use hyper_openssl::HttpsConnector; + use openssl::ssl::{SslConnector, SslMethod}; + + use std::env; + use std::path::PathBuf; + + use test_util::new_security_cfg; + use tikv_util::security::SecurityConfig; #[test] fn test_status_service() { let mut status_server = StatusServer::new(1); - let _ = status_server.start("127.0.0.1:0".to_string()); + let _ = status_server.start("127.0.0.1:0".to_string(), &SecurityConfig::default()); let client = Client::new(); let uri = Uri::builder() .scheme("http") @@ -305,4 +358,43 @@ mod tests { handle.wait().unwrap(); status_server.stop(); } + + #[test] + fn test_security_status_service() { + let mut status_server = StatusServer::new(1); + let _ = status_server.start("127.0.0.1:0".to_string(), &new_security_cfg()); + let p = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let mut connector = HttpConnector::new(1); + connector.enforce_http(false); + let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); + ssl.set_ca_file(format!( + "{}", + p.join("components/test_util/data/ca.pem").display() + )) + .unwrap(); + + let ssl = HttpsConnector::with_connector(connector, ssl).unwrap(); + let client = Client::builder().build::<_, Body>(ssl); + + let uri = Uri::builder() + .scheme("https") + .authority(status_server.listening_addr().to_string().as_str()) + .path_and_query("/metrics") + .build() + .unwrap(); + + let handle = status_server.thread_pool.spawn_handle(lazy(move || { + client + .get(uri) + .map(|res| { + assert_eq!(res.status(), StatusCode::OK); + }) + .map_err(|err| { + panic!("response status is not OK: {:?}", err); + }) + })); + handle.wait().unwrap(); + status_server.stop(); + } } diff --git a/tests/integrations/import/sst_service.rs b/tests/integrations/import/sst_service.rs index 8750a5981e73..43117c2dedc9 100644 --- a/tests/integrations/import/sst_service.rs +++ b/tests/integrations/import/sst_service.rs @@ -214,8 +214,28 @@ fn test_cleanup_sst() { check_sst_deleted(&import, &meta, &data); } +#[test] +fn test_ingest_sst_region_not_found() { + let (_cluster, mut ctx_not_found, _, import) = new_cluster_and_tikv_import_client(); + + let temp_dir = TempDir::new("test_ingest_sst_errors").unwrap(); + + ctx_not_found.set_region_id(1 << 31); // A large region id that must no exists. + let sst_path = temp_dir.path().join("test_split.sst"); + let sst_range = (0, 100); + let (mut meta, _data) = gen_sst_file(sst_path, sst_range); + meta.set_region_id(ctx_not_found.get_region_id()); + meta.set_region_epoch(ctx_not_found.get_region_epoch().clone()); + + let mut ingest = IngestRequest::default(); + ingest.set_context(ctx_not_found); + ingest.set_sst(meta); + let resp = import.ingest(&ingest).unwrap(); + assert!(resp.get_error().has_region_not_found()); +} + fn new_sst_meta(crc32: u32, length: u64) -> SSTMeta { - let mut m = SSTMeta::new(); + let mut m = SSTMeta::default(); m.set_uuid(Uuid::new_v4().as_bytes().to_vec()); m.set_crc32(crc32); m.set_length(length); diff --git a/tiflash-proxy/src/server.rs b/tiflash-proxy/src/server.rs index 9a7e15f6ce8c..1a8a3dec4c80 100644 --- a/tiflash-proxy/src/server.rs +++ b/tiflash-proxy/src/server.rs @@ -327,7 +327,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig, security_mgr: Arc %e