Skip to content

Commit

Permalink
Merge branch 'release/0.4.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
robertmrk committed Jan 6, 2019
2 parents b11fcfa + a6f2ede commit 2537e4c
Show file tree
Hide file tree
Showing 20 changed files with 516 additions and 179 deletions.
5 changes: 5 additions & 0 deletions DESCRIPTION.rst
Expand Up @@ -73,6 +73,11 @@ Usage
loop = asyncio.get_event_loop()
loop.run_until_complete(stream_events())
Documentation
-------------

http://aiosfstream.readthedocs.io/

.. _aiohttp: https://github.com/aio-libs/aiohttp/
.. _asyncio: https://docs.python.org/3/library/asyncio.html
.. _api: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/intro_stream.htm
Expand Down
3 changes: 3 additions & 0 deletions MANIFEST.in
Expand Up @@ -2,7 +2,10 @@ include LICENSE.txt
include DESCRIPTION.rst
include README.rst
include .pylintrc
include aiosfstream/py.typed
graft tests
graft docs
global-exclude *.pyc
global-exclude .python-version
prune docs/build
prune venv*
5 changes: 5 additions & 0 deletions README.rst
Expand Up @@ -73,6 +73,11 @@ Usage
loop = asyncio.get_event_loop()
loop.run_until_complete(stream_events())
Documentation
-------------

http://aiosfstream.readthedocs.io/

Install
-------

Expand Down
17 changes: 10 additions & 7 deletions aiosfstream/__init__.py
@@ -1,13 +1,16 @@
"""Salesforce Streaming API client for asyncio"""
import logging

from ._metadata import VERSION as __version__ # noqa: F401
from .client import Client, SalesforceStreamingClient # noqa: F401
from .auth import PasswordAuthenticator # noqa: F401
from .auth import RefreshTokenAuthenticator # noqa: F401
from .replay import ReplayMarker, ReplayOption # noqa: F401
from .replay import MappingStorage, DefaultMappingStorage # noqa: F401
from .replay import ConstantReplayId, ReplayMarkerStorage # noqa: F401
from aiosfstream._metadata import VERSION as __version__ # noqa: F401
from aiosfstream.client import Client, SalesforceStreamingClient # noqa: F401
from aiosfstream.client import ReplayMarkerStoragePolicy # noqa: F401
from aiosfstream.auth import PasswordAuthenticator # noqa: F401
from aiosfstream.auth import RefreshTokenAuthenticator # noqa: F401
from aiosfstream.replay import ReplayMarker, ReplayOption # noqa: F401
from aiosfstream.replay import MappingStorage # noqa: F401
from aiosfstream.replay import DefaultMappingStorage # noqa: F401
from aiosfstream.replay import ConstantReplayId # noqa: F401
from aiosfstream.replay import ReplayMarkerStorage # noqa: F401

# Create a default handler to avoid warnings in applications without logging
# configuration
Expand Down
7 changes: 6 additions & 1 deletion aiosfstream/_metadata.py
Expand Up @@ -3,6 +3,11 @@
DESCRIPTION = "Salesforce Streaming API client for asyncio"
KEYWORDS = "salesforce asyncio aiohttp comet cometd bayeux push streaming"
URL = "https://github.com/robertmrk/aiosfstream"
VERSION = "0.3.0"
PROJECT_URLS = {
"CI": "https://travis-ci.org/robertmrk/aiosfstream",
"Coverage": "https://coveralls.io/github/robertmrk/aiosfstream",
"Docs": "http://aiosfstream.readthedocs.io/"
}
VERSION = "0.4.0"
AUTHOR = "Róbert Márki"
AUTHOR_EMAIL = "gsmiko@gmail.com"
95 changes: 56 additions & 39 deletions aiosfstream/auth.py
Expand Up @@ -3,12 +3,15 @@
from http import HTTPStatus
import reprlib
import json
from typing import Optional, Tuple

from aiocometd import AuthExtension
from aiocometd.typing import JsonObject, JsonLoader, JsonDumper, Payload, \
Headers
from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientError

