Skip to content

Commit

Permalink
Merge pull request #44 from scop/typing
Browse files Browse the repository at this point in the history
Finish type hinting
  • Loading branch information
nielstron committed Oct 22, 2020
2 parents c67f4ce + 5856bce commit b599475
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 58 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ matrix:
- python: '3.6'
- python: '3.7'
- python: '3.8'
env: PRE_COMMIT=1
env: PRE_COMMIT=1 MYPY=1
fast_finish: true
dist: xenial
sudo: true

install:
- pip install coverage coveralls
- test ! "$PRE_COMMIT" || pip install pre-commit
- test ! "$MYPY" || pip install "mypy>=0.782"
- pip install -e .

script:
- test ! "$PRE_COMMIT" || pre-commit run --all-files
- test ! "$MYPY" || mypy pysyncthru
- coverage run --source=pysyncthru setup.py test

after_success:
Expand Down
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pysyncthru = {editable = true,path = "."}

[dev-packages]
pre-commit = "*"
mypy = ">=0.782"

[requires]
python_version = "3.8"
20 changes: 10 additions & 10 deletions pysyncthru/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import aiohttp

from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, cast
from enum import Enum

ENDPOINT = "/sws/app/information/home/home.json"
Expand Down Expand Up @@ -37,7 +37,7 @@ class SyncThru:
DRUM = "drum"
TRAY = "tray"

def __init__(self, ip, session) -> None:
def __init__(self, ip: str, session: aiohttp.ClientSession) -> None:
"""Initialize the the printer."""
self.url = construct_url(ip)
self._session = session
Expand Down Expand Up @@ -77,41 +77,41 @@ def is_unknown_state(self) -> bool:
def model(self) -> Optional[str]:
"""Return the model name of the printer."""
try:
return self.data.get("identity").get("model_name")
return cast(Dict[str, str], self.data.get("identity", {})).get("model_name")
except (KeyError, AttributeError):
return None

def location(self) -> Optional[str]:
"""Return the location of the printer."""
try:
return self.data.get("identity").get("location")
return cast(Dict[str, str], self.data.get("identity", {})).get("location")
except (KeyError, AttributeError):
return None

def serial_number(self) -> Optional[str]:
"""Return the serial number of the printer."""
try:
return self.data.get("identity").get("serial_num")
return cast(Dict[str, str], self.data.get("identity", {})).get("serial_num")
except (KeyError, AttributeError):
return None

def hostname(self) -> Optional[str]:
"""Return the hostname of the printer."""
try:
return self.data.get("identity").get("host_name")
return cast(Dict[str, str], self.data.get("identity", {})).get("host_name")
except (KeyError, AttributeError):
return None

def device_status(self) -> SyncthruState:
"""Fetch the raw device status"""
try:
return SyncthruState(int(self.data.get("status").get("hrDeviceStatus")))
return SyncthruState(int(self.data.get("status", {}).get("hrDeviceStatus")))
except (ValueError, TypeError):
return SyncthruState.INVALID

def device_status_details(self) -> str:
"""Return the detailed (display) status of the device as string."""
head = self.data.get("status")
head = self.data.get("status", {})
status_display = [
head.get("status{}".format(i), "").strip() for i in [1, 2, 3, 4]
]
Expand All @@ -125,7 +125,7 @@ def capability(self) -> Dict[str, Any]:
except (KeyError, AttributeError):
return {}

def raw(self) -> Dict:
def raw(self) -> Dict[str, Any]:
"""Return all details of the printer."""
try:
return self.data
Expand All @@ -146,7 +146,7 @@ def toner_status(self, filter_supported: bool = True) -> Dict[str, Any]:
toner_status[color] = {}
return toner_status

