Skip to content

Commit

Permalink
cleanup (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
svenkreiss committed May 13, 2017
1 parent 9ea7ff1 commit 7a386ae
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 22 deletions.
32 changes: 16 additions & 16 deletions pysparkling/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,6 @@ def remember(self, duration):
"""Provided for compatibility, but does nothing here."""
pass

def start(self):
"""Start processing streams."""

def cb():
time_ = time.time()
log.debug('Step {}'.format(time_))

# run a step on all streams
for d in self._dstreams:
d._step(time_)

self._pcb = PeriodicCallback(cb, self.batch_duration * 1000.0)
self._pcb.start()
self._on_stop_cb.append(self._pcb.stop)
StreamingContext._activeContext = self

def socketBinaryStream(self, hostname, port, length):
"""Create a TCP socket server for binary input.
Expand Down Expand Up @@ -203,6 +187,22 @@ def socketTextStream(self, hostname, port):
self._on_stop_cb.append(tcp_text_stream.stop)
return DStream(tcp_text_stream, self, deserializer)

def start(self):
"""Start processing streams."""

def cb():
time_ = time.time()
log.debug('Step {}'.format(time_))

# run a step on all streams
for d in self._dstreams:
d._step(time_)

self._pcb = PeriodicCallback(cb, self.batch_duration * 1000.0)
self._pcb.start()
self._on_stop_cb.append(self._pcb.stop)
StreamingContext._activeContext = self

def stop(self, stopSparkContext=True, stopGraceFully=False):
"""Stop processing streams.
Expand Down
6 changes: 0 additions & 6 deletions pysparkling/streaming/tcpstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ def __call__(self, data):

class TCPTextStream(TCPServer):
def __init__(self, delimiter=b'\n'):
if TCPServer is False:
log.error('Run \'pip install tornado\' to use TCPStream.')

super(TCPTextStream, self).__init__()
self.delimiter = delimiter
self.buffer = []
Expand Down Expand Up @@ -58,9 +55,6 @@ class TCPBinaryStream(TCPServer):
"""

def __init__(self, length=None):
if TCPServer is False:
log.error('Run \'pip install tornado\' to use TCPStream.')

super(TCPBinaryStream, self).__init__()
self.length = length
self.buffer = []
Expand Down

0 comments on commit 7a386ae

Please sign in to comment.