Navigation Menu

Skip to content

Commit

Permalink
cmd/transfer,cmd/server: make SessionID pointer so sql null doesn't p…
Browse files Browse the repository at this point in the history
…anic and improve transfer performance
  • Loading branch information
nicksherron committed Feb 17, 2020
1 parent bc7aebe commit bfc9179
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 126 deletions.
8 changes: 4 additions & 4 deletions Makefile
Expand Up @@ -14,12 +14,12 @@ help:
@echo 'Management commands for bashhub-server:'
@echo
@echo 'Usage:'
@echo ' make build Compile the project.'
@echo ' make build Compile the project'
@echo ' make docker-build Build docker image'
@echo ' make clean Clean the directory tree'
@echo ' make test Run tests on a compiled project.'
@echo ' make test-postgres Start postgres in ephemeral docker container and run backend tests.'
@echo ' make test-all Run test and test-postgres.'
@echo ' make test Run tests on a compiled project'
@echo ' make test-postgres Start postgres in ephemeral docker container and run backend tests'
@echo ' make test-all Run test and test-postgres'
@echo

build:
Expand Down
4 changes: 1 addition & 3 deletions cmd/root.go
Expand Up @@ -77,9 +77,7 @@ func startupMessage() {
`, Version, addr)
color.HiGreen(banner)
fmt.Print("\n")
log.Printf("Listening and serving HTTP on %v", addr)
fmt.Print("\n")
log.Printf("\nListening and serving HTTP on %v\n", addr)
}

func listenAddr() string {
Expand Down
110 changes: 71 additions & 39 deletions cmd/transfer.go
Expand Up @@ -28,13 +28,16 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cheggaaa/pb/v3"
"github.com/spf13/cobra"
"golang.org/x/crypto/ssh/terminal"
)

type cList struct {
Retries int
UUID string `json:"uuid"`
Command string `json:"command"`
Created int64 `json:"created"`
Expand All @@ -58,10 +61,14 @@ var (
workers int
unique bool
limit int
wg sync.WaitGroup
dstCounter uint64
srcCounter uint64
inserted uint64
wgSrc sync.WaitGroup
wgDst sync.WaitGroup
cmdList commandsList
transferCmd = &cobra.Command{

transferCmd = &cobra.Command{
Use: "transfer",
Short: "Transfer bashhub history from one server to another",
Run: func(cmd *cobra.Command, args []string) {
Expand Down Expand Up @@ -127,41 +134,56 @@ func run() {
sysRegistered = false
dstToken = getToken(dstURL, dstUser, dstPass)
cmdList = getCommandList()
counter := 0

if !progress {
bar = pb.ProgressBarTemplate(barTemplate).Start(len(cmdList)).SetMaxWidth(70)
bar.Set("message", "transferring ")
}
fmt.Print("\nstarting transfer...\n\n")
client := &http.Client{}
pipe := make(chan []byte)
queue := make(chan cList, len(cmdList))
pipe := make(chan []byte, len(cmdList))

// ignore http errors. We try and recover them
log.SetOutput(nil)
go func() {
for {
select {
case data := <-pipe:
case item := <-queue:
wgDst.Add(1)
atomic.AddUint64(&dstCounter, 1)
go func(cmd cList) {
defer wgDst.Done()
commandLookup(cmd, pipe, queue)
}(item)
if atomic.CompareAndSwapUint64(&dstCounter, uint64(workers), 0) {
wgDst.Wait()
}
case result := <-pipe:
wgSrc.Add(1)
go srcSend(data, client)
atomic.AddUint64(&srcCounter, 1)
go func(data []byte) {
srcSend(data, 0)
}(result)
if atomic.CompareAndSwapUint64(&srcCounter, uint64(workers), 0) {
wgSrc.Wait()
}
}

}
}()
// ignore http errors. We try and recover them
log.SetOutput(nil)
for _, v := range cmdList {
wg.Add(1)
counter++
go commandLookup(v.UUID, client, 0, pipe)
if counter > workers {
wg.Wait()
counter = 0
v.Retries = 0
queue <- v
}

for {
if atomic.CompareAndSwapUint64(&inserted, uint64(len(cmdList)), 0) {
break
}
time.Sleep(200 * time.Millisecond)
}
wg.Wait()
if !progress {
bar.Finish()
}
wgSrc.Wait()
}
func sysRegister(mac string, site string, user string, pass string) string {

Expand Down Expand Up @@ -329,21 +351,20 @@ func getCommandList() commandsList {

return result
}
func commandLookup(uuid string, client *http.Client, retries int, pipe chan []byte) {

func commandLookup(item cList, pipe chan []byte, queue chan cList) {
defer func() {
wg.Done()
if r := recover(); r != nil {
mem := strings.Contains(fmt.Sprintf("%v", r), "runtime error: invalid memory address")
eof := strings.Contains(fmt.Sprintf("%v", r), "EOF")
if mem || eof {
if retries < 10 {
retries++
wg.Add(1)
go commandLookup(uuid, client, retries, pipe)

if item.Retries < 10 {
item.Retries++
queue <- item
return
} else {
log.SetOutput(os.Stderr)
log.Println("ERROR: failed over 10 times looking up command from source with uuid: ", uuid)
log.Println("ERROR: failed over 10 times looking up command from source with uuid: ", item.UUID)
log.SetOutput(nil)
}
} else {
Expand All @@ -353,37 +374,52 @@ func commandLookup(uuid string, client *http.Client, retries int, pipe chan []by
}
}()

u := strings.TrimSpace(srcURL) + "/api/v1/command/" + strings.TrimSpace(uuid)
u := strings.TrimSpace(srcURL) + "/api/v1/command/" + strings.TrimSpace(item.UUID)
req, err := http.NewRequest("GET", u, nil)

if err != nil {
panic(err)
}
req.Header.Add("Authorization", srcToken)

resp, err := client.Do(req)
resp, err := http.DefaultClient.Do(req)

if err != nil {
panic(err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Fatalf("failed command lookup from %v, go status code %v", srcURL, resp.StatusCode)
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
if resp.StatusCode != 200 {
err = fmt.Errorf("%v response from %v: %v", resp.StatusCode, srcURL, string(body))
log.SetOutput(os.Stderr)
log.Fatal(err)
}
pipe <- body
}

func srcSend(data []byte, client *http.Client) {
func srcSend(data []byte, retries int) {
defer func() {
if r := recover(); r != nil {
retries++
if retries < 10 {
srcSend(data, retries)
return
}
log.SetOutput(os.Stderr)
log.Println("Error on response.\n", r)
log.SetOutput(nil)
}
if !progress {
bar.Add(1)
}
atomic.AddUint64(&inserted, 1)
wgSrc.Done()
}()

body := bytes.NewReader(data)

u := dstURL + "/api/v1/import"
Expand All @@ -392,17 +428,13 @@ func srcSend(data []byte, client *http.Client) {
log.SetOutput(os.Stderr)
log.Fatal(err)
}
req.Header.Add("Authorization", dstToken)

resp, err := client.Do(req)

req.Header.Add("Authorization", dstToken)
_, err = http.DefaultClient.Do(req)
if err != nil {
log.SetOutput(os.Stderr)
log.Println("Error on response.\n", err)
log.SetOutput(nil)
log.Fatal(err)
}

defer resp.Body.Close()
}

func check(err error) {
Expand Down
38 changes: 5 additions & 33 deletions cmd/transfer_test.go
Expand Up @@ -26,7 +26,6 @@ import (
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -113,6 +112,9 @@ func TestMain(m *testing.M) {
var err error
testDir, err = ioutil.TempDir("", "bashhub-server-test-")
check(err)
if testWork {
log.Println("TESTWORK=", testDir)
}

src, err = startSrc()
check(err)
Expand Down Expand Up @@ -178,6 +180,7 @@ func TestCreateToken(t *testing.T) {
if srcToken == "" {
t.Fatal("srcToken token is blank")
}

sysRegistered = false
dstToken = getToken(dstURL, dstUser, dstPass)
if dstToken == "" {
Expand Down Expand Up @@ -246,37 +249,6 @@ func TestTransfer(t *testing.T) {
assert.Equal(t, dstStatus.TotalCommands, srcStatus.TotalCommands)
}

func BenchmarkGoInserts(b *testing.B) {
client := &http.Client{}
counter := 0
pipe := make(chan []byte)
go func() {
for {
select {
case data := <-pipe:
wgSrc.Add(1)
go srcSend(data, client)
}

}
}()
for i := 0; i < b.N; i++ {
wg.Add(1)
counter++
n := rand.Intn(commandsN)
go commandLookup(cmdList[n].UUID, client, 0, pipe)
if counter > workers {
wg.Wait()
counter = 0
}
}
wg.Wait()
if !progress {
bar.Finish()
}
wgSrc.Wait()
}

func getStatus(t *testing.T, u string, token string) internal.Status {
u = fmt.Sprintf("%v/api/v1/client-view/status?processId=1000&startTime=%v", u, sessionStartTime)
req, err := http.NewRequest("GET", u, nil)
Expand Down Expand Up @@ -318,5 +290,5 @@ func cleanup() {
return
}
log.SetOutput(os.Stderr)
log.Println("TESTWORK=", testDir)

}
1 change: 0 additions & 1 deletion go.mod
Expand Up @@ -13,7 +13,6 @@ require (
github.com/jinzhu/gorm v1.9.12
github.com/lib/pq v1.3.0
github.com/magiconair/properties v1.8.0
github.com/manifoldco/promptui v0.7.0
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0
Expand Down

0 comments on commit bfc9179

Please sign in to comment.