def input_tray_status(self, filter_supported: bool = True) -> Dict[int, Any]:
def input_tray_status(self, filter_supported: bool = True) -> Dict[str, Any]:
"""Return the state of all input trays."""
tray_status = {}
for tray in (
Expand Down
Empty file added pysyncthru/py.typed
Empty file.
13 changes: 7 additions & 6 deletions pysyncthru/tests/test_structure/server_control.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socketserver
import threading


class Server:
def __init__(self, server):
def __init__(self, server: socketserver.TCPServer) -> None:
"""
Expects subclass of TCPServer as argument
"""
self._server = server
self._server_started_event = threading.Event()
self._server_running = False

def _run_server(self):
def _run_server(self) -> None:

print("Server started, serving on port {}".format(self.get_port()))

Expand All @@ -26,13 +27,13 @@ def _run_server(self):
finally:
self._cleanup_server()

def _cleanup_server(self):
def _cleanup_server(self) -> None:
self._server_running = False
self._server.server_close()
# Here, server was stopped
print("Server stopped")

def stop_server(self):
def stop_server(self) -> None:
"""
Close server forcibly
:return:
Expand All @@ -41,7 +42,7 @@ def stop_server(self):
if self._server_running:
self._server.shutdown()

def start_server(self, timeout=10):
def start_server(self, timeout: int = 10) -> None:
"""
Start server thread as daemon
As such the program will automatically close the thread
Expand All @@ -57,5 +58,5 @@ def start_server(self, timeout=10):
# wait (non-busy) for successful start
self._server_started_event.wait(timeout=timeout)

def get_port(self):
def get_port(self) -> int:
return self._server.server_address[1]
31 changes: 17 additions & 14 deletions pysyncthru/tests/test_structure/syncthru_mock_server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import os
import urllib.parse
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler, HTTPServer

try:
from http import HTTPStatus
except ImportError:
# Backwards compatability
import http.client as HTTPStatus
import posixpath
from pathlib import Path
from typing import Optional, Tuple

SERVER_DIR = Path(__file__).parent or Path(".")

Expand All @@ -17,21 +13,27 @@ class SyncThruServer(HTTPServer):

blocked = False

def set_blocked(self):
def set_blocked(self) -> None:
self.blocked = True

def unset_blocked(self):
def unset_blocked(self) -> None:
self.blocked = False


class SyncThruRequestHandler(SimpleHTTPRequestHandler):
def do_GET(self):
def __init__(
self, request: bytes, client_address: Tuple[str, int], server: SyncThruServer
) -> None:
self.server = server # type: SyncThruServer
super().__init__(request, client_address, server)

def do_GET(self) -> None:
if self.server.blocked:
self.send_error(403, "Access denied because server blocked")
else:
super(SyncThruRequestHandler, self).do_GET()

def translate_path(self, path):
def translate_path(self, path: str) -> str:
"""Translate a /-separated PATH to the local filename syntax.
Components that mean special things to the local file system
Expand All @@ -50,8 +52,7 @@ def translate_path(self, path):
except UnicodeDecodeError:
path = urllib.parse.unquote(path)
path = posixpath.normpath(path)
words = path.split("/")
words = filter(None, words)
words = filter(None, path.split("/"))
path = str(SERVER_DIR.absolute())
for word in words:
if os.path.dirname(word) or word in (os.curdir, os.pardir):
Expand All @@ -62,7 +63,9 @@ def translate_path(self, path):
path += "/"
return path

def send_error(self, code, message=None, explain=None):
def send_error(
self, code: int, message: Optional[str] = None, explain: Optional[str] = None
) -> None:
"""
Send syncthru error page
:param code:
Expand All @@ -89,7 +92,7 @@ def send_error(self, code, message=None, explain=None):
with SERVER_DIR.joinpath(".error.html").open("rb") as file:
body = file.read()
self.send_header("Content-Type", self.error_content_type)
self.send_header("Content-Length", int(len(body)))
self.send_header("Content-Length", str(len(body)))
self.end_headers()

if self.command != "HEAD" and body:
Expand Down
50 changes: 25 additions & 25 deletions pysyncthru/tests/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
class SyncthruWebTest(unittest.TestCase):

server = None
server_control = None
server_control = None # type: Server
port = 0
url = "http://localhost:80"
syncthru = None
syncthru = None # type: SyncThru

def setUp(self):
def setUp(self) -> None:
# Create an arbitrary subclass of TCP Server as the server to be started
# Here, it is an Simple HTTP file serving server
handler = SyncThruRequestHandler
Expand Down Expand Up @@ -58,25 +58,25 @@ async def fetch():
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())

