Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node traffic monitoring command #25

Merged
merged 21 commits into from Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/mysterium_client/command_run/command.go
Expand Up @@ -13,7 +13,9 @@ type commandRun struct {
outputError io.Writer

mysteriumClient server.Client
vpnClient *openvpn.Client
vpnMiddlewares []openvpn.ManagementMiddleware

vpnClient *openvpn.Client
}

func (cmd *commandRun) Run(options CommandOptions) error {
Expand All @@ -30,10 +32,14 @@ func (cmd *commandRun) Run(options CommandOptions) error {
return err
}

vpnMiddlewares := append(
cmd.vpnMiddlewares,
bytescount_client.NewMiddleware(cmd.mysteriumClient, vpnSession.Id, 1*time.Minute),
)
cmd.vpnClient = openvpn.NewClient(
vpnConfig,
options.DirectoryRuntime,
bytescount_client.NewMiddleware(cmd.mysteriumClient, vpnSession.Id, 1*time.Minute),
vpnMiddlewares...,
)
if err := cmd.vpnClient.Start(); err != nil {
return err
Expand Down
20 changes: 14 additions & 6 deletions cmd/mysterium_client/command_run/factory.go
@@ -1,27 +1,35 @@
package command_run

import (
"os"
"github.com/mysterium/node/openvpn"
"github.com/mysterium/node/server"
"io"
"os"
)

func NewCommand() *commandRun {
func NewCommand() Command {
return &commandRun{
output: os.Stdout,
output: os.Stdout,
outputError: os.Stderr,

mysteriumClient: server.NewClient(),
vpnMiddlewares: make([]openvpn.ManagementMiddleware, 0),
}
}

func NewCommandWithDependencies(
output io.Writer,
outputError io.Writer,

mysteriumClient server.Client,
) *commandRun {
vpnMiddlewares ...openvpn.ManagementMiddleware,

) Command {
return &commandRun{
output: output,
output: output,
outputError: outputError,

mysteriumClient: mysteriumClient,
vpnMiddlewares: vpnMiddlewares,
}
}
}
7 changes: 7 additions & 0 deletions cmd/mysterium_client/command_run/interface.go
@@ -0,0 +1,7 @@
package command_run

type Command interface {
Run(options CommandOptions) error
Wait() error
Kill()
}
18 changes: 9 additions & 9 deletions cmd/mysterium_fake/mysterium_fake.go
@@ -1,13 +1,13 @@
package main

import (
"os"
"fmt"
"sync"
command_client "github.com/mysterium/node/cmd/mysterium_client/command_run"
command_server "github.com/mysterium/node/cmd/mysterium_server/command_run"
"github.com/mysterium/node/ipify"
"github.com/mysterium/node/server"
command_server "github.com/mysterium/node/cmd/mysterium_server/command_run"
command_client "github.com/mysterium/node/cmd/mysterium_client/command_run"
"os"
"sync"
)

const NODE_KEY = "fake"
Expand All @@ -30,17 +30,17 @@ func runServer(waiter *sync.WaitGroup, mysteriumClient server.Client) {

serverCommand := command_server.NewCommandWithDependencies(os.Stdout, os.Stderr, ipifyClient, mysteriumClient)
err := serverCommand.Run(command_server.CommandOptions{
NodeKey: NODE_KEY,
DirectoryConfig: NODE_DIRECTORY_CONFIG,
NodeKey: NODE_KEY,
DirectoryConfig: NODE_DIRECTORY_CONFIG,
DirectoryRuntime: CLIENT_DIRECTORY_RUNTIME,
})
if err != nil {
fmt.Fprintln(os.Stderr, "Server starting error: ", err)
os.Exit(1)
}

waiter.Add(1)
go func() {
waiter.Add(1)
defer waiter.Done()

if err = serverCommand.Wait(); err != nil {
Expand All @@ -61,13 +61,13 @@ func runClient(waiter *sync.WaitGroup, mysteriumClient server.Client) {
os.Exit(1)
}

waiter.Add(1)
go func() {
waiter.Add(1)
defer waiter.Done()

if err = clientCommand.Wait(); err != nil {
fmt.Fprintln(os.Stderr, "Client stopped with error: ", err)
os.Exit(1)
}
}()
}
}
124 changes: 124 additions & 0 deletions cmd/mysterium_monitor/command_run/command.go
@@ -0,0 +1,124 @@
package command_run

import (
"errors"
"github.com/mysterium/node/ipify"
"io"

command_client "github.com/mysterium/node/cmd/mysterium_client/command_run"
"github.com/mysterium/node/server"
"github.com/mysterium/node/state_client"
"sync"
"time"
)

type commandRun struct {
output io.Writer
outputError io.Writer

ipifyClient ipify.Client
ipOriginal string

clientCommand command_client.Command
ipCheckWaiter sync.WaitGroup
resultWriter *resultWriter
}

func (cmd *commandRun) Run(options CommandOptions) error {
var err error

cmd.resultWriter, err = NewResultWriter(options.ResultFile)
if err != nil {
return err
}
defer cmd.resultWriter.Close()

nodeProvider, err := NewNodeProvider(options)
if err != nil {
return err
}
defer nodeProvider.Close()

cmd.ipOriginal, err = cmd.ipifyClient.GetIp()
if err != nil {
return errors.New("Failed to get original IP: " + err.Error())
}

cmd.clientCommand = command_client.NewCommandWithDependencies(
cmd.output,
cmd.outputError,
server.NewClient(),
state_client.NewMiddleware(cmd.checkClientIpWhenConnected),
)

nodeProvider.WithEachNode(func(nodeKey string) {
cmd.resultWriter.NodeStart(nodeKey)
cmd.ipCheckWaiter.Add(1)

err = cmd.clientCommand.Run(command_client.CommandOptions{
NodeKey: nodeKey,
DirectoryRuntime: options.DirectoryRuntime,
})
if err != nil {
cmd.resultWriter.NodeError("Client starting error", err)
return
}

go cmd.checkClientHandleTimeout()

cmd.ipCheckWaiter.Wait()
cmd.clientCommand.Kill()
cmd.checkClientIpWhenDisconnected()
})

return nil
}

func (cmd *commandRun) checkClientIpWhenConnected(state state_client.State) error {
if state == state_client.STATE_CONNECTED {
ipForwarded, err := cmd.ipifyClient.GetIp()
if err != nil {
cmd.resultWriter.NodeError("Forwarded IP not detected", err)
cmd.ipCheckWaiter.Done()
return nil
}

if ipForwarded == cmd.ipOriginal {
cmd.resultWriter.NodeStatus("Forwarded IP matches original")
cmd.ipCheckWaiter.Done()
return nil
}

cmd.resultWriter.NodeStatus("OK")
cmd.ipCheckWaiter.Done()
}
return nil
}

func (cmd *commandRun) checkClientHandleTimeout() {
<-time.After(10 * time.Second)

cmd.resultWriter.NodeStatus("Client not connected")
cmd.ipCheckWaiter.Done()
}

func (cmd *commandRun) checkClientIpWhenDisconnected() {
ipForwarded, err := cmd.ipifyClient.GetIp()
if err != nil {
cmd.resultWriter.NodeError("Disconnect IP not detected", err)
return
}

if ipForwarded != cmd.ipOriginal {
cmd.resultWriter.NodeStatus("Disconnect IP does not match original")
return
}
}

func (cmd *commandRun) Wait() error {
return nil
}

func (cmd *commandRun) Kill() {
cmd.clientCommand.Kill()
}
34 changes: 34 additions & 0 deletions cmd/mysterium_monitor/command_run/factory.go
@@ -0,0 +1,34 @@
package command_run

import (
"github.com/mysterium/node/cmd/mysterium_monitor/command_run/node_provider"
"github.com/mysterium/node/ipify"
"os"
"time"
)

func NewCommand() Command {
return &commandRun{
output: os.Stdout,
outputError: os.Stderr,

ipifyClient: ipify.NewClientWithTimeout(5 * time.Second),
}
}

func NewNodeProvider(options CommandOptions) (nodeProvider node_provider.NodeProvider, err error) {
if options.Node != "" {
nodeProvider = node_provider.NewArrayProvider([]string{options.Node})
} else {
nodeProvider, err = node_provider.NewFileProvider(options.NodeFile)
if err != nil {
return
}
}

nodeProvider = node_provider.NewRememberProvider(
nodeProvider,
options.DirectoryRuntime+"/remember.status",
)
return
}
7 changes: 7 additions & 0 deletions cmd/mysterium_monitor/command_run/interface.go
@@ -0,0 +1,7 @@
package command_run

type Command interface {
Run(options CommandOptions) error
Wait() error
Kill()
}
@@ -0,0 +1,8 @@
package node_provider

type NodeProvider interface {
WithEachNode(consumer NodeConsumer)
Close() error
}

type NodeConsumer func(nodeKey string)
@@ -0,0 +1,21 @@
package node_provider

func NewArrayProvider(nodeKeys []string) NodeProvider {
return &arrayProvider{
nodeKeys: nodeKeys,
}
}

type arrayProvider struct {
nodeKeys []string
}

func (provider *arrayProvider) WithEachNode(consumer NodeConsumer) {
for _, nodeKey := range provider.nodeKeys {
consumer(nodeKey)
}
}

func (provider *arrayProvider) Close() error {
return nil
}
@@ -0,0 +1,33 @@
package node_provider

import (
"bufio"
"os"
)

func NewFileProvider(filePath string) (NodeProvider, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}

provider := &fileProvider{
file: file,
}
return provider, nil
}

type fileProvider struct {
file *os.File
}

func (provider *fileProvider) WithEachNode(consumer NodeConsumer) {
scanner := bufio.NewScanner(provider.file)
for scanner.Scan() {
consumer(scanner.Text())
}
}

func (provider *fileProvider) Close() error {
return provider.file.Close()
}