Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 62 additions & 31 deletions src/data_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
TWITTER_BEARER_TOKEN, MONGODB_USERNAME, MONGODB_PASSWORD

from data_collectors import RedditDataCollector, RssDataCollector, TwitterDataCollector
from producer import Producer
from data_streamers import TwitterDataStreamer
from producer import Producer
from helper import build_logging_filepath


Expand All @@ -33,6 +34,8 @@ def get_arguments():
'rss', help='Scrape data from RSS feeds')
rss_parser.add_argument('--base_url', required=True,
help='URL of a RSS feed database where links to relevant RSS feeds can be found')
twitter_parser.add_argument('--stream', action='store_true', default=False, dest='stream',
help='Additional flag for activating tweets streaming')
# reddit parser
subparsers.add_parser(
'reddit', help='Scrape data from reddit')
Expand Down Expand Up @@ -75,7 +78,7 @@ def set_logging_config(args, config):


def get_data_collector_instance(args, config):
"""Get the instance of the data
"""Get the instance of data collector

:param args: arguments of the script
:type args: Namespace
Expand All @@ -93,6 +96,22 @@ def get_data_collector_instance(args, config):
raise NotImplementedError


def get_data_streamer_instance(args, config):
"""Get the instance of data streamer

:param args: arguments of the script
:type args: Namespace
:raises NotImplementedError: no data streamer implemented for given data source
:return: instance of the specific data streamer
:rtype: subclass of BaseDataStreamer
"""
if args.data_source == DATA_SOURCE_TWITTER:
return TwitterDataStreamer(config["Twitter"][CONFIG_TWITTER_CONSUMER_KEY],
config["Twitter"][CONFIG_TWITTER_CONSUMER_SECRET],
config["Twitter"][CONFIG_TWITTER_BEARER_TOKEN])
else:
raise NotImplementedError

def get_job_collection(host, port):
"""Get the MongoDB collection which holds the producer jobs

Expand All @@ -118,35 +137,47 @@ def main():
kafka_host = config[CONFIG_KAFKA][scraper_env][CONFIG_KAFKA_HOST]
kafka_port = config[CONFIG_KAFKA][scraper_env][CONFIG_KAFKA_PORT]
producer = Producer(kafka_host, kafka_port)
data_collector = None
try:
data_collector = get_data_collector_instance(args, config)
except NotImplementedError:
logging.error(
f'Data collection not implemented for data source {args.data_source}')

max_workers = int(config[CONFIG_GENERAL][CONFIG_GENERAL_MAX_WORKERS])
count_successful = count_failed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:

futures = data_collector.get_data_collection_futures(executor=executor)
for future in as_completed(futures):
try:
response = future.result()
if(isinstance(response, str)):
message = response
else:
message = future.result().text
producer.publish(args.data_source, message)
count_successful = count_successful + 1
except RequestException as e:
logging.warning(f'Error in GET-Request: {e}')
count_failed = count_failed + 1
continue
except Exception:
logging.error(traceback.format_exc())
count_failed = count_failed + 1
continue

if args.stream:
data_streamer = None
try:
data_streamer = get_data_streamer_instance(args, config)
except NotImplementedError:
logging.error(
f'Data streaming not implemented for data source {args.data_source}')

data_streamer.stream_into_producer(producer)

else:
data_collector = None
try:
data_collector = get_data_collector_instance(args, config)
except NotImplementedError:
logging.error(
f'Data collection not implemented for data source {args.data_source}')

max_workers = int(config[CONFIG_GENERAL][CONFIG_GENERAL_MAX_WORKERS])
count_successful = count_failed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = data_collector.get_data_collection_futures(executor=executor)

for future in as_completed(futures):
try:
response = future.result()
if(isinstance(response, str)):
message = response
else:
message = future.result().text
producer.publish(args.data_source, message)
count_successful = count_successful + 1
except RequestException as e:
logging.warning(f'Error in GET-Request: {e}')
count_failed = count_failed + 1
continue
except Exception:
logging.error(traceback.format_exc())
count_failed = count_failed + 1
continue

end_time = datetime.now()
mongo_host = config[CONFIG_MONGODB][scraper_env][CONFIG_MONGODB_HOST]
Expand Down
182 changes: 182 additions & 0 deletions src/data_streamers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import abc
import tweepy
import json
import threading
import logging

class BaseDataStreamer(object):
"""Base class for data streamers from different data sources
"""
@abc.abstractmethod
def stream_into_producer(self, producer):
"""Stream data from the data source into a Kafka producer

:param producer: Producer where the streamed data is published to
:type producer: Producer
"""
pass


class TwitterDataStreamer(BaseDataStreamer):
"""Data streamer fetching data from twitter and publishing into
a Kafka producer
"""

def __init__(self, consumer_key, consumer_secret, bearer_token):
"""Data streamer fetching tweets connected to the latest worldwide
Twitter trends and publishing them into a Kafka producer

