Skip to content
This repository has been archived by the owner on Jun 3, 2023. It is now read-only.

Commit

Permalink
Use a fixed pool of worker threads and a queue to process tweets
Browse files Browse the repository at this point in the history
  • Loading branch information
maxbbraun committed Jan 27, 2017
1 parent 9deafb8 commit 788b558
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from os import getenv
from simplejson import loads
from Queue import Queue
from threading import Thread
from tweepy import API
from tweepy import OAuthHandler
Expand Down Expand Up @@ -31,6 +32,8 @@
EMOJI_THUMBS_DOWN = u"\U0001f44e"
EMOJI_SHRUG = u"¯\_(\u30c4)_/¯"

# The number of worker threads processing tweets.
NUM_THREADS = 100

class Twitter:
"""A helper for talking to Twitter APIs."""
Expand Down Expand Up @@ -106,6 +109,34 @@ def __init__(self, callback, logs_to_cloud):
self.logs_to_cloud = logs_to_cloud
self.logs = Logs(name="twitter-listener", to_cloud=self.logs_to_cloud)
self.callback = callback
self.init_queue()

def init_queue(self):
"""Creates a queue and starts the worker threads."""

self.queue = Queue()
for worker_id in range(NUM_THREADS):
worker = Thread(target=self.process_queue, args=[worker_id])
worker.start()

def process_queue(self, worker_id):
"""Continuously processes tasks on the queue."""

# Create a new logs instance (with its own httplib2 instance) so that
# there is a separate one for each thread.
logs = Logs("twitter-listener-worker-%s" % worker_id,
to_cloud=self.logs_to_cloud)

while True:
# The main loop doesn't catch and report exceptions from background
# threads, so do that here.
try:
self.logs.debug("Processing queue of size: %s" %
self.queue.qsize())
data = self.queue.get(block=True)
self.handle_data(logs, data)
except BaseException as exception:
logs.catch(exception)

def on_error(self, status):
"""Handles any API errors."""
Expand All @@ -116,28 +147,13 @@ def on_error(self, status):
return True

def on_data(self, data):
"""Kicks off a new thread to handle data."""
"""Puts a task to process the new data on the queue."""

thread = Thread(target=self.safe_handle_data, args=[data])
thread.start()
self.queue.put(data)

# Don't stop.
return True

def safe_handle_data(self, data):
"""Calls handle_data() in a thread-safe way."""

# Create new logging and error clients (with their own httplib2
# instances) to be used on background threads.
logs = Logs("twitter-listener-background", to_cloud=self.logs_to_cloud)

# The main loop doesn't catch and report exceptions from background
# threads, so do that here.
try:
self.handle_data(logs, data)
except BaseException as exception:
logs.catch(exception)

def handle_data(self, logs, data):
"""Sanity-checks and extracts the data before sending it to the
callback.
Expand Down

0 comments on commit 788b558

Please sign in to comment.