Skip to content

Commit

Permalink
Reworked/simplified tracebd.py a bit
Browse files Browse the repository at this point in the history
Instead of trying to align to block-boundaries tracebd.py now just
aliases to whatever dimensions are provided.

Also reworked how scripts handle default sizing. Now using reasonable
defaults with 0 being a placeholder for automatic sizing. The addition
of -z/--cat makes it possible to pipe directly to stdout.

Also added support for dots/braille output which can capture more
detail, though care needs to be taken to not rely on accurate coloring.
  • Loading branch information
geky committed Nov 15, 2022
1 parent fb58148 commit 42d889e
Show file tree
Hide file tree
Showing 3 changed files with 738 additions and 490 deletions.
127 changes: 85 additions & 42 deletions scripts/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,61 @@ def openio(path, mode='r'):
else:
return open(path, mode)

class LinesIO:
def __init__(self, maxlen=None):
self.maxlen = maxlen
self.lines = co.deque(maxlen=maxlen)
self.tail = io.StringIO()

# trigger automatic sizing
if maxlen == 0:
self.resize(0)

def write(self, s):
# note using split here ensures the trailing string has no newline
lines = s.split('\n')

if len(lines) > 1 and self.tail.getvalue():
self.tail.write(lines[0])
lines[0] = self.tail.getvalue()
self.tail = io.StringIO()

self.lines.extend(lines[:-1])

if lines[-1]:
self.tail.write(lines[-1])

def resize(self, maxlen):
self.maxlen = maxlen
if maxlen == 0:
maxlen = shutil.get_terminal_size((80, 5))[1]
if maxlen != self.lines.maxlen:
self.lines = co.deque(self.lines, maxlen=maxlen)

last_lines = 1
def draw(self):
# did terminal size change?
if self.maxlen == 0:
self.resize(0)

# first thing first, give ourself a canvas
while LinesIO.last_lines < len(self.lines):
sys.stdout.write('\n')
LinesIO.last_lines += 1

for j, line in enumerate(self.lines):
# move cursor, clear line, disable/reenable line wrapping
sys.stdout.write('\r')
if len(self.lines)-1-j > 0:
sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-j))
sys.stdout.write('\x1b[K')
sys.stdout.write('\x1b[?7l')
sys.stdout.write(line)
sys.stdout.write('\x1b[?7h')
if len(self.lines)-1-j > 0:
sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-j))
sys.stdout.flush()


# parse different data representations
def dat(x):
Expand All @@ -114,6 +169,7 @@ def dat(x):
# else give up
raise ValueError("invalid dat %r" % x)


# a hack log10 that preserves sign, and passes zero as zero
def slog10(x):
if x == 0:
Expand All @@ -123,7 +179,6 @@ def slog10(x):
else:
return -m.log10(-x)


class Plot:
def __init__(self, width, height, *,
xlim=None,
Expand Down Expand Up @@ -427,7 +482,8 @@ def main(csv_paths, *,
xlim=None,
ylim=None,
width=None,
height=None,
height=17,
cat=False,
color=False,
braille=False,
colors=None,
Expand Down Expand Up @@ -538,10 +594,12 @@ def writeln(s=''):
if v is not None))))

# figure out our plot size
if width is not None:
if width is None:
width_ = min(80, shutil.get_terminal_size((80, 17))[0])
elif width:
width_ = width
else:
width_ = shutil.get_terminal_size((80, 8))[0]
width_ = shutil.get_terminal_size((80, 17))[0]
# make space for units
width_ -= 5
# make space for legend
Expand All @@ -550,10 +608,10 @@ def writeln(s=''):
# limit a bit
width_ = max(2*4, width_)

if height is not None:
if height:
height_ = height
else:
height_ = shutil.get_terminal_size((80, 8))[1]
height_ = shutil.get_terminal_size((80, 17))[1]
# make space for shell prompt
if not keep_open:
height_ -= 1
Expand Down Expand Up @@ -644,45 +702,26 @@ def writeln(s=''):
'\x1b[m' if color else '')
for j in range(i, min(i+legend_cols, len(legend_))))))


last_lines = 1
def redraw():
nonlocal last_lines

canvas = io.StringIO()
draw(canvas)
canvas = canvas.getvalue().splitlines()

# give ourself a canvas
while last_lines < len(canvas):
sys.stdout.write('\n')
last_lines += 1

for i, line in enumerate(canvas):
jump = len(canvas)-1-i
# move cursor, clear line, disable/reenable line wrapping
sys.stdout.write('\r')
if jump > 0:
sys.stdout.write('\x1b[%dA' % jump)
sys.stdout.write('\x1b[K')
sys.stdout.write('\x1b[?7l')
sys.stdout.write(line)
sys.stdout.write('\x1b[?7h')
if jump > 0:
sys.stdout.write('\x1b[%dB' % jump)

sys.stdout.flush()

if keep_open:
try:
while True:
redraw()
if cat:
draw(sys.stdout)
else:
ring = LinesIO()
draw(ring)
ring.draw()
# don't just flood open calls
time.sleep(sleep or 0.1)
except KeyboardInterrupt:
pass