:param consumer_key: Authentication key
:type consumer_key: str
:param consumer_secret: Authentication secret
:type consumer_key: str
:param bearer_token: Bearer token
:type bearer_token: str
"""
super().__init__()
auth = tweepy.OAuth1UserHandler(consumer_key, consumer_secret)
self._API = tweepy.API(auth) # from Twitter APIv1.1
self.bearer_token = bearer_token
self.stop_thread = False

def schedule(self, client):
"""Thread function which schedules a query of the latest trends
from Twitter API in certain time intervals

:param client: TwitterStreamingClient
:type client: TwitterStreamingClient
"""
while not self.stop_thread:
self.timer.start()
logging.debug('Fetching current trends started')
self.timer.join()
logging.debug('Fetching current trends ended')
self.timer = threading.Timer(interval=30, function=self._get_current_trends, args=(client,))

def _get_current_trends(self, client):
"""Thread function which fetches the latest trends from Twitter API
and modifies the StreamingClient's StreamRules according to the trends

:param client: TwitterStreamingClient
:type client: TwitterStreamingClient
"""
# Trending location: Worldwide (woeid: 1)
trending_location = 1

# Get trends from API
results = self._API.get_place_trends(trending_location)[0]
trends = [trend['name'] for trend in results['trends']]
rules = client.get_rules().data

# Delete old trends from streaming rules
delete_ids = []
for rule in rules:
if rule.tag in trends:
trends.remove(rule.tag)
else:
delete_ids.append(rule.id)
if len(delete_ids) > 0:
client.delete_rules(delete_ids)

# Add all new trends
new_rules = []
for trend in trends:
query = trend + ' -is:retweet -is:reply -is:nullcast lang:en'
new_rules.append(tweepy.StreamRule(value=query, tag=trend))
if len(new_rules) > 0:
client.add_rules(new_rules)

def stream_into_producer(self, producer):
"""Stream data from Twitter into a Kafka producer

:param producer: Producer where the streamed data is published to
:type producer: Producer
"""
client = TwitterStreamingClient(self.bearer_token, producer)
self._get_current_trends(client)

self.timer = threading.Timer(interval=30, function=self._get_current_trends, args=(client,))

thread = threading.Thread(target=self.schedule, args=(client,), daemon=True)
thread.start()

client.filter(tweet_fields=['text', 'created_at', 'lang', 'public_metrics', 'geo'],
user_fields=['username', 'verified', 'public_metrics'],
expansions=['author_id', 'geo.place_id'])

self.stop_thread = True
self.timer.cancel()


class TwitterStreamingClient(tweepy.StreamingClient):
def __init__(self, bearer_token, producer):
"""StreamingClient for Twitter, which processes the streamed tweets
and pushes it into a Kafka producer

:param bearer_token: Bearer token
:type bearer_token: str
:param producer: Producer where the streamed data is published to
:type producer: Producer
"""
super().__init__(bearer_token, wait_on_rate_limit=True)
self.producer = producer
logging.debug('Twitter streaming client started')

def on_response(self, response):
"""Generic callback function for processing a response from Twitter's
Streaming API

:param response: Response object containing a tweet with includes or errors
:type response: tweepy.StreamResponse
"""
tweet = response.data
includes = response.includes
rules = response.matching_rules

result_json = self._process_tweet(tweet, includes, rules)

self.producer.publish('twitter-stream', result_json)

def on_errors(self, errors):
"""Callback function for processing a errors from Twitter's Streaming API

:param errors: Errors object
:type response: dict
"""
logging.error(errors)
self.disconnect()

def _process_tweet(self, tweet, includes, rules):
"""Processing a tweet as it is fetched from the Streaming API

:param tweet: Tweet object
:type tweet: tweepy.tweet.Tweet
:param includes: Additional information to tweet
:type includes: dict
:param rules: Matching StreamRules
:type rules: List[tweepy.StreamRules]
:return: Json-stringified tweet
:rtype: str
"""
author = {}
if 'users' in includes and tweet.author_id in includes['users']:
user = includes['users'][tweet.author_id]
author = {'username': user.username,
'verified': user.verified,
'num_followers': user.public_metrics['followers_count']}
place = ''
if 'places' in includes and tweet.geo and tweet.geo['place_id'] in includes['places']:
place = includes['places'][tweet.geo['place_id']].full_name

# Build results dict
result = {}
result['tweet_id'] = tweet.id
result['text'] = tweet.text
result['created_at'] = str(tweet.created_at)
result['metrics'] = tweet.public_metrics
result['author'] = author
if place != '':
result['place'] = place
result['trend'] = rules[0].tag

return json.dumps([result])