Skip to content

Commit

Permalink
Merge pull request #21 from rlgomes/bunch-o-random-fixes
Browse files Browse the repository at this point in the history
various random fixes
  • Loading branch information
rlgomes committed Aug 10, 2016
2 parents 2f95e68 + 2a558d6 commit 5503e8f
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 34 deletions.
4 changes: 4 additions & 0 deletions docs/adapters/stdio.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ following options available at this time:
read('stdio',
format='jsonl',
file=None,
strip_ansi=False,
time='time') | ...
```

Argument | Description | Required?
----------- | --------------------------------------------------------------------------- | :---------
format | format specifier used to pick a different kind of [streamer](streamers/) | No, default: `jsonl`
file | filename to read from, when not specified we read from STDIN | No, default: `None`
strip_ansi | when set to `True` then all ANSI sequences are removed from the input | No, default: `False`
time | field name that contains valid timestamp | No, default: `time`

## write
Expand All @@ -31,13 +33,15 @@ following options available at this time:
write('stdio',
format='jsonl',
file=None,
append=False,
time='time') | ...
```

Argument | Description | Required?
----------- | --------------------------------------------------------------------------- | :---------
format | format specifier used to pick a different kind of [streamer](streamers/) | No, default: `jsonl`
file | filename to write to, when not specified we read from STDOUT | No, default: `None`
append | boolean that specifies if we should append or not to any existing output | No, default: `False`
time | field name that contains valid timestamp | No, default: `time`


Expand Down
8 changes: 5 additions & 3 deletions docs/adapters/streamers/csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ No arguments exposed at this point for read operations.

## write

Argument | Description | Required?
-------- | --------------------------------------------------------- | :---------
headers | boolean used to specify to print or not print the headers | No, default: `True`
Argument | Description | Required?
----------------- | ------------------------------------------------------------------------------ | :---------
headers | boolean used to specify to print or not print the headers | No, default: `True`
delimiter | delimiter character to use when reading the data | No, defualt: `,`
ignore_whitespace | boolean used to specify if any preceding/trailing whitespace should be ignored | No, default: `False`
44 changes: 36 additions & 8 deletions flume/adapters/stdio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
stdio adapter
"""
import re
import sys

from flume.adapters.adapter import adapter
Expand All @@ -15,21 +16,36 @@ class InputStream(object):
the input to stream line by line and not block the pipeline.
"""

def __init__(self, file):
self.file = file
def __init__(self, stream, strip_ansi=False):
self.stream = stream
self.strip_ansi = strip_ansi

def __remove_ansi(self, line):
if line is None:
return None

return re.sub(r'\x1b[^A-Za-z]*[A-Za-z]', '', line)

def read(self, size=1024):
return self.file.read(size=1024)
if self.strip_ansi:
return self.__remove_ansi(self.stream.read(size))

else:
return self.stream.read(size)

def readlines(self):

while True:
line = self.file.readline()
line = self.stream.readline()

if not line:
break

yield line
if self.strip_ansi:
yield self.__remove_ansi(line)

else:
yield line

class stdio(adapter):
"""
Expand All @@ -48,18 +64,24 @@ class stdio(adapter):
def __init__(self,
format='jsonl',
file=None,
append=False,
strip_ansi=False,
**kwargs):
self.streamer = streamers.get_streamer(format, **kwargs)
self.file = file
self.append = append
self.strip_ansi = strip_ansi

def read(self):
if self.file is None:
for point in self.streamer.read(InputStream(stdio.stdin)):
for point in self.streamer.read(InputStream(stdio.stdin,
strip_ansi=self.strip_ansi)):
yield [Point(**point)]

else:
with open(self.file, 'r') as stream:
stream = InputStream(stream)
stream = InputStream(stream,
strip_ansi=self.strip_ansi)
for point in self.streamer.read(stream):
yield [Point(**point)]

Expand All @@ -68,7 +90,13 @@ def write(self, points):
self.streamer.write(stdio.stdout, points)

else:
with open(self.file, 'a') as stream:
if self.append:
mode = 'a'

else:
mode = 'w'

with open(self.file, mode) as stream:
self.streamer.write(stream, points)

def eof(self):
Expand Down
12 changes: 9 additions & 3 deletions flume/adapters/streamers/csver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@

class CSV(Streamer):

def __init__(self, headers=True):
def __init__(self,
headers=True,
delimiter=',',
ignore_whitespace=True):
self.writer = None
self.headers = headers
self.delimiter = delimiter
self.ignore_whitespace = ignore_whitespace

def read(self, stream):
# XXX: expose the delimiter, quotechar...etc.
reader = csv.DictReader(stream)
reader = csv.DictReader(stream.readlines(),
delimiter=self.delimiter,
skipinitialspace=self.ignore_whitespace)

for point in reader:
# XXX: buffer N points before pushing out ?
Expand Down
3 changes: 1 addition & 2 deletions flume/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""
flume command line utility
"""
flume command line utility """
import imp
import logging
import os
Expand Down
22 changes: 7 additions & 15 deletions flume/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
"""
import bisect
import logging
import six
import sys
import threading

from flume import util
import six

from flume import moment
from flume import logger
from flume import util

if util.IS_PY2:
from Queue import Empty
Expand All @@ -22,12 +24,10 @@
from queue import Queue as queue
from queue import Empty

from flume import logger
from flume import moment

from flume.exceptions import FlumineException
from flume.point import Point


__SOURCES = {}
__PROCS = {}
__SINKS = {}
Expand Down Expand Up @@ -271,7 +271,6 @@ def run(self):

def execute(self,
wait=True,
debug=False,
loglevel=logger.WARN):

if 'inited' not in self.__dict__ or not self.inited:
Expand All @@ -281,7 +280,7 @@ def execute(self,
# on it shouldn't fail
if not hasattr(self, 'outputs'):
node.init_node(self, outputs=[])

logger.setLogLevel(loglevel)

# XXX: pooling here ?
Expand All @@ -293,8 +292,7 @@ def execute(self,
thread.start()

if self.parent:
self.parent.execute(wait=wait,
debug=debug)
self.parent.execute(wait=wait, loglevel=loglevel)

if wait:
while thread.is_alive():
Expand Down Expand Up @@ -385,7 +383,6 @@ def loop(self):

def execute(self,
wait=True,
debug=False,
loglevel=logging.ERROR):

def find_root(flume):
Expand Down Expand Up @@ -432,19 +429,14 @@ def find_root(flume):
source=source)

forwarder.execute(wait=False,
debug=debug,
loglevel=loglevel)

# start underlying flumes
for flume in self.flumes:
flume.execute(wait=False,
debug=debug,
loglevel=loglevel)

# override default behavior to execute the underlying flume
node.execute(self,
wait=False,
debug=debug,
loglevel=loglevel)


Loading

0 comments on commit 5503e8f

Please sign in to comment.