def test_online(self):
def test_online(self) -> None:
self.assertTrue(self.syncthru.is_online())

def test_status_normal(self):
def test_status_normal(self) -> None:
self.assertEqual(self.syncthru.device_status(), SyncthruState.NORMAL)

def test_status_details(self):
def test_status_details(self) -> None:
self.assertEqual(self.syncthru.device_status_details(), "Sleeping...")

def test_model(self):
def test_model(self) -> None:
self.assertEqual(self.syncthru.model(), RAW["identity"]["model_name"])

def test_toner_filter(self):
def test_toner_filter(self) -> None:
self.assertDictEqual(
self.syncthru.toner_status(True),
{"black": {"opt": 1, "remaining": 58, "cnt": 229, "newError": ""}},
)

def test_toner_no_filter(self):
def test_toner_no_filter(self) -> None:
empty = {"opt": 0, "remaining": 0, "cnt": 0, "newError": ""}
self.assertDictEqual(
self.syncthru.toner_status(False),
Expand All @@ -88,7 +88,7 @@ def test_toner_no_filter(self):
},
)

def test_input_tray_filter(self):
def test_input_tray_filter(self) -> None:
self.assertDictEqual(
self.syncthru.input_tray_status(True),
{
Expand All @@ -104,7 +104,7 @@ def test_input_tray_filter(self):
},
)

def test_input_tray_no_filter(self):
def test_input_tray_no_filter(self) -> None:
self.assertDictEqual(
self.syncthru.input_tray_status(False),
{
Expand Down Expand Up @@ -174,16 +174,16 @@ def test_input_tray_no_filter(self):
},
)

def test_output_tray(self):
def test_output_tray(self) -> None:
self.assertEqual(
self.syncthru.output_tray_status(),
{0: {"capacity": 100, "name": 1, "status": ""}},
)

def test_drum_status_filter(self):
def test_drum_status_filter(self) -> None:
self.assertEqual(self.syncthru.drum_status(True), {})

def test_drum_status_no_filter(self):
def test_drum_status_no_filter(self) -> None:
self.assertEqual(
self.syncthru.drum_status(False),
{
Expand All @@ -194,32 +194,32 @@ def test_drum_status_no_filter(self):
},
)

def test_location(self):
def test_location(self) -> None:
self.assertEqual(self.syncthru.location(), RAW["identity"]["location"])

def test_serial_number(self):
def test_serial_number(self) -> None:
self.assertEqual(self.syncthru.serial_number(), RAW["identity"]["serial_num"])

def test_hostname(self):
def test_hostname(self) -> None:
self.assertEqual(self.syncthru.hostname(), RAW["identity"]["host_name"])

def test_cap(self):
def test_cap(self) -> None:
self.assertEqual(self.syncthru.capability(), RAW["capability"])

def tearDown(self):
def tearDown(self) -> None:
self.server_control.stop_server()
pass


class NonSyncthruWebTest(unittest.TestCase):

server = None
server_control = None
server_control = None # type: Server
port = 0
url = "http://localhost:80"
syncthru = None
syncthru = None # type: SyncThru

def test_no_syncthru(self):
def test_no_syncthru(self) -> None:
"""Test that an error is thrown when no syncthru is supported"""
# Create an arbitrary subclass of TCP Server as the server to be started
# Here, it is an Simple HTTP file serving server
Expand Down Expand Up @@ -249,7 +249,7 @@ def test_no_syncthru(self):

try:

async def fetch():
async def fetch() -> None:
async with aiohttp.ClientSession() as session:
self.syncthru = SyncThru(self.url, session)
await self.syncthru.update()
Expand All @@ -262,10 +262,10 @@ async def fetch():
except ValueError:
pass

def test_offline_unknown(self):
def test_offline_unknown(self) -> None:
"""Test that nothing is returned when syncthru is offline"""

async def fetch():
async def fetch() -> None:
async with aiohttp.ClientSession() as session:
self.syncthru = SyncThru(self.url, session)
await self.syncthru.update()
Expand Down

0 comments on commit b599475

Please sign in to comment.