redraw()
if cat:
draw(sys.stdout)
else:
ring = LinesIO()
draw(ring)
ring.draw()
sys.stdout.write('\n')
else:
draw(sys.stdout)
Expand Down Expand Up @@ -726,9 +765,9 @@ def redraw():
default='auto',
help="When to use terminal colors. Defaults to 'auto'.")
parser.add_argument(
'--braille',
'-⣿', '--braille',
action='store_true',
help="Use unicode braille characters. Note that braille characters "
help="Use 2x4 unicode braille characters. Note that braille characters "
"sometimes suffer from inconsistent widths.")
parser.add_argument(
'--colors',
Expand All @@ -747,12 +786,16 @@ def redraw():
parser.add_argument(
'-W', '--width',
type=lambda x: int(x, 0),
help="Width in columns. A width of 0 indicates no limit. Defaults "
"to terminal width or 80.")
help="Width in columns. 0 uses the terminal width. Defaults to "
"min(terminal, 80).")
parser.add_argument(
'-H', '--height',
type=lambda x: int(x, 0),
help="Height in rows. Defaults to terminal height or 8.")
help="Height in rows. 0 uses the terminal height. Defaults to 17.")
parser.add_argument(
'-z', '--cat',
action='store_true',
help="Pipe directly to stdout.")
parser.add_argument(
'-X', '--xlim',
type=lambda x: tuple(dat(x) if x else None for x in x.split(',')),
Expand Down
145 changes: 84 additions & 61 deletions scripts/tailpipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
# SPDX-License-Identifier: BSD-3-Clause
#

import collections as co
import io
import os
import shutil
import sys
import threading as th
import time


Expand All @@ -24,71 +26,89 @@ def openio(path, mode='r'):
else:
return open(path, mode)

def main(path='-', *, lines=1, sleep=0.01, keep_open=False):
ring = [None] * lines
i = 0
count = 0
lock = th.Lock()
event = th.Event()
done = False

# do the actual reading in a background thread
def read():
nonlocal i
nonlocal count
nonlocal done
class LinesIO:
def __init__(self, maxlen=None):
self.maxlen = maxlen
self.lines = co.deque(maxlen=maxlen)
self.tail = io.StringIO()

# trigger automatic sizing
if maxlen == 0:
self.resize(0)

def write(self, s):
# note using split here ensures the trailing string has no newline
lines = s.split('\n')

if len(lines) > 1 and self.tail.getvalue():
self.tail.write(lines[0])
lines[0] = self.tail.getvalue()
self.tail = io.StringIO()

self.lines.extend(lines[:-1])

if lines[-1]:
self.tail.write(lines[-1])

def resize(self, maxlen):
self.maxlen = maxlen
if maxlen == 0:
maxlen = shutil.get_terminal_size((80, 5))[1]
if maxlen != self.lines.maxlen:
self.lines = co.deque(self.lines, maxlen=maxlen)

last_lines = 1
def draw(self):
# did terminal size change?
if self.maxlen == 0:
self.resize(0)

# first thing first, give ourself a canvas
while LinesIO.last_lines < len(self.lines):
sys.stdout.write('\n')
LinesIO.last_lines += 1

for j, line in enumerate(self.lines):
# move cursor, clear line, disable/reenable line wrapping
sys.stdout.write('\r')
if len(self.lines)-1-j > 0:
sys.stdout.write('\x1b[%dA' % (len(self.lines)-1-j))
sys.stdout.write('\x1b[K')
sys.stdout.write('\x1b[?7l')
sys.stdout.write(line)
sys.stdout.write('\x1b[?7h')
if len(self.lines)-1-j > 0:
sys.stdout.write('\x1b[%dB' % (len(self.lines)-1-j))
sys.stdout.flush()


def main(path='-', *, lines=5, cat=False, sleep=0.01, keep_open=False):
if cat:
ring = sys.stdout
else:
ring = LinesIO(lines)

ptime = time.time()
try:
while True:
with openio(path) as f:
for line in f:
with lock:
ring[i] = line
i = (i + 1) % lines
count = min(lines, count + 1)
event.set()
ring.write(line)

# need to redraw?
if not cat and time.time()-ptime >= sleep:
ring.draw()
ptime = time.time()

if not keep_open:
break
# don't just flood open calls
time.sleep(sleep or 0.1)
done = True

th.Thread(target=read, daemon=True).start()

try:
last_count = 1
while not done:
time.sleep(sleep)
event.wait()
event.clear()

# create a copy to avoid corrupt output
with lock:
ring_ = ring.copy()
i_ = i
count_ = count

# first thing first, give ourself a canvas
while last_count < count_:
sys.stdout.write('\n')
last_count += 1

for j in range(count_):
# move cursor, clear line, disable/reenable line wrapping
sys.stdout.write('\r')
if count_-1-j > 0:
sys.stdout.write('\x1b[%dA' % (count_-1-j))
sys.stdout.write('\x1b[K')
sys.stdout.write('\x1b[?7l')
sys.stdout.write(ring_[(i_-count_+j) % lines][:-1])
sys.stdout.write('\x1b[?7h')
if count_-1-j > 0:
sys.stdout.write('\x1b[%dB' % (count_-1-j))

sys.stdout.flush()

except KeyboardInterrupt:
pass

sys.stdout.write('\n')
if not cat:
sys.stdout.write('\n')


if __name__ == "__main__":
Expand All @@ -104,15 +124,18 @@ def read():
'-n',
'--lines',
type=lambda x: int(x, 0),
help="Number of lines to show. Defaults to 1.")
help="Show this many lines of history. 0 uses the terminal height. "
"Defaults to 5.")
parser.add_argument(
'-z', '--cat',
action='store_true',
help="Pipe directly to stdout.")
parser.add_argument(
'-s',
'--sleep',
'-s', '--sleep',
type=float,
help="Seconds to sleep between reads. Defaults to 0.01.")
parser.add_argument(
'-k',
'--keep-open',
'-k', '--keep-open',
action='store_true',
help="Reopen the pipe on EOF, useful when multiple "
"processes are writing.")
Expand Down

0 comments on commit 42d889e

Please sign in to comment.