Skip to content

Commit

Permalink
fix: Add example case.
Browse files Browse the repository at this point in the history
  • Loading branch information
locona committed Feb 17, 2020
1 parent d03a626 commit 61c764a
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 13 deletions.
78 changes: 69 additions & 9 deletions examples/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/k0kubun/pp"
"github.com/lithammer/dedent"
Expand All @@ -14,18 +17,21 @@ func SessionsList() {
pp.Println(res, err)
}

func SessionsGet(sessionID int) {
func SessionsGet(sessionID int) *livy.Session {
svc := livy.NewService(context.Background())
res, err := svc.Sessions.Get(sessionID).Do()
pp.Println(res, err)
return res
}

func SessionsInsert() {
func SessionsInsert() *livy.Session {
svc := livy.NewService(context.Background())
res, err := svc.Sessions.Insert(&livy.InsertSessionRequest{
Kind: livy.SessionKind_Spark,
}).Do()
pp.Println(res, err)

return res
}

func SessionsDelete(sessionID int) {
Expand All @@ -52,13 +58,29 @@ func StatementsList(sessionID int) {
pp.Println(res, err)
}

func StatementsGet(sessionID, statementId int) {
func StatementsGet(sessionID, statementID int) *livy.Statement {
svc := livy.NewService(context.Background())
res, err := svc.Statements.Get(sessionID, statementId).Do()
res, err := svc.Statements.Get(sessionID, statementID).Do()
pp.Println(res, err)
return res
}

func StatementsWait(sessionID, statementID int) *livy.Statement {
t := time.NewTicker(5 * time.Second)
defer t.Stop()

for range t.C {
stmt := StatementsGet(sessionID, statementID)
pp.Println(stmt)
if stmt.State == livy.StatementState_Available {
return stmt
}
}

return nil
}

func StatementsInsert(sessionID int) {
func StatementsInsert(sessionID int) *livy.Statement {
svc := livy.NewService(context.Background())
letter := "val NUM_SAMPLES = 100000;\n" +
"val count = sc.parallelize(1 to NUM_SAMPLES).map { i => \n" +
Expand All @@ -74,6 +96,7 @@ func StatementsInsert(sessionID int) {
Code: dedent.Dedent(letter),
}).Do()
pp.Println(res, err)
return res
}

func BatchesList() {
Expand All @@ -82,15 +105,52 @@ func BatchesList() {
pp.Println(res, err)
}

func work() {
fmt.Println("#")
}

func routine(command <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
var status = "Play"
for {
select {
case cmd := <-command:
switch cmd {
case "Stop":
return
case "Pause":
status = "Pause"
default:
status = "Play"
}
default:
if status == "Play" {
work()
}
}
}
}

func main() {
// add your function calls here
sessionID := 2
// SessionsInsert()
// SessionsGet(sessionID)
// sessionID := 0
// session := SessionsInsert()
session := SessionsGet(0)
// SessionsDelete(sessionID)
// SessionsState(sessionID)
// SessionsLog(sessionID)

// Statement
// wg := sync.WaitGroup{}
// wg.Add(1)
// command := make(chan string)
// go routine(command & wg)
// StatementsList(sessionID)
StatementsInsert(sessionID)
statement := StatementsInsert(session.ID)
pp.Println(session.ID, statement.ID)
// statement := StatementsGet(session.ID, statement.ID)
statement = StatementsWait(session.ID, statement.ID)
pp.Println(statement)
b, _ := statement.Output.Data.MarshalJSON()
fmt.Println(string(b))
}
1 change: 0 additions & 1 deletion gensupport/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gensupport
import (
"bytes"
"encoding/json"
"fmt"
"io"
"strings"
)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ require (
github.com/k0kubun/pp v3.0.1+incompatible
github.com/lithammer/dedent v1.1.0
github.com/mattn/go-colorable v0.1.4 // indirect
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
4 changes: 1 addition & 3 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"

"github.com/locona/livy/gensupport"
Expand All @@ -27,7 +26,7 @@ type Statement struct {
ID int
Code string
State StatementState
Output StatementState
Output StatementOutput
}

type StatementOutput struct {
Expand Down Expand Up @@ -146,7 +145,6 @@ func (c *StatementsInsertCall) Do() (*Statement, error) {
return nil, err
}

s, _ := ioutil.ReadAll(res.Body)
statement := &Statement{}
err = gensupport.DecodeResponse(statement, res)
if err != nil {
Expand Down

0 comments on commit 61c764a

Please sign in to comment.