Skip to content
Permalink
Browse files

producer to query and push stats

  • Loading branch information...
vansika committed Feb 8, 2019
1 parent 44e33f0 commit 517fa37d071401d4d7be67ff926a5c826b708212
@@ -42,3 +42,8 @@ services:
command: sleep 1000000000
volumes:
- ..:/rec:z

networks:
default:
external:
name: listenbrainz_default
No changes.
@@ -0,0 +1,61 @@
import pika
import logging
from listenbrainz_spark import config
import time
import json
import sys

class SparkReader():
def __init__(self):
self.unique_ch = None
self.ERROR_RETRY_DELAY = 3

def connect_to_rabbitmq(self):
"""Creates a RabbitMQ connection object
"""
username = config.RABBITMQ['RABBITMQ_USERNAME']
password = config.RABBITMQ['RABBITMQ_PASSWORD']
host = config.RABBITMQ['RABBITMQ_HOST']
port = config.RABBITMQ['RABBITMQ_PORT']
virtual_host = config.RABBITMQ['RABBITMQ_VHOST']

while True:
try:
credentials = pika.PlainCredentials(username, password)
connection_parameters = pika.ConnectionParameters(
host=host,
port=port,
virtual_host=virtual_host,
credentials=credentials,
)
self.connection = pika.BlockingConnection(connection_parameters)
break
except Exception as err:
error_message = "Cannot connect to RabbitMQ: {error}, retrying in {delay} seconds."
print(error_message.format(error=str(err), delay=self.ERROR_RETRY_DELAY))
time.sleep(self.ERROR_RETRY_DELAY)

def start(self, data):
"""Publish data to RabbitMQ
"""
if "RABBITMQ_HOST" not in config.RABBITMQ:
logging.critical("RabbitMQ service not defined. Sleeping {0} seconds and exiting.".format(self.ERROR_RETRY_DELAY))
time.sleep(self.ERROR_RETRY_DELAY)
sys.exit(-1)

try:
self.connect_to_rabbitmq()
self.unique_ch = self.connection.channel()
self.unique_ch.exchange_declare(exchange=config.RABBITMQ['SPARK_EXCHANGE'], exchange_type='fanout')
self.unique_ch.queue_declare(queue=config.RABBITMQ['SPARK_QUEUE'], durable=True)
self.unique_ch.queue_bind(exchange=config.RABBITMQ['SPARK_EXCHANGE'], queue=config.RABBITMQ['SPARK_QUEUE'])
self.unique_ch.basic_publish(
exchange=config.RABBITMQ['SPARK_EXCHANGE'],
routing_key='',
body=json.dumps(data),
properties=pika.BasicProperties(delivery_mode = 2,),
)
except pika.exceptions.ConnectionClosed as e:
logging.error("Connection to rabbitmq closed while trying to publish: %s" % str(e), exc_info=True)
except Exception as e:
logging.error("Cannot publish to rabbitmq channel: %s / %s" % (type(e).__name__, str(e)), exc_info=True)
@@ -1,3 +1,15 @@
HDFS_HTTP_URI = 'http://hadoop-master:9870' # the URI of the http webclient for HDFS
HDFS = {
'HDFS_HTTP_URI': 'http://hadoop-master:9870', # the URI of the http webclient for HDFS
'HDFS_CLUSTER_URI': 'hdfs://hadoop-master:9000' # the URI to be used with Spark
}

HDFS_CLUSTER_URI = 'hdfs://hadoop-master:9000' # the URI to be used with Spark
RABBITMQ = {
'RABBITMQ_HOST': "rabbitmq",
'RABBITMQ_PORT': 5672,
'RABBITMQ_USERNAME': "guest",
'RABBITMQ_PASSWORD': "guest",
'RABBITMQ_VHOST': "/",
'MAXIMUM_RABBITMQ_CONNECTIONS': 20,
'SPARK_EXCHANGE': "spark",
'SPARK_QUEUE': "spark"
}
@@ -2,11 +2,12 @@
import listenbrainz_spark
import time
from datetime import datetime
from listenbrainz_spark.RabbitMQ.rabbitmq_connection import SparkReader

month = datetime.now().month
year = datetime.now().year

def get_total_listens(user_name, table):
def get_artist_count(user_name, table):
"""
Args:
user_name: name of the user
@@ -17,9 +18,8 @@ def get_total_listens(user_name, table):
"""
t0 = time.time()
query = listenbrainz_spark.sql_context.sql("""
SELECT count(artist_name) as cnt
FROM %s
where user_name = '%s'
SELECT count(*) as cnt
FROM (SELECT DISTINCT artist_name from %s where user_name = '%s')
""" % (table, user_name))
query_t0 = time.time()
query.show()
@@ -58,7 +58,7 @@ def get_artists(user_name, table):
artist['listen_count'] = row.cnt
artist_stats.append(artist)
artist = {}
count = get_total_listens(user_name, table)
count = get_artist_count(user_name, table)
artist['artist_count'] = count
artist['artist_stats'] = artist_stats
return artist
@@ -178,6 +178,7 @@ def main(app_name):
query_t0 = time.time()
print("DataFrame loaded in %.2f s" % (query_t0 - t0))
users = get_users(table)
obj = SparkReader()
stats = []
for user in users:
user_data = {}
@@ -186,5 +187,6 @@ def main(app_name):
user_data[user]['recordings'] = get_recordings(user, table)
user_data[user]['releases'] = get_releases(user, table)
stats.append(user_data)
print (stats)
obj.start(stats)
stats = []

@@ -1,6 +1,7 @@
import sys
from listenbrainz_spark.stats import user


if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: manage.py <app_name>")
@@ -1,3 +1,4 @@
click==6.2
hetznercloud==1.1.1
hdfs == 2.1.0
pika == 0.12.0

0 comments on commit 517fa37

Please sign in to comment.
You can’t perform that action at this time.