diff --git a/calibre-plugin/libby/client.py b/calibre-plugin/libby/client.py index eb474ed..69df01f 100644 --- a/calibre-plugin/libby/client.py +++ b/calibre-plugin/libby/client.py @@ -7,7 +7,7 @@ # See https://github.com/ping/libby-calibre-plugin for more # information # - +import base64 import gzip import json import logging @@ -19,7 +19,7 @@ from socket import error as SocketError, timeout as SocketTimeout from ssl import SSLError from typing import Dict, List, Optional, Tuple, Union -from urllib import request +from urllib import parse, request from urllib.error import HTTPError, URLError from urllib.parse import urlencode, urljoin from urllib.request import HTTPCookieProcessor, Request, build_opener @@ -1047,3 +1047,62 @@ def verify_card( f"auth/link/{website_id}", params=data, is_form=False, method="POST" ) return res + + def tags(self) -> Dict: + """ + Get user tags. + + :return: + """ + res: Dict = self.send_request("https://vandal.svc.overdrive.com/tags") + return res + + def tag( + self, tag_id: str, tag_name: str, paging_range: Tuple[int, int] = (0, 12) + ) -> Dict: + """ + Details of a tag, including titles ("taggings"). + + :param tag_id: UUID string + :param tag_name: string + :param paging_range: tuple(start, end) for paging titles ("taggings"). 0-indexed. Defaults to a page size of 12. + :return: + """ + query = { + "enc": "1", # ?? + "sort": "newest", + "range": f"{paging_range[0]}...{paging_range[1]}", + } + b64encoded_tag_name = base64.b64encode(tag_name.encode("utf-8")).decode("ascii") + res: Dict = self.send_request( + f"https://vandal.svc.overdrive.com/tag/{tag_id}/{b64encoded_tag_name}", + query=query, + ) + return res + + def tag_paged( + self, tag_id: str, tag_name: str, page: int = 0, per_page: int = 12 + ) -> Dict: + """ + Helper method to get details of a tag with more standardised paging parameters + + :param tag_id: + :param tag_name: + :param page: 0-indexed. For paging titles ("taggings"). + :param per_page: Default 12. Does not appear to be constrained. Tested up to 400. + :return: + """ + paging_range = (page * per_page, (page + 1) * per_page) + return self.tag(tag_id, tag_name, paging_range) + + def taggings(self, title_ids: List[str]) -> Dict: + """ + Get tagging information for title IDs + + :param title_ids: + :return: + """ + res: Dict = self.send_request( + f'https://vandal.svc.overdrive.com/taggings/{parse.quote(",".join(title_ids))}' + ) + return res diff --git a/tests/libby.py b/tests/libby.py index e809d2b..7f2c321 100644 --- a/tests/libby.py +++ b/tests/libby.py @@ -365,3 +365,108 @@ def test_client_error_handling(self, open_mock): with self.assertRaises(ClientConnectionError): client.sync() + + def test_tags(self): + if not self.client.identity_token: + self.skipTest("Client not authorised") + + res = self.client.tags() + for k in ("tags", "totalTags", "totalTaggings"): + with self.subTest("response", k=k): + self.assertIn(k, res, msg=f'"{k}" not found') + for tag in res.get("tags"): + for k in ( + "name", + "uuid", + "description", + "behaviors", + "createTime", + "totalTaggings", + "taggings", + ): + with self.subTest("tag", k=k): + self.assertIn(k, tag, msg=f'"{k}" not found') + + def test_tag(self): + if not self.client.identity_token: + self.skipTest("Client not authorised") + + res = self.client.tags() + per_page = 12 + for tag in res.get("tags"): + if tag.get("totalTaggings", 0) <= per_page: + # get a tag that requires paging + continue + total_titles_expected = tag["totalTaggings"] + curr_page = 0 + tagged_titles = [] + while True: + res = self.client.tag_paged( + tag["uuid"], tag["name"], page=curr_page, per_page=per_page + ) + curr_page += 1 + tag_found = res.get("tag") + self.assertTrue(tag_found) + for k2 in ( + "name", + "uuid", + "description", + "behaviors", + "createTime", + "facetCounts", + "totalTaggings", + "taggings", + ): + with self.subTest("tag_found", k2=k2): + self.assertIn(k2, tag_found, msg=f'"{k2}" not found') + for title in tag_found["taggings"]: + for k3 in ( + "titleId", + "websiteId", + "cardId", + "createTime", + "titleFormat", + "titleSubjects", + "sortTitle", + "sortAuthor", + ): + with self.subTest("title", k3=k3): + self.assertIn(k3, title, msg=f'"{k3}" not found') + self.assertNotIn(title["titleId"], tagged_titles) + tagged_titles.append(title["titleId"]) + + if len(tag_found["taggings"]) < per_page: + # last page + break + + self.assertEqual(total_titles_expected, len(tagged_titles)) + break + + def test_taggings(self): + if not self.client.identity_token: + self.skipTest("Client not authorised") + + title_ids = ["784353", "36635"] + res = self.client.taggings(title_ids) + self.assertEqual(len(title_ids), len(res.items())) + for title_id in title_ids: + self.assertIn(title_id, res) + if not res[title_id]: + # title has not been tagged + continue + for tag in res[title_id]: + for k in ( + "titleId", + "websiteId", + "cardId", + "createTime", + "titleFormat", + "titleSubjects", + "sortTitle", + "sortAuthor", + "properties", + "tagUUID", + "tagName", + ): + with self.subTest("title", k=k): + self.assertIn(k, tag, msg=f'"{k}" not found')