diff --git a/manticore/native/cpu/abstractcpu.py b/manticore/native/cpu/abstractcpu.py index 2cf89175e..5237a40e5 100644 --- a/manticore/native/cpu/abstractcpu.py +++ b/manticore/native/cpu/abstractcpu.py @@ -726,7 +726,7 @@ def read_int(self, where, size=None, force=False): self._publish("did_read_memory", where, value, size) return value - def write_bytes(self, where, data, force=False): + def write_bytes(self, where: int, data, force: bool = False) -> None: """ Write a concrete or symbolic (or mixed) buffer to memory diff --git a/manticore/native/cpu/cpufactory.py b/manticore/native/cpu/cpufactory.py index 16a6d81b2..cd2f85172 100644 --- a/manticore/native/cpu/cpufactory.py +++ b/manticore/native/cpu/cpufactory.py @@ -8,6 +8,7 @@ I386CdeclAbi, SystemVAbi, ) +from .abstractcpu import Abi, Cpu, SyscallAbi class CpuFactory: @@ -28,20 +29,20 @@ class CpuFactory: } @staticmethod - def get_cpu(mem, machine): + def get_cpu(mem, machine: str) -> Cpu: cpu = CpuFactory._cpus[machine](mem) mem.cpu = cpu return cpu @staticmethod - def get_function_abi(cpu, os, machine): + def get_function_abi(cpu: Cpu, os: str, machine: str) -> Abi: if os != "linux" or machine not in CpuFactory._linux_abis: raise NotImplementedError(f"OS and machine combination not supported: {os}/{machine}") return CpuFactory._linux_abis[machine](cpu) @staticmethod - def get_syscall_abi(cpu, os, machine): + def get_syscall_abi(cpu: Cpu, os: str, machine: str) -> SyscallAbi: if os != "linux" or machine not in CpuFactory._linux_syscalls_abis: raise NotImplementedError(f"OS and machine combination not supported: {os}/{machine}") diff --git a/manticore/platforms/linux.py b/manticore/platforms/linux.py index 8aef0aace..3b0d7d693 100644 --- a/manticore/platforms/linux.py +++ b/manticore/platforms/linux.py @@ -25,11 +25,13 @@ from ..core.smtlib import ConstraintSet, Operators, Expression, issymbolic from ..core.smtlib.solver import Z3Solver from ..exceptions import SolverError -from ..native.cpu.abstractcpu import Syscall, ConcretizeArgument, Interruption +from ..native.cpu.abstractcpu import Cpu, Syscall, ConcretizeArgument, Interruption from ..native.cpu.cpufactory import CpuFactory from ..native.memory import SMemory32, SMemory64, Memory32, Memory64, LazySMemory32, LazySMemory64 from ..platforms.platform import Platform, SyscallNotImplemented, unimplemented +from typing import List, Set + logger = logging.getLogger(__name__) MixedSymbolicBuffer = Union[List[Union[bytes, Expression]], bytes] @@ -238,10 +240,7 @@ def __init__(self, constraints, path="sfile", mode="rw", max_size=100, wildcard= if symbols_cnt > max_size: logger.warning( - ( - "Found more wildcards in the file than free ", - "symbolic values allowed (%d > %d)", - ), + "Found more wildcards in the file than free symbolic values allowed (%d > %d)", symbols_cnt, max_size, ) @@ -407,7 +406,7 @@ def write(self, buf): assert self.is_connected() return self.peer._transmit(buf) - def _transmit(self, buf): + def _transmit(self, buf) -> int: for c in buf: self.buffer.append(c) return len(buf) @@ -511,7 +510,7 @@ def empty_platform(cls, arch): platform._init_std_fds() return platform - def _init_std_fds(self): + def _init_std_fds(self) -> None: # open standard files stdin, stdout, stderr logger.debug("Opening file descriptors (0,1,2) (STDIN, STDOUT, STDERR)") self.input = Socket() @@ -535,7 +534,7 @@ def _init_std_fds(self): assert (in_fd, out_fd, err_fd) == (0, 1, 2) - def _init_cpu(self, arch): + def _init_cpu(self, arch) -> None: # create memory and CPU cpu = self._mk_proc(arch) self.procs = [cpu] @@ -543,7 +542,7 @@ def _init_cpu(self, arch): self._function_abi = CpuFactory.get_function_abi(cpu, "linux", arch) self._syscall_abi = CpuFactory.get_syscall_abi(cpu, "linux", arch) - def _find_symbol(self, name): + def _find_symbol(self, name: str): symbol_tables = (s for s in self.elf.iter_sections() if isinstance(s, SymbolTableSection)) for section in symbol_tables: @@ -557,7 +556,7 @@ def _find_symbol(self, name): return None - def _execve(self, program, argv, envp): + def _execve(self, program: str, argv: List[str], envp: List[str]) -> None: """ Load `program` and establish program state, such as stack and arguments. @@ -584,14 +583,14 @@ def _execve(self, program, argv, envp): # Each process can wait for one timeout self.timers = [None] * nprocs # each fd has a waitlist - self.rwait = [set() for _ in range(nfiles)] - self.twait = [set() for _ in range(nfiles)] + self.rwait: List[Set] = [set() for _ in range(nfiles)] + self.twait: List[Set] = [set() for _ in range(nfiles)] # Install event forwarders for proc in self.procs: self.forward_events_from(proc) - def _mk_proc(self, arch): + def _mk_proc(self, arch: str) -> Cpu: mem = Memory32() if arch in {"i386", "armv7"} else Memory64() cpu = CpuFactory.get_cpu(mem, arch) return cpu @@ -1201,7 +1200,7 @@ def _to_signed_dword(self, dword): raise EnvironmentError(f"Corrupted internal CPU state (arch width is {arch_width})") return sdword - def _open(self, f): + def _open(self, f) -> int: """ Adds a file descriptor to the current file descriptor list @@ -1217,7 +1216,7 @@ def _open(self, f): self.files.append(f) return fd - def _close(self, fd): + def _close(self, fd: int) -> None: """ Removes a file descriptor from the file descriptor list :rtype: int @@ -1231,9 +1230,9 @@ def _close(self, fd): ) # Keep track for SymbolicFile testcase generation self.files[fd] = None except IndexError: - raise FdError(f"Bad file descriptor ({fd})") + raise FdError(f"Bad file descriptor ({fd})", errno.EBADF) - def _dup(self, fd): + def _dup(self, fd: int) -> int: """ Duplicates a file descriptor :rtype: int @@ -1242,34 +1241,34 @@ def _dup(self, fd): """ return self._open(self.files[fd]) - def _is_fd_open(self, fd): + def _is_fd_open(self, fd: int) -> bool: """ Determines if the fd is within range and in the file descr. list :param fd: the file descriptor to check. """ return fd >= 0 and fd < len(self.files) and self.files[fd] is not None - def _get_fd(self, fd): + def _get_fd(self, fd: int) -> File: if not self._is_fd_open(fd): - raise FdError + raise FdError(f"File descriptor is not open", errno.EBADF) else: return self.files[fd] - def _transform_write_data(self, data: bytes) -> bytes: + def _transform_write_data(self, data) -> bytes: """ Implement in subclass to transform data written by write(2)/writev(2) Nop by default. """ return data - def _exit(self, message): + def _exit(self, message) -> None: procid = self.procs.index(self.current) self.sched() self.running.remove(procid) if len(self.running) == 0: raise TerminateState(message, testcase=True) - def sys_umask(self, mask): + def sys_umask(self, mask: int) -> int: """ umask - Set file creation mode mask :param int mask: New mask @@ -1280,7 +1279,7 @@ def sys_umask(self, mask): except OSError as e: return -e.errno - def sys_chdir(self, path): + def sys_chdir(self, path) -> int: """ chdir - Change current working directory :param int path: Pointer to path @@ -1293,7 +1292,7 @@ def sys_chdir(self, path): except OSError as e: return -e.errno - def sys_getcwd(self, buf, size): + def sys_getcwd(self, buf, size) -> int: """ getcwd - Get the current working directory :param int buf: Pointer to dest array @@ -1307,13 +1306,13 @@ def sys_getcwd(self, buf, size): if size > 0 and size < length: logger.info( - "GETCWD: size is greater than 0, but is smaller than the length" - "of the path + 1. Returning ERANGE" + "GETCWD: size is greater than 0, but is smaller than the length " + "of the path + 1. Returning -errno.ERANGE" ) return -errno.ERANGE if not self.current.memory.access_ok(slice(buf, buf + length), "w"): - logger.info("GETCWD: buf within invalid memory. Returning EFAULT") + logger.info("GETCWD: buf within invalid memory. Returning -errno.EFAULT") return -errno.EFAULT self.current.write_string(buf, current_dir) @@ -1323,7 +1322,7 @@ def sys_getcwd(self, buf, size): except OSError as e: return -e.errno - def sys_lseek(self, fd, offset, whence): + def sys_lseek(self, fd: int, offset: int, whence: int) -> int: """ lseek - reposition read/write file offset @@ -1333,42 +1332,82 @@ def sys_lseek(self, fd, offset, whence): :param fd: a valid file descriptor :param offset: the offset in bytes - :param whence: SEEK_SET: The file offset is set to offset bytes. - SEEK_CUR: The file offset is set to its current location plus offset bytes. - SEEK_END: The file offset is set to the size of the file plus offset bytes. + :param whence: os.SEEK_SET: The file offset is set to offset bytes. + os.SEEK_CUR: The file offset is set to its current location plus offset bytes. + os.SEEK_END: The file offset is set to the size of the file plus offset bytes. :return: offset from file beginning, or EBADF (fd is not a valid file descriptor or is not open) - """ signed_offset = self._to_signed_dword(offset) try: return self._get_fd(fd).seek(signed_offset, whence) except FdError as e: logger.info( - ("LSEEK: Not valid file descriptor on lseek." "Fd not seekable. Returning EBADF") + f"LSEEK: Not valid file descriptor on lseek. Fd not seekable. Returning {-e.err}" ) return -e.err - def sys_read(self, fd, buf, count): + def sys_llseek( + self, fd: int, offset_high: int, offset_low: int, resultp: int, whence: int + ) -> int: + """ + _llseek - reposition read/write file offset + + The _llseek() system call repositions the offset of the open + file description associated with the file descriptor fd to + (offset_high<<32) | offset_low bytes relative to the beginning of the + file, the current file offset, or the end of the file, depending on + whether whence is os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END, + respectively. It returns the resulting file position in the argument + result. + + This system call exists on various 32-bit platforms to support seeking + to large file offsets. + + :param fd: a valid file descriptor + :param offset_high: the high 32 bits of the byte offset + :param offset_low: the low 32 bits of the byte offset + :param resultp: a pointer to write the position into on success + :param whence: os.SEEK_SET: The file offset is set to offset bytes. + os.SEEK_CUR: The file offset is set to its current location plus offset bytes. + os.SEEK_END: The file offset is set to the size of the file plus offset bytes. + + :return: 0 on success, negative on error + """ + signed_offset_high = self._to_signed_dword(offset_high) + signed_offset_low = self._to_signed_dword(offset_low) + signed_offset = (signed_offset_high << 32) | signed_offset_low + try: + pos = self._get_fd(fd).seek(signed_offset, whence) + posbuf = struct.pack("q", pos) # `loff_t * resultp` in linux, which is `long long` + self.current.write_bytes(resultp, posbuf) + return 0 + except FdError as e: + logger.info( + f"LSEEK: Not valid file descriptor on llseek. Fd not seekable. Returning {-e.err}" + ) + return -e.err + + def sys_read(self, fd: int, buf, count) -> int: data: bytes = bytes() if count != 0: # TODO check count bytes from buf if buf not in self.current.memory: # or not self.current.memory.isValid(buf+count): - logger.info("READ: buf points to invalid address. Returning EFAULT") + logger.info("READ: buf points to invalid address. Returning -errno.EFAULT") return -errno.EFAULT try: # Read the data and put it in memory data = self._get_fd(fd).read(count) except FdError as e: - logger.info(("READ: Not valid file descriptor on read." " Returning EBADF")) + logger.info(f"READ: Not valid file descriptor ({fd}). Returning -{e.err}") return -e.err self.syscall_trace.append(("_read", fd, data)) self.current.write_bytes(buf, data) return len(data) - def sys_write(self, fd, buf, count): + def sys_write(self, fd: int, buf, count) -> int: """ write - send bytes through a file descriptor The write system call writes up to count bytes from the buffer pointed to by buf to the file descriptor fd. If count is zero, write returns 0 @@ -1392,7 +1431,7 @@ def sys_write(self, fd, buf, count): # TODO check count bytes from buf if buf not in cpu.memory or buf + count not in cpu.memory: - logger.debug("WRITE: buf points to invalid address. Returning EFAULT") + logger.debug("WRITE: buf points to invalid address. Returning -errno.EFAULT") return -errno.EFAULT if fd > 2 and write_fd.is_full(): @@ -1400,27 +1439,27 @@ def sys_write(self, fd, buf, count): self.wait([], [fd], None) raise RestartSyscall() - data: MixedSymbolicBuffer = cpu.read_bytes(buf, count) - data: bytes = self._transform_write_data(data) + data_sym: MixedSymbolicBuffer = cpu.read_bytes(buf, count) + data = self._transform_write_data(data_sym) write_fd.write(data) for line in data.split(b"\n"): - line = line.decode( + line_str = line.decode( "latin-1" ) # latin-1 encoding will happily decode any byte (0x00-0xff) - logger.debug(f"WRITE({fd}, 0x{buf:08x}, {count}) -> <{repr(line):48s}>") + logger.debug(f"WRITE({fd}, 0x{buf:08x}, {count}) -> <{repr(line_str):48s}>") self.syscall_trace.append(("_write", fd, data)) self.signal_transmit(fd) return len(data) - def sys_fork(self): + def sys_fork(self) -> int: """ We don't support forking, but do return a valid error code to client binary. """ return -errno.ENOSYS - def sys_access(self, buf, mode): + def sys_access(self, buf, mode) -> int: """ Checks real user's permissions for a file :rtype: int @@ -1557,7 +1596,7 @@ def sys_open(self, buf, flags, mode): return self._open(f) - def sys_openat(self, dirfd, buf, flags, mode): + def sys_openat(self, dirfd, buf, flags, mode) -> int: """ Openat SystemCall - Similar to open system call except dirfd argument when path contained in buf is relative, dirfd is referred to set the relative path @@ -1578,11 +1617,11 @@ def sys_openat(self, dirfd, buf, flags, mode): try: dir_entry = self._get_fd(dirfd) except FdError as e: - logger.info("openat: Not valid file descriptor. Returning EBADF") + logger.info(f"openat: Not valid file descriptor. Returning {-e.err}") return -e.err if not isinstance(dir_entry, Directory): - logger.info("openat: Not directory descriptor. Returning ENOTDIR") + logger.info("openat: Not directory descriptor. Returning -errno.ENOTDIR") return -errno.ENOTDIR dir_path = dir_entry.name @@ -1597,7 +1636,7 @@ def sys_openat(self, dirfd, buf, flags, mode): return self._open(f) - def sys_rename(self, oldnamep, newnamep): + def sys_rename(self, oldnamep, newnamep) -> int: """ Rename filename `oldnamep` to `newnamep`. @@ -1615,7 +1654,7 @@ def sys_rename(self, oldnamep, newnamep): return ret - def sys_fsync(self, fd): + def sys_fsync(self, fd: int) -> int: """ Synchronize a file's in-core state with that on disk. """ @@ -1674,7 +1713,7 @@ def sys_dup(self, fd): """ if not self._is_fd_open(fd): - logger.info("DUP: Passed fd is not open. Returning EBADF") + logger.info(f"DUP: Passed fd is not open ({fd}). Returning -errno.EBADF") return -errno.EBADF newfd = self._dup(fd) @@ -1691,12 +1730,12 @@ def sys_dup2(self, fd, newfd): try: file = self._get_fd(fd) except FdError as e: - logger.info("DUP2: Passed fd is not open. Returning EBADF") + logger.info("DUP2: fd ({fd}) is not open. Returning {-e.err}") return -e.err soft_max, hard_max = self._rlimits[self.RLIMIT_NOFILE] if newfd >= soft_max: - logger.info("DUP2: newfd is above max descriptor table size") + logger.info(f"DUP2: newfd ({newfd}) is above max descriptor table size") return -errno.EBADF if self._is_fd_open(newfd): @@ -2136,7 +2175,7 @@ def sys_accept4(self, sockfd, addr, addrlen, flags): def sys_recv(self, sockfd, buf, count, flags, trace_str="_recv"): data: bytes = bytes() if not self.current.memory.access_ok(slice(buf, buf + count), "w"): - logger.info("RECV: buf within invalid memory. Returning EFAULT") + logger.info("RECV: buf within invalid memory. Returning -errno.EFAULT") return -errno.EFAULT try: @@ -2216,7 +2255,7 @@ def sys_getrandom(self, buf, size, flags): return 0 if buf not in self.current.memory: - logger.info("getrandom: Provided an invalid address. Returning EFAULT") + logger.info("getrandom: Provided an invalid address. Returning -errno.EFAULT") return -errno.EFAULT if flags & ~(GRND_NONBLOCK | GRND_RANDOM): @@ -2509,7 +2548,7 @@ def sys_fstat(self, fd, buf): try: stat = self._get_fd(fd).stat() except FdError as e: - logger.info("Calling fstat with invalid fd, returning EBADF") + logger.info(f"Calling fstat with invalid fd, returning {-e.err}") return -e.err def add(width, val): @@ -2552,7 +2591,7 @@ def sys_fstat64(self, fd, buf): try: stat = self._get_fd(fd).stat() except FdError as e: - logger.info("Calling fstat with invalid fd, returning EBADF") + logger.info(f"Calling fstat with invalid fd, returning {-e.err}") return -e.err def add(width, val): diff --git a/tests/native/test_syscalls.py b/tests/native/test_syscalls.py index 9d3f3e97b..d0aa5b9d6 100644 --- a/tests/native/test_syscalls.py +++ b/tests/native/test_syscalls.py @@ -1,5 +1,7 @@ import random +import struct import socket +import tempfile import unittest import os @@ -10,24 +12,26 @@ from manticore.platforms.platform import SyscallNotImplemented -def get_random_filename(): - return f"/tmp/mcore_test_{int(random.getrandbits(32))}" - - class LinuxTest(unittest.TestCase): _multiprocess_can_split_ = True BIN_PATH = os.path.join(os.path.dirname(__file__), "binaries", "basic_linux_amd64") def setUp(self): + self.tmp_dir = tempfile.TemporaryDirectory(prefix="mcore_test_") self.linux = linux.SLinux(self.BIN_PATH) def tearDown(self): for f in self.linux.files: if isinstance(f, linux.File): f.close() + self.tmp_dir.cleanup() + + def get_path(self, basename: str) -> str: + "Returns an absolute path with the given basename" + return f"{self.tmp_dir.name}/{basename}" def test_time(self): - self.linux.current.memory.mmap(0x1000, 0x1000, "rw ") + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") time_0 = self.linux.sys_time(0) self.linux.sys_clock_gettime(1, 0x1100) @@ -50,19 +54,19 @@ def test_time(self): self.assertGreater(time_2_final, time_2_0, "Time did not increase!") def test_directories(self): - tmpdir = get_random_filename() + dname = self.get_path("test_directories") - self.linux.current.memory.mmap(0x1000, 0x1000, "rw ") - self.linux.current.write_string(0x1100, tmpdir) + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") + self.linux.current.write_string(0x1100, dname) - self.assertFalse(os.path.exists(tmpdir)) + self.assertFalse(os.path.exists(dname)) self.linux.sys_mkdir(0x1100, mode=0o777) - self.assertTrue(os.path.exists(tmpdir)) + self.assertTrue(os.path.exists(dname)) self.linux.sys_rmdir(0x1100) - self.assertFalse(os.path.exists(tmpdir)) + self.assertFalse(os.path.exists(dname)) def test_pipe(self): - self.linux.current.memory.mmap(0x1000, 0x1000, "rw ") + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") self.linux.sys_pipe(0x1100) fd1 = self.linux.current.read_int(0x1100, 8 * 4) @@ -79,8 +83,8 @@ def test_pipe(self): ) def test_ftruncate(self): - fname = get_random_filename() - self.linux.current.memory.mmap(0x1000, 0x1000, "rw ") + fname = self.get_path("test_ftruncate") + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") self.linux.current.write_string(0x1100, fname) fd = self.linux.sys_open(0x1100, os.O_RDWR, 0o777) @@ -99,9 +103,9 @@ def test_ftruncate(self): ) def test_link(self): - fname = get_random_filename() - newname = get_random_filename() - self.linux.current.memory.mmap(0x1000, 0x1000, "rw ") + fname = self.get_path("test_link_from") + newname = self.get_path("test_link_to") + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") self.linux.current.write_string(0x1100, fname) self.linux.current.write_string(0x1180, newname) @@ -126,8 +130,8 @@ def test_link(self): self.assertFalse(os.path.exists(newname)) def test_chmod(self): - fname = get_random_filename() - self.linux.current.memory.mmap(0x1000, 0x1000, "rw ") + fname = self.get_path("test_chmod") + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") self.linux.current.write_string(0x1100, fname) print("Creating", fname) @@ -143,7 +147,7 @@ def test_chmod(self): self.assertEqual(-errno.EPERM, self.linux.sys_chown(0x1100, 0, 0)) def test_recvfrom(self): - self.linux.current.memory.mmap(0x1000, 0x1000, "rw ") + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") sock_fd = self.linux.sys_socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.assertEqual(sock_fd, 3) @@ -190,6 +194,129 @@ def test_multiple_sockets(self): conn_fd = self.linux.sys_accept(sock_fd, None, 0) self.assertEqual(conn_fd, 4) + def test_lseek(self): + fname = self.get_path("test_lseek") + assert len(fname) < 0x100 + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") + self.linux.current.write_string(0x1100, fname) + + fd = self.linux.sys_open(0x1100, os.O_RDWR, 0o777) + buf = b"1" * 1000 + self.assertEqual(len(buf), 1000) + self.linux.current.write_bytes(0x1200, buf) + self.linux.sys_write(fd, 0x1200, len(buf)) + + pos = self.linux.sys_lseek(fd, 100, os.SEEK_SET) + self.assertEqual(100, pos) + + pos = self.linux.sys_lseek(fd, -50, os.SEEK_CUR) + self.assertEqual(50, pos) + + pos = self.linux.sys_lseek(fd, 50, os.SEEK_CUR) + self.assertEqual(100, pos) + + pos = self.linux.sys_lseek(fd, 0, os.SEEK_END) + self.assertEqual(len(buf), pos) + + pos = self.linux.sys_lseek(fd, -50, os.SEEK_END) + self.assertEqual(len(buf) - 50, pos) + + pos = self.linux.sys_lseek(fd, 50, os.SEEK_END) + self.assertEqual(len(buf) + 50, pos) + + self.linux.sys_close(fd) + pos = self.linux.sys_lseek(fd, 0, os.SEEK_SET) + self.assertEqual(-errno.EBADF, pos) + + @unittest.expectedFailure + def test_lseek_end_broken(self): + fname = self.get_path("test_lseek") + assert len(fname) < 0x100 + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") + self.linux.current.write_string(0x1100, fname) + + fd = self.linux.sys_open(0x1100, os.O_RDWR, 0o777) + buf = b"1" * 1000 + self.assertEqual(len(buf), 1000) + self.linux.current.write_bytes(0x1200, buf) + self.linux.sys_write(fd, 0x1200, len(buf)) + + # FIXME: currently broken -- raises a Python OSError invalid argument exception! + pos = self.linux.sys_lseek(fd, -2 * len(buf), os.SEEK_END) + self.assertEqual(-errno.EBADF, pos) + + def test_llseek(self): + fname = self.get_path("test_llseek") + assert len(fname) < 0x100 + # map some memory we can play with + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") + # open a file descriptor for `fname` + self.linux.current.write_string(0x1100, fname) + fd = self.linux.sys_open(0x1100, os.O_RDWR, 0o777) + # write some bogus data to the file + buf = b"1" * 1000 + self.assertEqual(len(buf), 1000) + self.linux.current.write_bytes(0x1200, buf) + self.linux.sys_write(fd, 0x1200, len(buf)) + + # set up a location & some helpers for the result pointer for `sys_llseek` + result_struct = struct.Struct("q") + resultp = 0x1900 + result_size = result_struct.size + + def read_resultp(): + "reads the `loff_t` value -- a long long -- from the result pointer" + data = self.linux.current.read_bytes(resultp, result_struct.size) + return result_struct.unpack(b"".join(data))[0] + + # now actually test some things about sys_llseek + res = self.linux.sys_llseek(fd, 0, 100, resultp, os.SEEK_SET) + self.assertEqual(res, 0) + self.assertEqual(read_resultp(), 100) + + res = self.linux.sys_llseek(fd, 1, 0, resultp, os.SEEK_CUR) + self.assertEqual(res, 0) + self.assertEqual(read_resultp(), 4294967396) + + res = self.linux.sys_llseek(fd, 0, -1000, resultp, os.SEEK_CUR) + self.assertEqual(res, 0) + self.assertEqual(read_resultp(), 4294966396) + + res = self.linux.sys_llseek(fd, 0, 0, resultp, os.SEEK_END) + self.assertEqual(res, 0) + self.assertEqual(read_resultp(), len(buf)) + + res = self.linux.sys_llseek(fd, 0, 50, resultp, os.SEEK_END) + self.assertEqual(res, 0) + self.assertEqual(read_resultp(), len(buf) + 50) + + res = self.linux.sys_llseek(fd, 0, -50, resultp, os.SEEK_END) + self.assertEqual(res, 0) + self.assertEqual(read_resultp(), len(buf) - 50) + + self.linux.sys_close(fd) + res = self.linux.sys_llseek(fd, 0, 0, resultp, os.SEEK_SET) + self.assertEqual(-errno.EBADF, res) + + @unittest.expectedFailure + def test_llseek_end_broken(self): + fname = self.get_path("test_llseek_end_broken") + assert len(fname) < 0x100 + # map some memory we can play with + self.linux.current.memory.mmap(0x1000, 0x1000, "rw") + # open a file descriptor for `fname` + self.linux.current.write_string(0x1100, fname) + fd = self.linux.sys_open(0x1100, os.O_RDWR, 0o777) + # write some bogus data to the file + buf = b"1" * 1000 + self.assertEqual(len(buf), 1000) + self.linux.current.write_bytes(0x1200, buf) + self.linux.sys_write(fd, 0x1200, len(buf)) + + # FIXME: currently broken -- raises a Python OSError invalid argument exception! + res = self.linux.sys_llseek(fd, 0, -2 * len(buf), resultp, os.SEEK_END) + self.assertTrue(res < 0) + def test_unimplemented(self): stubs = linux_syscall_stubs.SyscallStubs(default_to_fail=False)