From 834e23236927852bac9162695e2f01e2993e8ec9 Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Tue, 7 Mar 2023 20:30:31 +0900 Subject: [PATCH 01/11] Major changes in memory usage Stops storing all read files in memory. This change applies to regular files only, not pipes or compressed files. Change to control by channel to make it work more asynchronously. Change File Watch to Directory Watch to react to file descriptor changes. --- oviewer/action.go | 7 + oviewer/document.go | 74 ++++-- oviewer/document_test.go | 6 +- oviewer/event.go | 6 - oviewer/exec.go | 2 - oviewer/logdoc.go | 9 +- oviewer/move.go | 15 +- oviewer/oviewer.go | 34 ++- oviewer/reader.go | 494 ++++++++++++++++++++++++++------------- oviewer/reader_test.go | 3 + oviewer/uncompress.go | 9 +- 11 files changed, 445 insertions(+), 214 deletions(-) diff --git a/oviewer/action.go b/oviewer/action.go index 1c143958..223afea4 100644 --- a/oviewer/action.go +++ b/oviewer/action.go @@ -67,6 +67,11 @@ func (root *Root) toggleFollowMode() { // toggleFollowAll toggles follow all mode. func (root *Root) toggleFollowAll() { root.General.FollowAll = !root.General.FollowAll + root.mu.Lock() + for _, doc := range root.DocList { + doc.latestNum = doc.BufEndNum() + } + root.mu.Unlock() } // toggleFollowSection toggles follow section mode. @@ -98,10 +103,12 @@ func (root *Root) reload(m *Document) { return } + root.mu.Lock() if err := m.reload(); err != nil { log.Printf("cannot reload: %s", err) return } + root.mu.Unlock() root.releaseEventBuffer() // Reserve time to read. time.Sleep(100 * time.Millisecond) diff --git a/oviewer/document.go b/oviewer/document.go index d0eaf2ec..24af36c0 100644 --- a/oviewer/document.go +++ b/oviewer/document.go @@ -25,20 +25,16 @@ type Document struct { FileName string // Caption is an additional caption to display after the file name. Caption string - + // filepath stores the absolute pathname for file watching. + filepath string // File is the os.File. file *os.File // chunks is the content of the file to be stored in chunks. chunks []*chunk - // notify when eof is reached. - eofCh chan struct{} - // notify when reopening. - followCh chan struct{} - - // notify when a file changes. - changCh chan struct{} + // Specifies the chunk to read. -1 reads the new last line. + ctlCh chan controlSpecifier cancel context.CancelFunc @@ -61,6 +57,7 @@ type Document struct { // offset offset int64 + startNum int // endNum is the number of the last line read. endNum int @@ -127,12 +124,26 @@ type chunk struct { start int64 } +type controlSpecifier struct { + chunkNum int + done chan struct{} + control control +} + +type control string + +const ( + firstControl = "first" + countControl = "count" + followControl = "follow" + closeControl = "close" + reloadControl = "reload" + readControl = "read" +) + // NewDocument returns Document. func NewDocument() (*Document, error) { m := &Document{ - eofCh: make(chan struct{}), - followCh: make(chan struct{}), - changCh: make(chan struct{}), tickerDone: make(chan struct{}), general: general{ ColumnDelimiter: "", @@ -140,6 +151,7 @@ func NewDocument() (*Document, error) { MarkStyleWidth: 1, PlainMode: false, }, + ctlCh: make(chan controlSpecifier, 10), seekable: true, preventReload: false, chunks: []*chunk{ @@ -189,7 +201,8 @@ func OpenDocument(fileName string) (*Document, error) { m.seekable = false } - if err := m.ReadFile(fileName); err != nil { + m.FileName = fileName + if err := m.ControlFile(fileName); err != nil { return nil, err } return m, nil @@ -204,7 +217,7 @@ func STDINDocument() (*Document, error) { m.seekable = false m.Caption = "(STDIN)" - if err := m.ReadFile(""); err != nil { + if err := m.ControlFile(""); err != nil { return nil, err } return m, nil @@ -212,26 +225,37 @@ func STDINDocument() (*Document, error) { // GetLine returns one line from buffer. func (m *Document) GetLine(n int) string { - m.mu.Lock() - defer m.mu.Unlock() - - if n < 0 || n >= m.endNum { + if n < m.startNum || n >= m.endNum { return "" } - chunkNum := n / ChunkSize - chunkLine := n % ChunkSize if len(m.chunks)-1 < chunkNum { log.Println("over chunk size: ", chunkNum) return "" } chunk := m.chunks[chunkNum] - if len(chunk.lines)-1 < chunkLine { - log.Printf("over lines size: chunk[%d]:%d < %d", chunkNum, len(chunk.lines)-1, chunkLine) - return "" + + if len(chunk.lines) == 0 { + log.Println("not in memory", chunkNum) + sc := controlSpecifier{ + control: readControl, + chunkNum: chunkNum, + done: make(chan struct{}), + } + m.ctlCh <- sc + <-sc.done + log.Println("load chunk", chunkNum) } - return chunk.lines[chunkLine] + m.mu.Lock() + defer m.mu.Unlock() + + cn := n % ChunkSize + if cn < len(chunk.lines) { + return chunk.lines[cn] + } + log.Println("not load", n, m.endNum) + return "" } // CurrentLN returns the currently displayed line number. @@ -305,7 +329,9 @@ func (m *Document) getLineC(lN int, tabWidth int) (LineC, bool) { str: str, pos: pos, } - m.cache.Add(lN, line) + if len(org) != 0 { + m.cache.Add(lN, line) + } lc := make(contents, len(org)) copy(lc, org) diff --git a/oviewer/document_test.go b/oviewer/document_test.go index a6a22824..c69d8306 100644 --- a/oviewer/document_test.go +++ b/oviewer/document_test.go @@ -82,7 +82,8 @@ func TestDocument_lineToContents(t *testing.T) { if err := m.ReadAll(bytes.NewBufferString(tt.str)); err != nil { t.Fatal(err) } - <-m.eofCh + for !m.BufEOF() { + } t.Logf("num:%d", m.BufEndNum()) got, err := m.contents(tt.args.lN, tt.args.tabWidth) if (err != nil) != tt.wantErr { @@ -136,7 +137,8 @@ func TestDocument_Export(t *testing.T) { t.Fatal(err) } w := &bytes.Buffer{} - <-m.eofCh + for !m.BufEOF() { + } m.bottomLN = m.BufEndNum() m.Export(w, tt.args.start, tt.args.end) if gotW := w.String(); gotW != tt.wantW { diff --git a/oviewer/event.go b/oviewer/event.go index 87b42702..d440ac36 100644 --- a/oviewer/event.go +++ b/oviewer/event.go @@ -177,9 +177,6 @@ func (root *Root) follow() { if root.General.FollowAll { root.followAll() } - - root.Doc.onceFollowMode() - num := root.Doc.BufEndNum() if root.Doc.latestNum == num { return @@ -202,10 +199,8 @@ func (root *Root) followAll() { } current := root.CurrentDoc - root.mu.RLock() for n, doc := range root.DocList { - doc.onceFollowMode() if doc.latestNum != doc.BufEndNum() { current = n } @@ -213,7 +208,6 @@ func (root *Root) followAll() { root.mu.RUnlock() if root.CurrentDoc != current { - log.Printf("switch document: %d", current) root.switchDocument(current) } } diff --git a/oviewer/exec.go b/oviewer/exec.go index c8931bdb..c0ac582d 100644 --- a/oviewer/exec.go +++ b/oviewer/exec.go @@ -88,8 +88,6 @@ func commandStart(command *exec.Cmd) (io.Reader, io.Reader, error) { } func finishCommand(docout *Document, docerr *Document) { - <-docout.eofCh - <-docerr.eofCh atomic.StoreInt32(&docout.changed, 1) atomic.StoreInt32(&docerr.changed, 1) atomic.StoreInt32(&docout.closed, 1) diff --git a/oviewer/logdoc.go b/oviewer/logdoc.go index d85e5009..a9d37626 100644 --- a/oviewer/logdoc.go +++ b/oviewer/logdoc.go @@ -12,9 +12,12 @@ func NewLogDoc() (*Document, error) { return nil, err } m.FollowMode = true - m.FileName = "Log" + m.Caption = "Log" m.seekable = false log.SetOutput(m) + if err := m.ControlNonFile(); err != nil { + return nil, err + } return m, nil } @@ -26,6 +29,10 @@ func (m *Document) Write(p []byte) (int, error) { if len(chunk.lines) >= ChunkSize { chunk = NewChunk(m.size) m.mu.Lock() + if len(m.chunks) > 2 { + m.chunks[len(m.chunks)-2].lines = nil + m.startNum = ChunkSize * (len(m.chunks) - 1) + } m.chunks = append(m.chunks, chunk) m.mu.Unlock() } diff --git a/oviewer/move.go b/oviewer/move.go index 0a23808a..2fdb0bec 100644 --- a/oviewer/move.go +++ b/oviewer/move.go @@ -15,7 +15,7 @@ func (m *Document) moveLine(lN int) int { // moveTop moves to the top. func (m *Document) moveTop() { - m.moveLine(0) + m.moveLine(m.startNum) } // Go to the top line. @@ -58,7 +58,7 @@ func (root *Root) movePgUp() { defer root.releaseEventBuffer() root.moveNumUp(root.statusPos - root.headerLen) - if root.Doc.topLN < 0 { + if root.Doc.topLN < root.Doc.startNum { root.Doc.moveTop() } } @@ -98,7 +98,7 @@ func (root *Root) moveHfUp() { defer root.releaseEventBuffer() root.moveNumUp((root.statusPos - root.headerLen) / 2) - if root.Doc.topLN < 0 { + if root.Doc.topLN < root.Doc.startNum { root.Doc.moveTop() } } @@ -201,7 +201,8 @@ func (root *Root) moveUpN(n int) { defer root.releaseEventBuffer() m := root.Doc - if m.topLN == 0 && m.topLX == 0 { + if m.topLN <= m.startNum && m.topLX == 0 { + root.setMessage("Out of bounds") return } @@ -225,8 +226,8 @@ func (root *Root) moveUpN(n int) { // Previous line. m.topLN -= n - if m.topLN < 0 { - m.topLN = 0 + if m.topLN < m.startNum { + m.topLN = m.startNum m.topLX = 0 return } @@ -322,7 +323,7 @@ func (root *Root) prevSection() { return } n = (n - m.firstLine()) + m.SectionStartPosition - n = max(n, 0) + n = max(n, m.startNum) m.moveLine(n) } diff --git a/oviewer/oviewer.go b/oviewer/oviewer.go index 7aff97ab..06fa4c71 100644 --- a/oviewer/oviewer.go +++ b/oviewer/oviewer.go @@ -8,6 +8,7 @@ import ( "log" "os" "os/signal" + "path/filepath" "regexp" "sync" "syscall" @@ -310,6 +311,8 @@ const MaxWriteLog int = 10 var ( // ErrOutOfRange indicates that value is out of range. ErrOutOfRange = errors.New("out of range") + // ErrNotInMemory indicates that value is not in memory. + ErrNotInMemory = errors.New("not in memory") // ErrFatalCache indicates that the cache value had a fatal error. ErrFatalCache = errors.New("fatal error in cache value") // ErrMissingFile indicates that the file does not exist. @@ -468,10 +471,23 @@ func (root *Root) SetWatcher(watcher *fsnotify.Watcher) { } root.mu.Lock() for _, doc := range root.DocList { - if doc.FileName == event.Name { - select { - case doc.changCh <- struct{}{}: - default: + if doc.filepath == event.Name { + switch event.Op { + case fsnotify.Write: + select { + case doc.ctlCh <- controlSpecifier{control: followControl}: + log.Printf("notify send %v", event) + default: + log.Println("???", len(doc.ctlCh)) + } + case fsnotify.Remove, fsnotify.Create: + select { + case doc.ctlCh <- controlSpecifier{control: reloadControl}: + log.Printf("notify send %v", event) + default: + log.Println("???", len(doc.ctlCh)) + } + } } } @@ -486,7 +502,15 @@ func (root *Root) SetWatcher(watcher *fsnotify.Watcher) { }() for _, doc := range root.DocList { - if err := watcher.Add(doc.FileName); err != nil { + fileName, err := filepath.Abs(doc.FileName) + if err != nil { + log.Println(err) + continue + } + doc.filepath = fileName + + path := filepath.Dir(fileName) + if err := watcher.Add(path); err != nil { root.debugMessage(fmt.Sprintf("watcher %s:%s", doc.FileName, err)) } } diff --git a/oviewer/reader.go b/oviewer/reader.go index f4a5a3b1..91a7c05a 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -16,189 +16,263 @@ import ( const FormFeed = "\f" -// ReadFile reads file. -// If the file name is empty, read from standard input. -func (m *Document) ReadFile(fileName string) error { - m.mu.Lock() - defer m.mu.Unlock() - - f, err := open(fileName) - if err != nil { - return err - } - m.file = f - m.FileName = fileName - - cFormat, r := uncompressedReader(m.file) - m.CFormat = cFormat +// ControlFile controls file read and loads in chunks. +func (m *Document) ControlFile(fileName string) error { + go func() error { + r, err := m.openFile(fileName) + if err != nil { + log.Println(err) + return err + } + atomic.StoreInt32(&m.eof, 0) + reader := bufio.NewReader(r) + for sc := range m.ctlCh { + switch sc.control { + case firstControl: + reader, err = m.firstControl(reader) + case countControl: + reader, err = m.countControl(reader) + case followControl: + reader, err = m.followControl(reader) + case reloadControl: + reader, err = m.reloadControl(reader) + case readControl: + reader, err = m.readControl(reader, sc.chunkNum) + default: + panic(fmt.Sprintf("unexpected %s", sc.control)) + } - go m.waitEOF() + if sc.done != nil { + close(sc.done) + } + if err != nil { + log.Println(err) + close(m.ctlCh) + } + } + log.Println("close m.ctlCh") + return nil + }() - if STDOUTPIPE != nil { - r = io.TeeReader(r, STDOUTPIPE) + m.ctlCh <- controlSpecifier{ + control: firstControl, + chunkNum: 0, + done: nil, } - - return m.ReadAll(r) + return nil } -func open(fileName string) (*os.File, error) { - if fileName == "" { - if term.IsTerminal(0) { - return nil, ErrMissingFile +// ControlNonFile only supports reload. +func (m *Document) ControlNonFile() error { + go func() error { + var err error + for sc := range m.ctlCh { + switch sc.control { + case reloadControl: + m.reset() + case readControl: + chunk := m.chunks[sc.chunkNum] + chunk.lines = make([]string, ChunkSize) + default: + panic(fmt.Sprintf("unexpected %s", sc.control)) + } + if err != nil { + log.Println(err) + } + if sc.done != nil { + close(sc.done) + } } - return os.Stdin, nil - } - - f, err := os.Open(fileName) - if err != nil { - return nil, err - } - return f, nil + log.Println("close ctlCh") + return nil + }() + return nil } -// waitEOF waits until EOF is reached before closing. -func (m *Document) waitEOF() { - <-m.eofCh - if m.seekable { - if err := m.close(); err != nil { - log.Printf("EOF: %s", err) +func (m *Document) firstControl(reader *bufio.Reader) (*bufio.Reader, error) { + if err := m.firstRead(reader); err != nil { + if !errors.Is(err, io.EOF) { + return nil, err } + reader = m.afterEOF(reader) + } + if m.BufEOF() { + return reader, nil } - atomic.StoreInt32(&m.changed, 1) - m.followCh <- struct{}{} -} - -// ReadReader reads reader. -// A wrapper for ReadAll, used when eofCh notifications are not needed. -func (m *Document) ReadReader(r io.Reader) error { go func() { - <-m.eofCh + m.ctlCh <- controlSpecifier{ + control: countControl, + chunkNum: 0, + done: nil, + } }() - return m.ReadAll(r) + return reader, nil } -// ReadAll reads all from the reader. -// And store it in the lines of the Document. -// ReadAll needs to be notified on eofCh. -func (m *Document) ReadAll(r io.Reader) error { - reader := bufio.NewReader(r) - go func() { - if m.checkClose() { - return +func (m *Document) countControl(reader *bufio.Reader) (*bufio.Reader, error) { + if m.seekable { + if _, err := m.file.Seek(m.offset, io.SeekStart); err != nil { + return nil, err } - - if err := m.readAll(reader); err != nil { - if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, os.ErrClosed) { - m.eofCh <- struct{}{} - atomic.StoreInt32(&m.eof, 1) - return - } - log.Printf("error: %v\n", err) - atomic.StoreInt32(&m.eof, 0) - return + reader.Reset(m.file) + } + chunk := m.lastChunk() + start := 0 + if err := m.countChunk(chunk, reader, start); err != nil { + if !errors.Is(err, io.EOF) { + return nil, err } - }() - - // Named pipes for continuous read. - if !m.seekable { - m.onceFollowMode() + reader = m.afterEOF(reader) } - return nil + if !m.BufEOF() { + chunk := NewChunk(m.size) + m.mu.Lock() + m.chunks = append(m.chunks, chunk) + m.mu.Unlock() + go func() { + m.ctlCh <- controlSpecifier{ + control: countControl, + chunkNum: 0, + done: nil, + } + }() + } + return reader, nil } -// onceFollowMode opens the follow mode only once. -func (m *Document) onceFollowMode() { - if atomic.SwapInt32(&m.openFollow, 1) == 1 { - return +func (m *Document) followControl(reader *bufio.Reader) (*bufio.Reader, error) { + if !m.FollowMode && !m.FollowAll { + return reader, nil } - if m.file == nil { - return + if m.seekable { + if _, err := m.file.Seek(m.offset, io.SeekStart); err != nil { + return nil, fmt.Errorf("seek:%w", err) + } + reader = bufio.NewReader(m.file) } - - var cancel context.CancelFunc - ctx := context.Background() - ctx, cancel = context.WithCancel(ctx) - go m.startFollowMode(ctx, cancel) - m.cancel = cancel + if err := m.readAll(reader); err != nil { + if !errors.Is(err, io.EOF) { + return nil, err + } + reader = m.afterEOF(reader) + } + return reader, nil } -// startFollowMode opens the file in follow mode. -// Seek to the position where the file was closed, and then read. -func (m *Document) startFollowMode(ctx context.Context, cancel context.CancelFunc) { - defer cancel() - <-m.followCh +func (m *Document) readControl(reader *bufio.Reader, chunkNum int) (*bufio.Reader, error) { + chunk := m.chunks[chunkNum] + start := 0 if m.seekable { - // Wait for the file to open until it changes. - select { - case <-ctx.Done(): - return - case <-m.changCh: + if _, err := m.file.Seek(chunk.start, io.SeekStart); err != nil { + return nil, err } - m.file = m.openFollowFile() + reader.Reset(m.file) } - - r := compressedFormatReader(m.CFormat, m.file) - if err := m.ContinueReadAll(ctx, r); err != nil { - log.Printf("%s follow mode read %v", m.FileName, err) + if err := m.packChunk(chunk, reader, start, false); err != nil { + if !errors.Is(err, io.EOF) { + return nil, err + } + reader = m.afterEOF(reader) } + return reader, nil } -// openFollowFile opens a file in follow mode. -func (m *Document) openFollowFile() *os.File { - m.mu.Lock() - defer m.mu.Unlock() - r, err := os.Open(m.FileName) +func (m *Document) reloadControl(reader *bufio.Reader) (*bufio.Reader, error) { + if !m.WatchMode { + m.reset() + } else { + chunk := m.lastChunk() + m.appendFormFeed(chunk) + } + var err error + reader, err = m.reloadFile(reader) if err != nil { - log.Printf("openFollowFile: %s", err) - return m.file + return nil, err } - atomic.StoreInt32(&m.closed, 0) - atomic.StoreInt32(&m.eof, 0) - if _, err := r.Seek(m.offset, io.SeekStart); err != nil { - log.Printf("openFollowMode: %s", err) + err = m.firstRead(reader) + if err != nil { + if !errors.Is(err, io.EOF) { + return nil, err + } + reader = m.afterEOF(reader) } - return r + + return reader, nil } -// Close closes the File. -// Record the last read position. -func (m *Document) close() error { - if m.checkClose() { - return nil +func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { + if !m.seekable { + m.ClearCache() + return reader, nil + } + if err := m.file.Close(); err != nil { + log.Printf("read: %s", err) } + m.ClearCache() + r, err := m.openFile(m.FileName) + if err != nil { + str := fmt.Sprintf("Access is no longer possible: %s", err) + reader = bufio.NewReader(strings.NewReader(str)) + return reader, nil + } + reader = bufio.NewReader(r) + return reader, nil +} +func (m *Document) afterEOF(reader *bufio.Reader) *bufio.Reader { m.offset = m.size - if err := m.file.Close(); err != nil { - return fmt.Errorf("close: %w", err) + atomic.StoreInt32(&m.eof, 1) + if !m.seekable { // for NamedPipe. + return bufio.NewReader(m.file) } - atomic.StoreInt32(&m.openFollow, 0) - atomic.StoreInt32(&m.closed, 1) - atomic.StoreInt32(&m.changed, 1) - return nil + return reader } -// ContinueReadAll continues to read even if it reaches EOF. -func (m *Document) ContinueReadAll(ctx context.Context, r io.Reader) error { - reader := bufio.NewReader(r) - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - if m.checkClose() { - return nil +func (m *Document) openFile(fileName string) (io.Reader, error) { + f, err := open(fileName) + if err != nil { + return nil, err + } + m.mu.Lock() + m.file = f + + cFormat, r := uncompressedReader(m.file, m.seekable) + if cFormat == UNCOMPRESSED { + if m.seekable { + f.Seek(0, io.SeekStart) + r = f } + } + m.CFormat = cFormat + if STDOUTPIPE != nil { + r = io.TeeReader(r, STDOUTPIPE) + } + m.mu.Unlock() - if err := m.readAll(reader); err != nil { - if errors.Is(err, io.EOF) { - <-m.changCh - continue - } - return err + return r, nil +} + +func open(fileName string) (*os.File, error) { + if fileName == "" { + if term.IsTerminal(0) { + return nil, ErrMissingFile } + return os.Stdin, nil + } + + f, err := os.Open(fileName) + if err != nil { + return nil, err + } + return f, nil +} + +func (m *Document) firstRead(reader *bufio.Reader) error { + if !m.seekable || m.CFormat != UNCOMPRESSED { + return m.readAll(reader) } + return m.readFirstOnly(reader) } // readAll actually reads everything. @@ -218,6 +292,17 @@ func (m *Document) readAll(reader *bufio.Reader) error { } } +// readAll actually reads everything. +// The read lines are stored in the lines of the Document. +func (m *Document) readFirstOnly(reader *bufio.Reader) error { + chunk := m.chunks[0] + start := 0 + if err := m.packChunk(chunk, reader, start, true); err != nil { + return err + } + return nil +} + // packChunk append lines read from reader into chunks. func (m *Document) packChunk(chunk *chunk, reader *bufio.Reader, start int, isCount bool) error { var line strings.Builder @@ -241,6 +326,7 @@ func (m *Document) packChunk(chunk *chunk, reader *bufio.Reader, start int, isCo if line.Len() != 0 { if isCount { m.append(chunk, line.String()) + m.offset = m.size } else { m.appendOnly(chunk, line.String()) } @@ -249,6 +335,7 @@ func (m *Document) packChunk(chunk *chunk, reader *bufio.Reader, start int, isCo } if isCount { m.append(chunk, line.String()) + m.offset = m.size } else { m.appendOnly(chunk, line.String()) } @@ -257,6 +344,34 @@ func (m *Document) packChunk(chunk *chunk, reader *bufio.Reader, start int, isCo return nil } +func (m *Document) countChunk(chunk *chunk, reader *bufio.Reader, start int) error { + var isPrefix bool + i := start + for i < ChunkSize { + buf, err := reader.ReadSlice('\n') + if err == bufio.ErrBufferFull { + isPrefix = true + err = nil + } + m.size += int64(len(buf)) + if isPrefix { + isPrefix = false + continue + } + + i++ + if len(buf) != 0 { + m.endNum++ + } + m.offset = m.size + atomic.StoreInt32(&m.changed, 1) + if err != nil { + return err + } + } + return nil +} + // appendOnly appends to the line of the chunk. // appendOnly does not updates the number of lines and size. func (m *Document) appendOnly(chunk *chunk, line string) { @@ -283,7 +398,6 @@ func (m *Document) appendFormFeed(chunk *chunk) { line = chunk.lines[len(chunk.lines)-1] } m.mu.Unlock() - // Do not add if the previous is FormFeed. if line != FormFeed { m.append(chunk, FormFeed) @@ -305,40 +419,25 @@ func (m *Document) lastChunk() *chunk { // Regular files are reopened and reread increase. // The pipe will reset what it has read. func (m *Document) reload() error { - if (m.file == os.Stdin && m.BufEOF()) || !m.seekable && m.checkClose() { - return fmt.Errorf("%w %s", ErrAlreadyClose, m.FileName) - } - - if m.seekable { - if m.cancel != nil { - m.cancel() - } - if !m.checkClose() && m.file != nil { - if err := m.close(); err != nil { - log.Println(err) - } - } + sc := controlSpecifier{ + control: reloadControl, + done: make(chan struct{}), } - - if m.WatchMode { - chunk := m.lastChunk() - m.appendFormFeed(chunk) - } else { - m.reset() + log.Println("reload send") + m.ctlCh <- sc + <-sc.done + log.Println("receive done") + if !m.WatchMode { m.topLN = 0 } - if !m.seekable { - return nil - } - - atomic.StoreInt32(&m.closed, 0) - return m.ReadFile(m.FileName) + return nil } // reset clears all lines. func (m *Document) reset() { m.mu.Lock() + m.size = 0 m.endNum = 0 m.chunks = []*chunk{ NewChunk(0), @@ -352,3 +451,70 @@ func (m *Document) reset() { func (m *Document) checkClose() bool { return atomic.LoadInt32(&m.closed) == 1 } + +// Close closes the File. +// Record the last read position. +func (m *Document) close() error { + if m.checkClose() { + return nil + } + log.Println("close") + if err := m.file.Close(); err != nil { + return fmt.Errorf("close: %w", err) + } + atomic.StoreInt32(&m.closed, 1) + atomic.StoreInt32(&m.changed, 1) + return nil +} + +// ReadFile reads file. +// If the file name is empty, read from standard input. +// +// Deprecated: +func (m *Document) ReadFile(fileName string) error { + r, err := m.openFile(fileName) + if err != nil { + return err + } + return m.ReadAll(r) +} + +// ContinueReadAll continues to read even if it reaches EOF. +// +// Deprecated: +func (m *Document) ContinueReadAll(ctx context.Context, r io.Reader) error { + return m.ReadAll(r) +} + +// ReadReader reads reader. +// A wrapper for ReadAll. +// +// Deprecated: +func (m *Document) ReadReader(r io.Reader) error { + return m.ReadAll(r) +} + +// ReadAll reads all from the reader. +// And store it in the lines of the Document. +// ReadAll needs to be notified on eofCh. +// +// Deprecated: +func (m *Document) ReadAll(r io.Reader) error { + reader := bufio.NewReader(r) + go func() { + if m.checkClose() { + return + } + + if err := m.readAll(reader); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, os.ErrClosed) { + m.offset = m.size + atomic.StoreInt32(&m.eof, 1) + return + } + log.Printf("error: %v\n", err) + return + } + }() + return nil +} diff --git a/oviewer/reader_test.go b/oviewer/reader_test.go index 3479f523..51a9f53a 100644 --- a/oviewer/reader_test.go +++ b/oviewer/reader_test.go @@ -160,6 +160,9 @@ func TestDocument_reload(t *testing.T) { m.FileName = tt.fields.FileName m.WatchMode = tt.fields.WatchMode m.seekable = tt.fields.seekable + if err := m.ControlFile(tt.fields.FileName); err != nil { + t.Fatal("ControlFile error") + } if err := m.reload(); (err != nil) != tt.wantErr { t.Errorf("Document.reload() error = %v, wantErr %v", err, tt.wantErr) } diff --git a/oviewer/uncompress.go b/oviewer/uncompress.go index 3553ec08..c55ae092 100644 --- a/oviewer/uncompress.go +++ b/oviewer/uncompress.go @@ -62,7 +62,7 @@ func (c Compressed) String() string { return "UNCOMPRESSED" } -func uncompressedReader(reader io.Reader) (Compressed, io.Reader) { +func uncompressedReader(reader io.Reader, seekable bool) (Compressed, io.Reader) { buf := [7]byte{} n, err := io.ReadAtLeast(reader, buf[:], len(buf)) if err != nil { @@ -72,10 +72,13 @@ func uncompressedReader(reader io.Reader) (Compressed, io.Reader) { return UNCOMPRESSED, bytes.NewReader(nil) } - mr := io.MultiReader(bytes.NewReader(buf[:n]), reader) cFormat := compressType(buf[:7]) - r := compressedFormatReader(cFormat, mr) + if seekable && cFormat == UNCOMPRESSED { + return UNCOMPRESSED, nil + } + mr := io.MultiReader(bytes.NewReader(buf[:n]), reader) + r := compressedFormatReader(cFormat, mr) return cFormat, r } From 5ca9fd8bfb9ccae7e7153ad2dd62c61ea8db43b1 Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Thu, 9 Mar 2023 18:17:41 +0900 Subject: [PATCH 02/11] WIP: update --- oviewer/document.go | 38 ++++++----------- oviewer/reader.go | 99 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 87 insertions(+), 50 deletions(-) diff --git a/oviewer/document.go b/oviewer/document.go index 24af36c0..5f684358 100644 --- a/oviewer/document.go +++ b/oviewer/document.go @@ -124,23 +124,6 @@ type chunk struct { start int64 } -type controlSpecifier struct { - chunkNum int - done chan struct{} - control control -} - -type control string - -const ( - firstControl = "first" - countControl = "count" - followControl = "follow" - closeControl = "close" - reloadControl = "reload" - readControl = "read" -) - // NewDocument returns Document. func NewDocument() (*Document, error) { m := &Document{ @@ -236,15 +219,7 @@ func (m *Document) GetLine(n int) string { chunk := m.chunks[chunkNum] if len(chunk.lines) == 0 { - log.Println("not in memory", chunkNum) - sc := controlSpecifier{ - control: readControl, - chunkNum: chunkNum, - done: make(chan struct{}), - } - m.ctlCh <- sc - <-sc.done - log.Println("load chunk", chunkNum) + m.loadChunk(chunkNum) } m.mu.Lock() @@ -258,6 +233,17 @@ func (m *Document) GetLine(n int) string { return "" } +// loadChunk loads a chunk into memory. +func (m *Document) loadChunk(chunkNum int) { + sc := controlSpecifier{ + control: loadControl, + chunkNum: chunkNum, + done: make(chan struct{}), + } + m.ctlCh <- sc + <-sc.done +} + // CurrentLN returns the currently displayed line number. func (m *Document) CurrentLN() int { return m.topLN diff --git a/oviewer/reader.go b/oviewer/reader.go index 91a7c05a..7bcf4c4a 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -14,6 +14,24 @@ import ( "golang.org/x/term" ) +type controlSpecifier struct { + chunkNum int + done chan struct{} + control control +} + +type control string + +const ( + firstControl = "first" + readControl = "read" + followControl = "follow" + closeControl = "close" + reloadControl = "reload" + loadControl = "load" + searchControl = "search" +) + const FormFeed = "\f" // ControlFile controls file read and loads in chunks. @@ -30,14 +48,16 @@ func (m *Document) ControlFile(fileName string) error { switch sc.control { case firstControl: reader, err = m.firstControl(reader) - case countControl: - reader, err = m.countControl(reader) + case readControl: + reader, err = m.readControl(reader) case followControl: reader, err = m.followControl(reader) case reloadControl: reader, err = m.reloadControl(reader) - case readControl: - reader, err = m.readControl(reader, sc.chunkNum) + case loadControl: + reader, err = m.loadControl(reader, sc.chunkNum) + case searchControl: + err = m.searchControl(reader, sc.chunkNum) default: panic(fmt.Sprintf("unexpected %s", sc.control)) } @@ -70,7 +90,7 @@ func (m *Document) ControlNonFile() error { switch sc.control { case reloadControl: m.reset() - case readControl: + case loadControl: chunk := m.chunks[sc.chunkNum] chunk.lines = make([]string, ChunkSize) default: @@ -89,6 +109,8 @@ func (m *Document) ControlNonFile() error { return nil } +// firstControl is executed first. +// Open the file and read the first chunk. func (m *Document) firstControl(reader *bufio.Reader) (*bufio.Reader, error) { if err := m.firstRead(reader); err != nil { if !errors.Is(err, io.EOF) { @@ -99,48 +121,45 @@ func (m *Document) firstControl(reader *bufio.Reader) (*bufio.Reader, error) { if m.BufEOF() { return reader, nil } - go func() { - m.ctlCh <- controlSpecifier{ - control: countControl, - chunkNum: 0, - done: nil, - } - }() + m.ctlCh <- controlSpecifier{ + control: readControl, + } return reader, nil } -func (m *Document) countControl(reader *bufio.Reader) (*bufio.Reader, error) { +// readControl is executed after the second +// and only reads the file or counts the lines of the file. +func (m *Document) readControl(reader *bufio.Reader) (*bufio.Reader, error) { if m.seekable { if _, err := m.file.Seek(m.offset, io.SeekStart); err != nil { return nil, err } reader.Reset(m.file) } + chunk := m.lastChunk() start := 0 - if err := m.countChunk(chunk, reader, start); err != nil { + if err := m.readOrCountChunk(chunk, reader, start); err != nil { if !errors.Is(err, io.EOF) { return nil, err } reader = m.afterEOF(reader) } + if !m.BufEOF() { chunk := NewChunk(m.size) m.mu.Lock() m.chunks = append(m.chunks, chunk) m.mu.Unlock() - go func() { - m.ctlCh <- controlSpecifier{ - control: countControl, - chunkNum: 0, - done: nil, - } - }() + m.ctlCh <- controlSpecifier{ + control: readControl, + } } return reader, nil } +// followControl reads lines added to the file while in follow-mode. func (m *Document) followControl(reader *bufio.Reader) (*bufio.Reader, error) { if !m.FollowMode && !m.FollowAll { return reader, nil @@ -160,7 +179,8 @@ func (m *Document) followControl(reader *bufio.Reader) (*bufio.Reader, error) { return reader, nil } -func (m *Document) readControl(reader *bufio.Reader, chunkNum int) (*bufio.Reader, error) { +// loadControl loads the read contents into chunks. +func (m *Document) loadControl(reader *bufio.Reader, chunkNum int) (*bufio.Reader, error) { chunk := m.chunks[chunkNum] start := 0 if m.seekable { @@ -178,6 +198,25 @@ func (m *Document) readControl(reader *bufio.Reader, chunkNum int) (*bufio.Reade return reader, nil } +func (m *Document) searchControl(reader *bufio.Reader, chunkNum int) error { + chunk := m.chunks[chunkNum] + start := 0 + if m.seekable { + if _, err := m.file.Seek(chunk.start, io.SeekStart); err != nil { + return err + } + reader.Reset(m.file) + } + if err := m.packChunk(chunk, reader, start, false); err != nil { + if !errors.Is(err, io.EOF) { + return err + } + reader = m.afterEOF(reader) + } + return nil +} + +// reloadControl performs reload processing func (m *Document) reloadControl(reader *bufio.Reader) (*bufio.Reader, error) { if !m.WatchMode { m.reset() @@ -201,6 +240,15 @@ func (m *Document) reloadControl(reader *bufio.Reader) (*bufio.Reader, error) { return reader, nil } +func (m *Document) readOrCountChunk(chunk *chunk, reader *bufio.Reader, start int) error { + if !m.seekable { + log.Println("pack", len(m.chunks)) + return m.packChunk(chunk, reader, start, true) + } + log.Println("count", len(m.chunks)) + return m.countChunk(chunk, reader, start) +} + func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { if !m.seekable { m.ClearCache() @@ -243,6 +291,8 @@ func (m *Document) openFile(fileName string) (io.Reader, error) { f.Seek(0, io.SeekStart) r = f } + } else { + m.seekable = false } m.CFormat = cFormat if STDOUTPIPE != nil { @@ -269,8 +319,8 @@ func open(fileName string) (*os.File, error) { } func (m *Document) firstRead(reader *bufio.Reader) error { - if !m.seekable || m.CFormat != UNCOMPRESSED { - return m.readAll(reader) + if m.seekable { + return m.readFirstOnly(reader) } return m.readFirstOnly(reader) } @@ -281,6 +331,7 @@ func (m *Document) readAll(reader *bufio.Reader) error { chunk := m.chunks[len(m.chunks)-1] start := len(chunk.lines) for { + log.Println("loop?") if err := m.packChunk(chunk, reader, start, true); err != nil { return err } From ccf6261e22860fba1299e21f66881b972d0a63c8 Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Fri, 10 Mar 2023 16:16:43 +0900 Subject: [PATCH 03/11] WIP: update --- oviewer/document.go | 2 +- oviewer/reader.go | 89 +++++++++++++++++++-------------------------- 2 files changed, 38 insertions(+), 53 deletions(-) diff --git a/oviewer/document.go b/oviewer/document.go index 5f684358..b148636f 100644 --- a/oviewer/document.go +++ b/oviewer/document.go @@ -134,7 +134,7 @@ func NewDocument() (*Document, error) { MarkStyleWidth: 1, PlainMode: false, }, - ctlCh: make(chan controlSpecifier, 10), + ctlCh: make(chan controlSpecifier), seekable: true, preventReload: false, chunks: []*chunk{ diff --git a/oviewer/reader.go b/oviewer/reader.go index 7bcf4c4a..fd670100 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -48,12 +48,19 @@ func (m *Document) ControlFile(fileName string) error { switch sc.control { case firstControl: reader, err = m.firstControl(reader) + if !m.BufEOF() { + m.continueRead() + } case readControl: reader, err = m.readControl(reader) + if !m.BufEOF() { + m.continueRead() + } case followControl: reader, err = m.followControl(reader) case reloadControl: reader, err = m.reloadControl(reader) + m.startRead() case loadControl: reader, err = m.loadControl(reader, sc.chunkNum) case searchControl: @@ -74,14 +81,26 @@ func (m *Document) ControlFile(fileName string) error { return nil }() - m.ctlCh <- controlSpecifier{ - control: firstControl, - chunkNum: 0, - done: nil, - } + m.startRead() return nil } +func (m *Document) startRead() { + go func() { + m.ctlCh <- controlSpecifier{ + control: firstControl, + } + }() +} + +func (m *Document) continueRead() { + go func() { + m.ctlCh <- controlSpecifier{ + control: readControl, + } + }() +} + // ControlNonFile only supports reload. func (m *Document) ControlNonFile() error { go func() error { @@ -118,13 +137,6 @@ func (m *Document) firstControl(reader *bufio.Reader) (*bufio.Reader, error) { } reader = m.afterEOF(reader) } - if m.BufEOF() { - return reader, nil - } - m.ctlCh <- controlSpecifier{ - control: readControl, - } - return reader, nil } @@ -146,16 +158,6 @@ func (m *Document) readControl(reader *bufio.Reader) (*bufio.Reader, error) { } reader = m.afterEOF(reader) } - - if !m.BufEOF() { - chunk := NewChunk(m.size) - m.mu.Lock() - m.chunks = append(m.chunks, chunk) - m.mu.Unlock() - m.ctlCh <- controlSpecifier{ - control: readControl, - } - } return reader, nil } @@ -181,14 +183,17 @@ func (m *Document) followControl(reader *bufio.Reader) (*bufio.Reader, error) { // loadControl loads the read contents into chunks. func (m *Document) loadControl(reader *bufio.Reader, chunkNum int) (*bufio.Reader, error) { + // non-seekable files are all in memory, so loadControl should not be called. + if !m.seekable { + return nil, fmt.Errorf("cannot be loaded") + } + chunk := m.chunks[chunkNum] - start := 0 - if m.seekable { - if _, err := m.file.Seek(chunk.start, io.SeekStart); err != nil { - return nil, err - } - reader.Reset(m.file) + if _, err := m.file.Seek(chunk.start, io.SeekStart); err != nil { + return nil, err } + reader.Reset(m.file) + start := 0 if err := m.packChunk(chunk, reader, start, false); err != nil { if !errors.Is(err, io.EOF) { return nil, err @@ -229,14 +234,6 @@ func (m *Document) reloadControl(reader *bufio.Reader) (*bufio.Reader, error) { if err != nil { return nil, err } - err = m.firstRead(reader) - if err != nil { - if !errors.Is(err, io.EOF) { - return nil, err - } - reader = m.afterEOF(reader) - } - return reader, nil } @@ -258,6 +255,7 @@ func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { log.Printf("read: %s", err) } m.ClearCache() + atomic.StoreInt32(&m.eof, 0) r, err := m.openFile(m.FileName) if err != nil { str := fmt.Sprintf("Access is no longer possible: %s", err) @@ -319,10 +317,9 @@ func open(fileName string) (*os.File, error) { } func (m *Document) firstRead(reader *bufio.Reader) error { - if m.seekable { - return m.readFirstOnly(reader) - } - return m.readFirstOnly(reader) + chunk := m.chunks[0] + start := 0 + return m.packChunk(chunk, reader, start, true) } // readAll actually reads everything. @@ -331,7 +328,6 @@ func (m *Document) readAll(reader *bufio.Reader) error { chunk := m.chunks[len(m.chunks)-1] start := len(chunk.lines) for { - log.Println("loop?") if err := m.packChunk(chunk, reader, start, true); err != nil { return err } @@ -343,17 +339,6 @@ func (m *Document) readAll(reader *bufio.Reader) error { } } -// readAll actually reads everything. -// The read lines are stored in the lines of the Document. -func (m *Document) readFirstOnly(reader *bufio.Reader) error { - chunk := m.chunks[0] - start := 0 - if err := m.packChunk(chunk, reader, start, true); err != nil { - return err - } - return nil -} - // packChunk append lines read from reader into chunks. func (m *Document) packChunk(chunk *chunk, reader *bufio.Reader, start int, isCount bool) error { var line strings.Builder From 35ea3806473d60c62ae13176538ad08723f0a558 Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Sat, 11 Mar 2023 12:52:02 +0900 Subject: [PATCH 04/11] Added support for close --- oviewer/action.go | 4 +- oviewer/doclist.go | 4 +- oviewer/document.go | 23 +++++++++-- oviewer/reader.go | 98 ++++++++++++++++++++++++--------------------- 4 files changed, 74 insertions(+), 55 deletions(-) diff --git a/oviewer/action.go b/oviewer/action.go index 223afea4..17d128b0 100644 --- a/oviewer/action.go +++ b/oviewer/action.go @@ -89,9 +89,7 @@ func (root *Root) closeFile() { root.setMessage("already closed") return } - if err := root.Doc.close(); err != nil { - log.Printf("closeFile: %s", err) - } + root.Doc.closeControl() root.setMessagef("close file %s", root.Doc.FileName) log.Printf("close file %s", root.Doc.FileName) } diff --git a/oviewer/doclist.go b/oviewer/doclist.go index e757f7b5..04c23ec5 100644 --- a/oviewer/doclist.go +++ b/oviewer/doclist.go @@ -50,9 +50,7 @@ func (root *Root) closeDocument() { root.setMessagef("close [%d]%s", root.CurrentDoc, root.Doc.FileName) log.Printf("close [%d]%s", root.CurrentDoc, root.Doc.FileName) root.mu.Lock() - if err := root.DocList[root.CurrentDoc].close(); err != nil { - log.Printf("%s:%s", root.Doc.FileName, err) - } + root.DocList[root.CurrentDoc].closeControl() root.DocList = append(root.DocList[:root.CurrentDoc], root.DocList[root.CurrentDoc+1:]...) if root.CurrentDoc > 0 { root.CurrentDoc-- diff --git a/oviewer/document.go b/oviewer/document.go index b148636f..9e7e9f8a 100644 --- a/oviewer/document.go +++ b/oviewer/document.go @@ -99,6 +99,9 @@ type Document struct { // 1 if there is a closed. closed int32 + // 1 if there is a read cancel. + readCancel int32 + // WatchMode is watch mode. WatchMode bool // preventReload is true to prevent reload. @@ -219,7 +222,7 @@ func (m *Document) GetLine(n int) string { chunk := m.chunks[chunkNum] if len(chunk.lines) == 0 { - m.loadChunk(chunkNum) + m.loadControl(chunkNum) } m.mu.Lock() @@ -233,8 +236,8 @@ func (m *Document) GetLine(n int) string { return "" } -// loadChunk loads a chunk into memory. -func (m *Document) loadChunk(chunkNum int) { +// loadControl sends instructions to load chunks into memory. +func (m *Document) loadControl(chunkNum int) { sc := controlSpecifier{ control: loadControl, chunkNum: chunkNum, @@ -244,6 +247,20 @@ func (m *Document) loadChunk(chunkNum int) { <-sc.done } +func (m *Document) closeControl() { + atomic.StoreInt32(&m.readCancel, 1) + sc := controlSpecifier{ + control: closeControl, + done: make(chan struct{}), + } + + log.Println("close send") + m.ctlCh <- sc + <-sc.done + log.Println("receive done") + atomic.StoreInt32(&m.readCancel, 0) +} + // CurrentLN returns the currently displayed line number. func (m *Document) CurrentLN() int { return m.topLN diff --git a/oviewer/reader.go b/oviewer/reader.go index fd670100..e1e4c820 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -15,21 +15,21 @@ import ( ) type controlSpecifier struct { - chunkNum int done chan struct{} control control + chunkNum int } type control string const ( - firstControl = "first" - readControl = "read" - followControl = "follow" - closeControl = "close" - reloadControl = "reload" - loadControl = "load" - searchControl = "search" + firstControl = "first" + continueControl = "read" + followControl = "follow" + closeControl = "close" + reloadControl = "reload" + loadControl = "load" + searchControl = "search" ) const FormFeed = "\f" @@ -47,24 +47,28 @@ func (m *Document) ControlFile(fileName string) error { for sc := range m.ctlCh { switch sc.control { case firstControl: - reader, err = m.firstControl(reader) + reader, err = m.firstRead(reader) if !m.BufEOF() { - m.continueRead() + m.continueControl() } - case readControl: - reader, err = m.readControl(reader) + case continueControl: + reader, err = m.continueRead(reader) if !m.BufEOF() { - m.continueRead() + m.continueControl() } case followControl: - reader, err = m.followControl(reader) - case reloadControl: - reader, err = m.reloadControl(reader) - m.startRead() + reader, err = m.followRead(reader) case loadControl: - reader, err = m.loadControl(reader, sc.chunkNum) + reader, err = m.readChunk(reader, sc.chunkNum) case searchControl: - err = m.searchControl(reader, sc.chunkNum) + err = m.searchChunk(reader, sc.chunkNum) + case reloadControl: + reader, err = m.reloadRead(reader) + m.startControl() + case closeControl: + err = m.close() + log.Println(err) + err = nil default: panic(fmt.Sprintf("unexpected %s", sc.control)) } @@ -81,11 +85,11 @@ func (m *Document) ControlFile(fileName string) error { return nil }() - m.startRead() + m.startControl() return nil } -func (m *Document) startRead() { +func (m *Document) startControl() { go func() { m.ctlCh <- controlSpecifier{ control: firstControl, @@ -93,10 +97,10 @@ func (m *Document) startRead() { }() } -func (m *Document) continueRead() { +func (m *Document) continueControl() { go func() { m.ctlCh <- controlSpecifier{ - control: readControl, + control: continueControl, } }() } @@ -128,10 +132,12 @@ func (m *Document) ControlNonFile() error { return nil } -// firstControl is executed first. -// Open the file and read the first chunk. -func (m *Document) firstControl(reader *bufio.Reader) (*bufio.Reader, error) { - if err := m.firstRead(reader); err != nil { +// firstRead first reads the file. +// Packs the contents of the read file into the first chunk. +func (m *Document) firstRead(reader *bufio.Reader) (*bufio.Reader, error) { + chunk := m.chunks[0] + start := 0 + if err := m.packChunk(chunk, reader, start, true); err != nil { if !errors.Is(err, io.EOF) { return nil, err } @@ -140,9 +146,9 @@ func (m *Document) firstControl(reader *bufio.Reader) (*bufio.Reader, error) { return reader, nil } -// readControl is executed after the second +// continueRead is executed after the second // and only reads the file or counts the lines of the file. -func (m *Document) readControl(reader *bufio.Reader) (*bufio.Reader, error) { +func (m *Document) continueRead(reader *bufio.Reader) (*bufio.Reader, error) { if m.seekable { if _, err := m.file.Seek(m.offset, io.SeekStart); err != nil { return nil, err @@ -151,7 +157,7 @@ func (m *Document) readControl(reader *bufio.Reader) (*bufio.Reader, error) { } chunk := m.lastChunk() - start := 0 + start := len(chunk.lines) if err := m.readOrCountChunk(chunk, reader, start); err != nil { if !errors.Is(err, io.EOF) { return nil, err @@ -161,8 +167,8 @@ func (m *Document) readControl(reader *bufio.Reader) (*bufio.Reader, error) { return reader, nil } -// followControl reads lines added to the file while in follow-mode. -func (m *Document) followControl(reader *bufio.Reader) (*bufio.Reader, error) { +// followRead reads lines added to the file while in follow-mode. +func (m *Document) followRead(reader *bufio.Reader) (*bufio.Reader, error) { if !m.FollowMode && !m.FollowAll { return reader, nil } @@ -181,8 +187,8 @@ func (m *Document) followControl(reader *bufio.Reader) (*bufio.Reader, error) { return reader, nil } -// loadControl loads the read contents into chunks. -func (m *Document) loadControl(reader *bufio.Reader, chunkNum int) (*bufio.Reader, error) { +// readChunk loads the read contents into chunks. +func (m *Document) readChunk(reader *bufio.Reader, chunkNum int) (*bufio.Reader, error) { // non-seekable files are all in memory, so loadControl should not be called. if !m.seekable { return nil, fmt.Errorf("cannot be loaded") @@ -203,7 +209,7 @@ func (m *Document) loadControl(reader *bufio.Reader, chunkNum int) (*bufio.Reade return reader, nil } -func (m *Document) searchControl(reader *bufio.Reader, chunkNum int) error { +func (m *Document) searchChunk(reader *bufio.Reader, chunkNum int) error { chunk := m.chunks[chunkNum] start := 0 if m.seekable { @@ -221,8 +227,8 @@ func (m *Document) searchControl(reader *bufio.Reader, chunkNum int) error { return nil } -// reloadControl performs reload processing -func (m *Document) reloadControl(reader *bufio.Reader) (*bufio.Reader, error) { +// reloadRead performs reload processing +func (m *Document) reloadRead(reader *bufio.Reader) (*bufio.Reader, error) { if !m.WatchMode { m.reset() } else { @@ -239,7 +245,7 @@ func (m *Document) reloadControl(reader *bufio.Reader) (*bufio.Reader, error) { func (m *Document) readOrCountChunk(chunk *chunk, reader *bufio.Reader, start int) error { if !m.seekable { - log.Println("pack", len(m.chunks)) + //log.Println("pack", len(m.chunks)) return m.packChunk(chunk, reader, start, true) } log.Println("count", len(m.chunks)) @@ -316,16 +322,10 @@ func open(fileName string) (*os.File, error) { return f, nil } -func (m *Document) firstRead(reader *bufio.Reader) error { - chunk := m.chunks[0] - start := 0 - return m.packChunk(chunk, reader, start, true) -} - -// readAll actually reads everything. +// readAll reads to the end. // The read lines are stored in the lines of the Document. func (m *Document) readAll(reader *bufio.Reader) error { - chunk := m.chunks[len(m.chunks)-1] + chunk := m.lastChunk() start := len(chunk.lines) for { if err := m.packChunk(chunk, reader, start, true); err != nil { @@ -345,6 +345,9 @@ func (m *Document) packChunk(chunk *chunk, reader *bufio.Reader, start int, isCo var isPrefix bool i := start for i < ChunkSize { + if atomic.LoadInt32(&m.readCancel) == 1 { + break + } buf, err := reader.ReadSlice('\n') if err == bufio.ErrBufferFull { isPrefix = true @@ -455,6 +458,7 @@ func (m *Document) lastChunk() *chunk { // Regular files are reopened and reread increase. // The pipe will reset what it has read. func (m *Document) reload() error { + atomic.StoreInt32(&m.readCancel, 1) sc := controlSpecifier{ control: reloadControl, done: make(chan struct{}), @@ -462,6 +466,7 @@ func (m *Document) reload() error { log.Println("reload send") m.ctlCh <- sc <-sc.done + atomic.StoreInt32(&m.readCancel, 0) log.Println("receive done") if !m.WatchMode { m.topLN = 0 @@ -498,6 +503,7 @@ func (m *Document) close() error { if err := m.file.Close(); err != nil { return fmt.Errorf("close: %w", err) } + atomic.StoreInt32(&m.eof, 1) atomic.StoreInt32(&m.closed, 1) atomic.StoreInt32(&m.changed, 1) return nil From 34cb4fc08cb50315cc99f492ad3dbab60254d52f Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Sat, 11 Mar 2023 15:07:51 +0900 Subject: [PATCH 05/11] Fix to be able to reopen by reload --- oviewer/reader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/oviewer/reader.go b/oviewer/reader.go index e1e4c820..a0408c5f 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -169,6 +169,9 @@ func (m *Document) continueRead(reader *bufio.Reader) (*bufio.Reader, error) { // followRead reads lines added to the file while in follow-mode. func (m *Document) followRead(reader *bufio.Reader) (*bufio.Reader, error) { + if m.checkClose() { + return reader, nil + } if !m.FollowMode && !m.FollowAll { return reader, nil } @@ -253,6 +256,7 @@ func (m *Document) readOrCountChunk(chunk *chunk, reader *bufio.Reader, start in } func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { + atomic.StoreInt32(&m.closed, 1) if !m.seekable { m.ClearCache() return reader, nil @@ -261,6 +265,7 @@ func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { log.Printf("read: %s", err) } m.ClearCache() + atomic.StoreInt32(&m.closed, 0) atomic.StoreInt32(&m.eof, 0) r, err := m.openFile(m.FileName) if err != nil { From 9b66687dc39170a56fe84a49b9b4f67317d67dcc Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Sat, 11 Mar 2023 21:08:41 +0900 Subject: [PATCH 06/11] Added a condition to close --- oviewer/action.go | 4 ++++ oviewer/document.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/oviewer/action.go b/oviewer/action.go index 17d128b0..03517de1 100644 --- a/oviewer/action.go +++ b/oviewer/action.go @@ -89,6 +89,10 @@ func (root *Root) closeFile() { root.setMessage("already closed") return } + if root.Doc.seekable { + root.setMessage("cannnot close") + return + } root.Doc.closeControl() root.setMessagef("close file %s", root.Doc.FileName) log.Printf("close file %s", root.Doc.FileName) diff --git a/oviewer/document.go b/oviewer/document.go index 9e7e9f8a..60b440bb 100644 --- a/oviewer/document.go +++ b/oviewer/document.go @@ -221,7 +221,7 @@ func (m *Document) GetLine(n int) string { } chunk := m.chunks[chunkNum] - if len(chunk.lines) == 0 { + if len(chunk.lines) == 0 && atomic.LoadInt32(&m.closed) == 0 { m.loadControl(chunkNum) } From c2bf95341ae418f5d36d86d4f92b17c31b8899e0 Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Sun, 19 Mar 2023 09:17:17 +0900 Subject: [PATCH 07/11] Allow exec reload Recreated Exec and deprecated ExecCommand. Separate the control part. ControlNonFile integrated into ControlFile. --- go.mod | 6 +- go.sum | 17 ++--- main.go | 17 +---- oviewer/document.go | 12 +++- oviewer/exec.go | 124 +++++++++++++++++++++++++++++++----- oviewer/logdoc.go | 4 +- oviewer/move.go | 1 - oviewer/reader.go | 139 +++++++++++++++++++++++++---------------- oviewer/reader_test.go | 8 ++- 9 files changed, 230 insertions(+), 98 deletions(-) diff --git a/go.mod b/go.mod index 04f3ac2b..fea3ad62 100644 --- a/go.mod +++ b/go.mod @@ -9,14 +9,14 @@ require ( github.com/gdamore/tcell/v2 v2.6.0 github.com/hashicorp/golang-lru v0.5.4 github.com/jwalton/gchalk v1.3.0 - github.com/klauspost/compress v1.16.0 + github.com/klauspost/compress v1.16.3 github.com/mattn/go-runewidth v0.0.14 github.com/pierrec/lz4 v2.6.1+incompatible github.com/rivo/uniseg v0.4.4 github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.15.0 github.com/ulikunitz/xz v0.5.11 - golang.org/x/exp v0.0.0-20230304125523-9ff063c70017 + golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 golang.org/x/sync v0.1.0 golang.org/x/term v0.6.0 ) @@ -31,7 +31,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/pelletier/go-toml/v2 v2.0.7 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect - github.com/spf13/afero v1.9.4 // indirect + github.com/spf13/afero v1.9.5 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index f2a2fc5d..c2f3c23c 100644 --- a/go.sum +++ b/go.sum @@ -144,8 +144,8 @@ github.com/jwalton/gchalk v1.3.0/go.mod h1:ytRlj60R9f7r53IAElbpq4lVuPOPNg2J4tJcC github.com/jwalton/go-supportscolor v1.1.0 h1:HsXFJdMPjRUAx8cIW6g30hVSFYaxh9yRQwEWgkAR7lQ= github.com/jwalton/go-supportscolor v1.1.0/go.mod h1:hFVUAZV2cWg+WFFC4v8pT2X/S2qUUBYMioBD9AINXGs= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= -github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -180,8 +180,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/spf13/afero v1.9.4 h1:Sd43wM1IWz/s1aVXdOBkjJvuP8UdyqioeE4AmM0QsBs= -github.com/spf13/afero v1.9.4/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= +github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM= +github.com/spf13/afero v1.9.5/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/UhQ= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= @@ -225,7 +225,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -236,8 +236,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230304125523-9ff063c70017 h1:3Ea9SZLCB0aRIhSEjM+iaGIlzzeDJdpi579El/YIhEE= -golang.org/x/exp v0.0.0-20230304125523-9ff063c70017/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 h1:pVgRXcIictcr+lBQIFeiwuwtDIs4eL21OuM9nyAADmo= +golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -293,6 +293,7 @@ golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -350,6 +351,7 @@ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210309040221-94ec62e08169/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211004093028-2c5d950f24ef/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -373,6 +375,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= diff --git a/main.go b/main.go index 5ace3a04..7b9a5579 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,7 @@ package main import ( "errors" "fmt" - "log" "os" - "os/exec" "path/filepath" "github.com/noborus/ov/oviewer" @@ -142,23 +140,14 @@ func ExecCommand(args []string) error { if len(args) == 0 { return ErrNoArgument } - - command := exec.Command(args[0], args[1:]...) - ov, err := oviewer.ExecCommand(command) + cmd := oviewer.NewCommand(args) + ov, err := cmd.Exec() if err != nil { return err } defer func() { - if command == nil || command.Process == nil { - return - } - if err := command.Process.Kill(); err != nil { - log.Println(err) - } - if err := command.Wait(); err != nil { - log.Println(err) - } + cmd.Wait() }() ov.SetConfig(config) diff --git a/oviewer/document.go b/oviewer/document.go index 60b440bb..cfe22429 100644 --- a/oviewer/document.go +++ b/oviewer/document.go @@ -187,8 +187,12 @@ func OpenDocument(fileName string) (*Document, error) { m.seekable = false } + f, err := open(fileName) + if err != nil { + return nil, err + } m.FileName = fileName - if err := m.ControlFile(fileName); err != nil { + if err := m.ControlFile(f); err != nil { return nil, err } return m, nil @@ -203,7 +207,11 @@ func STDINDocument() (*Document, error) { m.seekable = false m.Caption = "(STDIN)" - if err := m.ControlFile(""); err != nil { + f, err := open("") + if err != nil { + return nil, err + } + if err := m.ControlFile(f); err != nil { return nil, err } return m, nil diff --git a/oviewer/exec.go b/oviewer/exec.go index c0ac582d..b92be8af 100644 --- a/oviewer/exec.go +++ b/oviewer/exec.go @@ -1,15 +1,120 @@ package oviewer import ( + "bufio" + "fmt" "io" "log" "os" "os/exec" + "strings" "sync/atomic" + "time" "golang.org/x/term" ) +type Command struct { + args []string + command *exec.Cmd + stdout io.Reader + stderr io.Reader + docout *Document + docerr *Document +} + +func NewCommand(args []string) *Command { + return &Command{ + args: args, + } +} + +func (cmd *Command) Exec() (*Root, error) { + cmd.command = exec.Command(cmd.args[0], cmd.args[1:]...) + var err error + cmd.docout, cmd.docerr, err = newOutErrDocument() + if err != nil { + return nil, err + } + + so, se, err := commandStart(cmd.command) + if err != nil { + return nil, err + } + + cmd.stdout = so + cmd.stderr = se + + cmd.docout.Caption = "(" + cmd.command.Args[0] + ")" + cmd.docout.FileName + atomic.StoreInt32(&cmd.docout.closed, 0) + atomic.StoreInt32(&cmd.docerr.closed, 0) + err = cmd.docout.ControlReader(so, cmd.Reload) + if err != nil { + log.Printf("%s", err) + } + cmd.docerr.Caption = "(" + cmd.command.Args[0] + ")" + cmd.docerr.FileName + err = cmd.docerr.ControlReader(se, cmd.stderrReload) + if err != nil { + log.Printf("%s", err) + } + return NewOviewer(cmd.docout, cmd.docerr) +} + +func (cmd *Command) Wait() { + if cmd.command == nil || cmd.command.Process == nil { + return + } + atomic.StoreInt32(&cmd.docout.closed, 1) + atomic.StoreInt32(&cmd.docerr.closed, 1) + if err := cmd.command.Process.Kill(); err != nil { + log.Println(err) + } + if err := cmd.command.Wait(); err != nil { + log.Println(err) + } +} + +func (cmd *Command) Reload() *bufio.Reader { + cmd.Wait() + if cmd.docout.WatchMode { + cmd.docout.appendFormFeed(cmd.docout.lastChunk()) + } else { + cmd.docout.reset() + } + cmd.command = exec.Command(cmd.args[0], cmd.args[1:]...) + so, se, err := commandStart(cmd.command) + if err != nil { + log.Println(err) + str := fmt.Sprintf("command error: %s", err) + reader := bufio.NewReader(strings.NewReader(str)) + return reader + } + cmd.stdout = so + cmd.stderr = se + + sc := controlSpecifier{ + control: reloadControl, + done: make(chan struct{}), + } + log.Println("stderr reload send") + cmd.docerr.ctlCh <- sc + <-sc.done + atomic.StoreInt32(&cmd.docerr.readCancel, 0) + log.Println("stderr receive done") + + return bufio.NewReader(so) +} + +func (cmd *Command) stderrReload() *bufio.Reader { + if !cmd.docout.WatchMode { + cmd.docerr.reset() + } else { + cmd.docerr.appendFormFeed(cmd.docerr.lastChunk()) + } + + return bufio.NewReader(cmd.stderr) +} + // ExecCommand return the structure of oviewer. // ExecCommand executes the command and opens stdout/stderr as document. func ExecCommand(command *exec.Cmd) (*Root, error) { @@ -18,20 +123,18 @@ func ExecCommand(command *exec.Cmd) (*Root, error) { return nil, err } - go finishCommand(docout, docerr) - so, se, err := commandStart(command) if err != nil { return nil, err } docout.Caption = "(" + command.Args[0] + ")" + docout.FileName - err = docout.ReadAll(so) + err = docout.ControlReader(so, nil) if err != nil { log.Printf("%s", err) } docerr.Caption = "(" + command.Args[0] + ")" + docerr.FileName - err = docerr.ReadAll(se) + err = docerr.ControlReader(se, nil) if err != nil { log.Printf("%s", err) } @@ -44,14 +147,12 @@ func newOutErrDocument() (*Document, *Document, error) { return nil, nil, err } docout.FileName = "STDOUT" - docout.preventReload = true docerr, err := NewDocument() if err != nil { return nil, nil, err } docerr.FileName = "STDERR" - docerr.preventReload = true return docout, docerr, nil } @@ -70,7 +171,9 @@ func commandStart(command *exec.Cmd) (io.Reader, io.Reader, error) { if STDOUTPIPE != nil { so = io.TeeReader(so, STDOUTPIPE) } - + if _, err := fmt.Fprintf(command.Stdout, "Time: %s\n", time.Now().Format(time.RFC3339)); err != nil { + log.Println(err) + } // STDERR errReader, err := command.StderrPipe() if err != nil { @@ -86,10 +189,3 @@ func commandStart(command *exec.Cmd) (io.Reader, io.Reader, error) { } return so, se, nil } - -func finishCommand(docout *Document, docerr *Document) { - atomic.StoreInt32(&docout.changed, 1) - atomic.StoreInt32(&docerr.changed, 1) - atomic.StoreInt32(&docout.closed, 1) - atomic.StoreInt32(&docerr.closed, 1) -} diff --git a/oviewer/logdoc.go b/oviewer/logdoc.go index a9d37626..e2bd2c7e 100644 --- a/oviewer/logdoc.go +++ b/oviewer/logdoc.go @@ -2,6 +2,7 @@ package oviewer import ( "log" + "sync/atomic" ) // NewLogDoc generates a document for log. @@ -15,7 +16,8 @@ func NewLogDoc() (*Document, error) { m.Caption = "Log" m.seekable = false log.SetOutput(m) - if err := m.ControlNonFile(); err != nil { + atomic.StoreInt32(&m.closed, 1) + if err := m.ControlFile(nil); err != nil { return nil, err } return m, nil diff --git a/oviewer/move.go b/oviewer/move.go index 2fdb0bec..9c52ab92 100644 --- a/oviewer/move.go +++ b/oviewer/move.go @@ -202,7 +202,6 @@ func (root *Root) moveUpN(n int) { m := root.Doc if m.topLN <= m.startNum && m.topLX == 0 { - root.setMessage("Out of bounds") return } diff --git a/oviewer/reader.go b/oviewer/reader.go index a0408c5f..905b44a7 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -35,15 +35,40 @@ const ( const FormFeed = "\f" // ControlFile controls file read and loads in chunks. -func (m *Document) ControlFile(fileName string) error { +// ControlFile can be reloaded by file name. +func (m *Document) ControlFile(file *os.File) error { go func() error { - r, err := m.openFile(fileName) + atomic.StoreInt32(&m.closed, 0) + r, err := m.openFile(file) if err != nil { + atomic.StoreInt32(&m.closed, 1) log.Println(err) - return err } atomic.StoreInt32(&m.eof, 0) reader := bufio.NewReader(r) + for sc := range m.ctlCh { + err := m.control(sc, reader) + if err != nil { + log.Println(err) + } + if sc.done != nil { + close(sc.done) + } + } + log.Println("close m.ctlCh") + return nil + }() + + m.startControl() + return nil +} + +// controlreader is the controller for io.Reader. +// Assuming call from Exec. reload executes the argument function. +func (m *Document) ControlReader(r io.Reader, reload func() *bufio.Reader) error { + reader := bufio.NewReader(r) + go func() error { + var err error for sc := range m.ctlCh { switch sc.control { case firstControl: @@ -56,32 +81,21 @@ func (m *Document) ControlFile(fileName string) error { if !m.BufEOF() { m.continueControl() } - case followControl: - reader, err = m.followRead(reader) - case loadControl: - reader, err = m.readChunk(reader, sc.chunkNum) - case searchControl: - err = m.searchChunk(reader, sc.chunkNum) case reloadControl: - reader, err = m.reloadRead(reader) + log.Println("reset") + reader = reload() m.startControl() - case closeControl: - err = m.close() - log.Println(err) - err = nil default: panic(fmt.Sprintf("unexpected %s", sc.control)) } - - if sc.done != nil { - close(sc.done) - } if err != nil { log.Println(err) - close(m.ctlCh) + } + if sc.done != nil { + close(sc.done) } } - log.Println("close m.ctlCh") + log.Println("close ctlCh") return nil }() @@ -89,6 +103,42 @@ func (m *Document) ControlFile(fileName string) error { return nil } +func (m *Document) control(sc controlSpecifier, reader *bufio.Reader) error { + if atomic.LoadInt32(&m.closed) == 1 && sc.control != reloadControl { + return fmt.Errorf("closed %s", sc.control) + } + + var err error + switch sc.control { + case firstControl: + reader, err = m.firstRead(reader) + if !m.BufEOF() { + m.continueControl() + } + case continueControl: + reader, err = m.continueRead(reader) + if !m.BufEOF() { + m.continueControl() + } + case followControl: + reader, err = m.followRead(reader) + case loadControl: + reader, err = m.readChunk(reader, sc.chunkNum) + case searchControl: + err = m.searchChunk(reader, sc.chunkNum) + case reloadControl: + reader, err = m.reloadRead(reader) + m.startControl() + case closeControl: + err = m.close() + log.Println(err) + err = nil + default: + panic(fmt.Sprintf("unexpected %s", sc.control)) + } + return nil +} + func (m *Document) startControl() { go func() { m.ctlCh <- controlSpecifier{ @@ -105,33 +155,6 @@ func (m *Document) continueControl() { }() } -// ControlNonFile only supports reload. -func (m *Document) ControlNonFile() error { - go func() error { - var err error - for sc := range m.ctlCh { - switch sc.control { - case reloadControl: - m.reset() - case loadControl: - chunk := m.chunks[sc.chunkNum] - chunk.lines = make([]string, ChunkSize) - default: - panic(fmt.Sprintf("unexpected %s", sc.control)) - } - if err != nil { - log.Println(err) - } - if sc.done != nil { - close(sc.done) - } - } - log.Println("close ctlCh") - return nil - }() - return nil -} - // firstRead first reads the file. // Packs the contents of the read file into the first chunk. func (m *Document) firstRead(reader *bufio.Reader) (*bufio.Reader, error) { @@ -139,6 +162,7 @@ func (m *Document) firstRead(reader *bufio.Reader) (*bufio.Reader, error) { start := 0 if err := m.packChunk(chunk, reader, start, true); err != nil { if !errors.Is(err, io.EOF) { + atomic.StoreInt32(&m.eof, 1) return nil, err } reader = m.afterEOF(reader) @@ -151,6 +175,7 @@ func (m *Document) firstRead(reader *bufio.Reader) (*bufio.Reader, error) { func (m *Document) continueRead(reader *bufio.Reader) (*bufio.Reader, error) { if m.seekable { if _, err := m.file.Seek(m.offset, io.SeekStart); err != nil { + atomic.StoreInt32(&m.eof, 1) return nil, err } reader.Reset(m.file) @@ -267,7 +292,13 @@ func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { m.ClearCache() atomic.StoreInt32(&m.closed, 0) atomic.StoreInt32(&m.eof, 0) - r, err := m.openFile(m.FileName) + f, err := open(m.FileName) + if err != nil { + str := fmt.Sprintf("Access is no longer possible: %s", err) + reader = bufio.NewReader(strings.NewReader(str)) + return reader, nil + } + r, err := m.openFile(f) if err != nil { str := fmt.Sprintf("Access is no longer possible: %s", err) reader = bufio.NewReader(strings.NewReader(str)) @@ -286,11 +317,7 @@ func (m *Document) afterEOF(reader *bufio.Reader) *bufio.Reader { return reader } -func (m *Document) openFile(fileName string) (io.Reader, error) { - f, err := open(fileName) - if err != nil { - return nil, err - } +func (m *Document) openFile(f *os.File) (io.Reader, error) { m.mu.Lock() m.file = f @@ -519,7 +546,11 @@ func (m *Document) close() error { // // Deprecated: func (m *Document) ReadFile(fileName string) error { - r, err := m.openFile(fileName) + f, err := open(fileName) + if err != nil { + return err + } + r, err := m.openFile(f) if err != nil { return err } diff --git a/oviewer/reader_test.go b/oviewer/reader_test.go index 51a9f53a..e153fdff 100644 --- a/oviewer/reader_test.go +++ b/oviewer/reader_test.go @@ -160,8 +160,12 @@ func TestDocument_reload(t *testing.T) { m.FileName = tt.fields.FileName m.WatchMode = tt.fields.WatchMode m.seekable = tt.fields.seekable - if err := m.ControlFile(tt.fields.FileName); err != nil { - t.Fatal("ControlFile error") + f, err := open(tt.fields.FileName) + if err != nil { + t.Fatal("open error", tt.fields.FileName) + } + if err := m.ControlFile(f); err != nil { + t.Fatal("ControlFile error", tt.fields.FileName) } if err := m.reload(); (err != nil) != tt.wantErr { t.Errorf("Document.reload() error = %v, wantErr %v", err, tt.wantErr) From bd35a9e79adf5ae85639ab8a561f2e4b5cfcd919 Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Mon, 20 Mar 2023 14:48:03 +0900 Subject: [PATCH 08/11] Add follow-name Follow the file name even if the file descriptor changes. --- main.go | 3 +++ oviewer/draw.go | 3 +++ oviewer/oviewer.go | 11 +++++++++++ oviewer/reader.go | 12 +++++++----- 4 files changed, 24 insertions(+), 5 deletions(-) diff --git a/main.go b/main.go index 7b9a5579..f0eacceb 100644 --- a/main.go +++ b/main.go @@ -265,6 +265,9 @@ func init() { rootCmd.PersistentFlags().BoolP("follow-section", "", false, "follow section") _ = viper.BindPFlag("general.FollowSection", rootCmd.PersistentFlags().Lookup("follow-section")) + rootCmd.PersistentFlags().BoolP("follow-name", "", false, "follow name mode") + _ = viper.BindPFlag("general.FollowName", rootCmd.PersistentFlags().Lookup("follow-name")) + rootCmd.PersistentFlags().IntP("watch", "T", 0, "watch mode interval") _ = viper.BindPFlag("general.WatchInterval", rootCmd.PersistentFlags().Lookup("watch")) diff --git a/oviewer/draw.go b/oviewer/draw.go index 50742f2c..8151991f 100644 --- a/oviewer/draw.go +++ b/oviewer/draw.go @@ -383,6 +383,9 @@ func (root *Root) normalLeftStatus() (contents, int) { if root.General.FollowAll { modeStatus = "(Follow All)" } + if root.Doc.FollowName { + modeStatus = "(Follow Name)" + } // Watch mode doubles as FollowSection mode. if root.Doc.WatchMode { modeStatus += "(Watch)" diff --git a/oviewer/oviewer.go b/oviewer/oviewer.go index 06fa4c71..918dc736 100644 --- a/oviewer/oviewer.go +++ b/oviewer/oviewer.go @@ -162,6 +162,8 @@ type general struct { FollowAll bool // FollowSection is a follow mode that uses section instead of line. FollowSection bool + // FollowName is the mode to follow files by name. + FollowName bool // PlainMode is whether to enable the original character decoration. PlainMode bool } @@ -481,6 +483,9 @@ func (root *Root) SetWatcher(watcher *fsnotify.Watcher) { log.Println("???", len(doc.ctlCh)) } case fsnotify.Remove, fsnotify.Create: + if !doc.FollowName { + continue + } select { case doc.ctlCh <- controlSpecifier{control: reloadControl}: log.Printf("notify send %v", event) @@ -569,6 +574,9 @@ func (root *Root) Run() error { doc.general = root.Config.General doc.regexpCompile() + if doc.FollowName { + doc.FollowMode = true + } w := "" if doc.general.WatchInterval > 0 { doc.watchMode() @@ -771,6 +779,9 @@ func mergeGeneral(src general, dst general) general { if dst.FollowSection { src.FollowSection = dst.FollowSection } + if dst.FollowName { + src.FollowName = dst.FollowName + } if dst.ColumnDelimiter != "" { src.ColumnDelimiter = dst.ColumnDelimiter } diff --git a/oviewer/reader.go b/oviewer/reader.go index 905b44a7..616419a8 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -47,7 +47,7 @@ func (m *Document) ControlFile(file *os.File) error { atomic.StoreInt32(&m.eof, 0) reader := bufio.NewReader(r) for sc := range m.ctlCh { - err := m.control(sc, reader) + reader, err = m.control(sc, reader) if err != nil { log.Println(err) } @@ -103,9 +103,9 @@ func (m *Document) ControlReader(r io.Reader, reload func() *bufio.Reader) error return nil } -func (m *Document) control(sc controlSpecifier, reader *bufio.Reader) error { +func (m *Document) control(sc controlSpecifier, reader *bufio.Reader) (*bufio.Reader, error) { if atomic.LoadInt32(&m.closed) == 1 && sc.control != reloadControl { - return fmt.Errorf("closed %s", sc.control) + return nil, fmt.Errorf("closed %s", sc.control) } var err error @@ -128,6 +128,7 @@ func (m *Document) control(sc controlSpecifier, reader *bufio.Reader) error { err = m.searchChunk(reader, sc.chunkNum) case reloadControl: reader, err = m.reloadRead(reader) + log.Println("start") m.startControl() case closeControl: err = m.close() @@ -136,7 +137,7 @@ func (m *Document) control(sc controlSpecifier, reader *bufio.Reader) error { default: panic(fmt.Sprintf("unexpected %s", sc.control)) } - return nil + return reader, nil } func (m *Document) startControl() { @@ -287,11 +288,12 @@ func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { return reader, nil } if err := m.file.Close(); err != nil { - log.Printf("read: %s", err) + log.Printf("reload: %s", err) } m.ClearCache() atomic.StoreInt32(&m.closed, 0) atomic.StoreInt32(&m.eof, 0) + log.Println("reload", m.FileName) f, err := open(m.FileName) if err != nil { str := fmt.Sprintf("Access is no longer possible: %s", err) From a8f2687e5603fc38a4b8f64539abc97a06a19fb8 Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Mon, 20 Mar 2023 18:06:11 +0900 Subject: [PATCH 09/11] Fixed wrong position of close flag --- oviewer/reader.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/oviewer/reader.go b/oviewer/reader.go index 616419a8..328e34f5 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -282,15 +282,16 @@ func (m *Document) readOrCountChunk(chunk *chunk, reader *bufio.Reader, start in } func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { - atomic.StoreInt32(&m.closed, 1) if !m.seekable { m.ClearCache() return reader, nil } + atomic.StoreInt32(&m.closed, 1) if err := m.file.Close(); err != nil { log.Printf("reload: %s", err) } m.ClearCache() + atomic.StoreInt32(&m.closed, 0) atomic.StoreInt32(&m.eof, 0) log.Println("reload", m.FileName) From c9e0b671dd07f393cfabeb24d0ca99c08b12df5a Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Tue, 21 Mar 2023 04:11:26 +0900 Subject: [PATCH 10/11] Fixed a race condition log needed a separate controller. --- oviewer/logdoc.go | 2 +- oviewer/reader.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/oviewer/logdoc.go b/oviewer/logdoc.go index e2bd2c7e..d71146cc 100644 --- a/oviewer/logdoc.go +++ b/oviewer/logdoc.go @@ -17,7 +17,7 @@ func NewLogDoc() (*Document, error) { m.seekable = false log.SetOutput(m) atomic.StoreInt32(&m.closed, 1) - if err := m.ControlFile(nil); err != nil { + if err := m.ControlLog(); err != nil { return nil, err } return m, nil diff --git a/oviewer/reader.go b/oviewer/reader.go index 328e34f5..f3faaaeb 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -63,6 +63,25 @@ func (m *Document) ControlFile(file *os.File) error { return nil } +func (m *Document) ControlLog() error { + go func() error { + for sc := range m.ctlCh { + switch sc.control { + case reloadControl: + m.reset() + default: + panic(fmt.Sprintf("unexpected %s", sc.control)) + } + if sc.done != nil { + close(sc.done) + } + } + log.Println("close m.ctlCh") + return nil + }() + return nil +} + // controlreader is the controller for io.Reader. // Assuming call from Exec. reload executes the argument function. func (m *Document) ControlReader(r io.Reader, reload func() *bufio.Reader) error { From 8c1f729d73d796bd48c02d42bb1adf8a0751b92f Mon Sep 17 00:00:00 2001 From: Noboru Saito Date: Wed, 22 Mar 2023 10:05:55 +0900 Subject: [PATCH 11/11] Cleaned up the debug output Cleaned up the debug output and cleaned up the code. Added openFileReader to avoid duplication --- oviewer/oviewer.go | 8 ++++---- oviewer/reader.go | 35 ++++++++++++++++------------------- 2 files changed, 20 insertions(+), 23 deletions(-) diff --git a/oviewer/oviewer.go b/oviewer/oviewer.go index 918dc736..6d5f709a 100644 --- a/oviewer/oviewer.go +++ b/oviewer/oviewer.go @@ -478,9 +478,9 @@ func (root *Root) SetWatcher(watcher *fsnotify.Watcher) { case fsnotify.Write: select { case doc.ctlCh <- controlSpecifier{control: followControl}: - log.Printf("notify send %v", event) + root.debugMessage(fmt.Sprintf("notify send %v", event)) default: - log.Println("???", len(doc.ctlCh)) + root.debugMessage(fmt.Sprintf("notify send fail %d", len(doc.ctlCh))) } case fsnotify.Remove, fsnotify.Create: if !doc.FollowName { @@ -488,9 +488,9 @@ func (root *Root) SetWatcher(watcher *fsnotify.Watcher) { } select { case doc.ctlCh <- controlSpecifier{control: reloadControl}: - log.Printf("notify send %v", event) + root.debugMessage(fmt.Sprintf("notify send %v", event)) default: - log.Println("???", len(doc.ctlCh)) + root.debugMessage(fmt.Sprintf("notify send fail %d", len(doc.ctlCh))) } } diff --git a/oviewer/reader.go b/oviewer/reader.go index f3faaaeb..aa9f00b1 100644 --- a/oviewer/reader.go +++ b/oviewer/reader.go @@ -39,7 +39,7 @@ const FormFeed = "\f" func (m *Document) ControlFile(file *os.File) error { go func() error { atomic.StoreInt32(&m.closed, 0) - r, err := m.openFile(file) + r, err := m.fileReader(file) if err != nil { atomic.StoreInt32(&m.closed, 1) log.Println(err) @@ -147,7 +147,6 @@ func (m *Document) control(sc controlSpecifier, reader *bufio.Reader) (*bufio.Re err = m.searchChunk(reader, sc.chunkNum) case reloadControl: reader, err = m.reloadRead(reader) - log.Println("start") m.startControl() case closeControl: err = m.close() @@ -293,10 +292,8 @@ func (m *Document) reloadRead(reader *bufio.Reader) (*bufio.Reader, error) { func (m *Document) readOrCountChunk(chunk *chunk, reader *bufio.Reader, start int) error { if !m.seekable { - //log.Println("pack", len(m.chunks)) return m.packChunk(chunk, reader, start, true) } - log.Println("count", len(m.chunks)) return m.countChunk(chunk, reader, start) } @@ -314,13 +311,7 @@ func (m *Document) reloadFile(reader *bufio.Reader) (*bufio.Reader, error) { atomic.StoreInt32(&m.closed, 0) atomic.StoreInt32(&m.eof, 0) log.Println("reload", m.FileName) - f, err := open(m.FileName) - if err != nil { - str := fmt.Sprintf("Access is no longer possible: %s", err) - reader = bufio.NewReader(strings.NewReader(str)) - return reader, nil - } - r, err := m.openFile(f) + r, err := m.openFileReader(m.FileName) if err != nil { str := fmt.Sprintf("Access is no longer possible: %s", err) reader = bufio.NewReader(strings.NewReader(str)) @@ -339,7 +330,7 @@ func (m *Document) afterEOF(reader *bufio.Reader) *bufio.Reader { return reader } -func (m *Document) openFile(f *os.File) (io.Reader, error) { +func (m *Document) fileReader(f *os.File) (io.Reader, error) { m.mu.Lock() m.file = f @@ -361,6 +352,18 @@ func (m *Document) openFile(f *os.File) (io.Reader, error) { return r, nil } +func (m *Document) openFileReader(fileName string) (io.Reader, error) { + f, err := open(fileName) + if err != nil { + return nil, err + } + r, err := m.fileReader(f) + if err != nil { + return nil, err + } + return r, nil +} + func open(fileName string) (*os.File, error) { if fileName == "" { if term.IsTerminal(0) { @@ -517,11 +520,9 @@ func (m *Document) reload() error { control: reloadControl, done: make(chan struct{}), } - log.Println("reload send") m.ctlCh <- sc <-sc.done atomic.StoreInt32(&m.readCancel, 0) - log.Println("receive done") if !m.WatchMode { m.topLN = 0 } @@ -568,11 +569,7 @@ func (m *Document) close() error { // // Deprecated: func (m *Document) ReadFile(fileName string) error { - f, err := open(fileName) - if err != nil { - return err - } - r, err := m.openFile(f) + r, err := m.openFileReader(fileName) if err != nil { return err }