In [56]:
%pip install requests_toolbelt

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [57]:
from fastcore.basics import patch_to
from fastcore.test import test_eq

import logging

from abc import ABC, abstractmethod
from dataclasses import dataclass

from requests_toolbelt.utils import dump
import json

import requests

import aiohttp
import asyncio


In [58]:
@dataclass
class ResponseGetData:
    """return this object from any API route
    """

    status: int  # api status
    response: any  # response from api
    is_success: bool

    @classmethod
    def _from_requests_response(cls, res : requests.Response):
        """ returns ResponseGetData from requests.Response"""

        if res.ok and "application/json" in res.headers.get("Content-Type", {}):
            return ResponseGetData(
                status = res.status_code, 
                response = res.json(),
                is_success=True
            )

        elif res.ok:
            return ResponseGetData(
                status=res.status_code,
                response=response.text,
                is_success=True
            )
        
        return ResponseGetData(
            status=res.status_code,
            response=res.reason,
            is_success=False
        )
    
    @classmethod
    async def _from_aihottp_response(cls, res: aiohttp.ClientResponse):
        """ returns ResponseGetData from asyncronous aiohttp.Client.Response"""
        if (
            res.status == 200
            and hasattr(res, "headers")
            and res.headers.get("Content-Type")
            and "application/json" in res.headers.get("Content-Type")
        ):

            # handle if unable to decode json()
            try:
                return ResponseGetData(
                    status=res.status, response=await res.json(), is_success=True
                )

            except Exception as e:
                print(e)
                print("response included json, but defaulted to backup decode method")

                response = await res.read()
                return ResponseGetData(
                    status=res.status, response=response.decode(), is_success=True
                )
        # response is text
        elif res.status == 200:
            return ResponseGetData(
                status=res.status, response=await res.text(), is_success=True
            )

        # response is error
        else:
            return ResponseGetData(status=res.status, response=res.reason, is_success=False)

In [59]:
rgd = ResponseGetData(status = 200, response = "test", is_success = True)

test_eq(rgd.is_success , True)

rgd

ResponseGetData(status=200, response='test', is_success=True)

# Transport Methods by Library

This code base supports two API request libraries, `requests.request` (synchronous) and `aiohttp.ClientRequest` (asynchronous) this can be extended as new libraries emerge with different performance characteristics.

The `RequestTransport` abstract base class adds consistency to the transport methods with consistent methods for generating headers, as well as generating requests from APIs.

In [60]:
class RequestTransport(ABC):
    """Universal transport for handling API request headers
    """

    def __init__(self, 
                 request_timeout : int = None # set request timeout
                 ):
        logger = logging.getLogger('dcdomo')
        logger.setLevel(logging.WARNING)
        self.logger = logging
        self.request_timeout = request_timeout
        self.default_timeout = 10

    @abstractmethod
    def _request() -> ResponseGetData:
        """sends request to an API"""
        pass

    def dump_response(self, response):
        data = dump.dump_all(response)
        return str(data.decode('utf-8'))


    @staticmethod
    def _obj_to_json(obj):
        return json.dumps(obj, default=str)

    def _headers_default_receive_json(self):
        return { 'Accept': 'application/json' }
    
    def _headers_receive_csv(self):
        headers = self._headers_default_receive_json()
        headers['Accept'] = 'text/csv'
        return headers

    def _headers_send_json(self):
        headers = self._headers_default_receive_json()
        headers['Content-Type'] = 'application/json'
        return headers

    def _headers_send_csv(self):
        headers = self._headers_default_receive_json()
        headers['Content-Type'] = 'text/csv'
        return headers

    def _headers_send_gzip(self):
        headers = self._headers_default_receive_json()
        headers['Content-Type'] = 'text/csv'
        headers['Content-Encoding'] = 'gzip'
        return headers



### RequestTransport request methods

Each method establishes the appropriate headers before calling the request method.

Implements request methods.  The hidden ```._request()``` method will be determined by the Transport method (synchronous or asynchronous).