from .exceptions import AuthenticationError
from aiosfstream.exceptions import AuthenticationError


TOKEN_URL = "https://login.salesforce.com/services/oauth2/token"
Expand All @@ -20,55 +23,70 @@
class AuthenticatorBase(AuthExtension):
"""Abstract base class to serve as a base for implementing concrete
authenticators"""
def __init__(self, sandbox=False, json_dumps=json.dumps,
json_loads=json.loads):
def __init__(self, sandbox: bool = False,
json_dumps: JsonDumper = json.dumps,
json_loads: JsonLoader = json.loads) -> None:
"""
:param bool sandbox: Marks whether the authentication has to be done \
:param sandbox: Marks whether the authentication has to be done \
for a sandbox org or for a production org
:param json_dumps: Function for JSON serialization, the default is \
:func:`json.dumps`
:type json_dumps: :func:`callable`
:param json_loads: Function for JSON deserialization, the default is \
:func:`json.loads`
:type json_loads: :func:`callable`
"""
#: Marks whether the authentication has to be done for a sandbox org \
#: or for a production org
self._sandbox = sandbox
#: Salesforce session ID that can be used with the web services API
self.access_token = None
self.access_token: Optional[str] = None
#: Value is Bearer for all responses that include an access token
self.token_type = None
self.token_type: Optional[str] = None
#: A URL indicating the instance of the user’s org
self.instance_url = None
self.instance_url: Optional[str] = None
#: Identity URL that can be used to both identify the user and query \
#: for more information about the user
self.id = None # pylint: disable=invalid-name
self.id: Optional[str] = None # pylint: disable=invalid-name
#: Base64-encoded HMAC-SHA256 signature signed with the consumer’s \
#: private key containing the concatenated ID and issued_at. Use to \
#: verify that the identity URL hasn’t changed since the server sent it
self.signature = None
self.signature: Optional[str] = None
#: Timestamp when the signature was created
self.issued_at = None
self.issued_at: Optional[str] = None
#: Function for JSON serialization
self.json_dumps = json_dumps
#: Function for JSON deserialization
self.json_loads = json_loads

@property
def _token_url(self):
def _token_url(self) -> str:
"""The URL that should be used for token requests"""
if self._sandbox:
return SANDBOX_TOKEN_URL
return TOKEN_URL

async def outgoing(self, payload, headers):
async def outgoing(self, payload: Payload, headers: Headers) -> None:
"""Process outgoing *payload* and *headers*
Called just before a payload is sent to insert the ``Authorization`` \
header value.
:param payload: List of outgoing messages
:param headers: Headers to send
:raise AuthenticationError: If the value of :py:attr:`~token_type` or \
:py:attr:`~access_token` is ``None``. In other words, it's raised if \
the method is called without authenticating first.
"""
if self.token_type is None or self.access_token is None:
raise AuthenticationError("Unknown token_type and access_token "
"values. Method called without "
"authenticating first.")
headers["Authorization"] = self.token_type + " " + self.access_token

async def incoming(self, payload, headers=None):
async def incoming(self, payload: Payload,
headers: Optional[Headers] = None) -> None:
pass

