From 413b585b306304e970cefd6c71182f47bdd95b52 Mon Sep 17 00:00:00 2001 From: "Alan D. Tse" Date: Fri, 19 Feb 2021 00:08:53 -0800 Subject: [PATCH] feat: add mfa support This is untested but is based on a working example. https://github.com/timdorr/tesla-api/discussions/258 --- teslajsonpy/connection.py | 75 ++++++++++++++++++++++++++++++++++++--- teslajsonpy/exceptions.py | 8 +++-- 2 files changed, 76 insertions(+), 7 deletions(-) diff --git a/teslajsonpy/connection.py b/teslajsonpy/connection.py index 741fb623..f930226e 100644 --- a/teslajsonpy/connection.py +++ b/teslajsonpy/connection.py @@ -367,7 +367,14 @@ async def _process_messages() -> None: # } # ) - async def get_authorization_code(self, email, password) -> Text: + async def get_authorization_code( + self, + email: Text, + password: Text, + mfa_code: Text = "", + mfa_device: int = 0, + retry_limit: int = 3, + ) -> Text: """Get authorization code from the oauth3 login method.""" # https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code if not (email and password): @@ -378,10 +385,68 @@ async def get_authorization_code(self, email, password) -> Text: html = await resp.text() soup: BeautifulSoup = BeautifulSoup(html, "html.parser") data = get_inputs(soup) - data["identity"] = self.email - data["credential"] = self.password - resp = await self.websession.post(url, data=data) - _process_resp(resp) + data["identity"] = email + data["credential"] = password + transaction_id: Text = data.get("transaction_id") + for attempt in range(retry_limit): + _LOGGER.debug("Attempt #%s", attempt) + resp = await self.websession.post(url, data=data) + _process_resp(resp) + if not resp.history: + html = await resp.text() + if "/mfa/verify" in html: + mfa_resp = await self.websession.get( + "https://auth.tesla.com/oauth2/v3/authorize/mfa/factors", + params={"transaction_id": transaction_id}, + ) + _process_resp(mfa_resp) + # { + # "data": [ + # { + # "dispatchRequired": false, + # "id": "X-4Y-44e4-b9a4-54e114a13c40", + # "name": "Pixel", + # "factorType": "token:software", + # "factorProvider": "TESLA", + # "securityLevel": 1, + # "activatedAt": "2021-02-10T23:53:40.000Z", + # "updatedAt": "2021-02-10T23:54:20.000Z" + # } + # ] + # } + mfa_json = await mfa_resp.json() + if len(mfa_json.get("data", [])) > 1: + factor_id = mfa_json["data"][mfa_device]["id"] + if not mfa_code: + _LOGGER.debug("No MFA provided") + _LOGGER.debug("MFA Devices: %s", mfa_json["data"]) + raise IncompleteCredentials( + "MFA Code missing", devices=mfa_json["data"] + ) + mfa_resp = await self.websession.post( + "https://auth.tesla.com/oauth2/v3/authorize/mfa/verify", + json={ + "transaction_id": transaction_id, + "factor_id": factor_id, + "passcode": mfa_code, + }, + ) + _process_resp(mfa_resp) + mfa_json = await mfa_resp.json() + if not ( + mfa_json["data"].get("approved") + and mfa_json["data"].get("valid") + ): + _LOGGER.debug("MFA Code invalid") + raise IncompleteCredentials( + "MFA Code invalid", devices=mfa_json["data"] + ) + resp = await self.websession.post(url, data=data) + _process_resp(resp) + await asyncio.sleep(3) + if not (resp.history): + _LOGGER.debug("Failed to authenticate") + raise IncompleteCredentials("Unable to login with credentials") code_url = URL(resp.history[-1].url) return code_url.query.get("code") diff --git a/teslajsonpy/exceptions.py b/teslajsonpy/exceptions.py index 7cfd5fbc..279db4df 100644 --- a/teslajsonpy/exceptions.py +++ b/teslajsonpy/exceptions.py @@ -6,6 +6,7 @@ https://github.com/zabuldon/teslajsonpy """ import logging +from typing import Any, Dict, Text _LOGGER = logging.getLogger(__name__) @@ -13,7 +14,7 @@ class TeslaException(Exception): """Class of Tesla API exceptions.""" - def __init__(self, code, *args, **kwargs): + def __init__(self, code: Text, *args, **kwargs): """Initialize exceptions for the Tesla API.""" self.message = "" super().__init__(*args, **kwargs) @@ -52,7 +53,10 @@ class RetryLimitError(TeslaException): class IncompleteCredentials(TeslaException): """Class of exceptions for incomplete credentials.""" - pass + def __init__(self, code: Text, *args, devices: Dict[Any, Any] = None, **kwargs): + """Initialize exception to include list of devices.""" + super().__init__(code, *args, **kwargs) + self.devices = devices or {} class UnknownPresetMode(TeslaException):