In [61]:
class HTTPMethod:
    """utility class used by RequestTransport to enumerate HTTP methods"""

    GET = 'GET'
    POST = 'POST'
    PUT = 'PUT'
    PATCH = 'PATCH'
    DELETE = 'DELETE'

In [62]:
@patch_to(RequestTransport)
def get(self, url, params = None , request_timeout : int = None):
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_default_receive_json()
    return self._request(url, HTTPMethod.GET, headers, params)

@patch_to(RequestTransport)
def get_csv(self, url, params, request_timeout : int = None):
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_receive_csv()
    return self._request(url, HTTPMethod.GET, headers, params)

@patch_to(RequestTransport)
def post(self, url, body, params, request_timeout : int = None) -> ResponseGetData:
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_send_json()
    return self._request(url, HTTPMethod.POST, headers, params,
                        self._obj_to_json(body))

@patch_to(RequestTransport)
def put(self, url, body ,request_timeout : int = None ):
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_send_json()
    return self._request(url, HTTPMethod.PUT, headers, {},
                        self._obj_to_json(body))

@patch_to(RequestTransport)
def put_csv(self, url, body, request_timeout : int = None):
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_send_csv()
    return self._request(url, HTTPMethod.PUT, headers, {}, body)

@patch_to(RequestTransport)
def put_gzip(self, url, body, request_timeout : int = None):
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_send_gzip()
    return self._request(url, HTTPMethod.PUT, headers, {}, body)

@patch_to(RequestTransport)
def patch(self, url, body, request_timeout : int = None):
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_send_json()
    return self._request(url, HTTPMethod.PATCH, headers, {},
                        self._obj_to_json(body))

@patch_to(RequestTransport)
def delete(self, url, request_timeout : int = None):
    if request_timeout:
        self.request_timeout = request_timeout

    headers = self._headers_default_receive_json()
    return self._request(url, HTTPMethod.DELETE, headers)

## Synchronous Code Execution
The `TransportSync` class is a simple wrapper for `requests.request`

In [63]:
class TransportSync(RequestTransport):
    def __init__(self):
        super().__init__()

    def _request(self,
                url : str,
                method : HTTPMethod,
                headers : dict ,
                params : dict = None,
                body : str or dict = None):

        self.logger.debug('{} {} {}'.format(method, url, body))

        request_args = {'method': method, 
                        'url': url,
                        'headers': headers,
                        'params': params,
                        'data': body,
                        'stream': True}

        if self.request_timeout:
            request_args['timeout'] = self.request_timeout

        res = requests.request(**request_args)
        return ResponseGetData._from_requests_response(res)

In [64]:
ts = TransportSync()

test_eq(isinstance(ts, TransportSync), True)

get_res = ts.get(url = 'http://www.thecocktaildb.com/api/json/v1/1/search.php?s=margarita')

test_eq(get_res.status, 200)


## Asynchronous Code Execution

The `TransportAsync` class is a wrapper for `aiohttp`'s ClientSession and ClientResponse implementations.  Notice the use of ```async/await``` in the code samples


In [65]:
class TransportAsync(RequestTransport):
    """wrapper for aiohttp.ClientSession and aiohttp.ClientResponse for handling asynchronous code execution.  In the event of request_timeout, will default to synchronous code execution via requests.request library"""
    
    def __init__(self):
        super().__init__()

    async def _request(self,
                url : str,
                method : HTTPMethod,
                headers : dict ,
                params : dict = None,
                body : str or dict = None,
                session : aiohttp.ClientSession = None,
                ):

        if session is None:
            is_close_session = True
            session = aiohttp.ClientSession()
        else:
            is_close_session = False
        
        self.logger.debug('{} {} {}'.format(method, url, body))

        request_args = {'url': url,
                        'headers': headers,
                        'params': params,
                        'data': body}

        try:
            res = await getattr(session, method.lower())(
                timeout = aiohttp.ClientTimeout(total = self.request_timeout),
                **request_args )
            
            return await ResponseGetData._from_aihottp_response(res)
        
        except asyncio.TimeoutError as e:
            print('defaulting to sync request.  TimeoutError')
            
            res = requests.request(
                method = method, 
                timeout = self.default_timeout,
                **request_args)

            return ResponseGetData._from_requests_response(res)
        
        finally:
            if is_close_session:
                await session.close()


