diff --git a/stable/network-benchmark-testing/README.md b/stable/network-benchmark-testing/README.md new file mode 100644 index 0000000..dc4c10f --- /dev/null +++ b/stable/network-benchmark-testing/README.md @@ -0,0 +1,52 @@ +# Testing Kubernetes cluster network performance + +A standardized benchmark to measure Kubernetes networking performance on multiple host platforms and network stacks. + +### Usage +The test suite can be executed via a single Go binary that triggers all the automated testing present on the netperf Docker container. + +***NOTE*** A minimum of two Kubernetes worker nodes are necessary for this test. If using minikube you must enable the Docker registry as the image is needed on multiple hosts. + +```shell +#create 2 node cluster +minikube start --nodes 2 --cpus=6 --memory=20019 --kubernetes-version=v1.21.0 + +#enable the docker registry and push image to make available on all nodes +minikube addons enable registry +docker run --rm -it --network=host alpine ash -c "apk add socat && socat TCP-LISTEN:5000,reuseaddr,fork TCP:$(minikube ip):5000" & +docker build ./docker -t netperf:latest +docker tag netperf:latest localhost:5000/netperf:latest +docker push localhost:5000/netperf:latest + +#run the tests +cd docker && go run ./launch.go --iterations 1 +``` + +### Kubernetes Network benchmark results +The tests will take some time, after ~35mins you should see the following output. + +```shell +│ ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT │ +│ MSS , Maximum, 96, 160, 224, 288, 352, 416, 480, 544, 608, 672, 736, 800, 864, 928, 992, 1056, 1120, 1184, 1248, 1312, 1376, 1440, │ +│ 1 iperf TCP. Same VM using Pod IP ,43787.000000,40615,41787,41644,34696,38652,40741,39204,33203,39646,24538,43787,32438,27078,16714,33450,41896,35655,41853,40867,41655,40782,41521, │ +│ 2 iperf TCP. Same VM using Virtual IP ,45900.000000,37386,36459,37758,40965,41688,38859,39247,41394,37100,35232,40901,35395,36015,41664,45293,44693,40741,42982,41322,44138,40716,45900, │ +│ 3 iperf TCP. Remote VM using Pod IP ,27177.000000,26005,24857,26076,24670,24648,24399,24301,23858,23164,21852,24095,24304,25836,24940,25639,25975,22841,24274,24734,26783,24797,27177, │ +│ 4 iperf TCP. Remote VM using Virtual IP ,26820.000000,24622,24673,24447,23553,22981,23879,23464,24202,25872,24083,26820,23440,25935,24738,25447,25284,23834,25140,24318,25430,24247,25509, │ +│ 5 iperf TCP. Hairpin Pod to own Virtual IP ,43544.000000,38842,39599,38957,39019,39219,38770,39179,39829,41485,39045,43544,39219,38747,42666,40890,41004,37079,37665,37634,41949,37629,42241, │ +│ 6 iperf SCTP. Same VM using Pod IP ,2547.000000,2472,2512,2505,2495,2502,2499,2531,1329,1653,1745,1568,1265,175,1259,506,484,2249,2135,2254,2332,2414,2547, │ +│ 7 iperf SCTP. Same VM using Virtual IP ,2491.000000,2491,2451,2435,2441,2444,2429,2411,1229,1680,1490,1523,194,300,525,698,1557,1957,2119,2060,2250,2330,2408, │ +│ 8 iperf SCTP. Remote VM using Pod IP ,1467.000000,1319,1416,1384,1361,1467,1462,1393,544,1073,1079,921,1020,844,1150,1103,382,1205,1352,1408,1422,1418,1437, │ +│ 9 iperf SCTP. Remote VM using Virtual IP ,1540.000000,1438,1409,1216,1364,1534,1451,1540,948,863,1167,995,726,767,736,1240,1231,1191,1400,1441,1476,1441,1517, │ +│ 10 iperf SCTP. Hairpin Pod to own Virtual IP ,2356.000000,2236,2344,2300,2356,2309,2149,2146,1257,1476,1424,1378,807,364,954,1447,1409,1866,2025,2143,2210,1517,1131, │ +│ 11 iperf UDP. Same VM using Pod IP ,4130.000000,4130, │ +│ 12 iperf UDP. Same VM using Virtual IP ,3677.000000,3677, │ +│ 13 iperf UDP. Remote VM using Pod IP ,1314.000000,1314, │ +│ 14 iperf UDP. Remote VM using Virtual IP ,1281.000000,1281, │ +│ 15 netperf. Same VM using Pod IP ,9016.800000,9016.80, │ +│ 16 netperf. Same VM using Virtual IP ,0.000000,0, │ +│ 17 netperf. Remote VM using Pod IP ,7078.690000,7078.69, │ +│ 18 netperf. Remote VM using Virtual IP ,0.000000,0, +``` + +### Understanding the benchmark data +More information and charting these results [Benchmarking Kubernetes Networking](https://github.com/kubernetes/perf-tests/tree/master/network/benchmarks/netperf#output-raw-csv-data) \ No newline at end of file diff --git a/stable/network-benchmark-testing/docker/Dockerfile b/stable/network-benchmark-testing/docker/Dockerfile new file mode 100644 index 0000000..4f02128 --- /dev/null +++ b/stable/network-benchmark-testing/docker/Dockerfile @@ -0,0 +1,31 @@ +ARG GOLANG_VERSION=1.18 +FROM golang:${GOLANG_VERSION} as builder +WORKDIR /workspace + +COPY nptests/nptest.go nptest.go +COPY go.sum go.sum +COPY go.mod go.mod + +RUN go build -o nptests + +FROM debian:jessie +ENV LD_LIBRARY_PATH=/usr/local/lib + +# install binary and remove cache +RUN apt-get update \ + && apt-get install -y curl wget net-tools gcc make libsctp-dev \ + && rm -rf /var/lib/apt/lists/* +RUN mkdir -p /tmp + + +# Download and build iperf3 from sources +RUN curl -LO https://downloads.es.net/pub/iperf/iperf-3.1.tar.gz && tar zxf iperf-3.1.tar.gz +RUN cd iperf-3.1 && ./configure --prefix=/usr/local --bindir /usr/local/bin && make && make install + +# Download and build netperf from sources +RUN curl -LO https://github.com/HewlettPackard/netperf/archive/netperf-2.7.0.tar.gz && tar -xzf netperf-2.7.0.tar.gz && mv netperf-netperf-2.7.0/ netperf-2.7.0 +RUN cd netperf-2.7.0 && ./configure --prefix=/usr/local --bindir /usr/local/bin && make && make install + +COPY --from=builder /workspace/nptests /usr/bin/ + +ENTRYPOINT ["nptests"] \ No newline at end of file diff --git a/stable/network-benchmark-testing/docker/go.mod b/stable/network-benchmark-testing/docker/go.mod new file mode 100644 index 0000000..999a28e --- /dev/null +++ b/stable/network-benchmark-testing/docker/go.mod @@ -0,0 +1,28 @@ +module k8s.io/perf-tests/network + +go 1.13 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/ghodss/yaml v1.0.1-0.20180820084758-c7ce16629ff4 // indirect + github.com/gogo/protobuf v1.1.2-0.20181010092945-fd322a3c4963 // indirect + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect + github.com/google/btree v1.0.0 // indirect + github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect + github.com/googleapis/gnostic v0.2.3-0.20180520015035-48a0ecefe2e4 // indirect + github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect + github.com/imdario/mergo v0.3.6 // indirect + github.com/json-iterator/go v1.1.6-0.20180914014843-2433035e5132 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/peterbourgon/diskv v2.0.2-0.20180312054125-0646ccaebea1+incompatible // indirect + github.com/spf13/pflag v1.0.3 // indirect + github.com/stretchr/testify v1.5.1 // indirect + golang.org/x/oauth2 v0.0.0-20160902055913-3c3a985cb79f // indirect + golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect + google.golang.org/appengine v1.6.6 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + k8s.io/api v0.0.0-20181011064954-26c7a45db378 + k8s.io/apimachinery v0.0.0-20181011064652-56cf97ad69c7 + k8s.io/client-go v0.0.0-20181011105049-9b03088ac34f +) \ No newline at end of file diff --git a/stable/network-benchmark-testing/docker/go.sum b/stable/network-benchmark-testing/docker/go.sum new file mode 100644 index 0000000..5e8afcb --- /dev/null +++ b/stable/network-benchmark-testing/docker/go.sum @@ -0,0 +1,64 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/ghodss/yaml v1.0.1-0.20180820084758-c7ce16629ff4 h1:PaTU+9BARuIOAz1ixvps39DJjfq/SxOj3axzIlh7nFo= +github.com/ghodss/yaml v1.0.1-0.20180820084758-c7ce16629ff4/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gogo/protobuf v1.1.2-0.20181010092945-fd322a3c4963 h1:qxvPMOEHQtKCpBDHjx3NiCnM3NfMEyeA9c72+IljOP4= +github.com/gogo/protobuf v1.1.2-0.20181010092945-fd322a3c4963/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= +github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= +github.com/googleapis/gnostic v0.2.3-0.20180520015035-48a0ecefe2e4 h1:Z09Qt6AGDtg0cC/YgnX/iymzIqmZf5aasP5JZFxmkNQ= +github.com/googleapis/gnostic v0.2.3-0.20180520015035-48a0ecefe2e4/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= +github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= +github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= +github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/json-iterator/go v1.1.6-0.20180914014843-2433035e5132 h1:Y1mRUIuPBTUZfUBQW0V9iItFGJnksjotNXSOzTtxfIo= +github.com/json-iterator/go v1.1.6-0.20180914014843-2433035e5132/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/peterbourgon/diskv v2.0.2-0.20180312054125-0646ccaebea1+incompatible h1:FhnA4iH8T/yYW+AolPONZjGE897wxj3MAzfEbrZkSYw= +github.com/peterbourgon/diskv v2.0.2-0.20180312054125-0646ccaebea1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/oauth2 v0.0.0-20160902055913-3c3a985cb79f h1:VWt05OS3Al9w09GSPgltoHP90whAHlpik/Bys7HVEDE= +golang.org/x/oauth2 v0.0.0-20160902055913-3c3a985cb79f/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= +golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc= +google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +k8s.io/api v0.0.0-20181011064954-26c7a45db378 h1:lBCdUUoFs8PEMxhQoSNnVXxpKcrNrzuOw4/ofBd+0do= +k8s.io/api v0.0.0-20181011064954-26c7a45db378/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= +k8s.io/apimachinery v0.0.0-20181011064652-56cf97ad69c7 h1:l1WAFAYv7ba/kpouP/gXH+l3ZqsRKbGH8F1VMDYmIbM= +k8s.io/apimachinery v0.0.0-20181011064652-56cf97ad69c7/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= +k8s.io/client-go v0.0.0-20181011105049-9b03088ac34f h1:+izcn/RUZFQiB2q/wTE/xgLViQ5iFIZhlQParzg2ha4= +k8s.io/client-go v0.0.0-20181011105049-9b03088ac34f/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= \ No newline at end of file diff --git a/stable/network-benchmark-testing/docker/launch.go b/stable/network-benchmark-testing/docker/launch.go new file mode 100644 index 0000000..627a499 --- /dev/null +++ b/stable/network-benchmark-testing/docker/launch.go @@ -0,0 +1,453 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* + launch.go + + Launch the netperf tests + + 1. Launch the netperf-orch service + 2. Launch the worker pods + 3. Wait for the output csv data to show up in orchestrator pod logs +*/ + +package main + +import ( + "flag" + "fmt" + "os" + "strings" + "time" + + api "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + csvDataMarker = "GENERATING CSV OUTPUT" + csvEndDataMarker = "END CSV DATA" + runUUID = "latest" + orchestratorPort = 5202 + iperf3Port = 5201 + netperfPort = 12865 +) + +var ( + iterations int + hostnetworking bool + tag string + kubeConfig string + testNamespace string + netperfImage string + cleanupOnly bool + + everythingSelector metav1.ListOptions = metav1.ListOptions{} + + primaryNode api.Node + secondaryNode api.Node +) + +func init() { + flag.BoolVar(&hostnetworking, "hostnetworking", false, + "(boolean) Enable Host Networking Mode for PODs") + flag.IntVar(&iterations, "iterations", 1, + "Number of iterations to run") + flag.StringVar(&tag, "tag", runUUID, "CSV file suffix") + flag.StringVar(&netperfImage, "image", "localhost:5000/netperf:latest", "Docker image used to run the network tests") + flag.StringVar(&testNamespace, "namespace", "netperf", "Test namespace to run netperf pods") + defaultKubeConfig := fmt.Sprintf("%s/.kube/config", os.Getenv("HOME")) + flag.StringVar(&kubeConfig, "kubeConfig", defaultKubeConfig, + "Location of the kube configuration file ($HOME/.kube/config") + flag.BoolVar(&cleanupOnly, "cleanup", false, + "(boolean) Run the cleanup resources phase only (use this flag to clean up orphaned resources from a test run)") +} + +func setupClient() *kubernetes.Clientset { + config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + panic(err) + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err) + } + + return clientset +} + +// getMinions : Only return schedulable/worker nodes +func getMinionNodes(c *kubernetes.Clientset) *api.NodeList { + nodes, err := c.CoreV1().Nodes().List( + metav1.ListOptions{ + FieldSelector: "spec.unschedulable=false", + }) + if err != nil { + fmt.Println("Failed to fetch nodes", err) + return nil + } + return nodes +} + +func cleanup(c *kubernetes.Clientset) { + // Cleanup existing rcs, pods and services in our namespace + rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(everythingSelector) + if err != nil { + fmt.Println("Failed to get replication controllers", err) + return + } + for _, rc := range rcs.Items { + fmt.Println("Deleting rc", rc.GetName()) + if err := c.CoreV1().ReplicationControllers(testNamespace).Delete( + rc.GetName(), &metav1.DeleteOptions{}); err != nil { + fmt.Println("Failed to delete rc", rc.GetName(), err) + } + } + pods, err := c.CoreV1().Pods(testNamespace).List(everythingSelector) + if err != nil { + fmt.Println("Failed to get pods", err) + return + } + for _, pod := range pods.Items { + fmt.Println("Deleting pod", pod.GetName()) + if err := c.CoreV1().Pods(testNamespace).Delete(pod.GetName(), &metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil { + fmt.Println("Failed to delete pod", pod.GetName(), err) + } + } + svcs, err := c.CoreV1().Services(testNamespace).List(everythingSelector) + if err != nil { + fmt.Println("Failed to get services", err) + return + } + for _, svc := range svcs.Items { + fmt.Println("Deleting svc", svc.GetName()) + err := c.CoreV1().Services(testNamespace).Delete( + svc.GetName(), &metav1.DeleteOptions{}) + if err != nil { + fmt.Println("Failed to get service", err) + } + } +} + +// createServices: Long-winded function to programmatically create our two services +func createServices(c *kubernetes.Clientset) bool { + // Create our namespace if not present + if _, err := c.CoreV1().Namespaces().Get(testNamespace, metav1.GetOptions{}); err != nil { + _, err := c.CoreV1().Namespaces().Create(&api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}) + if err != nil { + fmt.Println("Failed to create service", err) + } + } + + // Create the orchestrator service that points to the coordinator pod + orchLabels := map[string]string{"app": "netperf-orch"} + orchService := &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "netperf-orch", + }, + Spec: api.ServiceSpec{ + Selector: orchLabels, + Ports: []api.ServicePort{{ + Name: "netperf-orch", + Protocol: api.ProtocolTCP, + Port: orchestratorPort, + TargetPort: intstr.FromInt(orchestratorPort), + }}, + Type: api.ServiceTypeClusterIP, + }, + } + if _, err := c.CoreV1().Services(testNamespace).Create(orchService); err != nil { + fmt.Println("Failed to create orchestrator service", err) + return false + } + fmt.Println("Created orchestrator service") + + // Create the netperf-w2 service that points a clusterIP at the worker 2 pod + netperfW2Labels := map[string]string{"app": "netperf-w2"} + netperfW2Service := &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "netperf-w2", + }, + Spec: api.ServiceSpec{ + Selector: netperfW2Labels, + Ports: []api.ServicePort{ + { + Name: "netperf-w2", + Protocol: api.ProtocolTCP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-sctp", + Protocol: api.ProtocolSCTP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-udp", + Protocol: api.ProtocolUDP, + Port: iperf3Port, + TargetPort: intstr.FromInt(iperf3Port), + }, + { + Name: "netperf-w2-netperf", + Protocol: api.ProtocolTCP, + Port: netperfPort, + TargetPort: intstr.FromInt(netperfPort), + }, + }, + Type: api.ServiceTypeClusterIP, + }, + } + if _, err := c.CoreV1().Services(testNamespace).Create(netperfW2Service); err != nil { + fmt.Println("Failed to create netperf-w2 service", err) + return false + } + fmt.Println("Created netperf-w2 service") + return true +} + +// createRCs - Create replication controllers for all workers and the orchestrator +func createRCs(c *kubernetes.Clientset) bool { + // Create the orchestrator RC + name := "netperf-orch" + fmt.Println("Creating replication controller", name) + replicas := int32(1) + + _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(&api.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: api.ReplicationControllerSpec{ + Replicas: &replicas, + Selector: map[string]string{"app": name}, + Template: &api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": name}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: name, + Image: netperfImage, + Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}}, + Args: []string{"--mode=orchestrator"}, + ImagePullPolicy: "Always", + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, + }, + }, + }) + if err != nil { + fmt.Println("Error creating orchestrator replication controller", err) + return false + } + fmt.Println("Created orchestrator replication controller") + for i := 1; i <= 3; i++ { + // Bring up pods slowly + time.Sleep(3 * time.Second) + kubeNode := primaryNode.GetName() + if i == 3 { + kubeNode = secondaryNode.GetName() + } + name = fmt.Sprintf("netperf-w%d", i) + fmt.Println("Creating replication controller", name) + portSpec := []api.ContainerPort{} + if i > 1 { + // Worker W1 is a client-only pod - no ports are exposed + portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP}) + portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP}) + } + + workerEnv := []api.EnvVar{ + {Name: "worker", Value: name}, + {Name: "kubeNode", Value: kubeNode}, + {Name: "podname", Value: name}, + } + + replicas := int32(1) + + _, err := c.CoreV1().ReplicationControllers(testNamespace).Create(&api.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: api.ReplicationControllerSpec{ + Replicas: &replicas, + Selector: map[string]string{"app": name}, + Template: &api.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": name}, + }, + Spec: api.PodSpec{ + NodeName: kubeNode, + Containers: []api.Container{ + { + Name: name, + Image: netperfImage, + Ports: portSpec, + Args: []string{"--mode=worker"}, + Env: workerEnv, + ImagePullPolicy: "Always", + }, + }, + TerminationGracePeriodSeconds: new(int64), + }, + }, + }, + }) + if err != nil { + fmt.Println("Error creating orchestrator replication controller", name, ":", err) + return false + } + } + + return true +} + +func getOrchestratorPodName(pods *api.PodList) string { + for _, pod := range pods.Items { + if strings.Contains(pod.GetName(), "netperf-orch-") { + return pod.GetName() + } + } + return "" +} + +// Retrieve the logs for the pod/container and check if csv data has been generated +func getCsvResultsFromPod(c *kubernetes.Clientset, podName string) *string { + body, err := c.CoreV1().Pods(testNamespace).GetLogs(podName, &api.PodLogOptions{Timestamps: false}).DoRaw() + if err != nil { + fmt.Printf("Error (%s) reading logs from pod %s", err, podName) + return nil + } + logData := string(body) + index := strings.Index(logData, csvDataMarker) + endIndex := strings.Index(logData, csvEndDataMarker) + if index == -1 || endIndex == -1 { + return nil + } + csvData := string(body[index+len(csvDataMarker)+1 : endIndex]) + return &csvData +} + +// processCsvData : Process the CSV datafile and generate line and bar graphs +func processCsvData(csvData *string) bool { + t := time.Now().UTC() + outputFileDirectory := fmt.Sprintf("results_%s-%s", testNamespace, tag) + outputFilePrefix := fmt.Sprintf("%s-%s_%s.", testNamespace, tag, t.Format("20060102150405")) + fmt.Printf("Test concluded - CSV raw data written to %s/%scsv\n", outputFileDirectory, outputFilePrefix) + if _, err := os.Stat(outputFileDirectory); os.IsNotExist(err) { + err := os.Mkdir(outputFileDirectory, 0766) + if err != nil { + fmt.Println("Error creating directory", err) + return false + } + + } + fd, err := os.OpenFile(fmt.Sprintf("%s/%scsv", outputFileDirectory, outputFilePrefix), os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + fmt.Println("ERROR writing output CSV datafile", err) + return false + } + _, err = fd.WriteString(*csvData) + if err != nil { + fmt.Println("Error writing string", err) + return false + } + fd.Close() + return true +} + +func executeTests(c *kubernetes.Clientset) bool { + for i := 0; i < iterations; i++ { + cleanup(c) + if !createServices(c) { + fmt.Println("Failed to create services - aborting test") + return false + } + time.Sleep(3 * time.Second) + if !createRCs(c) { + fmt.Println("Failed to create replication controllers - aborting test") + return false + } + fmt.Println("Waiting for netperf pods to start up") + + var orchestratorPodName string + for len(orchestratorPodName) == 0 { + fmt.Println("Waiting for orchestrator pod creation") + time.Sleep(60 * time.Second) + var pods *api.PodList + var err error + if pods, err = c.CoreV1().Pods(testNamespace).List(everythingSelector); err != nil { + fmt.Println("Failed to fetch pods - waiting for pod creation", err) + continue + } + orchestratorPodName = getOrchestratorPodName(pods) + } + fmt.Println("Orchestrator Pod is", orchestratorPodName) + + // The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container + for { + // Monitor the orchestrator pod for the CSV results file + csvdata := getCsvResultsFromPod(c, orchestratorPodName) + if csvdata == nil { + fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...waiting for orchestrator to write CSV file...") + time.Sleep(60 * time.Second) + continue + } + if processCsvData(csvdata) { + break + } + } + fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i) + } + return false +} + +func main() { + flag.Parse() + fmt.Println("Network Performance Test") + fmt.Println("Parameters :") + fmt.Println("Iterations : ", iterations) + fmt.Println("Host Networking : ", hostnetworking) + fmt.Println("Test Namespace : ", testNamespace) + fmt.Println("Docker image : ", netperfImage) + fmt.Println("------------------------------------------------------------") + + var c *kubernetes.Clientset + if c = setupClient(); c == nil { + fmt.Println("Failed to setup REST client to Kubernetes cluster") + return + } + if cleanupOnly { + cleanup(c) + return + } + nodes := getMinionNodes(c) + if nodes == nil { + return + } + if len(nodes.Items) < 2 { + fmt.Println("Insufficient number of nodes for test (need minimum 2 nodes)") + return + } + primaryNode = nodes.Items[0] + secondaryNode = nodes.Items[1] + fmt.Printf("Selected primary,secondary nodes = (%s, %s)\n", primaryNode.GetName(), secondaryNode.GetName()) + executeTests(c) + cleanup(c) +} diff --git a/stable/network-benchmark-testing/docker/nptests/nptest.go b/stable/network-benchmark-testing/docker/nptests/nptest.go new file mode 100644 index 0000000..448fbb4 --- /dev/null +++ b/stable/network-benchmark-testing/docker/nptests/nptest.go @@ -0,0 +1,707 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* + nptest.go + + Dual-mode program - runs as both the orchestrator and as the worker nodes depending on command line flags + The RPC API is contained wholly within this file. +*/ + +package main + +// Imports only base Golang packages +import ( + "bytes" + "flag" + "fmt" + "log" + "net" + "net/http" + "net/rpc" + "os" + "os/exec" + "regexp" + "strconv" + "sync" + "time" +) + +type point struct { + mss int + bandwidth string + index int +} + +var mode string +var port string +var host string +var worker string +var kubenode string +var podname string + +var workerStateMap map[string]*workerState + +var iperfTCPOutputRegexp *regexp.Regexp +var iperfSCTPOutputRegexp *regexp.Regexp +var iperfUDPOutputRegexp *regexp.Regexp +var netperfOutputRegexp *regexp.Regexp +var iperfCPUOutputRegexp *regexp.Regexp + +var dataPoints map[string][]point +var dataPointKeys []string +var datapointsFlushed bool + +var globalLock sync.Mutex + +const ( + workerMode = "worker" + orchestratorMode = "orchestrator" + iperf3Path = "/usr/local/bin/iperf3" + netperfPath = "/usr/local/bin/netperf" + netperfServerPath = "/usr/local/bin/netserver" + outputCaptureFile = "/tmp/output.txt" + mssMin = 96 + mssMax = 1460 + mssStepSize = 64 + parallelStreams = "8" + rpcServicePort = "5202" + localhostIPv4Address = "127.0.0.1" +) + +const ( + iperfTCPTest = iota + iperfUDPTest = iota + iperfSctpTest = iota + netperfTest = iota +) + +// NetPerfRPC service that exposes RegisterClient and ReceiveOutput for clients +type NetPerfRPC int + +// ClientRegistrationData stores a data about a single client +type ClientRegistrationData struct { + Host string + KubeNode string + Worker string + IP string +} + +// IperfClientWorkItem represents a single task for an Iperf client +type IperfClientWorkItem struct { + Host string + Port string + MSS int + Type int +} + +// IperfServerWorkItem represents a single task for an Iperf server +type IperfServerWorkItem struct { + ListenPort string + Timeout int +} + +// WorkItem represents a single task for a worker +type WorkItem struct { + IsClientItem bool + IsServerItem bool + IsIdle bool + ClientItem IperfClientWorkItem + ServerItem IperfServerWorkItem +} + +type workerState struct { + sentServerItem bool + idle bool + IP string + worker string +} + +// WorkerOutput stores the results from a single worker +type WorkerOutput struct { + Output string + Code int + Worker string + Type int +} + +type testcase struct { + SourceNode string + DestinationNode string + Label string + ClusterIP bool + Finished bool + MSS int + Type int +} + +var testcases []*testcase +var currentJobIndex int + +func init() { + flag.StringVar(&mode, "mode", "worker", "Mode for the daemon (worker | orchestrator)") + flag.StringVar(&port, "port", rpcServicePort, "Port to listen on (defaults to 5202)") + flag.StringVar(&host, "host", "", "IP address to bind to (defaults to 0.0.0.0)") + + workerStateMap = make(map[string]*workerState) + testcases = []*testcase{ + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "1 iperf TCP. Same VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "2 iperf TCP. Same VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "3 iperf TCP. Remote VM using Pod IP", Type: iperfTCPTest, ClusterIP: false, MSS: mssMin}, + {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "4 iperf TCP. Remote VM using Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, + {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "5 iperf TCP. Hairpin Pod to own Virtual IP", Type: iperfTCPTest, ClusterIP: true, MSS: mssMin}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "6 iperf SCTP. Same VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "7 iperf SCTP. Same VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "8 iperf SCTP. Remote VM using Pod IP", Type: iperfSctpTest, ClusterIP: false, MSS: mssMin}, + {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "9 iperf SCTP. Remote VM using Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, + {SourceNode: "netperf-w2", DestinationNode: "netperf-w2", Label: "10 iperf SCTP. Hairpin Pod to own Virtual IP", Type: iperfSctpTest, ClusterIP: true, MSS: mssMin}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "11 iperf UDP. Same VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "12 iperf UDP. Same VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "13 iperf UDP. Remote VM using Pod IP", Type: iperfUDPTest, ClusterIP: false, MSS: mssMax}, + {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "14 iperf UDP. Remote VM using Virtual IP", Type: iperfUDPTest, ClusterIP: true, MSS: mssMax}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "15 netperf. Same VM using Pod IP", Type: netperfTest, ClusterIP: false}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w2", Label: "16 netperf. Same VM using Virtual IP", Type: netperfTest, ClusterIP: true}, + {SourceNode: "netperf-w1", DestinationNode: "netperf-w3", Label: "17 netperf. Remote VM using Pod IP", Type: netperfTest, ClusterIP: false}, + {SourceNode: "netperf-w3", DestinationNode: "netperf-w2", Label: "18 netperf. Remote VM using Virtual IP", Type: netperfTest, ClusterIP: true}, + } + + currentJobIndex = 0 + + // Regexes to parse the Mbits/sec out of iperf TCP, SCTP, UDP and netperf output + iperfTCPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver") + iperfSCTPOutputRegexp = regexp.MustCompile("SUM.*\\s+(\\d+)\\sMbits/sec\\s+receiver") + iperfUDPOutputRegexp = regexp.MustCompile("\\s+(\\S+)\\sMbits/sec\\s+\\S+\\s+ms\\s+") + netperfOutputRegexp = regexp.MustCompile("\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\S+\\s+(\\S+)\\s+") + iperfCPUOutputRegexp = regexp.MustCompile(`local/sender\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\),\sremote/receiver\s(\d+\.\d+)%\s\((\d+\.\d+)%\w/(\d+\.\d+)%\w\)`) + + dataPoints = make(map[string][]point) +} + +func initializeOutputFiles() { + fd, err := os.OpenFile(outputCaptureFile, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + fmt.Println("Failed to open output capture file", err) + os.Exit(2) + } + fd.Close() +} + +func main() { + initializeOutputFiles() + flag.Parse() + if !validateParams() { + fmt.Println("Failed to parse cmdline args - fatal error - bailing out") + os.Exit(1) + + } + grabEnv() + fmt.Println("Running as", mode, "...") + if mode == orchestratorMode { + orchestrate() + } else { + startWork() + } + fmt.Println("Terminating npd") +} + +func grabEnv() { + worker = os.Getenv("worker") + kubenode = os.Getenv("kubenode") + podname = os.Getenv("HOSTNAME") +} + +func validateParams() (rv bool) { + rv = true + if mode != workerMode && mode != orchestratorMode { + fmt.Println("Invalid mode", mode) + return false + } + + if len(port) == 0 { + fmt.Println("Invalid port", port) + return false + } + + if (len(host)) == 0 { + if mode == orchestratorMode { + host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") + } else { + host = os.Getenv("NETPERF_ORCH_SERVICE_HOST") + } + } + return +} + +func allWorkersIdle() bool { + for _, v := range workerStateMap { + if !v.idle { + return false + } + } + return true +} + +func getWorkerPodIP(worker string) string { + return workerStateMap[worker].IP +} + +func allocateWorkToClient(workerS *workerState, reply *WorkItem) { + if !allWorkersIdle() { + reply.IsIdle = true + return + } + + // System is all idle - pick up next work item to allocate to client + for n, v := range testcases { + if v.Finished { + continue + } + if v.SourceNode != workerS.worker { + reply.IsIdle = true + return + } + if _, ok := workerStateMap[v.DestinationNode]; !ok { + reply.IsIdle = true + return + } + fmt.Printf("Requesting jobrun '%s' from %s to %s for MSS %d\n", v.Label, v.SourceNode, v.DestinationNode, v.MSS) + reply.ClientItem.Type = v.Type + reply.IsClientItem = true + workerS.idle = false + currentJobIndex = n + + if !v.ClusterIP { + reply.ClientItem.Host = getWorkerPodIP(v.DestinationNode) + } else { + reply.ClientItem.Host = os.Getenv("NETPERF_W2_SERVICE_HOST") + } + + switch { + case v.Type == iperfTCPTest || v.Type == iperfUDPTest || v.Type == iperfSctpTest: + reply.ClientItem.Port = "5201" + reply.ClientItem.MSS = v.MSS + + v.MSS = v.MSS + mssStepSize + if v.MSS > mssMax { + v.Finished = true + } + return + + case v.Type == netperfTest: + reply.ClientItem.Port = "12865" + return + } + } + + for _, v := range testcases { + if !v.Finished { + return + } + } + + if !datapointsFlushed { + fmt.Println("ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT") + flushDataPointsToCsv() + datapointsFlushed = true + } + + reply.IsIdle = true +} + +// RegisterClient registers a single and assign a work item to it +func (t *NetPerfRPC) RegisterClient(data *ClientRegistrationData, reply *WorkItem) error { + globalLock.Lock() + defer globalLock.Unlock() + + state, ok := workerStateMap[data.Worker] + + if !ok { + // For new clients, trigger an iperf server start immediately + state = &workerState{sentServerItem: true, idle: true, IP: data.IP, worker: data.Worker} + workerStateMap[data.Worker] = state + reply.IsServerItem = true + reply.ServerItem.ListenPort = "5201" + reply.ServerItem.Timeout = 3600 + return nil + } + + // Worker defaults to idle unless the allocateWork routine below assigns an item + state.idle = true + + // Give the worker a new work item or let it idle loop another 5 seconds + allocateWorkToClient(state, reply) + return nil +} + +func writeOutputFile(filename, data string) { + fd, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0666) + if err != nil { + fmt.Println("Failed to append to existing file", filename, err) + return + } + defer fd.Close() + + if _, err = fd.WriteString(data); err != nil { + fmt.Println("Failed to append to existing file", filename, err) + } +} + +func registerDataPoint(label string, mss int, value string, index int) { + if sl, ok := dataPoints[label]; !ok { + dataPoints[label] = []point{{mss: mss, bandwidth: value, index: index}} + dataPointKeys = append(dataPointKeys, label) + } else { + dataPoints[label] = append(sl, point{mss: mss, bandwidth: value, index: index}) + } +} + +func flushDataPointsToCsv() { + var buffer string + + // Write the MSS points for the X-axis before dumping all the testcase datapoints + for _, points := range dataPoints { + if len(points) == 1 { + continue + } + buffer = fmt.Sprintf("%-45s, Maximum,", "MSS") + for _, p := range points { + buffer = buffer + fmt.Sprintf(" %d,", p.mss) + } + break + } + fmt.Println(buffer) + + for _, label := range dataPointKeys { + buffer = fmt.Sprintf("%-45s,", label) + points := dataPoints[label] + var max float64 + for _, p := range points { + fv, _ := strconv.ParseFloat(p.bandwidth, 64) + if fv > max { + max = fv + } + } + buffer = buffer + fmt.Sprintf("%f,", max) + for _, p := range points { + buffer = buffer + fmt.Sprintf("%s,", p.bandwidth) + } + fmt.Println(buffer) + } + fmt.Println("END CSV DATA") +} + +func parseIperfTCPBandwidth(output string) string { + // Parses the output of iperf3 and grabs the group Mbits/sec from the output + match := iperfTCPOutputRegexp.FindStringSubmatch(output) + if match != nil && len(match) > 1 { + return match[1] + } + return "0" +} + +func parseIperfSctpBandwidth(output string) string { + // Parses the output of iperf3 and grabs the group Mbits/sec from the output + match := iperfSCTPOutputRegexp.FindStringSubmatch(output) + if match != nil && len(match) > 1 { + return match[1] + } + return "0" +} + +func parseIperfUDPBandwidth(output string) string { + // Parses the output of iperf3 (UDP mode) and grabs the Mbits/sec from the output + match := iperfUDPOutputRegexp.FindStringSubmatch(output) + if match != nil && len(match) > 1 { + return match[1] + } + return "0" +} + +func parseIperfCPUUsage(output string) (string, string) { + // Parses the output of iperf and grabs the CPU usage on sender and receiver side from the output + match := iperfCPUOutputRegexp.FindStringSubmatch(output) + if match != nil && len(match) > 1 { + return match[1], match[4] + } + return "0", "0" +} + +func parseNetperfBandwidth(output string) string { + // Parses the output of netperf and grabs the Bbits/sec from the output + match := netperfOutputRegexp.FindStringSubmatch(output) + if match != nil && len(match) > 1 { + return match[1] + } + return "0" +} + +// ReceiveOutput processes a data received from a single client +func (t *NetPerfRPC) ReceiveOutput(data *WorkerOutput, reply *int) error { + globalLock.Lock() + defer globalLock.Unlock() + + testcase := testcases[currentJobIndex] + + var outputLog string + var bw string + var cpuSender string + var cpuReceiver string + + switch data.Type { + case iperfTCPTest: + mss := testcases[currentJobIndex].MSS - mssStepSize + outputLog = outputLog + fmt.Sprintln("Received TCP output from worker", data.Worker, "for test", testcase.Label, + "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output + writeOutputFile(outputCaptureFile, outputLog) + bw = parseIperfTCPBandwidth(data.Output) + cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output) + registerDataPoint(testcase.Label, mss, bw, currentJobIndex) + + case iperfSctpTest: + mss := testcases[currentJobIndex].MSS - mssStepSize + outputLog = outputLog + fmt.Sprintln("Received SCTP output from worker", data.Worker, "for test", testcase.Label, + "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output + writeOutputFile(outputCaptureFile, outputLog) + bw = parseIperfSctpBandwidth(data.Output) + cpuSender, cpuReceiver = parseIperfCPUUsage(data.Output) + registerDataPoint(testcase.Label, mss, bw, currentJobIndex) + + case iperfUDPTest: + mss := testcases[currentJobIndex].MSS - mssStepSize + outputLog = outputLog + fmt.Sprintln("Received UDP output from worker", data.Worker, "for test", testcase.Label, + "from", testcase.SourceNode, "to", testcase.DestinationNode, "MSS:", mss) + data.Output + writeOutputFile(outputCaptureFile, outputLog) + bw = parseIperfUDPBandwidth(data.Output) + registerDataPoint(testcase.Label, mss, bw, currentJobIndex) + + case netperfTest: + outputLog = outputLog + fmt.Sprintln("Received netperf output from worker", data.Worker, "for test", testcase.Label, + "from", testcase.SourceNode, "to", testcase.DestinationNode) + data.Output + writeOutputFile(outputCaptureFile, outputLog) + bw = parseNetperfBandwidth(data.Output) + registerDataPoint(testcase.Label, 0, bw, currentJobIndex) + testcases[currentJobIndex].Finished = true + + } + + switch data.Type { + case iperfTCPTest, iperfSctpTest: + fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec. CPU usage sender was", cpuSender, "%. CPU usage receiver was", cpuReceiver, "%.") + default: + fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec") + } + + return nil +} + +func serveRPCRequests(port string) { + baseObject := new(NetPerfRPC) + err := rpc.Register(baseObject) + if err != nil { + log.Fatal("failed to register rpc", err) + } + rpc.HandleHTTP() + listener, e := net.Listen("tcp", ":"+port) + if e != nil { + log.Fatal("listen error:", e) + } + err = http.Serve(listener, nil) + if err != nil { + log.Fatal("failed start server", err) + } +} + +// Blocking RPC server start - only runs on the orchestrator +func orchestrate() { + serveRPCRequests(rpcServicePort) +} + +// Walk the list of interfaces and find the first interface that has a valid IP +// Inside a container, there should be only one IP-enabled interface +func getMyIP() string { + ifaces, err := net.Interfaces() + if err != nil { + return localhostIPv4Address + } + + for _, iface := range ifaces { + if iface.Flags&net.FlagLoopback == 0 { + addrs, _ := iface.Addrs() + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + return ip.String() + } + } + } + return "127.0.0.1" +} + +func handleClientWorkItem(client *rpc.Client, workItem *WorkItem) { + fmt.Println("Orchestrator requests worker run item Type:", workItem.ClientItem.Type) + switch { + case workItem.ClientItem.Type == iperfTCPTest || workItem.ClientItem.Type == iperfUDPTest || workItem.ClientItem.Type == iperfSctpTest: + outputString := iperfClient(workItem.ClientItem.Host, workItem.ClientItem.Port, workItem.ClientItem.MSS, workItem.ClientItem.Type) + var reply int + err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) + if err != nil { + log.Fatal("failed to call client", err) + } + case workItem.ClientItem.Type == netperfTest: + outputString := netperfClient(workItem.ClientItem.Host, workItem.ClientItem.Port, workItem.ClientItem.Type) + var reply int + err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: workItem.ClientItem.Type}, &reply) + if err != nil { + log.Fatal("failed to call client", err) + } + } + // Client COOLDOWN period before asking for next work item to replenish burst allowance policers etc + time.Sleep(10 * time.Second) +} + +// isIPv6: Determines if an address is an IPv6 address +func isIPv6(address string) bool { + x := net.ParseIP(address) + return x != nil && x.To4() == nil && x.To16() != nil +} + +// startWork : Entry point to the worker infinite loop +func startWork() { + for true { + var timeout time.Duration + var client *rpc.Client + var err error + + address := host + if isIPv6(address) { + address = "[" + address + "]" + } + + timeout = 5 + for true { + fmt.Println("Attempting to connect to orchestrator at", host) + client, err = rpc.DialHTTP("tcp", address+":"+port) + if err == nil { + break + } + fmt.Println("RPC connection to ", host, " failed:", err) + time.Sleep(timeout * time.Second) + } + + for true { + clientData := ClientRegistrationData{Host: podname, KubeNode: kubenode, Worker: worker, IP: getMyIP()} + var workItem WorkItem + + if err := client.Call("NetPerfRPC.RegisterClient", clientData, &workItem); err != nil { + // RPC server has probably gone away - attempt to reconnect + fmt.Println("Error attempting RPC call", err) + break + } + + switch { + case workItem.IsIdle == true: + time.Sleep(5 * time.Second) + continue + + case workItem.IsServerItem == true: + fmt.Println("Orchestrator requests worker run iperf and netperf servers") + go iperfServer() + go netperfServer() + time.Sleep(1 * time.Second) + + case workItem.IsClientItem == true: + handleClientWorkItem(client, &workItem) + } + } + } +} + +// Invoke and indefinitely run an iperf server +func iperfServer() { + output, success := cmdExec(iperf3Path, []string{iperf3Path, "-s", host, "-J", "-i", "60"}, 15) + if success { + fmt.Println(output) + } +} + +// Invoke and indefinitely run netperf server +func netperfServer() { + output, success := cmdExec(netperfServerPath, []string{netperfServerPath, "-D"}, 15) + if success { + fmt.Println(output) + } +} + +// Invoke and run an iperf client and return the output if successful. +func iperfClient(serverHost, serverPort string, mss int, workItemType int) (rv string) { + switch { + case workItemType == iperfTCPTest: + output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss)}, 15) + if success { + rv = output + } + + case workItemType == iperfSctpTest: + output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-V", "-N", "-i", "30", "-t", "10", "-f", "m", "-w", "512M", "-Z", "-P", parallelStreams, "-M", strconv.Itoa(mss), "--sctp"}, 15) + if success { + rv = output + } + + case workItemType == iperfUDPTest: + output, success := cmdExec(iperf3Path, []string{iperf3Path, "-c", serverHost, "-i", "30", "-t", "10", "-f", "m", "-b", "0", "-u"}, 15) + if success { + rv = output + } + } + return +} + +// Invoke and run a netperf client and return the output if successful. +func netperfClient(serverHost, serverPort string, workItemType int) (rv string) { + output, success := cmdExec(netperfPath, []string{netperfPath, "-H", serverHost}, 15) + if success { + fmt.Println(output) + rv = output + } else { + fmt.Println("Error running netperf client", output) + } + + return +} + +func cmdExec(command string, args []string, timeout int32) (rv string, rc bool) { + cmd := exec.Cmd{Path: command, Args: args} + + var stdoutput bytes.Buffer + var stderror bytes.Buffer + cmd.Stdout = &stdoutput + cmd.Stderr = &stderror + if err := cmd.Run(); err != nil { + outputstr := stdoutput.String() + errstr := stderror.String() + fmt.Println("Failed to run", outputstr, "error:", errstr, err) + return + } + + rv = stdoutput.String() + rc = true + return +}