Skip to content
Permalink
Browse files

script to consume data from lb-playground

  • Loading branch information...
vansika committed Feb 8, 2019
1 parent d4fbde1 commit 4bd194f4a0e8150bd71d063d581c0ff195b069bd
@@ -43,16 +43,16 @@ def insert_user_stats(user_id, artists, recordings, releases, artist_count):

artist_stats = {
'count': artist_count,
'all_time': artists,
'prev_month': artists,
}

recording_stats = {
'all_time': recordings,
'prev_month': recordings,
}


release_stats = {
'all_time': releases,
'prev_month': releases,
}


@@ -34,10 +34,10 @@ def test_insert_user_stats(self):
)

result = db_stats.get_all_user_stats(user_id=self.user['id'])
self.assertListEqual(result['artist']['all_time'], artists)
self.assertListEqual(result['artist']['prev_month'], artists)
self.assertEqual(result['artist']['count'], 2)
self.assertListEqual(result['release']['all_time'], releases)
self.assertListEqual(result['recording']['all_time'], recordings)
self.assertListEqual(result['release']['prev_month'], releases)
self.assertListEqual(result['recording']['prev_month'], recordings)
self.assertGreater(int(result['last_updated'].strftime('%s')), 0)

def insert_test_data(self):
@@ -71,7 +71,7 @@ def test_get_user_stats(self):
self.assertEqual(data['artist']['count'], 2)

data = db_stats.get_user_stats(self.user['id'], 'recording')
self.assertListEqual(data['recording']['all_time'], data_inserted['user_recordings'])
self.assertListEqual(data['recording']['prev_month'], data_inserted['user_recordings'])

def test_get_user_artists(self):
data_inserted = self.insert_test_data()
@@ -81,10 +81,10 @@ def test_get_user_artists(self):
def test_get_all_user_stats(self):
data_inserted = self.insert_test_data()
result = db_stats.get_all_user_stats(self.user['id'])
self.assertListEqual(result['artist']['all_time'], data_inserted['user_artists'])
self.assertListEqual(result['artist']['prev_month'], data_inserted['user_artists'])
self.assertEqual(result['artist']['count'], 2)
self.assertListEqual(result['release']['all_time'], data_inserted['user_releases'])
self.assertListEqual(result['recording']['all_time'], data_inserted['user_recordings'])
self.assertListEqual(result['release']['prev_month'], data_inserted['user_releases'])
self.assertListEqual(result['recording']['prev_month'], data_inserted['user_recordings'])
self.assertGreater(int(result['last_updated'].strftime('%s')), 0)

def test_valid_stats_exist(self):
No changes.
@@ -0,0 +1,85 @@
import json
import logging
import pika
import time
import ujson

from flask import current_app
from listenbrainz import utils
from listenbrainz.db import user as db_user, stats as db_stats
from listenbrainz.webserver import create_app
from listenbrainz.db.exceptions import DatabaseException
from listenbrainz import config
import sqlalchemy

class SparkReader:
def __init__(self):
self.app = create_app() # creating a flask app for config values and logging to Sentry


def init_rabbitmq_connection(self):
""" Initializes the connection to RabbitMQ.
Note: this is a blocking function which keeps retrying if it fails
to connect to RabbitMQ
"""
self.connection = utils.connect_to_rabbitmq(
username=current_app.config['RABBITMQ_USERNAME'],
password=current_app.config['RABBITMQ_PASSWORD'],
host=current_app.config['RABBITMQ_HOST'],
port=current_app.config['RABBITMQ_PORT'],
virtual_host=current_app.config['RABBITMQ_VHOST'],
error_logger=current_app.logger.error,
)


def callback(self, ch, method, properties, body):
""" Handle the data received from the queue and works accordingly.
"""
data = ujson.loads(body)
user_name = next(iter(data[0]))
user = db_user.get_by_mb_id(user_name)
artists = data[0][user_name]['artists']['artist_stats']
recordings = data[0][user_name]['recordings']
releases = data[0][user_name]['releases']
artist_count = data[0][user_name]['artists']['artist_count']
db_stats.insert_user_stats(user['id'], artists, recordings, releases, artist_count)
print ("data for {} published".format(user_name))

while True:
try:
self.incoming_ch.basic_ack(delivery_tag=method.delivery_tag)
break
except pika.exceptions.ConnectionClosed:
self.init_rabbitmq_connection()


def start(self):
""" initiates RabbitMQ connection and starts consuming from the queue
"""

with self.app.app_context():

while True:
self.init_rabbitmq_connection()
self.incoming_ch = utils.create_channel_to_consume(
connection=self.connection,
exchange=current_app.config['SPARK_EXCHANGE'],
queue=current_app.config['SPARK_QUEUE'],
callback_function=self.callback,
)
current_app.logger.info('Stats calculator started!')
try:
print("consuming")
self.incoming_ch.start_consuming()
except pika.exceptions.ConnectionClosed:
current_app.logger.warning("Connection to rabbitmq closed. Re-opening.")
self.connection = None
continue

self.connection.close()


if __name__ == '__main__':
sr = SparkReader()
sr.start()
@@ -189,7 +189,7 @@ def artists(user_name):
flash.error(msg)
return redirect(url_for('user.profile', user_name=user_name))

top_artists = data['artist']['all_time']
top_artists = data['artist']['prev_month']
return render_template(
"user/artists.html",
user=user,

0 comments on commit 4bd194f

Please sign in to comment.
You can’t perform that action at this time.