diff --git a/History.md b/History.md index ce92cf2..0c31452 100644 --- a/History.md +++ b/History.md @@ -1,3 +1,9 @@ +0.5.0 / 2013-06-09 +================== + * Replaced: urllib2 with requests package + * Replaced: oauth2 with requests_oauthlib package + * Added: optional timeout parameter + 0.4.0 / 2013-07-23 ================== * Added handling for API v2 diff --git a/README.md b/README.md index 3fb1da6..094821f 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,43 @@ # ox3apiclient -A small class to help connect to the OpenX Enterprise API. While it uses [oauth2](https://github.com/simplegeo/python-oauth2), -it does not use [httplib2](http://code.google.com/p/httplib2/) as the transport due to issues with headers created by -httplib2. Instead it uses urllib2 as the HTTP transport. +A small class to help connect to the OpenX Enterprise API. As of version 0.5.0 it uses +[requests_oauthlib](https://github.com/requests/requests-oauthlib) instead of oauth2. -It currently supports Python 2.4 - 2.7, with 3.x support comming in the future. +It currently supports Python 2.4 - 2.7, with 3.x support coming in the future. As of version 0.4.0, ox3apiclient supports API v2. If your instance is v2, set the api_path option to "/ox/4.0". -Basic usage: +As of version 0.5.0 the client.request method returns a requests.Response object instead of +urllib2.Response and throws a requests.exceptions.HTTPError instead of urllib2.HTTPError. +In addition debugging is now available via the standard python logging facility. + +See the [requests documentation](http://docs.python-requests.org/en/latest/) for details. + +Basic usage with debugging enabled: ````python import ox3apiclient +import logging ox = ox3apiclient.client_from_file().logon() -account_ids = ox.get('/a/account') +ox.logger.setLevel(logging.DEBUG) +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +ox.logger.addHandler(ch) + +accounts = ox.get('/account') order = { 'status': 'Active', 'name': 'OX3APIClient Object Creation Test', - 'account_id': account_ids[0], - 'start_date': '2012-08-22 00:00:00'} + 'account_uid': accounts['objects'][0]['account_uid'], + 'start_date': '2015-06-01 00:00:00'} -new_order = ox.post('/a/order', data=order) +new_order = ox.post('/order', data=order) -ox.delete('/a/order/%s' % new_order['id']) +ox.delete('/order/%s' % new_order['uid']) ox.logoff() ```` @@ -34,14 +45,14 @@ ox.logoff() ## Installation -Install from [PyPi](http://pypi.python.org/pypi) with [pip](http://www.pip-installer.org/en/latest/index.html) +ox3apiclient is currently unavailable at [PyPi](http://pypi.python.org/pypi) so just clone our git repo: ```` -$ pip install ox3apiclient +$ git clone https://github.com/openx/OX3-Python-API-Client.git ```` -This should install the [oauth2](https://github.com/simplegeo/python-oauth2) dependency, but you can manually install if needed. +Install requests and requests_oauthlib from [PyPi](http://pypi.python.org/pypi) with [pip](http://www.pip-installer.org/en/latest/index.html) ```` -$ pip install oauth2 +$ pip install requests requests_oauthlib ```` Note that Python 2.4 and 2.5 support requires simplejson. You will need @@ -104,4 +115,4 @@ ox = ox3apiclient.Client( consumer_secret=consumer_secret) ox.logon(email, password) -```` \ No newline at end of file +```` diff --git a/ox3apiclient/__init__.py b/ox3apiclient/__init__.py index 7cdec18..1f4b71f 100644 --- a/ox3apiclient/__init__.py +++ b/ox3apiclient/__init__.py @@ -2,7 +2,9 @@ import ConfigParser import cookielib +import logging import mimetypes +from pprint import pformat import random # json module is not supported in versions of Python < 2.6 so try to load the @@ -17,13 +19,8 @@ else: import json -if major_py_version == 2 and minor_py_version > 4: - import oauth2 as oauth -else: - import oauth2_version as oauth - -import urllib -import urllib2 +import requests +from requests_oauthlib import OAuth1 # parse_qs is in the urlparse module as of 2.6, but in cgi in earlier versions. if major_py_version == 2 and minor_py_version > 5: @@ -31,9 +28,9 @@ else: from cgi import parse_qs -import urlparse +from urlparse import urlparse -__version__ = '0.4.0' +__version__ = '0.5.0' REQUEST_TOKEN_URL = 'https://sso.openx.com/api/index/initiate' ACCESS_TOKEN_URL = 'https://sso.openx.com/api/index/token' @@ -42,13 +39,20 @@ API_PATH_V2 = '/ox/4.0' API_PATH_SSO = '/api' ACCEPTABLE_PATHS = (API_PATH_V1, API_PATH_V2, API_PATH_SSO) -JSON_PATHS = (API_PATH_V2,) +JSON_PATHS = (API_PATH_V2) HTTP_METHOD_OVERRIDES = ['DELETE', 'PUT', 'OPTIONS'] + class UnknownAPIFormatError(ValueError): """Client is passed an unrecognized API path that it cannot handle.""" pass + +class OAuthException(Exception): + """Client encountered an Oauth error.""" + pass + + class Client(object): """Client for making requests to the OX3 API. Maintains authentication and points all requests at a domain+path @@ -58,6 +62,7 @@ class Client(object): """ + def __init__(self, domain, realm, consumer_key, consumer_secret, callback_url='oob', scheme='http', @@ -69,7 +74,8 @@ def __init__(self, domain, realm, consumer_key, consumer_secret, password=None, http_proxy=None, https_proxy=None, - headers={}): + headers=None, + timeout=None): """ domain -- Your UI domain. The API is accessed off this domain. @@ -84,6 +90,7 @@ def __init__(self, domain, realm, consumer_key, consumer_secret, api_path -- Only override for debugging. http_proxy -- Optional proxy to send HTTP requests through. headers -- list of headers to send with the request + timeout -- http request timeout in seconds. """ self.domain = domain @@ -95,7 +102,7 @@ def __init__(self, domain, realm, consumer_key, consumer_secret, self.access_token_url = access_token_url self.authorization_url = authorization_url self.api_path = api_path - self.headers = headers + self.timeout = timeout # Validate API path: if api_path not in ACCEPTABLE_PATHS: @@ -109,124 +116,79 @@ def __init__(self, domain, realm, consumer_key, consumer_secret, self._email = email self._password = password - # You shouldn't need to access the oauth2 consumer and token objects - # directly so we'll keep them "private". - self._consumer = oauth.Consumer(self.consumer_key, self.consumer_secret) + # You shouldn't need to access the token and session objects directly so we'll keep them private. self._token = None - - # Similarly you probably won't need to access the cookie jar directly, - # so it is private as well. - self._cookie_jar = cookielib.LWPCookieJar() - opener = \ - urllib2.build_opener(urllib2.HTTPCookieProcessor(self._cookie_jar)) - # Add an HTTP[S] proxy if necessary: - proxies = {} + self._session = requests.Session() + # set supplied headers and proxies + if headers: + self._session.headers.update(headers) if http_proxy: - proxies['http'] = http_proxy + self._session.proxies.update({'http': http_proxy}) if https_proxy: - proxies['https'] = https_proxy - if proxies: - proxy_handler = urllib2.ProxyHandler(proxies) - opener.add_handler(proxy_handler) - - urllib2.install_opener(opener) - - def _sign_request(self, req): - """Utility method to sign a request.""" - parameters = {'oauth_callback': self.callback_url} - headers = req.headers - data = req.data - - # Add any (POST) data to the parameters to be signed in the OAuth - # request. - if data: - parameters.update(data) - - # Create a temporary oauth2 Request object and sign it so we can steal - # the Authorization header. - oauth_req = oauth.Request.from_consumer_and_token( - consumer=self._consumer, - token=self._token, - http_method=req.get_method(), - http_url=req.get_full_url(), - parameters=parameters, - is_form_encoded=True) - - oauth_req.sign_request( - oauth.SignatureMethod_HMAC_SHA1(), - self._consumer, - self._token) - - req.headers.update(oauth_req.to_header()) - return \ - urllib2.Request(req.get_full_url(), headers=req.headers, data=data) + self._session.proxies.update({'https': https_proxy}) + + self.logger = logging.getLogger(__name__) + + + def log_request(self, response): + self.logger.debug('====={0:=<45}'.format('OX3 api call started')) + self.logger.debug("%s %s" % (response.request.method, response.request.url)) + self.logger.debug('====={0:=<45}'.format('OX3 api call request headers')) + for k, v in response.request.headers.items(): + self.logger.debug("%s: %s" % (k, v)) + self.logger.debug('====={0:=<45}'.format('OX3 api call request body')) + self.logger.debug("%s" % response.request.body) + self.logger.debug('====={0:=<45}'.format('OX3 api call response headers')) + for k, v in response.headers.items(): + self.logger.debug("%s: %s" % (k, v)) + self.logger.debug('====={0:=<45}'.format('OX3 api call response body')) + try: + self.logger.debug(pformat(json.loads(response.content))) + except: + self.logger.debug("%s" % response.content) + self.logger.debug('====={0:=<45}'.format('OX3 api call finished')) + - def request(self, url, method='GET', headers={}, data=None, sign=False, + def request(self, url, method='GET', headers=None, data=None, sign=False, send_json=False): """Helper method to make a (optionally OAuth signed) HTTP request.""" - # Since we are using a urllib2.Request object we need to assign a value - # other than None to "data" in order to make the request a POST request, - # even if there is no data to post. - if method in ('POST', 'PUT') and not data: - data = '' - - headers = headers or self.headers - # If we're sending a JSON blob, we need to specify the header: - if method in ('POST', 'PUT') and send_json: - headers['Content-Type'] = 'application/json' - - req = urllib2.Request(url, headers=headers, data=data) - - # We need to set the request's get_method function to return a HTTP - # method for any values other than GET or POST. - if method in HTTP_METHOD_OVERRIDES: - req.get_method = lambda: method + if headers is None: + headers = {} if sign: - req = self._sign_request(req) - - # Stringify data. - if data: - # Everything needs to be UTF-8 for urlencode and json: - data_utf8 = req.get_data() - for i in data_utf8: - # Non-string ints don't have encode and can - # be handled by json.dumps already: - try: - data_utf8[i] = data_utf8[i].encode('utf-8') - except AttributeError: - pass - if send_json: - req.add_data(json.dumps(data_utf8)) - else: - req.add_data(urllib.urlencode(data_utf8)) - - # In 2.4 and 2.5, urllib2 throws errors for all non 200 status codes. - # The OpenX API uses 201 create responses and 204 for delete respones. - # We'll catch those errors and return the HTTPError object since it can - # (thankfully) be used just like a Response object. A handler is - # probably a better approach, but this is quick and works. - res = '[]' - try: - res = urllib2.urlopen(req) - except urllib2.HTTPError, err: - if err.code in [201, 204]: - res = err - else: - # TODO: Decide on format and what extra data to alert user for - # troubleshooting. - raise err - - return res + oauth = OAuth1(client_key=self.consumer_key, + resource_owner_key=self._token, + callback_uri=self.callback_url, + signature_type='query') + else: + oauth = None + if send_json: + response = self._session.request(method, self._resolve_url(url), headers=headers, + json=data, auth=oauth, timeout=self.timeout) + else: + response = self._session.request(method, self._resolve_url(url), headers=headers, + data=data, auth=oauth, timeout=self.timeout) + self.log_request(response) + response.raise_for_status() + return response def fetch_request_token(self): """Helper method to fetch and set request token. Returns token string. """ - res = self.request(url=self.request_token_url, method='POST', sign=True) - self._token = oauth.Token.from_string(res.read()) + oauth = OAuth1(client_key=self.consumer_key, + client_secret=self.consumer_secret, + callback_uri=self.callback_url, + signature_type='auth_header') + response = self._session.post(url=self.request_token_url, auth=oauth, timeout=self.timeout) + self.log_request(response) + if response.status_code != 200: + raise OAuthException("OAuth token request failed (%s) %s" % (response.status_code, response.content)) + credentials = parse_qs(response.content) + self._token = {'key': credentials['oauth_token'][0], + 'secret': credentials['oauth_token_secret'][0]} return self._token def authorize_token(self, email=None, password=None): @@ -247,27 +209,35 @@ def authorize_token(self, email=None, password=None): data = { 'email': email, 'password': password, - 'oauth_token': self._token.key} + 'oauth_token': self._token['key']} - res = self.request( - url=self.authorization_url, - method='POST', - data=data, - sign=True) + response = self._session.post(url=self.authorization_url, data=data, timeout=self.timeout) + self.log_request(response) + if response.status_code != 200: + raise OAuthException("OAuth login failed (%s) %s" % (response.status_code, response.content)) # Clear user credentials. self._email = self._password = None - - verifier = parse_qs(res.read())['oauth_verifier'][0] - self._token.set_verifier(verifier) + # set token verifier + self._token['verifier'] = parse_qs(response.content)['oauth_verifier'][0] def fetch_access_token(self): """Helper method to fetch and set access token. Returns token string. """ - res = self.request(url=self.access_token_url, method='POST', sign=True) - self._token = oauth.Token.from_string(res.read()) + oauth = OAuth1(client_key=self.consumer_key, + client_secret=self.consumer_secret, + resource_owner_key=self._token['key'], + resource_owner_secret=self._token['secret'], + verifier=self._token['verifier'], + callback_uri=self.callback_url, + signature_type='auth_header') + response = self._session.post(url=self.access_token_url, auth=oauth, timeout=self.timeout) + self.log_request(response) + if response.status_code != 200: + raise OAuthException("OAuth token verification failed (%s) %s" % (response.status_code, response.content)) + self._token = parse_qs(response.content)['oauth_token'][0] return self._token def validate_session(self): @@ -278,7 +248,7 @@ def validate_session(self): cookie = cookielib.Cookie( version=0, name='openx3_access_token', - value=self._token.key, + value=self._token, port=None, port_specified=False, domain=self.domain, @@ -292,18 +262,14 @@ def validate_session(self): comment=None, comment_url=None, rest={}) + self._session.cookies.set_cookie(cookie) - self._cookie_jar.set_cookie(cookie) # v2 doesn't need this extra step, just the cookie: if self.api_path == API_PATH_V1: - url_format = '%s://%s%s/a/session/validate' - url = url_format % (self.scheme, - self.domain, - self.api_path) - - res = self.request(url=url, method='PUT') - return res.read() + response = self._session.put(url=self._resolve_url('/a/session/validate'), timeout=self.timeout) + self.log_request(response) + return response.content def logon(self, email=None, password=None): """Returns self after authentication. @@ -324,12 +290,22 @@ def logon(self, email=None, password=None): def logoff(self): """Returns self after deleting authenticated session.""" if self.api_path == API_PATH_V1: - self.delete('/a/session') + response = self._session.delete(self._resolve_url('/a/session'), timeout=self.timeout) elif self.api_path == API_PATH_V2: - self.delete('/session') + response = self._session.delete(self._resolve_url('/session'), timeout=self.timeout) + elif self.api_path == API_PATH_SSO: + oauth = OAuth1(client_key=self.consumer_key, + resource_owner_key=self._token, + callback_uri=self.callback_url, + signature_type='query') + + response = self._session.delete(url=self.access_token_url, auth=oauth, timeout=self.timeout) + if response.status_code != 204: + raise OAuthException("OAuth token deletion failed (%s) %s" % (response.status_code, response.content)) else: raise UnknownAPIFormatError( 'Unrecognized API path: %s' % self.api_path) + self.log_request(response) return self def _resolve_url(self, url): @@ -337,7 +313,7 @@ def _resolve_url(self, url): given a full url already. """ - parse_res = urlparse.urlparse(url) + parse_res = urlparse(url) # 2.4 returns a tuple instead of ParseResult. Since ParseResult is a # subclass or tuple we can access URL components similarly across @@ -356,8 +332,13 @@ def get(self, url): """Issue a GET request to the given URL or API shorthand """ - res = self.request(self._resolve_url(url), method='GET') - return json.loads(res.read()) + response = self._session.get(self._resolve_url(url), timeout=self.timeout) + self.log_request(response) + response.raise_for_status() + try: + return response.json() + except: + return response.content def options(self, url): """Send a request with HTTP method OPTIONS to the given @@ -366,34 +347,58 @@ def options(self, url): OX3 v2 uses this method for showing help information. """ - res = self.request(self._resolve_url(url), method='OPTIONS') - return json.loads(res.read()) + response = self._session.options(self._resolve_url(url), timeout=self.timeout) + self.log_request(response) + response.raise_for_status() + try: + return response.json() + except: + return response.content def put(self, url, data=None): """Issue a PUT request to url (either a full URL or API shorthand) with the data. """ - res = self.request(self._resolve_url(url), method='PUT', data=data, - send_json=(self.api_path in JSON_PATHS)) - return json.loads(res.read()) + if self.api_path in JSON_PATHS: + response = self._session.put(self._resolve_url(url), data=json.dumps(data), timeout=self.timeout) + else: + response = self._session.put(self._resolve_url(url), data=data, timeout=self.timeout) + self.log_request(response) + response.raise_for_status() + try: + return response.json() + except: + return response.content def post(self, url, data=None): """Issue a POST request to url (either a full URL or API shorthand) with the data. """ - res = self.request(self._resolve_url(url), method='POST', data=data, - send_json=(self.api_path in JSON_PATHS)) - return json.loads(res.read()) + if self.api_path in JSON_PATHS: + response = self._session.post(self._resolve_url(url), data=json.dumps(data), timeout=self.timeout) + else: + response = self._session.post(self._resolve_url(url), data=data, timeout=self.timeout) + self.log_request(response) + response.raise_for_status() + try: + return response.json() + except: + return response.content def delete(self, url): """Issue a DELETE request to the URL or API shorthand.""" - res = self.request(self._resolve_url(url), method='DELETE') + response = requests.delete(self._resolve_url(url)) + self.log_request(response) + response.raise_for_status() # Catch no content responses from some delete actions. - if res.code == 204: - return json.loads('[]') - return json.loads(res.read()) + if response.status_code == 204: + return [] + try: + return response.json() + except: + return response.content def upload_creative(self, account_id, file_path): """Upload a media creative to the account with ID @@ -424,7 +429,6 @@ def upload_creative(self, account_id, file_path): body = '\r\n'.join(parts) - # TODO: refactor Client.request. # TODO: Catch errors in attempt to upload. headers = {'content-type': 'multipart/form-data; boundary=' + boundary} if self.api_path == API_PATH_V1: @@ -434,10 +438,13 @@ def upload_creative(self, account_id, file_path): else: raise UnknownAPIFormatError( 'Unrecognized API path: %s' % self.api_path) - req = urllib2.Request(url, headers=headers, data=body) - res = urllib2.urlopen(req) - - return json.loads(res.read()) + response = self._session.get(url, headers=headers, data=body, timeout=self.timeout) + self.log_request(response) + response.raise_for_status() + try: + return response.json() + except: + return response.content def client_from_file(file_path='.ox3rc', env=None): """Return an instance of ox3apiclient.Client with data from file_path. @@ -473,6 +480,7 @@ def client_from_file(file_path='.ox3rc', env=None): client = Client( domain=client_params['domain'], + realm=None, consumer_key=client_params['consumer_key'], consumer_secret=client_params['consumer_secret']) @@ -485,7 +493,8 @@ def client_from_file(file_path='.ox3rc', env=None): 'authorization_url', 'api_path', 'email', - 'password'] + 'password', + 'timeout'] for param in optional_params: try: @@ -504,4 +513,4 @@ def client_from_file(file_path='.ox3rc', env=None): # The exposed API has moved to using Client instead of OX3APIClient, but create # a temporary alias for backwards compatibility. -OX3APIClient = Client +OX3APIClient = Client \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py index a970c23..1934b3a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,3 +1,4 @@ # -*- coding: utf-8 -*- -from clientfromfile import * \ No newline at end of file +from clientfromfile import * +from client import * \ No newline at end of file diff --git a/tests/client.py b/tests/client.py new file mode 100644 index 0000000..8bf89f1 --- /dev/null +++ b/tests/client.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- +import ox3apiclient +import unittest +from mock import Mock, patch +import os +from contextlib import nested + + +class TestClient(unittest.TestCase): + ex_resp = Mock() + ex_resp.request.headers = {'rheader1': 'rvalue1', + 'rheader2': 'rvalue2'} + ex_resp.headers = {'header1': 'value1', + 'header2': 'value2'} + ex_resp.content = 'oauth_token=key&oauth_token_secret=secret&oauth_callback_confirmed=true' + ex_resp.json.return_value = {'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3'} + # Change this depending on needs, default is 200 + ex_resp.status_code = 200 + + def setUp(self): + self.email = 'you@example.com' + self.password = 'password123' + self.domain = 'uidomain.com' + self.realm = 'uidomain_realm' + self.consumer_key = '1fc5c9ae...' + self.consumer_secret = '7c664d68...' + self.request_token_url = 'https://ex-sso.openx.org/api/index/initiate' + self.access_token_url = 'https://ex-sso.openx.org/api/index/token' + self.authorization_url = 'https://ex-sso.openx.org/api/login/process' + self.api_path_v1 = '/ox/3.0' + self.api_path_v2 = '/ox/4.0' + self.url = 'https://www.example.com' + + with nested( + patch('ox3apiclient.requests.Session'), + patch('ox3apiclient.Client.log_request') + ) as (self.mock_requests_session, self.mock_client_log_request): + + self.mock_requests_session.return_value.get.return_value = self.ex_resp + self.mock_requests_session.return_value.post.return_value = self.ex_resp + self.mock_requests_session.return_value.put.return_value = self.ex_resp + self.mock_requests_session.return_value.options.return_value = self.ex_resp + self.mock_requests_session.return_value.delete.return_value = self.ex_resp + + self.mock_client_log_request.return_value = None + self.client = ox3apiclient.Client( + email=self.email, + password=self.password, + domain=self.domain, + realm=self.realm, + consumer_key=self.consumer_key, + consumer_secret=self.consumer_secret, + request_token_url=self.request_token_url, + access_token_url=self.access_token_url, + authorization_url=self.authorization_url) + + def test_init(self): + pass + + def test_log_request(self): + pass + + def test_request(self): + pass + + def test_fetch_request_token(self): + # Authorized Case + ret_val = self.client.fetch_request_token() + self.assertTrue(isinstance(ret_val, dict)) + self.assertEqual( + (ret_val['secret'], ret_val['key']), ('secret', 'key')) + # UnAuthorized Case + self.ex_resp.status_code = 401 + with self.assertRaises(ox3apiclient.OAuthException): + self.client.fetch_request_token() + + @patch('ox3apiclient.Client.fetch_request_token') + @patch('ox3apiclient.Client.log_request') + def test_authorize_token(self, + mock_client_log_request, + mock_fetch_request_token): + # mock the post response, and do some setup + r = Mock() + r.content = 'oauth_verifier=verifier' + self.mock_requests_session.return_value.post.return_value = r + mock_client_log_request.return_value = None + mock_fetch_request_token.return_value = {'key': 'key', + 'secret': 'secret'} + self.client._token = {'key': 'key', + 'secret': 'secret'} + + # UnAuthorized Case + r.status_code = 401 + with self.assertRaises(ox3apiclient.OAuthException): + self.client.authorize_token() + # Authorized Case + r.status_code = 200 + self.client.authorize_token() + self.assertEqual(self.client._token['verifier'], 'verifier') + + @patch('requests_oauthlib.OAuth1') + @patch('ox3apiclient.Client.log_request') + def test_fetch_access_token(self, mock_client_log_request, mock_oauth1): + # mock the OAuth1 and session post response + mock_oauth1.return_value = 'oauth' + r = Mock() + r.content = 'oauth_token=key' + self.mock_requests_session.return_value.post.return_value = r + self.client._token = {'key': 'key', + 'secret': 'secret', + 'verifier': 'verifier'} + + # UnAuthorized Case + r.status_code = 401 + with self.assertRaises(ox3apiclient.OAuthException): + self.client.fetch_access_token() + # Authorized Case + r.status_code = 200 + self.assertEqual(self.client.fetch_access_token(), 'key') + + def test_validate_session(self): + ret_val = self.client.validate_session() + self.assertEqual(ret_val, + 'oauth_token=key&' + 'oauth_token_secret=secret&' + 'oauth_callback_confirmed=true') + + def test_logon(self): + with nested( + patch('ox3apiclient.Client.fetch_request_token'), + patch('ox3apiclient.Client.authorize_token'), + patch('ox3apiclient.Client.fetch_access_token'), + patch('ox3apiclient.Client.validate_session'), + ) as (mock_fetch_request_token, mock_authorize_token, + mock_fetch_access_token, mock_validate_session): + mock_fetch_request_token.return_value = None + mock_authorize_token.return_value = None + mock_fetch_access_token.return_value = None + mock_validate_session.return_value = None + ret_val = self.client.logon() + self.assertTrue(isinstance(ret_val, ox3apiclient.Client)) + + def test_logoff(self): + ret_val = self.client.logoff() + self.assertTrue(isinstance(ret_val, ox3apiclient.Client)) + + # def test_resolve_url(self): + # pass + + def test_get(self): + ret_val = self.client.get(self.url) + self.assertEqual(ret_val, {'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3'}) + + def test_options(self): + ret_val = self.client.options('https://example.com') + self.assertEqual(ret_val, {'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3'}) + + def test_put(self): + ret_val = self.client.put('https://example.com', data={'k': 'v'}) + self.assertEqual(ret_val, {'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3'}) + + def test_post(self): + ret_val = self.client.post('https://example.com', data={'k': 'v'}) + self.assertEqual(ret_val, {'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3'}) + + @patch('ox3apiclient.requests.delete') + @patch('ox3apiclient.Client.log_request') + def test_delete(self, mock_client_log_request, mock_requests_delete): + mock_client_log_request.return_value = None + r = Mock() + r.status_code = 204 + mock_requests_delete.return_value = r + ret_val = self.client.delete('https://example.com') + self.assertEqual(ret_val, []) + + r.status_code = 200 + # r.content = {'key': 'value'} + # ret_val = self.client.delete('https://example.com') + # mock_requests_delete.return_value.json.side_effect = AttributeError + # self.assertEqual(ret_val, {'key': 'value'}) + + r.json.return_value = {'key': 'value'} + ret_val = self.client.delete('https://example.com') + self.assertEqual(ret_val, {'key': 'value'}) + + def test_upload_creative(self): + file_path = os.path.join(os.path.dirname(__file__), 'ox3rctest') + ret_val = self.client.upload_creative('123456789', file_path) + self.assertEqual(ret_val, {'key1': 'value1', + 'key2': 'value2', + 'key3': 'value3'}) + +if __name__ == '__main__': + # run this using python -m unittes -v tests from the root dir + unittest.main() diff --git a/tests/clientfromfile.py b/tests/clientfromfile.py index 1ad629a..6761aa9 100644 --- a/tests/clientfromfile.py +++ b/tests/clientfromfile.py @@ -1,12 +1,10 @@ # -*- coding: utf-8 -*- - import os.path +import ox3apiclient import unittest -import ox3apiclient class ClientFromFileTestCase(unittest.TestCase): - def test_returns_client(self): file_path = os.path.join(os.path.dirname(__file__), 'ox3rctest') ox = ox3apiclient.client_from_file(file_path=file_path) @@ -18,13 +16,11 @@ def test_loads_default_env(self): test_values = [ 'domain', - 'realm', 'consumer_secret', 'consumer_key'] loaded_values = [ ox.domain, - ox.realm, ox.consumer_key, ox.consumer_secret] @@ -38,13 +34,11 @@ def test_loads_alternate_env(self): test_values = [ 'domain_dev', - 'realm_dev', 'consumer_secret_dev', 'consumer_key_dev'] loaded_values = [ ox.domain, - ox.realm, ox.consumer_key, ox.consumer_secret] @@ -68,7 +62,6 @@ def test_loads_optional_options(self): test_values = [ 'domain', - 'realm', 'consumer_secret', 'consumer_key', 'callback_url', @@ -82,7 +75,6 @@ def test_loads_optional_options(self): loaded_values = [ ox.domain, - ox.realm, ox.consumer_key, ox.consumer_secret, ox.callback_url, @@ -98,3 +90,8 @@ def test_loads_optional_options(self): loaded_values.sort() self.assertEqual(loaded_values, test_values) + + +if __name__ == '__main__': + # run this using python -m unittes -v tests from the root dir + unittest.main()