diff --git a/coc/__init__.py b/coc/__init__.py index 786cde7d..56b4756f 100644 --- a/coc/__init__.py +++ b/coc/__init__.py @@ -22,7 +22,7 @@ SOFTWARE. """ -__version__ = "2.1.1" +__version__ = "2.2.0" from .abc import BasePlayer, BaseClan from .clans import RankedClan, Clan diff --git a/coc/client.py b/coc/client.py index 788bb604..96b52e85 100644 --- a/coc/client.py +++ b/coc/client.py @@ -31,6 +31,7 @@ import ujson from .clans import Clan, RankedClan +from .entry_logs import ClanWarLog from .errors import Forbidden, GatewayError, NotFound, PrivateWarLog from .enums import WarRound from .miscmodels import GoldPassSeason, Label, League, Location, LoadGameData @@ -44,11 +45,12 @@ CurrentWarIterator, ) from .players import Player, ClanMember, RankedPlayer -from .raid import RaidLog, RaidLogEntry +from .raid import RaidLogEntry from .spell import SpellHolder from .troop import TroopHolder from .utils import correct_tag, get, parse_army_link -from .wars import ClanWar, ClanWarLog, ClanWarLogEntry, ClanWarLeagueGroup +from .wars import ClanWar, ClanWarLogEntry, ClanWarLeagueGroup +from.entry_logs import ClanWarLog, RaidLog if TYPE_CHECKING: from .hero import Hero, Pet @@ -197,6 +199,13 @@ def __init__( self._clans = {} self._wars = {} + async def __aenter__(self): + self.__init__() + return self + + async def __aexit__(self, *args): + await self.close() + def _create_client(self, email, password): return HTTPClient( client=self, @@ -494,11 +503,18 @@ async def get_warlog( self, clan_tag: str, cls: Type[ClanWarLogEntry] = ClanWarLogEntry, - paginated=True, - **kwargs + page: bool = False, + limit: int = 0 ) -> ClanWarLog: - """Retrieve a clan's clan war log. - Set paginated = False to get the full war log with one API call. + """ + Retrieve a clan's clan war log. By default, this will return + all the clan's log available in the API. This will of course consume + memory. The option of limiting the amount of log items fetched + can be controlled with the `limit` parameter. Additionally, if + `paginate` is set to True, and an async for loop is performed + on this object, then additional log items will be fetched but only + consume the same amount of memory space at all time. + .. note:: @@ -509,8 +525,20 @@ async def get_warlog( Parameters ----------- - clan_tag : str - The clan tag to search for. + cls: + Target class to use to model that data returned + + clan_tag: + class:`str`: The clan tag to search for. + + page: + class:`bool`: Enable fetching logs while only holding the + same amount of logs as `limit`. If `paginate` is set to True, + and `limit` is set to default of 0, then `limit` will be set to + 10 automatically. + + limit: + class:`int`: Number of logs to retrieve Raises ------ @@ -532,38 +560,66 @@ async def get_warlog( Returns -------- - List[:class:`ClanWarLogEntry`] + :class:`ClanWarLog`: Entries in the warlog of the requested clan. """ + if limit < 0: + raise ValueError("Limit cannot be negative") + if not issubclass(cls, ClanWarLogEntry): raise TypeError("cls must be a subclass of ClanWarLogEntry.") if self.correct_tags: clan_tag = correct_tag(clan_tag) - if paginated and not kwargs.get("limit", None): - kwargs["limit"] = 5 + # If paginate is enabled and limit is set to default of 0, then + # set limit to a new default of 10 + if page: + limit = limit if limit else 10 + try: - data = await self.http.get_clan_warlog(clan_tag, **kwargs) + return await ClanWarLog.init_cls(client=self, + clan_tag=clan_tag, + page=page, + limit=limit, + model=cls) except Forbidden as exception: - raise PrivateWarLog(exception.response, exception.reason) from exception - - return ClanWarLog(data=data, client=self, cls=cls, clan_tag=clan_tag) + raise PrivateWarLog(exception.response, + exception.reason) from exception async def get_raidlog( - self, - clan_tag: str, - cls: Type[RaidLogEntry] = RaidLogEntry, - paginated: bool = True, - **kwargs + self, + clan_tag: str, + cls: Type[RaidLogEntry] = RaidLogEntry, + page: bool = False, + limit: int = 0 ) -> RaidLog: - """Retrieve a clan's raid log. - Set paginated = False to get the full raid log with one API call. + """ + Retrieve a clan's Capital Raid Log. By default, this will return + all the clan's log available in the API. This will of course consume + memory. The option of limiting the amount of log items fetched + can be controlled with the `limit` parameter. Additionally, if + `paginate` is set to True, and an async for loop is performed + on this object, then additional log items will be fetched but only + consume the same amount of memory space at all time. + Parameters ----------- - clan_tag : str - The clan tag to search for. + cls: + Target class to use to model that data returned + + clan_tag: + class:`str`: The clan tag to search for. + + page: + class:`bool`: Enable fetching logs while only holding the + same amount of logs as `limit`. If `paginate` is set to True, + and `limit` is set to default of 0, then `limit` will be set to + 10 automatically. + + limit: + class:`int`: Number of logs to retrieve Raises ------ @@ -573,28 +629,45 @@ async def get_raidlog( NotFound No clan was found with the supplied tag. + PrivateWarLog + The clan's warlog is private. + Maintenance The API is currently in maintenance. GatewayError The API hit an unexpected gateway exception. + Returns -------- - List[:class:`RaidLogEntry`] - Entries in the raid log of the requested clan. + :class:`RaidLog`: + Entries in the capital raid seasons of the requested clan. """ + + if limit < 0: + raise ValueError("Limit cannot be negative") + if not issubclass(cls, RaidLogEntry): - raise TypeError("cls must be a subclass of RaidLogEntry.") + raise TypeError("cls must be a subclass of ClanWarLogEntry.") if self.correct_tags: clan_tag = correct_tag(clan_tag) - if paginated and not kwargs.get("limit", None): - kwargs["limit"] = 5 + # If paginate is enabled and limit is set to default of 0, then + # set limit to a new default of 10 + if page: + limit = limit if limit else 10 - data = await self.http.get_clan_raidlog(clan_tag, **kwargs) - return RaidLog(data=data, client=self, cls=cls, clan_tag=clan_tag) + try: + return await RaidLog.init_cls(client=self, + clan_tag=clan_tag, + page=page, + limit=limit, + model=cls) + except Forbidden as exception: + raise PrivateWarLog(exception.response, + exception.reason) from exception async def get_clan_war(self, clan_tag: str, cls: Type[ClanWar] = ClanWar, **kwargs) -> ClanWar: """ diff --git a/coc/entry_logs.py b/coc/entry_logs.py new file mode 100644 index 00000000..c8fbcbf8 --- /dev/null +++ b/coc/entry_logs.py @@ -0,0 +1,228 @@ +# Enables circular import for type hinting coc.Client +from __future__ import annotations + +import asyncio +from abc import ABC, abstractmethod +from typing import Optional, TYPE_CHECKING, Type, Union + +from .raid import RaidLogEntry +from .wars import ClanWarLogEntry + +if TYPE_CHECKING: + from .client import Client + + +class LogPaginator(ABC): + @abstractmethod + def __init__(self, client: Client, + clan_tag: str, + limit: int, + page: bool, + json_resp: dict, + model: Union[Type[ClanWarLogEntry], Type[RaidLogEntry]]): + + self._clan_tag = clan_tag + self._limit = limit + self._page = page + + self._init_data = json_resp # Initial data; this is const + self._init_logs = json_resp.get("items", []) + + self._client = client + self._model = model + + def __len__(self) -> int: + return len(self._init_logs) + + def __iter__(self): + """Initialize the iter object and reset the iter index to 0""" + self._sync_index = 0 + return self + + def __next__(self) -> Union[ClanWarLogEntry, RaidLogEntry]: + """Fetch the next item in the iter object and return the entry""" + if self._sync_index == len(self._init_logs): + raise StopIteration + ret = self._model(data=self._init_logs[self._sync_index], + client=self._client) + self._sync_index += 1 + return ret + + def __getitem__(self, index: int) -> Union[ClanWarLogEntry, RaidLogEntry]: + """Support indexing the object. This will not fetch any addition + items from the endpoint""" + try: + ret = self._init_logs[index] + return self._model(data=ret, client=self._client) + except Exception: + raise + + def __aiter__(self): + # These values are used to simulate the caller having a single list + # of items. In reality, the list is populated on demand. + self._max_index = len(self._init_logs) + self._min_index = 0 + self._async_index = 0 + + # Make copies of the init data since they will change. + self._logs = self._init_logs[:] + self._page_data = self._init_data.copy() + return self + + async def __anext__(self) -> Union[ClanWarLogEntry, RaidLogEntry]: + """ + This class supports async for loops. If the `page` bool is set to + True then the async for loop will fetch all items from the endpoint + until there are not more items in the endpoint. This is done without + increasing the memory footprint by only caching `limit` number + of logs at all times. + + When `limit` is set to 10, `self._logs` will only store 10 log items. + When the last item in `self._logs` is reached when iterating, + the array will be replaced by the next `limit` number of items. All + this is abstracted from the user, they will just think they are + iterating over the array. Keep in mind that if `limit` is set to 10 + and there are 200 total logs, then this API will make 20 get + requests to the endpoint making it quite slow. Consider tuning + this method with the `limit` value. + """ + # If paging is not enabled, do not fetch any more items only + # iterate over the items in the self._war_logs + if not self._page: + if self._async_index == len(self._logs): + raise StopAsyncIteration + ret = self._model(data=self._logs[self._async_index], + client=self._client) + self._async_index += 1 + return ret + + # If paging is enabled, update self._war_logs if the end of the + # array is reached + ret: Union[ClanWarLogEntry, RaidLogEntry] + + # If index request is within range of the war_logs, return item + if self._min_index <= self._async_index < self._max_index: + ret = self._logs[self._async_index - self._min_index] + + # Iteration has reached the end of the array, fetch the next + # set of logs from the endpoint + elif self._next_page: + await self._paginate() + self._min_index = self._max_index + self._max_index = self._max_index + len(self._logs) + ret = self._logs[self._async_index - self._min_index] + else: + raise StopAsyncIteration + + self._async_index += 1 + return self._model(data=ret, client=self._client) + + async def _paginate(self) -> None: + """ + Request data from the endpoint and update the iter variables with + the new data. `self._fetch_endpoint` is a child defined method. + """ + self._page_data = await self._fetch_endpoint(self._client, + self._clan_tag, + **self.options) + self._logs = self._page_data.get("items", []) + + @property + def options(self) -> dict: + """Generate the header for the endpint request""" + options = {"limit": self._limit} + if self._next_page: + options["after"] = self._next_page + return options + + @property + def _next_page(self) -> Optional[str]: + """Determine if there is a next page for the endpoint query""" + try: + return self._page_data.get("paging").get("cursors").get("after") + except KeyError: + return None + + @staticmethod + @abstractmethod + async def _fetch_endpoint(client: Client, + clan_tag: str, + fut: Optional[asyncio.Future] = None, + **options) -> dict: + """Function to fetch data from the endpoint""" + pass + + @classmethod + @abstractmethod + async def init_cls(cls, + client: Client, + clan_tag: str, + model: Type[ClanWarLogEntry], + limit: int, + paginate: bool = True, + ) -> Union[ClanWarLog, RaidLog]: + """Class method to return an instantiated object""" + pass + + +class ClanWarLog(LogPaginator, ABC): + """Represents a Generator for a ClanWarLog""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @classmethod + async def init_cls(cls, + client: Client, + clan_tag: str, + model: Type[ClanWarLogEntry], + limit: int, + page: bool = True, + ) -> ClanWarLog: + + # Add the limit if specified + args = {"limit": limit} if limit else {} + + json_resp = await cls._fetch_endpoint(client, clan_tag, **args) + return ClanWarLog(client=client, clan_tag=clan_tag, limit=limit, + page=page, json_resp=json_resp, model=model) + + @staticmethod + async def _fetch_endpoint(client: Client, clan_tag: str, + fut: Optional[asyncio.Future] = None, + **options) -> dict: + result = await client.http.get_clan_warlog(clan_tag, **options) + if fut: + fut.set_result(result) + return result + + +class RaidLog(LogPaginator, ABC): + """Represents a Generator for a RaidLog""" + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @classmethod + async def init_cls(cls, + client: Client, + clan_tag: str, + model: Type[RaidLogEntry], + limit: int, + page: bool = True, + ) -> RaidLog: + + # Add the limit if specified + args = {"limit": limit} if limit else {} + + json_resp = await cls._fetch_endpoint(client, clan_tag, **args) + return RaidLog(client=client, clan_tag=clan_tag, limit=limit, + page=page, json_resp=json_resp, model=model) + + @staticmethod + async def _fetch_endpoint(client: Client, clan_tag: str, + fut: Optional[asyncio.Future] = None, + **options) -> dict: + result = await client.http.get_clan_raidlog(clan_tag, **options) + if fut: + fut.set_result(result) + return result diff --git a/coc/http.py b/coc/http.py index 876468a7..a546e2eb 100644 --- a/coc/http.py +++ b/coc/http.py @@ -143,7 +143,23 @@ class Route: BASE = "https://api.clashofclans.com/v1" - def __init__(self, method, path: str, **kwargs): + def __init__(self, method: str, path: str, **kwargs: dict): + """ + The class is used to create the final URL used to fetch the data + from the API. The parameters that are passed to the API are all in + the GET request packet. This class will parse the `kwargs` dictionary + and concatenate any parameters passed in. + + Parameters + ---------- + method: + :class:`str`: HTTP method used for the HTTP request + path: + :class:`str`: URL path used for the HTTP request + kwargs: + :class:`dict`: Optional options used to concatenate into the final + URL + """ if "#" in path: path = path.replace("#", "%23") diff --git a/coc/iterators.py b/coc/iterators.py index 68959103..7b07db79 100644 --- a/coc/iterators.py +++ b/coc/iterators.py @@ -171,7 +171,7 @@ async def _next(self): return None elif self.clan_tag is None: return war - elif war.clan_tag != self.clan_tag: + elif war._clan_tag != self.clan_tag: return await self._next() else: return war diff --git a/coc/raid.py b/coc/raid.py index 0e07cfac..6186c438 100644 --- a/coc/raid.py +++ b/coc/raid.py @@ -80,10 +80,15 @@ def __repr__(self): return "<%s %s>" % (self.__class__.__name__, " ".join("%s=%r" % t for t in attrs),) def __eq__(self, other): - return (isinstance(other, RaidMember) - and self.tag == other.tag - and self.raid_log_entry == other.raid_log_entry - and self.attacks == other.attacks) + if isinstance(other, RaidMember): + if (self.tag == other.tag + and self.attack_count == other.attack_count + and self.attack_limit == other.attack_limit + and self.bonus_attack_limit == other.bonus_attack_limit + and self.capital_resources_looted == other.capital_resources_looted + ): + return True + return False def _from_data(self, data): data_get = data.get @@ -148,12 +153,14 @@ def __repr__(self): return "<%s %s>" % (self.__class__.__name__, " ".join("%s=%r" % t for t in attrs),) def __eq__(self, other): - return (isinstance(other, self.__class__) - and self.raid_log_entry == other.raid_log_entry + if isinstance(other, RaidAttack): + if (self.attacker_tag == other.attacker_tag + and self.destruction == other.destruction and self.raid_clan == other.raid_clan and self.district == other.district - and self.attacker_tag == other.attacker_tag - and self.destruction == other.destruction) + ): + return True + return False def __init__(self, data, client, raid_log_entry, raid_clan, district): self.raid_log_entry = raid_log_entry @@ -198,6 +205,19 @@ class RaidDistrict: :class:`RaidClan` - The raid clan this district belongs to """ + def __eq__(self, other): + if isinstance(other, RaidDistrict): + if (self.id == other.id + and self.name == other.name + and self.hall_level == other.hall_level + and self.destruction == other.destruction + and self.looted == other.looted + and self.raid_clan == other.raid_clan + and self.attack_count == other.attack_count + ): + return True + return False + __slots__ = ("id", "name", "hall_level", @@ -220,13 +240,6 @@ def __repr__(self): ("destruction", self.destruction)] return "<%s %s>" % (self.__class__.__name__, " ".join("%s=%r" % t for t in attrs),) - def __eq__(self, other): - return isinstance(other, self.__class__) and \ - self.id == other.id and \ - self.attack_count == other.attack_count and \ - self.destruction == other.destruction and \ - self.looted == other.looted and \ - self.hall_level == other.hall_level def __init__(self, *, data, client, raid_log_entry, raid_clan): self.id: int = data.get("id") @@ -302,11 +315,11 @@ def __init__(self, *, data, client, raid_log_entry, **_): def __eq__(self, other): return (isinstance(other, RaidClan) - and self.raid_log_entry == other.raid_log_entry and self.tag == other.tag and self.attack_count == other.attack_count and self.district_count == other.district_count and self.destroyed_district_count == other.destroyed_district_count + and self.raid_log_entry.start_time == other.raid_log_entry.start_time and self.attacks == other.attacks) def __repr__(self): @@ -412,11 +425,18 @@ def __repr__(self): return "<%s %s>" % (self.__class__.__name__, " ".join("%s=%r" % t for t in attrs),) def __eq__(self, other): - return (isinstance(other, RaidLogEntry) - and self.start_time == other.start_time - and self._attack_log == other.attack_log - and self._defense_log == other.defense_log - and self.members == other.members) + if isinstance(other, RaidLogEntry): + if (self.start_time == other.start_time + and self.completed_raid_count == other.completed_raid_count + and self.destroyed_district_count == other.destroyed_district_count + and self.attack_count == other.attack_count + and self.attack_log == other.attack_log + and self.defense_log == other.defense_log + ): + return True + + return False + def _from_data(self, data: dict) -> None: data_get = data.get @@ -475,41 +495,13 @@ def get_member(self, tag: str) -> typing.Optional[RaidMember]: except KeyError: return None - -class RaidLog: - """Represents a Generator for a RaidLog""" - - def __init__(self, clan_tag, client, data, cls): - self.clan_tag = clan_tag - self.data = data.get("items", []) - self.client = client - self.cls = cls - self.global_index = 0 - self.max_index = len(self.data) - self.next_page = data.get("paging").get("cursors").get("after", "") - - def __getitem__(self, item: int): - if self.global_index > item: - data = self.client.loop.run_until_complete(self.client.http.get_clan_raidlog(self.clan_tag, limit=item+1)) - self.data = data.get("items", []) - self.max_index = len(self.data) - self.next_page = data.get("paging").get("cursors").get("after", "") - self.global_index = 0 - return_value = self.cls(data=self.data[item], client=self.client) - elif self.global_index + self.max_index <= item and not self.next_page: - raise IndexError() - elif self.next_page and self.global_index + self.max_index <= item: - data = self.client.loop.run_until_complete(self.client.http.get_clan_raidlog(self.clan_tag, - after=self.next_page, - limit=item-self.global_index)) - self.data = data.get("items", []) - self.global_index += self.max_index - self.max_index = len(self.data) - self.next_page = data.get("paging").get("cursors").get("after", "") - return_value = self.cls(data=self.data[item-self.global_index], client=self.client) - elif self.global_index < item: - return_value = self.cls(data=self.data[item-self.global_index], client=self.client) - else: - return_value = self.cls(data=self.data[item], client=self.client) - return return_value +# def _logs_same(self_log: List[RaidLogEntry], other_log: List[RaidLogEntry]): +# for s_log, o_log in zip(self_log, other_log): +# if not (s_log. == o_log.tag +# and s_log.name == o_log.name +# and s_log.raid_log_entry == o_log.raid_log_entry +# +# ): +# return False +# return True diff --git a/coc/wars.py b/coc/wars.py index 774d33e3..89f5943c 100644 --- a/coc/wars.py +++ b/coc/wars.py @@ -21,9 +21,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ +# Enables circular import for type hinting coc.Client +from __future__ import annotations + +import asyncio import itertools +from abc import ABC, abstractmethod -from typing import AsyncIterator, List, Optional, Type, TYPE_CHECKING +from typing import AsyncIterator, List, Optional, Type, TYPE_CHECKING, Union from .enums import WarRound from .iterators import LeagueWarIterator @@ -35,6 +40,7 @@ if TYPE_CHECKING: # pylint: disable=cyclic-import from .war_members import ClanWarMember # noqa + from .client import Client class ClanWar: @@ -94,7 +100,8 @@ def _from_data(self, data: dict) -> None: data_get = data.get self.state: str = data_get("state") - self.preparation_start_time = try_enum(Timestamp, data=data_get("preparationStartTime")) + self.preparation_start_time = try_enum(Timestamp, data=data_get( + "preparationStartTime")) self.start_time = try_enum(Timestamp, data=data_get("startTime")) self.end_time = try_enum(Timestamp, data=data_get("endTime")) self.war_tag: str = data_get("tag") @@ -103,28 +110,35 @@ def _from_data(self, data: dict) -> None: else: self.attacks_per_member: int = data_get("attacksPerMember") - self.team_size: int = data_get("teamSize") or len(data_get("clan", {}).get("members", [])) + self.team_size: int = data_get("teamSize") or len( + data_get("clan", {}).get("members", [])) clan_data = data_get("clan") # annoying bug where if you request a war with a clan tag that clan could be the opponent or clan, # depending on the way the game stores it internally. This isn't very helpful as we always want it # from the perspective of the tag we provided, so switch them around if it isn't correct. if clan_data and clan_data.get("tag", self.clan_tag) == self.clan_tag: - self.clan = try_enum(WarClan, data=clan_data, client=self._client, war=self) - self.opponent = try_enum(WarClan, data=data_get("opponent"), client=self._client, war=self) + self.clan = try_enum(WarClan, data=clan_data, client=self._client, + war=self) + self.opponent = try_enum(WarClan, data=data_get("opponent"), + client=self._client, war=self) else: - self.clan = try_enum(WarClan, data=data_get("opponent"), client=self._client, war=self) - self.opponent = try_enum(WarClan, data=clan_data, client=self._client, war=self) + self.clan = try_enum(WarClan, data=data_get("opponent"), + client=self._client, war=self) + self.opponent = try_enum(WarClan, data=clan_data, + client=self._client, war=self) @property def attacks(self) -> List[WarAttack]: """List[:class:`WarAttack`]: Returns all attacks this war, sorted by attack order.""" - return sorted([*self.clan.attacks, *self.opponent.attacks], key=lambda x: x.order, reverse=True) + return sorted([*self.clan.attacks, *self.opponent.attacks], + key=lambda x: x.order, reverse=True) @property def members(self) -> List["ClanWarMember"]: """List[:class:`ClanWarMember`]: A list of members that are in the war.""" - return sorted([*self.clan.members, *self.opponent.members], key=lambda x: (not x.is_opponent, x.map_position)) + return sorted([*self.clan.members, *self.opponent.members], + key=lambda x: (not x.is_opponent, x.map_position)) @property def type(self) -> Optional[str]: @@ -156,7 +170,8 @@ def type(self) -> Optional[str]: 20 * 60 * 60, 24 * 60 * 60, ] - if (self.start_time.time - self.preparation_start_time.time).seconds in prep_list: + if ( + self.start_time.time - self.preparation_start_time.time).seconds in prep_list: return "friendly" return "random" @@ -246,7 +261,8 @@ def get_member_by(self, **attrs) -> Optional["ClanWarMember"]: """ return get(self.members, **attrs) - def get_attack(self, attacker_tag: str, defender_tag: str) -> Optional[WarAttack]: + def get_attack(self, attacker_tag: str, defender_tag: str) -> Optional[ + WarAttack]: """Return the :class:`WarAttack` with the attacker tag and defender tag provided. If the attack was not found, this will return ``None``. @@ -275,9 +291,11 @@ def get_defenses(self, defender_tag: str) -> List[WarAttack]: # we could do a global lookup on all attacks in the war but this is faster as we have to lookup half the attacks if defender.is_opponent: # we need to get home clan's attacks on this base - return list(filter(lambda x: x.defender_tag == defender_tag, self.clan.attacks)) + return list(filter(lambda x: x.defender_tag == defender_tag, + self.clan.attacks)) - return list(filter(lambda x: x.defender_tag == defender_tag, self.opponent.attacks)) + return list(filter(lambda x: x.defender_tag == defender_tag, + self.opponent.attacks)) class ClanWarLogEntry: @@ -310,13 +328,25 @@ class ClanWarLogEntry: :class:`int`: The number of attacks each member had this war. """ - __slots__ = ("result", "end_time", "team_size", "clan", "opponent", "_client", "attacks_per_member") + __slots__ = ( + "result", "end_time", "team_size", "clan", "opponent", "_client", + "attacks_per_member") def __init__(self, *, data, client, **_): self._client = client - self._from_data(data) + def __eq__(self, other) -> bool: + if isinstance(other, self.__class__): + if self.clan == other.clan \ + and self.opponent == other.opponent \ + and self.result == other.result \ + and self.end_time == other.end_time \ + and self.attacks_per_member == other.attacks_per_member: + return True + + return False + def _from_data(self, data: dict) -> None: data_get = data.get @@ -333,7 +363,8 @@ def _from_data(self, data: dict) -> None: self.attacks_per_member: int = data_get("attacksPerMember") def _fake_load_clan(self, data): - if not (data and data.get("tag")): # CWL seasons have an opposition with only badges and no tag/name. + if not (data and data.get( + "tag")): # CWL seasons have an opposition with only badges and no tag/name. return None data["teamSize"] = self.team_size @@ -345,44 +376,6 @@ def is_league_entry(self) -> bool: return self.result is None or self.opponent is None -class ClanWarLog: - """Represents a Generator for a ClanWarLog""" - - def __init__(self, clan_tag, client, data, cls): - self.clan_tag = clan_tag - self.data = data.get("items", []) - self.client = client - self.cls = cls - self.global_index = 0 - self.max_index = len(self.data) - self.next_page = data.get("paging").get("cursors").get("after", "") - - def __getitem__(self, item: int): - if self.global_index > item: - data = self.client.loop.run_until_complete(self.client.http.get_clan_raidlog(self.clan_tag, limit=item+1)) - self.data = data.get("items", []) - self.max_index = len(self.data) - self.next_page = data.get("paging").get("cursors").get("after", "") - self.global_index = 0 - return_value = self.cls(data=self.data[item], client=self.client) - elif self.global_index + self.max_index <= item and not self.next_page: - raise IndexError() - elif self.next_page and self.global_index + self.max_index <= item: - data = self.client.loop.run_until_complete(self.client.http.get_clan_raidlog(self.clan_tag, - after=self.next_page, - limit=item-self.global_index)) - self.data = data.get("items", []) - self.global_index += self.max_index - self.max_index = len(self.data) - self.next_page = data.get("paging").get("cursors").get("after", "") - return_value = self.cls(data=self.data[item-self.global_index], client=self.client) - elif self.global_index < item: - return_value = self.cls(data=self.data[item-self.global_index], client=self.client) - else: - return_value = self.cls(data=self.data[item], client=self.client) - return return_value - - class ClanWarLeagueGroup: """Represents a Clan War League (CWL) Group @@ -406,14 +399,18 @@ class ClanWarLeagueGroup: """ - __slots__ = ("state", "season", "rounds", "number_of_rounds", "_client", "__iter_clans", "_cs_clans") + __slots__ = ( + "state", "season", "rounds", "number_of_rounds", "_client", + "__iter_clans", + "_cs_clans") def __repr__(self): attrs = [ ("state", self.state), ("season", self.season), ] - return "<%s %s>" % (self.__class__.__name__, " ".join("%s=%r" % t for t in attrs),) + return "<%s %s>" % ( + self.__class__.__name__, " ".join("%s=%r" % t for t in attrs),) def __init__(self, *, data, client, **_): self._client = client @@ -429,16 +426,19 @@ def _from_data(self, data: dict) -> None: self.number_of_rounds: int = len(rounds) # the API returns a list and the rounds that haven't started contain war tags of #0 (not sure why)... # we want to get only the valid rounds - self.rounds: List[List[str]] = [n["warTags"] for n in rounds if n["warTags"][0] != "#0"] + self.rounds: List[List[str]] = [n["warTags"] for n in rounds if + n["warTags"][0] != "#0"] - self.__iter_clans = (ClanWarLeagueClan(data=data, client=self._client) for data in data_get("clans", [])) + self.__iter_clans = (ClanWarLeagueClan(data=data, client=self._client) + for data in data_get("clans", [])) @cached_property("_cs_clans") def clans(self) -> List[ClanWarLeagueClan]: """List[:class:`LeagueClan`]: Returns all participating clans.""" return list(self.__iter_clans) - def get_wars_for_clan(self, clan_tag: str, cls: Type[ClanWar] = ClanWar) -> AsyncIterator[ClanWar]: + def get_wars_for_clan(self, clan_tag: str, cls: Type[ClanWar] = ClanWar) -> \ + AsyncIterator[ClanWar]: """Returns every war the clan has participated in this current CWL. This returns a :class:`LeagueWarIterator` which fetches all wars in parallel. @@ -465,10 +465,13 @@ def get_wars_for_clan(self, clan_tag: str, cls: Type[ClanWar] = ClanWar) -> Asyn :class:`ClanWar` A war in the current CWL season with the clan in it.. """ - return LeagueWarIterator(client=self._client, tags=itertools.chain(*self.rounds), clan_tag=clan_tag, cls=cls) + return LeagueWarIterator(client=self._client, + tags=itertools.chain(*self.rounds), + clan_tag=clan_tag, cls=cls) def get_wars( - self, cwl_round: WarRound = WarRound.current_war, cls: Type[ClanWar] = ClanWar + self, cwl_round: WarRound = WarRound.current_war, + cls: Type[ClanWar] = ClanWar ) -> AsyncIterator[ClanWar]: """Returns war information for every war in a league round. diff --git a/examples/discord_bot.py b/examples/discord_bot.py index a230aee4..b2d16c26 100644 --- a/examples/discord_bot.py +++ b/examples/discord_bot.py @@ -138,6 +138,7 @@ async def member_stat(ctx, player_tag): @bot.command() async def clan_info(ctx, clan_tag): + if not utils.is_valid_tag(clan_tag): await ctx.send("You didn't give me a proper tag!") return @@ -255,24 +256,17 @@ async def current_war_status(ctx, clan_tag): async def main(): logging.basicConfig(level=logging.ERROR) - coc_client = coc.Client() - - # Attempt to log into CoC API using your credentials. - try: - await coc_client.login(os.environ.get("DEV_SITE_EMAIL"), - os.environ.get("DEV_SITE_PASSWORD")) - except coc.InvalidCredentials as error: - exit(error) + async with coc.Client() as coc_client: + # Attempt to log into CoC API using your credentials. + try: + await coc_client.login(os.environ.get("DEV_SITE_EMAIL"), + os.environ.get("DEV_SITE_PASSWORD")) + except coc.InvalidCredentials as error: + exit(error) - # Add the client session to the bot - bot.coc_client = coc_client - - try: - # Run the bot + # Add the client session to the bot + bot.coc_client = coc_client await bot.start(os.environ.get("DISCORD_BOT_TOKEN")) - finally: - # When done, do not forget to clean up after yourself! - await coc_client.close() if __name__ == "__main__": diff --git a/examples/discord_bot_with_cogs.py b/examples/discord_bot_with_cogs.py index 2e387601..6b09c765 100644 --- a/examples/discord_bot_with_cogs.py +++ b/examples/discord_bot_with_cogs.py @@ -223,29 +223,24 @@ async def on_clan_trophy_change(old_clan, new_clan): async def main(): - coc_client = coc.EventsClient() + async with coc.EventsClient() as coc_client: - # Attempt to log into CoC API using your credentials. To enable events, - # you must use the coc.EventsClient class - try: - await coc_client.login(os.environ.get("DEV_SITE_EMAIL"), - os.environ.get("DEV_SITE_PASSWORD")) - except coc.InvalidCredentials as error: - exit(error) - - # Instantiate your custom bot class that inherits from the discord bot - # notice that we added coc_client into the bot. This will give us access - # to coc_client from all our discord bot commands - bot = CoCBot(command_prefix="?", - intents=discord.Intents.all(), - coc_client=coc_client) + # Attempt to log into CoC API using your credentials. To enable events, + # you must use the coc.EventsClient class + try: + await coc_client.login(os.environ.get("DEV_SITE_EMAIL"), + os.environ.get("DEV_SITE_PASSWORD")) + except coc.InvalidCredentials as error: + exit(error) + + # Instantiate your custom bot class that inherits from the discord bot + # notice that we added coc_client into the bot. This will give us access + # to coc_client from all our discord bot commands + bot = CoCBot(command_prefix="?", + intents=discord.Intents.all(), + coc_client=coc_client) - try: - # Run the bot await bot.start(os.environ.get("DISCORD_BOT_TOKEN")) - finally: - # When done, do not forget to clean up after yourself! - await coc_client.close() if __name__ == "__main__": diff --git a/examples/war_logs.py b/examples/war_logs.py index 72eae883..cc35e8db 100644 --- a/examples/war_logs.py +++ b/examples/war_logs.py @@ -2,6 +2,7 @@ import os import coc +from coc.raid import RaidLogEntry async def get_warlog_for_clans(client: coc.Client, clan_tags: list): @@ -19,6 +20,86 @@ async def get_warlog_for_clans(client: coc.Client, clan_tags: list): return war_logs +async def test_raidlog(client: coc.Client, clan_tag: str): + # Limit is set to None retrieving all values + raid_no_page = await client.get_raidlog(clan_tag) + limit = len(raid_no_page) + page_limit = 30 + + # Enable pagination, by default it will only cache 10 logs using limit + # once you iterate beyond the cached amount, it will fetch the next set + raid_with_page = await client.get_raidlog(clan_tag, page=True, limit=page_limit) + + # Iterate over warlogs like the current version of coc.py + for i, e in enumerate(raid_no_page): + e: RaidLogEntry + print(f"[{i}]-sync limit: {limit} page: False {e.start_time.time}") + + # Option to async for loop a non paginated object + count = 0 + async for i in raid_no_page: + print(f"[{count}]-async limit: {limit} page: False") + count += 1 + + for i, e in enumerate(raid_with_page): + print(f"[{i}]-sync limit: {page_limit} page: True") + + # Set `paginate=True` to enable fetching beyond the limit value until + # there are more values to fetch + count = 0 + async for i in raid_with_page: + print(f"[{count}]-async limit: {page_limit} page: True {i.start_time.time}") + count += 1 + + + # Simple test comparing the two data sets + count = 0 + async for async_log in raid_with_page: + if async_log != raid_no_page[count]: + raise AssertionError(f"{id(async_log)} does not match {id(raid_no_page[count])} at index {count}") + count += 1 + +async def test_warlog(client: coc.Client, clan_tag: str): + + # Limit is set to None retrieving all values + warlogs_no_page = await client.get_warlog(clan_tag) + limit = len(warlogs_no_page) + pagination_limit = 11 + + # Enable pagination, by default it will only cache 10 logs using limit + # once you iterate beyond the cached amount, it will fetch the next set + warlogs_with_page = await client.get_warlog(clan_tag, page=True, limit=pagination_limit) + + # Iterate over warlogs like the current version of coc.py + for i, e in enumerate(warlogs_no_page): + print(f"[{i}]-sync limit: {limit} page: False") + + # Option to async for loop a non paginated object + count = 0 + async for i in warlogs_no_page: + print(f"[{count}]-async limit: {limit} page: False") + count += 1 + + for i, e in enumerate(warlogs_with_page): + print(f"[{i}]-sync limit: {pagination_limit} page: True") + + # Set `paginate=True` to enable fetching beyond the limit value until + # there are more values to fetch + count = 0 + async for i in warlogs_with_page: + print(f"[{count}]-async limit: {pagination_limit} page: True {i.end_time.time}") + count += 1 + + # Simple test comparing the two data sets + count = 0 + async for async_log in warlogs_with_page: + if async_log != warlogs_no_page[count]: + raise AssertionError(f"{id(async_log)} does not match {id(warlogs_no_page[count])} at index {count}") + count += 1 + print(count) + + + async def get_clan_tags_names(client: coc.Client, name: str, limit: int): clans = await client.search_clans(name=name, limit=limit) # return a list of tuples of name/tag pair ie. @@ -34,25 +115,26 @@ async def get_warlog_opponents_from_clan_name(client: coc.Client, name: str, no_ for name, tag in clan_tags_names: # iterate over the wars - for war in war_logs[tag]: + for war_log in war_logs[tag]: # if it is a league war we will error below because it does not # return a WarLog object, and thus no opponent - if war.is_league_entry: + if war_log.is_league_entry: print("League War Season - No opponent info available") else: - print(f"War: {war.clan.name} vs {war.opponent.name}") + print(f"War: {war_log.clan.name} vs {war_log.opponent.name}") async def main(): - coc_client = coc.Client() - try: - await coc_client.login(os.environ.get("DEV_SITE_EMAIL"), - os.environ.get("DEV_SITE_PASSWORD")) - except coc.InvalidCredentials as error: - exit(error) - - await get_warlog_opponents_from_clan_name(coc_client, "Reddit Zulu", 5) - await coc_client.close() + async with coc.Client() as coc_client: + try: + await coc_client.login(os.environ.get("DEV_SITE_EMAIL"), + os.environ.get("DEV_SITE_PASSWORD")) + except coc.InvalidCredentials as error: + exit(error) + + await get_warlog_opponents_from_clan_name(coc_client, "Reddit Zulu", 5) + await test_warlog(coc_client, "#2Y28CGP8") + await test_raidlog(coc_client, "#2Y28CGP8") if __name__ == "__main__": diff --git a/setup.py b/setup.py index 3ee9f557..0cdaf0e0 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with open(os.path.join(os.getcwd(), "requirements.txt")) as f: REQUIREMENTS = f.read().splitlines() -VERSION = "2.1.3" +VERSION = "2.2.0" if "a" in VERSION: VERSION += "+" + subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]).decode("utf-8").strip()