Skip to content
Permalink
Browse files

catch exceptions wherever possible

  • Loading branch information...
vansika committed Jun 3, 2019
1 parent 392f5e5 commit 9404b5d98509456b5453aa3f0e8a69c8d51bc977
Showing with 37 additions and 18 deletions.
  1. +33 −17 listenbrainz_spark/stats/user.py
  2. +4 −1 listenbrainz_spark/stats_writer/stats_writer.py
@@ -170,33 +170,46 @@ def main(app_name):
df.printSchema()

table = 'listens_{}'.format(datetime.strftime(date, '%Y_%m'))
print(table)
df.registerTempTable(table)
print("Running Query...")
try:
df.registerTempTable(table)
except Exception as err:
logging.error("Cannot register dataframe: %s / %s. Aborting..." % (type(err).__name__, str(err)))
sys.exit(-1)
query_t0 = time.time()
print("DataFrame loaded in %.2f s" % (query_t0 - t0))
print("DataFrame loaded and registered in %.2f s" % (query_t0 - t0))

data = defaultdict(dict)
#data : Nested dict which can be depicted as:
#{'user1' : {'artist' : {artists dict returned by func get_artists},
#'recordings' : {recordings dict returned by func get_recordings},
#'releases': {releases dict returned by func get_releasess}, 'yearmonth' :
#'date when the stats were calculated'}, 'user2' : {...}}
try:
artist_data = get_artists(table)
for user_name, artist_stats in artist_data.items():
data[user_name]['artists'] = {
'artist_stats': artist_stats,
'artist_count': len(artist_stats),
}
except Exception as err:
logging.error("Problem in parsing artist data: %s / %s. Aborting..." % (type(err).__name__, str(err)))
sys.exit(-1)

artist_data = get_artists(table)
for user_name, artist_stats in artist_data.items():
data[user_name]['artists'] = {
'artist_stats': artist_stats,
'artist_count': len(artist_stats),
}

recording_data = get_recordings(table)
for user_name, recording_stats in recording_data.items():
data[user_name]['recordings'] = recording_stats
try:
recording_data = get_recordings(table)
for user_name, recording_stats in recording_data.items():
data[user_name]['recordings'] = recording_stats
except Exception as err:
logging.error("Problem in parsing recording data: %s / %s. Aborting..." % (type(err).__name__, str(err)))
sys.exit(-1)

release_data = get_releases(table)
for user_name, release_stats in release_data.items():
data[user_name]['releases'] = release_stats
try:
release_data = get_releases(table)
for user_name, release_stats in release_data.items():
data[user_name]['releases'] = release_stats
except Exception as err:
logging.error("Problem in parsing release data: %s / %s. Aborting..." % (type(err).__name__, str(err)))
sys.exit(-1)

rabbbitmq_conn_obj = StatsWriter()
yearmonth = datetime.strftime(date, '%Y-%m')
@@ -209,6 +222,9 @@ def main(app_name):
try:
rabbbitmq_conn_obj.start(rabbitmq_data)
print("Statistics of %s pushed to rabbitmq" % (user_name))
except pika.exceptions.ConnectionClosed:
logging.error("Connection to rabbitmq closed while trying to publish: Statistics of %s not published" % (user_name))
continue
except Exception as err:
logging.error("Cannot publish statistics of %s to rabbitmq channel: %s / %s." % (user_name, type(err).__name__, str(err)), exc_info=True)
continue
@@ -30,6 +30,9 @@ def connect_to_rabbitmq(self):
)
self.connection = pika.BlockingConnection(connection_parameters)
break
except pika.exceptions.AMQPChannelError as err:
logging.error("Caught a channel error: %s / %s, stopping..." % (type(err).__name__, str(err)))
break
except Exception as err:
error_message = "Cannot connect to RabbitMQ: {error}, retrying in {delay} seconds."
print(error_message.format(error=str(err), delay=self.ERROR_RETRY_DELAY))
@@ -72,4 +75,4 @@ def start(self, data):
self.unique_ch = self.connection.channel()
self.unique_ch.exchange_declare(exchange=config.SPARK_EXCHANGE, exchange_type='fanout')
self.unique_ch.queue_declare(queue=config.SPARK_QUEUE, durable=True)
self.unique_ch.queue_bind(exchange=config.SPARK_EXCHANGE, queue=config.SPARK_QUEUE)
self.unique_ch.queue_bind(exchange=config.SPARK_EXCHANGE, queue=config.SPARK_QUEUE)

0 comments on commit 9404b5d

Please sign in to comment.
You can’t perform that action at this time.