async def authenticate(self):
async def authenticate(self) -> None:
"""Called on initialization and after a failed authentication attempt
:raise AuthenticationError: If the server rejects the authentication \
Expand All @@ -91,11 +109,10 @@ async def authenticate(self):
self.__dict__.update(response_data)

@abstractmethod
async def _authenticate(self):
async def _authenticate(self) -> Tuple[int, JsonObject]:
"""Authenticate the user
:return: The status code and response data from the server's response
:rtype: tuple(int, dict)
:raise aiohttp.client_exceptions.ClientError: If a network failure \
occurs
"""
Expand All @@ -106,23 +123,23 @@ async def _authenticate(self):

class PasswordAuthenticator(AuthenticatorBase):
"""Authenticator for using the OAuth 2.0 Username-Password Flow"""
def __init__(self, consumer_key, consumer_secret, username, password,
sandbox=False, json_dumps=json.dumps, json_loads=json.loads):
def __init__(self, consumer_key: str, consumer_secret: str,
username: str, password: str, sandbox: bool = False,
json_dumps: JsonDumper = json.dumps,
json_loads: JsonLoader = json.loads) -> None:
"""
:param str consumer_key: Consumer key from the Salesforce connected \
:param consumer_key: Consumer key from the Salesforce connected \
app definition
:param str consumer_secret: Consumer secret from the Salesforce \
:param consumer_secret: Consumer secret from the Salesforce \
connected app definition
:param str username: Salesforce username
:param str password: Salesforce password
:param bool sandbox: Marks whether the authentication has to be done \
:param username: Salesforce username
:param password: Salesforce password
:param sandbox: Marks whether the authentication has to be done \
for a sandbox org or for a production org
:param json_dumps: Function for JSON serialization, the default is \
:func:`json.dumps`
:type json_dumps: :func:`callable`
:param json_loads: Function for JSON deserialization, the default is \
:func:`json.loads`
:type json_loads: :func:`callable`
"""
super().__init__(sandbox=sandbox,
json_dumps=json_dumps,
Expand All @@ -136,15 +153,15 @@ def __init__(self, consumer_key, consumer_secret, username, password,
#: Salesforce password
self.password = password

def __repr__(self):
def __repr__(self) -> str:
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(consumer_key={reprlib.repr(self.client_id)}," \
f"consumer_secret={reprlib.repr(self.client_secret)}, " \
f"username={reprlib.repr(self.username)}, " \
f"password={reprlib.repr(self.password)})"

async def _authenticate(self):
async def _authenticate(self) -> Tuple[int, JsonObject]:
async with ClientSession(json_serialize=self.json_dumps) as session:
data = {
"grant_type": "password",
Expand All @@ -160,24 +177,24 @@ async def _authenticate(self):

class RefreshTokenAuthenticator(AuthenticatorBase):
"""Authenticator for using the OAuth 2.0 Refresh Token Flow"""
def __init__(self, consumer_key, consumer_secret, refresh_token,
sandbox=False, json_dumps=json.dumps, json_loads=json.loads):
def __init__(self, consumer_key: str, consumer_secret: str,
refresh_token: str, sandbox: bool = False,
json_dumps: JsonDumper = json.dumps,
json_loads: JsonLoader = json.loads) -> None:
"""
:param str consumer_key: Consumer key from the Salesforce connected \
:param consumer_key: Consumer key from the Salesforce connected \
app definition
:param str consumer_secret: Consumer secret from the Salesforce \
:param consumer_secret: Consumer secret from the Salesforce \
connected app definition
:param str refresh_token: A refresh token obtained from Salesforce \
:param refresh_token: A refresh token obtained from Salesforce \
by using one of its authentication methods (for example with the \
OAuth 2.0 Web Server Authentication Flow)
:param bool sandbox: Marks whether the authentication has to be done \
:param sandbox: Marks whether the authentication has to be done \
for a sandbox org or for a production org
:param json_dumps: Function for JSON serialization, the default is \
:func:`json.dumps`
:type json_dumps: :func:`callable`
:param json_loads: Function for JSON deserialization, the default is \
:func:`json.loads`
:type json_loads: :func:`callable`
"""
super().__init__(sandbox=sandbox,
json_dumps=json_dumps,
Expand All @@ -189,14 +206,14 @@ def __init__(self, consumer_key, consumer_secret, refresh_token,
#: Salesforce refresh token
self.refresh_token = refresh_token

def __repr__(self):
def __repr__(self) -> str:
"""Formal string representation"""
cls_name = type(self).__name__
return f"{cls_name}(consumer_key={reprlib.repr(self.client_id)}," \
f"consumer_secret={reprlib.repr(self.client_secret)}, " \
f"refresh_token={reprlib.repr(self.refresh_token)})"

async def _authenticate(self):
async def _authenticate(self) -> Tuple[int, JsonObject]:
async with ClientSession(json_serialize=self.json_dumps) as session:
data = {
"grant_type": "refresh_token",
Expand Down

0 comments on commit 2537e4c

Please sign in to comment.