Skip to content

Commit

Permalink
first working version of threading
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Bastida committed Jul 5, 2015
1 parent 365b64f commit 1e5afd8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
66 changes: 42 additions & 24 deletions awslogs/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import re
import sys
import time
import threading
import functools
from datetime import datetime, timedelta

import boto3
Expand Down Expand Up @@ -41,7 +43,7 @@ def aws_connection_wrap(*args, **kwargs):
if code == u'ThrottlingException':
time.sleep(0.5)
continue
elif code == u'AccessDeniedException':
elif code == u'AccessDeniedException' or code == u'ExpiredTokenException':
hint = exc.response['Error'].get('Message', 'AccessDeniedException')
raise exceptions.AccessDeniedError(hint)
raise
Expand Down Expand Up @@ -71,12 +73,6 @@ def __init__(self, **kwargs):
self.start = self.parse_datetime(kwargs.get('start'))
self.end = self.parse_datetime(kwargs.get('end'))

# self.client = AWSClient('logs',
# aws_access_key_id=self.aws_access_key_id,
# aws_secret_access_key=self.aws_secret_access_key,
# aws_session_token=self.aws_session_token,
# region_name=self.aws_region
# )
self.client = AWSClient('logs',
aws_access_key_id=None,
aws_secret_access_key=None,
Expand All @@ -92,22 +88,36 @@ def _get_streams_from_pattern(self, group, pattern):
if re.match(reg, stream):
yield stream

def get_logs(self):
def list_logs(self):
streams = list(self._get_streams_from_pattern(self.log_group_name, self.log_stream_name))
max_stream_length = max([len(s) for s in streams])
group_length = len(self.log_group_name)

kwargs = {'logGroupName': self.log_group_name,
'logStreamNames': streams,
'interleaved': True}
if self.start:
kwargs['startTime'] = self.start
print_lock = threading.Lock()
end_condition = threading.Condition()
self.list_logs_end = False

if self.end:
kwargs['endTime'] = self.end
def get_logs(token=None):
kwargs = {'logGroupName': self.log_group_name,
'logStreamNames': streams,
'interleaved': True}
if self.start:
kwargs['startTime'] = self.start

if self.end:
kwargs['endTime'] = self.end

if token:
kwargs['nextToken'] = token

while True:
response = self.client.filter_log_events(**kwargs)

if 'nextToken' in response:
page = threading.Thread(target=functools.partial(get_logs, token=response['nextToken']))
page.start()

print_lock.acquire()

for event in response.get('events', []):
output = [event['message']]
if self.output_stream_enabled:
Expand All @@ -126,16 +136,24 @@ def get_logs(self):
'green'
)
)
yield ' '.join(output)
print(' '.join(output))

if 'nextToken' in response:
kwargs['nextToken']= response['nextToken']
else:
break
print_lock.release()

def list_logs(self):
for event in self.get_logs():
print(event)
if 'nextToken' not in response:
end_condition.acquire()
self.list_logs_end = True
end_condition.notify_all()
end_condition.release()

end_condition.acquire()

main = threading.Thread(target=get_logs)
main.start()

while not self.list_logs_end:
end_condition.wait()
end_condition.release()

def list_groups(self):
"""Lists available CloudWatch logs groups"""
Expand Down
1 change: 0 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def test_main_get(self, mock_stdout, botoclient):
client.describe_log_streams.side_effect = streams

main("awslogs get AAA DDD --no-color".split())

self.assertEqual(
mock_stdout.getvalue(),
("AAA DDD Hello 1\n"
Expand Down

0 comments on commit 1e5afd8

Please sign in to comment.