diff --git a/bytescount_client/middleware.go b/bytescount_client/middleware.go new file mode 100644 index 000000000..49c77105b --- /dev/null +++ b/bytescount_client/middleware.go @@ -0,0 +1,75 @@ +package bytescount_client + +import ( + "fmt" + "github.com/mysterium/node/openvpn" + "github.com/mysterium/node/server" + "github.com/mysterium/node/server/dto" + "net" + "regexp" + "strconv" + "time" +) + +type middleware struct { + mysteriumClient server.Client + interval time.Duration + sessionId string + + connection net.Conn +} + +func NewMiddleware(mysteriumClient server.Client, sessionId string, interval time.Duration) openvpn.ManagementMiddleware { + return &middleware{ + mysteriumClient: mysteriumClient, + interval: interval, + sessionId: sessionId, + + connection: nil, + } +} + +func (middleware *middleware) Start(connection net.Conn) error { + middleware.connection = connection + + command := fmt.Sprintf("bytecount %d\n", int(middleware.interval.Seconds())) + _, err := middleware.connection.Write([]byte(command)) + + return err +} + +func (middleware *middleware) Stop() error { + _, err := middleware.connection.Write([]byte("bytecount 0\n")) + return err +} + +func (middleware *middleware) ConsumeLine(line string) (consumed bool, err error) { + rule, err := regexp.Compile("^>BYTECOUNT:(.*),(.*)$") + if err != nil { + return + } + + match := rule.FindStringSubmatch(line) + consumed = len(match) > 0 + if !consumed { + return + } + + bytesIn, err := strconv.Atoi(match[1]) + if err != nil { + return + } + + bytesOut, err := strconv.Atoi(match[2]) + if err != nil { + return + } + + err = middleware.mysteriumClient.SessionSendStats(middleware.sessionId, dto.SessionStats{ + Id: middleware.sessionId, + BytesSent: bytesOut, + BytesReceived: bytesIn, + }) + + return +} diff --git a/bytescount_client/middleware_test.go b/bytescount_client/middleware_test.go new file mode 100644 index 000000000..abfa902f9 --- /dev/null +++ b/bytescount_client/middleware_test.go @@ -0,0 +1,43 @@ +package bytescount_client + +import ( + "errors" + "github.com/mysterium/node/server" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func Test_Factory(t *testing.T) { + middleware := NewMiddleware(server.NewClientFake(), "session-test", 1*time.Minute) + assert.NotNil(t, middleware) +} + +func Test_ConsumeLine(t *testing.T) { + var tests = []struct { + line string + expectedConsumed bool + expectedError error + }{ + {">BYTECOUNT:3018,3264", true, nil}, + {">BYTECOUNT:0,3264", true, nil}, + {">BYTECOUNT:3018,", true, errors.New(`strconv.ParseInt: parsing "": invalid syntax`)}, + {">BYTECOUNT:,", true, errors.New(`strconv.ParseInt: parsing "": invalid syntax`)}, + {"OTHER", false, nil}, + {"BYTECOUNT", false, nil}, + {"BYTECOUNT:", false, nil}, + {"BYTECOUNT:3018,3264", false, nil}, + {">BYTECOUNTT:3018,3264", false, nil}, + } + + middleware := NewMiddleware(server.NewClientFake(), "session-test", 1*time.Minute) + for _, test := range tests { + consumed, err := middleware.ConsumeLine(test.line) + if test.expectedError != nil { + assert.Error(t, test.expectedError, err.Error(), test.line) + } else { + assert.NoError(t, err, test.line) + } + assert.Equal(t, test.expectedConsumed, consumed, test.line) + } +} diff --git a/cmd/mysterium_client/command_run/command.go b/cmd/mysterium_client/command_run/command.go index 87a4efb99..64505deaf 100644 --- a/cmd/mysterium_client/command_run/command.go +++ b/cmd/mysterium_client/command_run/command.go @@ -1,9 +1,11 @@ package command_run import ( + "github.com/mysterium/node/bytescount_client" "github.com/mysterium/node/openvpn" "github.com/mysterium/node/server" "io" + "time" ) type commandRun struct { @@ -11,7 +13,7 @@ type commandRun struct { outputError io.Writer mysteriumClient server.Client - vpnClient *openvpn.Client + vpnClient *openvpn.Client } func (cmd *commandRun) Run(options CommandOptions) error { @@ -28,12 +30,15 @@ func (cmd *commandRun) Run(options CommandOptions) error { return err } - cmd.vpnClient = openvpn.NewClient(vpnConfig, options.DirectoryRuntime) + cmd.vpnClient = openvpn.NewClient( + vpnConfig, + options.DirectoryRuntime, + bytescount_client.NewMiddleware(cmd.mysteriumClient, vpnSession.Id, 1*time.Minute), + ) if err := cmd.vpnClient.Start(); err != nil { return err } - return nil } @@ -43,4 +48,4 @@ func (cmd *commandRun) Wait() error { func (cmd *commandRun) Kill() { cmd.vpnClient.Stop() -} \ No newline at end of file +} diff --git a/openvpn/client.go b/openvpn/client.go index 1f4399a54..c1b65550d 100644 --- a/openvpn/client.go +++ b/openvpn/client.go @@ -2,14 +2,14 @@ package openvpn import "sync" -func NewClient(config *ClientConfig, directoryRuntime string) *Client { +func NewClient(config *ClientConfig, directoryRuntime string, middlewares ...ManagementMiddleware) *Client { // Add the management interface socketAddress to the config socketAddress := tempFilename(directoryRuntime, "openvpn-management-", ".sock") config.SetManagementSocket(socketAddress) return &Client{ config: config, - management: NewManagement(socketAddress, "[client-managemnet] "), + management: NewManagement(socketAddress, "[client-management] ", middlewares...), process: NewProcess("[client-openvpn] "), } } diff --git a/openvpn/config.go b/openvpn/config.go index 77e658a8d..7abd4e176 100644 --- a/openvpn/config.go +++ b/openvpn/config.go @@ -36,8 +36,7 @@ func (c *Config) setFlag(name string) { func (c *Config) SetManagementSocket(socketAddress string) { c.setParam("management", socketAddress+" unix") - c.setFlag("management-signal") - c.setFlag("management-up-down") + c.setFlag("management-client") } func (c *Config) SetPort(port int) { diff --git a/openvpn/management.go b/openvpn/management.go index ffd22c10b..69ad33a6e 100644 --- a/openvpn/management.go +++ b/openvpn/management.go @@ -2,47 +2,42 @@ package openvpn import ( "bufio" - "bytes" "net" "net/textproto" - "regexp" - "strconv" "sync" "time" log "github.com/cihub/seelog" ) +// https://openvpn.net/index.php/open-source/documentation/miscellaneous/79-management-interface.html type Management struct { socketAddress string - logPrefix string - - ManagementRead chan string - ManagementWrite chan string + logPrefix string - events chan []string + lineReceived chan string + middlewares []ManagementMiddleware - currentClient string - clientEnv map[string]string + listenerShutdownStarted chan bool + listenerShutdownWaiter sync.WaitGroup +} - buffer []byte - waitGroup sync.WaitGroup - shutdown chan bool +type ManagementMiddleware interface { + Start(connection net.Conn) error + Stop() error + ConsumeLine(line string) (consumed bool, err error) } -func NewManagement(socketAddress, logPrefix string) *Management { +func NewManagement(socketAddress, logPrefix string, middlewares ...ManagementMiddleware) *Management { return &Management{ socketAddress: socketAddress, - logPrefix: logPrefix, - - ManagementRead: make(chan string), - ManagementWrite: make(chan string), + logPrefix: logPrefix, - events: make(chan []string), + lineReceived: make(chan string), + middlewares: middlewares, - clientEnv: make(map[string]string, 0), - buffer: make([]byte, 0), - shutdown: make(chan bool), + listenerShutdownStarted: make(chan bool), + listenerShutdownWaiter: sync.WaitGroup{}, } } @@ -56,6 +51,7 @@ func (management *Management) Start() error { } go management.waitForShutdown(listener) + go management.deliverLines() go management.waitForConnections(listener) return nil @@ -63,135 +59,83 @@ func (management *Management) Start() error { func (management *Management) Stop() { log.Info(management.logPrefix, "Shutdown") - close(management.shutdown) + close(management.listenerShutdownStarted) - management.waitGroup.Wait() + management.listenerShutdownWaiter.Wait() log.Info(management.logPrefix, "Shutdown finished") } func (management *Management) waitForShutdown(listener net.Listener) { - <-management.shutdown + <-management.listenerShutdownStarted + + for _, middleware := range management.middlewares { + middleware.Stop() + } + listener.Close() } func (management *Management) waitForConnections(listener net.Listener) { - management.waitGroup.Add(1) - defer management.waitGroup.Done() + management.listenerShutdownWaiter.Add(1) + defer management.listenerShutdownWaiter.Done() for { connection, err := listener.Accept() if err != nil { select { - case <-management.shutdown: + case <-management.listenerShutdownStarted: log.Info(management.logPrefix, "Connection closed") default: - log.Critical(management.logPrefix, "Connection accept error:", err) + log.Critical(management.logPrefix, "Connection accept error: ", err) } return } - log.Info(management.logPrefix, "Connection accepted") - go management.server(connection) + go management.serveNewConnection(connection) } } -func (management *Management) server(connection net.Conn) { - log.Info(management.logPrefix, "Server started") - - /* - go func() { - //c.Write([]byte("status\n")) - for { - <-time.After(time.Second) - _, err := c.Write([]byte("push \"echo hej\"\n")) - if err != nil { - log.Error(management.logPrefix, "Failed management write:", err) - return - } - log.Info(management.logPrefix, "Push echo hej") - } - }() - */ +func (management *Management) serveNewConnection(connection net.Conn) { + log.Info(management.logPrefix, "New connection started") - go func() { - for { - rows := <-management.events - log.Info(management.logPrefix, "Event received:", rows) - } - }() + for _, middleware := range management.middlewares { + middleware.Start(connection) + } reader := textproto.NewReader(bufio.NewReader(connection)) for { line, err := reader.ReadLine() if err != nil { + log.Warn(management.logPrefix, "Connection failed to read. ", err) return } - //log.Info(management.logPrefix, "Line received:", line) + log.Debug(management.logPrefix, "Line received: ", line) - libeBytes := []byte(line) - management.parse(libeBytes, false) + // Try to deliver the message + select { + case management.lineReceived <- line: + case <-time.After(time.Second): + log.Error(management.logPrefix, "Failed to transport line: ", line) + } } } -// https://openvpn.net/index.php/open-source/documentation/miscellaneous/79-management-interface.html -func (management *Management) parse(line []byte, retry bool) { - //log.Error(management.logPrefix, "Parsing:", string(line)) - - eventsConfig := map[string]string{ - // -- Log message output as controlled by the "log" command. - "log": ">LOG:([^\r\n]*)$", - } - -mainLoop: - for eventName, eventRegex := range eventsConfig { - reg, _ := regexp.Compile(eventRegex) - match := reg.FindAllSubmatchIndex(line, -1) - if len(match) == 0 { - continue - } - - for _, row := range match { - // Extract all strings of the current match - strings := []string{eventName} - for index := range row { - if index%2 > 0 { // Skipp all odd indexes - continue - } - - strings = append(strings, string(line[row[index]:row[index+1]])) - } - - // Try to deliver the message - select { - case management.events <- strings: - case <-time.After(time.Second): - log.Errorf( - "%sFailed to transport message (%client): %s |%s|", - management.logPrefix, - management.events, - eventName, - row, - strings, - ) - } - - if row[0] > 0 { - log.Warn("Trowing away message: ", strconv.Quote(string(line[:row[0]]))) +func (management *Management) deliverLines() { + for { + line := <-management.lineReceived + log.Debug(management.logPrefix, "Line delivering: ", line) + + lineConsumed := false + for _, middleware := range management.middlewares { + consumed, err := middleware.ConsumeLine(line) + if err != nil { + log.Error(management.logPrefix, "Failed to deliver line: ", line, ". ", err) } - // Just save the rest of the message - line = bytes.Trim(line[row[1]:], "\x00") - - continue mainLoop + lineConsumed = lineConsumed || consumed + } + if !lineConsumed { + log.Warn(management.logPrefix, "Line not delivered: ", line) } } - - if len(line) > 0 && !retry { - //log.Warn("Could not find message, adding to buffer: ", string(line)) - management.buffer = append(management.buffer, line...) - management.buffer = append(management.buffer, '\n') - management.parse(management.buffer, true) - } else if len(line) > 0 { - management.buffer = line - } -} \ No newline at end of file +} diff --git a/openvpn/server.go b/openvpn/server.go index d132bd91a..3c9a33ed1 100644 --- a/openvpn/server.go +++ b/openvpn/server.go @@ -2,14 +2,14 @@ package openvpn import "sync" -func NewServer(config *ServerConfig, directoryRuntime string) *Server { +func NewServer(config *ServerConfig, directoryRuntime string, middlewares ...ManagementMiddleware) *Server { // Add the management interface socketAddress to the config socketAddress := tempFilename(directoryRuntime, "openvpn-management-", ".sock") config.SetManagementSocket(socketAddress) return &Server{ config: config, - management: NewManagement(socketAddress, "[server-managemnet] "), + management: NewManagement(socketAddress, "[server-management] ", middlewares...), process: NewProcess("[server-openvpn] "), } } diff --git a/server/client_fake.go b/server/client_fake.go index 447f40cf1..529613a88 100644 --- a/server/client_fake.go +++ b/server/client_fake.go @@ -42,3 +42,9 @@ func (client *clientFake) SessionCreate(nodeKey string) (session dto.Session, er err = fmt.Errorf("Fake node not found: %s", nodeKey) return } + +func (client *clientFake) SessionSendStats(sessionId string, sessionStats dto.SessionStats) (err error) { + log.Info(MYSTERIUM_API_LOG_PREFIX, "Session stats sent: ", sessionId) + + return nil +} diff --git a/server/client_rest.go b/server/client_rest.go index dfab53efd..24f84f183 100644 --- a/server/client_rest.go +++ b/server/client_rest.go @@ -16,7 +16,7 @@ const MYSTERIUM_API_URL = "https://mvp.mysterium.network:5000/v1" const MYSTERIUM_API_CLIENT = "goclient-v0.1" const MYSTERIUM_API_LOG_PREFIX = "[Mysterium.api] " -func NewClient() *clientRest { +func NewClient() Client { httpClient := http.Client{ Transport: &http.Transport{}, } @@ -29,19 +29,6 @@ type clientRest struct { httpClient http.Client } -func (client *clientRest) SessionCreate(nodeKey string) (session dto.Session, err error) { - response, err := client.doRequest("POST", "client_create_session", dto.SessionStartRequest{ - NodeKey: nodeKey, - }) - if err == nil { - defer response.Body.Close() - err = parseResponseJson(response, &session) - log.Info(MYSTERIUM_API_LOG_PREFIX, "Created new session: ", session.Id) - } - - return -} - func (client *clientRest) NodeRegister(nodeKey, connectionConfig string) (err error) { response, err := client.doRequest("POST", "node_register", dto.NodeRegisterRequest{ NodeKey: nodeKey, @@ -68,6 +55,29 @@ func (client *clientRest) NodeSendStats(nodeKey string, sessionList []dto.Sessio return nil } +func (client *clientRest) SessionCreate(nodeKey string) (session dto.Session, err error) { + response, err := client.doRequest("POST", "client_create_session", dto.SessionStartRequest{ + NodeKey: nodeKey, + }) + if err == nil { + defer response.Body.Close() + err = parseResponseJson(response, &session) + log.Info(MYSTERIUM_API_LOG_PREFIX, "Session created: ", session.Id) + } + + return +} + +func (client *clientRest) SessionSendStats(sessionId string, sessionStats dto.SessionStats) (err error) { + response, err := client.doRequest("POST", "client_send_stats", sessionStats) + if err == nil { + defer response.Body.Close() + log.Info(MYSTERIUM_API_LOG_PREFIX, "Session stats sent: ", sessionId) + } + + return nil +} + func (client *clientRest) doRequest(method string, path string, payload interface{}) (*http.Response, error) { payloadJson, err := json.Marshal(payload) if err != nil { diff --git a/server/interface.go b/server/interface.go index 05c720442..29de98fa8 100644 --- a/server/interface.go +++ b/server/interface.go @@ -6,4 +6,5 @@ type Client interface { NodeRegister(nodeKey, connectionConfig string) (err error) NodeSendStats(nodeKey string, sessionStats []dto.SessionStats) (err error) SessionCreate(nodeKey string) (session dto.Session, err error) + SessionSendStats(sessionId string, sessionStats dto.SessionStats) (err error) }