Skip to content

Commit

Permalink
Merge e61593d into 3095566
Browse files Browse the repository at this point in the history
  • Loading branch information
penguinolog committed Dec 14, 2023
2 parents 3095566 + e61593d commit aac0fea
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 228 deletions.
123 changes: 16 additions & 107 deletions urwid/_posix_raw_display.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@

if typing.TYPE_CHECKING:
import io
from collections.abc import Callable
from types import FrameType

from typing_extensions import Literal
from urwid.event_loop import EventLoop


class Screen(_raw_display_base.Screen):
Expand Down Expand Up @@ -102,7 +103,7 @@ def _sigcont_handler(self, signum: int, frame: FrameType | None = None) -> None:
self.start()
self._sigwinch_handler(None, None)

def signal_init(self):
def signal_init(self) -> None:
"""
Called in the startup of run wrapper to set the SIGWINCH
and SIGTSTP signal handlers.
Expand All @@ -113,7 +114,7 @@ def signal_init(self):
self._prev_sigwinch_handler = self.signal_handler_setter(signal.SIGWINCH, self._sigwinch_handler)
self._prev_sigtstp_handler = self.signal_handler_setter(signal.SIGTSTP, self._sigtstp_handler)

def signal_restore(self):
def signal_restore(self) -> None:
"""
Called in the finally block of run wrapper to restore the
SIGTSTP, SIGCONT and SIGWINCH signal handlers.
Expand All @@ -132,7 +133,7 @@ def _mouse_tracking(self, enable: bool) -> None:
else:
self._stop_gpm_tracking()

def _start_gpm_tracking(self):
def _start_gpm_tracking(self) -> None:
if not os.path.isfile("/usr/bin/mev"):
return
if not os.environ.get("TERM", "").lower().startswith("linux"):
Expand All @@ -148,14 +149,14 @@ def _start_gpm_tracking(self):
fcntl.fcntl(m.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
self.gpm_mev = m

def _stop_gpm_tracking(self):
def _stop_gpm_tracking(self) -> None:
if not self.gpm_mev:
return
os.kill(self.gpm_mev.pid, signal.SIGINT)
os.waitpid(self.gpm_mev.pid, 0)
self.gpm_mev = None

def _start(self, alternate_buffer=True):
def _start(self, alternate_buffer: bool = True) -> None:
"""
Initialize the screen and input mode.
Expand Down Expand Up @@ -188,7 +189,7 @@ def _start(self, alternate_buffer=True):

return super()._start()

def _stop(self):
def _stop(self) -> None:
"""
Restore the screen.
"""
Expand All @@ -212,90 +213,6 @@ def _stop(self):

super()._stop()

@typing.overload
def get_input(self, raw_keys: Literal[False]) -> list[str]:
...

@typing.overload
def get_input(self, raw_keys: Literal[True]) -> tuple[list[str], list[int]]:
...

def get_input(self, raw_keys: bool = False) -> list[str] | tuple[list[str], list[int]]:
"""Return pending input as a list.
raw_keys -- return raw keycodes as well as translated versions
This function will immediately return all the input since the
last time it was called. If there is no input pending it will
wait before returning an empty list. The wait time may be
configured with the set_input_timeouts function.
If raw_keys is False (default) this function will return a list
of keys pressed. If raw_keys is True this function will return
a ( keys pressed, raw keycodes ) tuple instead.
Examples of keys returned:
* ASCII printable characters: " ", "a", "0", "A", "-", "/"
* ASCII control characters: "tab", "enter"
* Escape sequences: "up", "page up", "home", "insert", "f1"
* Key combinations: "shift f1", "meta a", "ctrl b"
* Window events: "window resize"
When a narrow encoding is not enabled:
* "Extended ASCII" characters: "\\xa1", "\\xb2", "\\xfe"
When a wide encoding is enabled:
* Double-byte characters: "\\xa1\\xea", "\\xb2\\xd4"
When utf8 encoding is enabled:
* Unicode characters: u"\\u00a5", u'\\u253c"
Examples of mouse events returned:
* Mouse button press: ('mouse press', 1, 15, 13),
('meta mouse press', 2, 17, 23)
* Mouse drag: ('mouse drag', 1, 16, 13),
('mouse drag', 1, 17, 13),
('ctrl mouse drag', 1, 18, 13)
* Mouse button release: ('mouse release', 0, 18, 13),
('ctrl mouse release', 0, 17, 23)
"""
if not self._started:
raise RuntimeError

self._wait_for_input_ready(self._next_timeout)
keys, raw = self.parse_input(None, None, self.get_available_raw_input())

# Avoid pegging CPU at 100% when slowly resizing
if keys == ["window resize"] and self.prev_input_resize:
while True:
self._wait_for_input_ready(self.resize_wait)
keys, raw2 = self.parse_input(None, None, self.get_available_raw_input())
raw += raw2
# if not keys:
# keys, raw2 = self._get_input(
# self.resize_wait)
# raw += raw2
if keys != ["window resize"]:
break
if keys[-1:] != ["window resize"]:
keys.append("window resize")

if keys == ["window resize"]:
self.prev_input_resize = 2
elif self.prev_input_resize == 2 and not keys:
self.prev_input_resize = 1
else:
self.prev_input_resize = 0

if raw_keys:
return keys, raw
return keys

def get_input_descriptors(self) -> list[int]:
"""
Return a list of integer file descriptors that should be
Expand All @@ -314,7 +231,7 @@ def get_input_descriptors(self) -> list[int]:
fd_list.append(self.gpm_mev.stdout.fileno())
return fd_list

def unhook_event_loop(self, event_loop):
def unhook_event_loop(self, event_loop: EventLoop) -> None:
"""
Remove any hooks added by hook_event_loop.
"""
Expand All @@ -325,7 +242,11 @@ def unhook_event_loop(self, event_loop):
event_loop.remove_alarm(self._input_timeout)
self._input_timeout = None

def hook_event_loop(self, event_loop, callback):
def hook_event_loop(
self,
event_loop: EventLoop,
callback: Callable[[list[str], list[int]], typing.Any],
) -> None:
"""
Register the given callback with the event loop, to be called with new
input whenever it's available. The callback should be passed a list of
Expand All @@ -348,7 +269,7 @@ def wrapper() -> tuple[list[str], typing.Any] | None:
def _get_input_codes(self) -> list[int]:
return super()._get_input_codes() + self._get_gpm_codes()

def _get_gpm_codes(self):
def _get_gpm_codes(self) -> list[int]:
codes = []
try:
while self.gpm_mev is not None and self.gpm_event_pending:
Expand Down Expand Up @@ -396,22 +317,10 @@ def _encode_gpm_event(self) -> list[int]:
if m & 4:
mod |= 16 # ctrl

def append_button(b):
def append_button(b: int) -> None:
b |= mod
result.extend([27, ord("["), ord("M"), b + 32, x + 32, y + 32])

def determine_button_release(flag: int) -> None:
nonlocal next_state
if b & 4 and last_state & 1:
append_button(0 + flag)
next_state |= 1
if b & 2 and last_state & 2:
append_button(1 + flag)
next_state |= 2
if b & 1 and last_state & 4:
append_button(2 + flag)
next_state |= 4

if ev in {20, 36, 52}: # press
if b & 4 and last_state & 1 == 0:
append_button(0)
Expand Down

0 comments on commit aac0fea

Please sign in to comment.