Skip to content

Commit

Permalink
cmd: show query results in the order of SQL statements
Browse files Browse the repository at this point in the history
  • Loading branch information
skatsuta committed Jun 2, 2017
1 parent 118fa52 commit 7ee9eac
Showing 1 changed file with 29 additions and 31 deletions.
60 changes: 29 additions & 31 deletions cmd/run.go
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"os"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -54,7 +53,7 @@ func init() {

func runRun(cmd *cobra.Command, args []string) {
l := len(args)
if l < 1 || l > 1 { // TODO: run interactive mode if no argument is given
if l != 1 { // TODO: run interactive mode if no argument is given
cmd.Help()
return
}
Expand All @@ -77,54 +76,53 @@ func runRun(cmd *cobra.Command, args []string) {
stmts := strings.Split(args[0], ";")

// Create channels
resultCh := make(chan *exec.Result)
errCh := make(chan error)
doneCh := make(chan struct{})
var wg sync.WaitGroup
l = len(stmts)
resultChs := make([]chan *exec.Result, 0, l)
errChs := make([]chan error, 0, l)

// Run each statement concurrently using goroutine
for _, stmt := range stmts {
query := stmt // capture locally
if strings.TrimSpace(query) == "" {
if strings.TrimSpace(stmt) == "" {
continue // Skip empty statements
}
wg.Add(1)
go runQuery(client, query, resultCh, errCh, &wg)
}

// Monitoring goroutine to notify that all the query executions have finished
go func() {
wg.Wait()
doneCh <- struct{}{}
}()
resultCh := make(chan *exec.Result)
errCh := make(chan error)
go runQuery(client, stmt, resultCh, errCh)

resultChs = append(resultChs, resultCh)
errChs = append(errChs, errCh)
}

fmt.Print("Running query")

// TODO: arrange results in the order of the original statements
tick := time.Tick(tickInterval)
for {
select {
case r := <-resultCh:
fmt.Print("\n")
print.NewTable(os.Stdout).Print(r)
case e := <-errCh:
fmt.Print("\n")
fmt.Fprintln(os.Stderr, e)
case <-tick:
fmt.Print(".")
case <-doneCh:
return
l = len(resultChs)
for i := 0; i < l; i++ {
loop:
for {
select {
case r := <-resultChs[i]:
fmt.Print("\n")
print.NewTable(os.Stdout).Print(r)
break loop
case e := <-errChs[i]:
fmt.Print("\n")
fmt.Fprintln(os.Stderr, e)
break loop
case <-tick:
fmt.Print(".")
}
}
}
}

func runQuery(client athenaiface.AthenaAPI, query string, resultCh chan *exec.Result, errCh chan error, wg *sync.WaitGroup) {
func runQuery(client athenaiface.AthenaAPI, query string, resultCh chan *exec.Result, errCh chan error) {
// Run a query, and send results or an error
r, err := exec.NewQuery(client, query, queryConfig).Run()
if err != nil {
errCh <- err
} else {
resultCh <- r
}
wg.Done()
}

0 comments on commit 7ee9eac

Please sign in to comment.