Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
flake8
mock==2.0.0
protobuf==3.9.1
pytest==4.3.0
Expand Down
5 changes: 4 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ exclude =
README.md,
LICENSE.md,
requirements.txt,
*protos*,


[coverage:run]
include =
stackify/*
omit =
*tests*
*tests*,
*handler_backport.py,
*protos*,

[tool:pytest]
python_files=tests.py test.py test_*.py *_test.py tests_*.py *_tests.py
4 changes: 3 additions & 1 deletion stackify/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

# using `%2F` instead of `/` as per package documentation
DEFAULT_SOCKET_FILE = '%2Fusr%2Flocal%2Fstackify%2Fstackify.sock'
DEFAULT_HTTP_ENDPOINT = 'https://localhost:10601'
SOCKET_URL = 'http+unix://' + DEFAULT_SOCKET_FILE
SOCKET_LOG_URL = '/log'
AGENT_LOG_URL = '/log'

API_REQUEST_INTERVAL_IN_SEC = 30

Expand Down Expand Up @@ -36,3 +37,4 @@

TRANSPORT_TYPE_DEFAULT = 'default'
TRANSPORT_TYPE_AGENT_SOCKET = 'agent_socket'
TRANSPORT_TYPE_AGENT_HTTP = 'agent_http'
4 changes: 2 additions & 2 deletions stackify/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from stackify.constants import MAX_BATCH
from stackify.constants import QUEUE_SIZE
from stackify.timer import RepeatedTimer
from stackify.transport import Transport
from stackify.transport import configure_transport


internal_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -70,7 +70,7 @@ def __init__(self, queue_, max_batch=MAX_BATCH, config=None, **kwargs):

self.max_batch = max_batch
self.messages = []
self.transport = Transport(config, **kwargs)
self.transport = configure_transport(config, **kwargs)
self.timer = RepeatedTimer(API_REQUEST_INTERVAL_IN_SEC, self.send_group)

self._started = False
Expand Down
2 changes: 2 additions & 0 deletions stackify/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ def _time(self):

def start(self):
if not self._started:
self._started = True
self.thread.setDaemon(True)
self.thread.start()

def stop(self):
if self._started:
self._started = False
self.event.set()
self.thread.join()
104 changes: 21 additions & 83 deletions stackify/transport/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import logging

from stackify.constants import LOG_SAVE_URL
from stackify.constants import SOCKET_LOG_URL
from stackify.constants import TRANSPORT_TYPE_AGENT_HTTP
from stackify.constants import TRANSPORT_TYPE_AGENT_SOCKET
from stackify.constants import TRANSPORT_TYPE_DEFAULT
from stackify.transport.agent import AgentSocket
from stackify.transport.agent.message import Log
from stackify.transport.agent.message import LogGroup
from stackify.transport.agent import AgentSocketTransport
from stackify.transport.agent import AgentHTTPTransport
from stackify.transport.application import get_configuration
from stackify.transport.application import EnvironmentDetail
from stackify.transport.default import HTTPClient
from stackify.transport.default.log import LogMsg
from stackify.transport.default.log import LogMsgGroup
from stackify.transport.default import DefaultTransport


internal_logger = logging.getLogger(__name__)

Expand All @@ -24,93 +21,34 @@ class TransportTypes(object):
Types:
* DEFAULT - HTTP transport that will directly send logs to the Platform
* AGENT_SOCKET - HTTP warapped Unix Socket Domain that will send logs to the StackifyAgent
* AGENT_HTTP - HTTP transport that will send logs to the Agent using HTTP requests
"""

DEFAULT = TRANSPORT_TYPE_DEFAULT
AGENT_SOCKET = TRANSPORT_TYPE_AGENT_SOCKET
AGENT_HTTP = TRANSPORT_TYPE_AGENT_HTTP

@classmethod
def get_transport(self, api_config=None, env_details=None):
# determine which transport to use depening on users config
if api_config.transport == self.AGENT_SOCKET:
internal_logger.debug('Setting Agent Socket Transport.')
api_config.transport = self.AGENT_SOCKET
return AgentSocket()
return AgentSocketTransport(api_config, env_details)

if api_config.transport == self.AGENT_HTTP:
internal_logger.debug('Setting Agent HTTP Transport.')
return AgentHTTPTransport(api_config, env_details)

internal_logger.debug('Setting Default Transport.')
api_config.transport = self.DEFAULT
return HTTPClient(api_config, env_details)

@classmethod
def create_message(self, record, api_config, env_details):
# create message depending on which transport
if api_config.transport == self.AGENT_SOCKET:
return Log(record, api_config, env_details).get_object()

msg = LogMsg()
msg.from_record(record)
return msg

@classmethod
def create_group_message(self, messages, api_config, env_details):
# create group message depending on which transport
if api_config.transport == self.AGENT_SOCKET:
return LogGroup(messages, api_config, env_details).get_object()

return LogMsgGroup(messages)

@classmethod
def get_log_url(self, api_config):
# return log url depending on which transport
if api_config.transport == self.AGENT_SOCKET:
return api_config.socket_url + SOCKET_LOG_URL

return LOG_SAVE_URL

@classmethod
def prepare_message(self, api_config, message):
# convert message depending on which transport
if api_config.transport == self.AGENT_SOCKET:
return message.SerializeToString()

return message


class Transport(object):
"""
Transport base class
"""

def __init__(self, config=None, **kwargs):
self.api_config = config or get_configuration(**kwargs)
self.env_details = EnvironmentDetail(self.api_config)
self._transport = TransportTypes.get_transport(
self.api_config,
self.env_details,
)

def create_message(self, record):
# create message from record
return TransportTypes.create_message(
record,
self.api_config,
self.env_details,
)
return DefaultTransport(api_config, env_details)

def create_group_message(self, messages):
# create group message from list of records
return TransportTypes.create_group_message(
messages,
self.api_config,
self.env_details,
)

def send(self, group_message):
# send group message
try:
self._transport.send(
TransportTypes.get_log_url(self.api_config),
TransportTypes.prepare_message(self.api_config, group_message),
)
except Exception as e:
internal_logger.error('Request error: {}'.format(e))
def configure_transport(config=None, **kwargs):
# return which transport to use depending on users input
api_config = config or get_configuration(**kwargs)
env_details = EnvironmentDetail(api_config)
return TransportTypes.get_transport(
api_config,
env_details,
)
31 changes: 30 additions & 1 deletion stackify/transport/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,30 @@
from .agent_socket import AgentSocket # noqa
import logging

from stackify.constants import AGENT_LOG_URL
from stackify.transport.agent import agent_http
from stackify.transport.agent import agent_socket
from stackify.transport.base import AgentBaseTransport

internal_logger = logging.getLogger(__name__)


class AgentSocketTransport(AgentBaseTransport):
"""
Agent Socket Transport handles sending of logs using Unix Socket Domain
"""

def __init__(self, api_config, env_details):
super(AgentSocketTransport, self).__init__(api_config, env_details)
self.url = api_config.socket_url + AGENT_LOG_URL
self._transport = agent_socket.AgentSocket()


class AgentHTTPTransport(AgentBaseTransport):
"""
Agent HTTP Transport handles sending of logs using HTTP requests
"""

def __init__(self, api_config, env_details):
super(AgentHTTPTransport, self).__init__(api_config, env_details)
self.url = api_config.http_endpoint + AGENT_LOG_URL
self._transport = agent_http.AgentHTTP()
26 changes: 26 additions & 0 deletions stackify/transport/agent/agent_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging
import requests
import retrying

internal_logger = logging.getLogger(__name__)


class AgentHTTP(object):
"""
AgentHTTP class that handles HTTP post requests
"""

def _post(self, url, payload):
headers = {
'Content-Type': 'application/x-protobuf',
}
try:
return requests.post(url, payload, headers=headers)
except Exception as e:
internal_logger.debug('HTTP transport exception: {}.'.format(e))
raise

@retrying.retry(wait_exponential_multiplier=1000, stop_max_delay=32000)
def send(self, url, payload):
# send payload through socket domain using _post method
return self._post(url, payload)
2 changes: 1 addition & 1 deletion stackify/transport/agent/agent_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ def _post(self, url, payload):
@retrying.retry(wait_exponential_multiplier=1000, stop_max_delay=32000)
def send(self, url, payload):
# send payload through socket domain using _post method
self._post(url, payload)
return self._post(url, payload)
2 changes: 1 addition & 1 deletion stackify/transport/agent/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@ def __init__(self, messages, api_config, env_details, logger=None):
log_group.application_location = env_details.appLocation
log_group.logger = logger or __name__
log_group.platform = 'python'
log_group.logs.MergeFrom(messages)
log_group.logs.extend(messages)
18 changes: 16 additions & 2 deletions stackify/transport/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from stackify.utils import arg_or_env
from stackify.constants import API_URL
from stackify.constants import DEFAULT_HTTP_ENDPOINT
from stackify.constants import SOCKET_URL
from stackify.constants import TRANSPORT_TYPE_AGENT_HTTP
from stackify.constants import TRANSPORT_TYPE_AGENT_SOCKET
from stackify.constants import TRANSPORT_TYPE_DEFAULT
from stackify.transport.default.formats import JSONObject
Expand All @@ -27,12 +29,22 @@ class ApiConfiguration:
ApiConfiguration class that stores application configurations
"""

def __init__(self, api_key, application, environment, api_url=API_URL, socket_url=SOCKET_URL, transport=None):
def __init__(
self,
api_key,
application,
environment,
api_url=API_URL,
socket_url=SOCKET_URL,
transport=None,
http_endpoint=DEFAULT_HTTP_ENDPOINT,
):
self.api_key = api_key
self.api_url = api_url
self.application = application
self.environment = environment
self.socket_url = socket_url
self.http_endpoint = http_endpoint
self.transport = transport


Expand All @@ -43,7 +55,8 @@ def get_configuration(**kwargs):
"""

transport = arg_or_env('transport', kwargs, TRANSPORT_TYPE_DEFAULT)
if transport == TRANSPORT_TYPE_AGENT_SOCKET:

if transport in [TRANSPORT_TYPE_AGENT_SOCKET, TRANSPORT_TYPE_AGENT_HTTP]:
api_key = arg_or_env('api_key', kwargs, '')
else:
api_key = arg_or_env('api_key', kwargs)
Expand All @@ -54,5 +67,6 @@ def get_configuration(**kwargs):
api_key=api_key,
api_url=arg_or_env('api_url', kwargs, API_URL),
socket_url=arg_or_env('socket_url', kwargs, SOCKET_URL),
http_endpoint=arg_or_env('http_endpoint', kwargs, DEFAULT_HTTP_ENDPOINT, env_key='STACKIFY_TRANSPORT_HTTP_ENDPOINT'),
transport=transport,
)
40 changes: 40 additions & 0 deletions stackify/transport/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from stackify.transport.agent.message import Log
from stackify.transport.agent.message import LogGroup


class BaseTransport(object):
"""
Base Transport
"""
def __init__(self, api_config, env_details):
self._api_config = api_config
self._env_details = env_details

def create_message(self, record):
raise NotImplementedError

def create_group_message(self, messages):
raise NotImplementedError

def send(self, group_message):
raise NotImplementedError


class AgentBaseTransport(BaseTransport):
"""
Base Transport for protobuf data
"""
url = None
_transport = None

def __init__(self, api_config, env_details):
super(AgentBaseTransport, self).__init__(api_config, env_details)

def create_message(self, record):
return Log(record, self._api_config, self._env_details).get_object()

def create_group_message(self, messages):
return LogGroup(messages, self._api_config, self._env_details).get_object()

def send(self, group_message):
return self._transport.send(self.url, group_message.SerializeToString())
31 changes: 30 additions & 1 deletion stackify/transport/default/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,30 @@
from .http import HTTPClient # noqa
from stackify.constants import LOG_SAVE_URL
from stackify.transport.base import BaseTransport
from stackify.transport.default.http import HTTPClient
from stackify.transport.default.log import LogMsg
from stackify.transport.default.log import LogMsgGroup


class DefaultTransport(BaseTransport):
"""
Default Transport handles sending of logs directly to platform
"""
_transport = None

def __init__(self, api_config, env_details):
super(DefaultTransport, self).__init__(api_config, env_details)
self._transport = HTTPClient(api_config, env_details)

def create_message(self, record):
msg = LogMsg()
msg.from_record(record)
return msg

def create_group_message(self, messages):
return LogMsgGroup(messages)

def send(self, group_message):
self._transport.send(
LOG_SAVE_URL,
group_message,
)
Loading