diff --git a/robinhood/robinhood.py b/robinhood/robinhood.py index cd27ae3..705fb5b 100644 --- a/robinhood/robinhood.py +++ b/robinhood/robinhood.py @@ -1,6 +1,7 @@ import json import requests import getpass +import os session = requests.Session() account_url = None @@ -11,6 +12,8 @@ def oauth(payload): url = 'https://api.robinhood.com/oauth2/token/' r = session.post(url, json=payload) + if r.status_code == 500: + raise RuntimeError('Missing or incorrect credentials.') session.headers.pop('challenge_id', None) response = r.json() @@ -27,26 +30,35 @@ def oauth(payload): return r -def login(username=None, password=None, device_token='c77a7142-cc14-4bc0-a0ea-bdc9a2bf6e68'): - """generates OAuth2 bearer token""" +def login(username: str = None, password: str = None, device_token: str = 'c77a7142-cc14-4bc0-a0ea-bdc9a2bf6e68', + bearer_token: str = None, no_input: bool = False) -> str: + """generates and returns OAuth2 bearer token""" global session - # check if bearer token exists and is valid - with open('tokens.json') as file: - try: - tokens = json.loads(file.read()) - if 'bearer_token' in tokens: - session.headers['Authorization'] = 'Bearer ' + tokens['bearer_token'] - if user(): - return - else: - del session.headers['Authorization'] - except json.decoder.JSONDecodeError: - pass + if bearer_token is not None: + session.headers['Authorization'] = 'Bearer ' + bearer_token + if user(): + return bearer_token + else: + print('Invalid/expired bearer token') + # check if bearer token exists and is valid. create tokens.json if does not exist. + if os.path.isfile('tokens.json'): + with open('tokens.json', 'r') as file: + try: + tokens = json.loads(file.read()) + if 'bearer_token' in tokens: + bearer_token = tokens['bearer_token'] + session.headers['Authorization'] = 'Bearer ' + bearer_token + if user(): + return bearer_token + else: + del session.headers['Authorization'] + except json.decoder.JSONDecodeError: + pass - if username is None: + if username is None and not no_input: username = input('Enter email or username: ') - if password is None: + if password is None and not no_input: password = getpass.getpass('Enter password: ') payload = { @@ -59,13 +71,15 @@ def login(username=None, password=None, device_token='c77a7142-cc14-4bc0-a0ea-bd r = oauth(payload) if r.status_code == 400: - if r.json()['detail'] == 'Request blocked, challenge type required.': + r = r.json() + if 'detail' in r and r['detail'] == 'Request blocked, challenge type required.': challenge_type = None while challenge_type not in ['1', '2']: print('Unfamiliar device detected.') - challenge_type = input("We're sending you a code to verify your login. Do you want us to:\n" - " 1: Text you the code\n" - " 2: Email it to you?\n") + challenge_type = '1' if no_input else \ + input("We're sending you a code to verify your login. Do you want us to:\n" + " 1: Text you the code\n" + " 2: Email it to you?\n") if challenge_type == '1': print('Texting...') payload['challenge_type'] = 'sms' @@ -94,10 +108,11 @@ def login(username=None, password=None, device_token='c77a7142-cc14-4bc0-a0ea-bd del session.headers['X-ROBINHOOD-CHALLENGE-RESPONSE-ID'] else: raise RuntimeError('Unable to log in with provided credentials.') - user() + + return r.json()['access_token'] -def user(): +def user() -> bool: """checks whether user is logged in""" url = "https://api.robinhood.com/user/" r = session.get(url) diff --git a/setup.py b/setup.py index a15446e..eb85adb 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ description="the unofficial Robinhood API", long_description=long_description, long_description_content_type="text/markdown", - version="0.0.3", + version="0.0.4", url="https://github.com/james-yun/robinhood", author="James Yun", author_email="jameswyun99@gmail.com",