In [66]:
tas = TransportAsync()
test_eq(isinstance(tas, TransportAsync), True)

get_res = await tas.get(url = 'http://www.thecocktaildb.com/api/json/v1/1/search.php?s=margarita', request_timeout = 1)

test_eq(get_res.status, 200)

get_res.response.get('drinks')[0].get('idDrink')

'11007'

To LOOP over asynchronous code blocks, take advantage of `asyncio.gather()`

1. Notice how the ```get_drink()``` function implements what would usually be contained within the FOR LOOP allowing us to implement a list comprehension.  The ```asyncio.sleep()``` function highlights the fact that we are retrieving all the requests simultaneously.

2. Use ```await asyncio.gather( * [list_of_asynchronous_functions])``` forces the code to wait till all the APIs have returned a response before proceeding to the final step of printing the results.

3. Each ```.get()``` request returns a `ResponseGetData` object.  We can test ```iif(ResponseGetData.is_success)``` to validate having received a 200 response.  The response contenent will always be in the .response attribute.

In [72]:
Token = str
AuthHeader = dict

class DomoAuth(ABC):
    auth_header: str = None
    auth_token: str = None
    

    @abstractmethod
    def set_header():
        """sets a header on the `DomoAuth` class"""
        pass

    @abstractmethod
    def get_auth_token() -> Token:
        """retrieves and sets an auth `Token` on `DomoAuth` class"""
        pass

class DomoFullAuth(DomoAuth):
    def __init__(self, 
                 domo_username:str,
                 domo_password:str,
                 domo_instance:str):
        self.domo_username = domo_username
        self.domo_password = domo_password
        self.domo_instance = domo_instance

        super().__init__():
        
    async def get_auth_token(self) -> Token:
        
        url = f'https://{domo_instance}.domo.com/api/content/v2/authentication'

        body = {
            'method': 'password',
            'emailAddress': domo_username,
            'password': domo_password
        }

        tsa = TransportAsync()
        res = await tsa.post(url = url, body = body)

        if not tsa.is_succss:
            return 


    def set_header(self ) -> AuthHeader:
        auth_header = {'x-domo-authentication': self.auth_token }
        
        self.auth_header = auth_header
        return auth_header



def header_client_auth(access_token):
    return {
            'Authorization': 'bearer ' + access_token,
        }


def header_developer_auth(access_token):
    return {
            'x-domo-developer-token': access_token,
        }


NameError: name 'request_fn' is not defined

In [None]:
import nbdev
nbdev.show_doc(DomoFullAuth.get_auth_token)

nbdev.show_doc(DomoFullAuth.set_header)

---

### DomoFullAuth.set_header

>      DomoFullAuth.set_header ()

In [None]:
async def get_drink(drink_name: str):    
    url = f'http://www.thecocktaildb.com/api/json/v1/1/search.php?s={drink_name}'
    print(f'getting - "{url}"')

    await(asyncio.sleep(2))    
    return await tas.get(url = url)

drink_names = ['margarita', 'old fashioned', 'gin and tonic', 'screwdriver', 'vodka tonic']

drinks_requests = await asyncio.gather(
    * [get_drink(drink_name) for drink_name in drink_names]
)

for drink in drinks_requests:
    [ print(drink_match.get('idDrink')) for drink_match in drink.response.get('drinks') if drink.is_success ]
        


getting - "http://www.thecocktaildb.com/api/json/v1/1/search.php?s=margarita"
getting - "http://www.thecocktaildb.com/api/json/v1/1/search.php?s=old fashioned"
getting - "http://www.thecocktaildb.com/api/json/v1/1/search.php?s=gin and tonic"
getting - "http://www.thecocktaildb.com/api/json/v1/1/search.php?s=screwdriver"
getting - "http://www.thecocktaildb.com/api/json/v1/1/search.php?s=vodka tonic"
11007
11118
17216
16158
12322
178332
11001
11403
12162
12091
178364
