Skip to content

Commit

Permalink
Workin pretty well
Browse files Browse the repository at this point in the history
  • Loading branch information
robinovitch61 committed Jul 9, 2022
1 parent aff24c4 commit 19f81e3
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 33 deletions.
44 changes: 36 additions & 8 deletions internal/tui/components/app/app.go
Expand Up @@ -37,7 +37,7 @@ type Model struct {
logType nomad.LogType

eventsStream *bufio.Reader
eventsCloser io.ReadCloser
eventsBody io.ReadCloser
event string

execWebSocket *websocket.Conn
Expand Down Expand Up @@ -85,6 +85,9 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}

switch msg := msg.(type) {
case message.CleanupCompleteMsg:
return m, tea.Quit

case tea.KeyMsg:
// always exit if desired, or don't respond if typing "q" legitimately in some text input
if key.Matches(msg, keymap.KeyMap.Exit) {
Expand All @@ -93,7 +96,7 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
enteringInput := currentPageModel != nil && currentPageModel.EnteringInput()
typingQLegitimately := msg.String() == "q" && (addingQToFilter || saving || enteringInput || m.inPty)
if !typingQLegitimately || m.err != nil {
return m, tea.Quit
return m, m.cleanupCmd()
}
}

Expand Down Expand Up @@ -265,7 +268,15 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.getCurrentPageModel().SetViewportSelectionEnabled(false)
}
case nomad.EventsPage:
if m.eventsBody != nil {
err := m.eventsBody.Close()
if err != nil {
m.err = err
return m, nil
}
}
m.eventsStream = msg.Connection.Reader
m.eventsBody = msg.Connection.Body
cmds = append(cmds, nomad.ReadEventsStreamNextMessage(m.eventsStream))
case nomad.LogsPage:
m.getCurrentPageModel().SetViewportSelectionToBottom()
Expand All @@ -277,11 +288,16 @@ func (m Model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {

case nomad.EventsStreamMsg:
if m.currentPage == nomad.EventsPage {
// TODO LEO: Reenable
// if string(msg) != "{}" {
m.getCurrentPageModel().AppendToViewport([]page.Row{{Row: string(msg)}}, true)
// }
cmds = append(cmds, nomad.ReadEventsStreamNextMessage(m.eventsStream))
if !msg.Closed {
if msg.Value != "{}" {
scrollDown := m.getCurrentPageModel().ViewportSelectionAtBottom()
m.getCurrentPageModel().AppendToViewport([]page.Row{{Row: msg.Value}}, true)
if scrollDown {
m.getCurrentPageModel().ScrollViewportToBottom()
}
}
cmds = append(cmds, nomad.ReadEventsStreamNextMessage(m.eventsStream))
}
}

case nomad.UpdatePageDataMsg:
Expand Down Expand Up @@ -362,7 +378,7 @@ func (m *Model) initialize() {
jobSpecPage := page.New(m.width, pageHeight, m.getFilterPrefix(nomad.JobSpecPage), nomad.JobSpecPage.LoadingString(), false, true, false, nil)
m.pageModels[nomad.JobSpecPage] = &jobSpecPage

eventsPage := page.New(m.width, pageHeight, m.getFilterPrefix(nomad.EventsPage), nomad.EventsPage.LoadingString(), true, true, false, nil)
eventsPage := page.New(m.width, pageHeight, m.getFilterPrefix(nomad.EventsPage), nomad.EventsPage.LoadingString(), true, false, false, nil)
m.pageModels[nomad.EventsPage] = &eventsPage

eventPage := page.New(m.width, pageHeight, m.getFilterPrefix(nomad.EventPage), nomad.EventPage.LoadingString(), false, true, false, nil)
Expand All @@ -386,6 +402,18 @@ func (m *Model) initialize() {
m.initialized = true
}

func (m *Model) cleanupCmd() tea.Cmd {
return func() tea.Msg {
if m.eventsBody != nil {
_ = m.eventsBody.Close()
}
if m.execWebSocket != nil {
nomad.CloseWebSocket(m.execWebSocket)()
}
return message.CleanupCompleteMsg{}
}
}

func (m *Model) setPageWindowSize() {
for _, pm := range m.pageModels {
pm.SetWindowSize(m.width, m.getPageHeight())
Expand Down
7 changes: 7 additions & 0 deletions internal/tui/components/page/page.go
Expand Up @@ -257,6 +257,13 @@ func (m Model) GetSelectedPageRow() (Row, error) {
return Row{}, fmt.Errorf("selection invalid")
}

func (m Model) ViewportSelectionAtBottom() bool {
if !m.viewport.SelectionEnabled() {
return false
}
return m.viewport.SelectedContentIdx() == len(m.pageData.Filtered)-1
}

func (m Model) EnteringInput() bool {
return m.doesRequestInput && m.needsNewInput
}
Expand Down
2 changes: 2 additions & 0 deletions internal/tui/message/message.go
Expand Up @@ -7,3 +7,5 @@ func (e ErrMsg) Error() string { return e.Err.Error() }
type PageInputReceivedMsg struct {
Input string
}

type CleanupCompleteMsg struct{}
4 changes: 2 additions & 2 deletions internal/tui/nomad/allocations.go
Expand Up @@ -81,8 +81,8 @@ type allocationRowEntry struct {

func FetchAllocations(url, token, jobID, jobNamespace string) tea.Cmd {
return func() tea.Msg {
params := map[string]string{
"namespace": jobNamespace,
params := [][2]string{
{"namespace", jobNamespace},
}
fullPath := fmt.Sprintf("%s%s%s%s", url, "/v1/job/", jobID, "/allocations")
body, err := get(fullPath, token, params)
Expand Down
23 changes: 16 additions & 7 deletions internal/tui/nomad/events.go
Expand Up @@ -8,14 +8,20 @@ import (
"strings"
)

type EventsStreamMsg string
type EventsStreamMsg struct {
Value string
Closed bool
}

func FetchEventsStream(url, token string) tea.Cmd {
return func() tea.Msg {
params := map[string]string{
"namespace": "default",
"topic": "Allocation",
"index": "0",
params := [][2]string{
{"namespace", "default"},
{"topic", "Allocation"},
{"topic", "Job"},
{"topic", "Deployment"},
{"topic", "Evaluation"},
{"index", "0"},
}
fullPath := fmt.Sprintf("%s%s", url, "/v1/event/stream")
resp, err := doQuery(fullPath, token, params)
Expand All @@ -29,17 +35,20 @@ func FetchEventsStream(url, token string) tea.Cmd {
return message.ErrMsg{Err: fmt.Errorf("token cannot access topic set %s", "TODO LEO")}
}

return PageLoadedMsg{Page: EventsPage, Connection: PersistentConnection{Reader: reader}}
return PageLoadedMsg{Page: EventsPage, Connection: persistentConnection{Reader: reader, Body: resp.Body}}
}
}

func ReadEventsStreamNextMessage(r *bufio.Reader) tea.Cmd {
return func() tea.Msg {
line, err := r.ReadBytes('\n')
if err != nil {
if strings.Contains(err.Error(), "response body closed") {
return EventsStreamMsg{Closed: true}
}
return message.ErrMsg{Err: err}
}
trimmed := strings.TrimSpace(string(line))
return EventsStreamMsg(trimmed)
return EventsStreamMsg{Value: trimmed}
}
}
4 changes: 2 additions & 2 deletions internal/tui/nomad/jobs.go
Expand Up @@ -53,8 +53,8 @@ type jobResponseEntry struct {

func FetchJobs(url, token string) tea.Cmd {
return func() tea.Msg {
params := map[string]string{
"namespace": "*",
params := [][2]string{
{"namespace", "*"},
}
fullPath := fmt.Sprintf("%s%s", url, "/v1/jobs")
body, err := get(fullPath, token, params)
Expand Down
4 changes: 2 additions & 2 deletions internal/tui/nomad/jobspec.go
Expand Up @@ -10,8 +10,8 @@ import (

func FetchJobSpec(url, token, jobID, jobNamespace string) tea.Cmd {
return func() tea.Msg {
params := map[string]string{
"namespace": jobNamespace,
params := [][2]string{
{"namespace", jobNamespace},
}
fullPath := fmt.Sprintf("%s%s%s", url, "/v1/job/", jobID)
body, err := get(fullPath, token, params)
Expand Down
12 changes: 6 additions & 6 deletions internal/tui/nomad/logs.go
Expand Up @@ -38,12 +38,12 @@ func (p LogType) ShortString() string {

func FetchLogs(url, token, allocID, taskName string, logType LogType) tea.Cmd {
return func() tea.Msg {
params := map[string]string{
"task": taskName,
"type": logType.ShortString(),
"origin": "end",
"offset": "1000000",
"plain": "true",
params := [][2]string{
{"task", taskName},
{"type", logType.ShortString()},
{"origin", "end"},
{"offset", "1000000"},
{"plain", "true"},
}
fullPath := fmt.Sprintf("%s%s%s", url, "/v1/client/fs/logs/", allocID)
body, err := get(fullPath, token, params)
Expand Down
6 changes: 4 additions & 2 deletions internal/tui/nomad/pages.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/robinovitch61/wander/internal/tui/formatter"
"github.com/robinovitch61/wander/internal/tui/keymap"
"github.com/robinovitch61/wander/internal/tui/style"
"io"
"strings"
"time"
)
Expand Down Expand Up @@ -158,15 +159,16 @@ func (p Page) GetFilterPrefix(jobID, taskName, allocID string) string {
}
}

type PersistentConnection struct {
type persistentConnection struct {
Reader *bufio.Reader
Body io.ReadCloser
}

type PageLoadedMsg struct {
Page Page
TableHeader []string
AllPageRows []page.Row
Connection PersistentConnection
Connection persistentConnection
}

type UpdatePageDataMsg struct{ Page Page }
Expand Down
8 changes: 4 additions & 4 deletions internal/tui/nomad/util.go
Expand Up @@ -11,7 +11,7 @@ import (
"net/url"
)

func doQuery(url, token string, params map[string]string) (*http.Response, error) {
func doQuery(url, token string, params [][2]string) (*http.Response, error) {
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
Expand All @@ -20,8 +20,8 @@ func doQuery(url, token string, params map[string]string) (*http.Response, error
req.Header.Set("X-Nomad-Token", token)

query := req.URL.Query()
for key, val := range params {
query.Add(key, val)
for _, p := range params {
query.Add(p[0], p[1])
}
req.URL.RawQuery = query.Encode()

Expand All @@ -32,7 +32,7 @@ func doQuery(url, token string, params map[string]string) (*http.Response, error
return resp, nil
}

func get(url, token string, params map[string]string) ([]byte, error) {
func get(url, token string, params [][2]string) ([]byte, error) {
resp, err := doQuery(url, token, params)
if err != nil {
return nil, err
Expand Down

0 comments on commit 19f81e3

Please sign in to comment.