Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LB-452: Add script to replay user listens to fix bad data #580

Merged
merged 7 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
24 changes: 24 additions & 0 deletions listenbrainz/listen_replay/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import click


from influxdb.exceptions import InfluxDBClientError, InfluxDBServerError
from listenbrainz.listen_replay.demo import DemoUserReplayer


cli = click.Group()


@cli.command(name="demo_user_replay")
@click.argument("user_name", type=str)
def demo_user_replay(user_name):
replayer = DemoUserReplayer(user_name)
try:
replayer.start()
except (InfluxDBClientError, InfluxDBServerError) as e:
replayer.app.logger.error("Influx error while replaying listens: %s", str(e), exc_info=True)
raise
except Exception as e:
replayer.app.logger.error("Error while replaying listens: %s", str(e), exc_info=True)
raise

# add more commands here as needed
7 changes: 7 additions & 0 deletions listenbrainz/listen_replay/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from listenbrainz.listen_replay.replay_user import UserReplayer


class DemoUserReplayer(UserReplayer):

def filter(self, row):
return row
101 changes: 101 additions & 0 deletions listenbrainz/listen_replay/replay_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
""" This script should be used to replay user listens to fix bad data.
"""
import abc
import json
import uuid

from datetime import datetime
from flask import current_app
from listenbrainz.utils import get_measurement_name, quote, convert_to_unix_timestamp
from listenbrainz.webserver import create_app
from listenbrainz.webserver.influx_connection import init_influx_connection
from listenbrainz.listenstore.influx_listenstore import DUMP_CHUNK_SIZE


class UserReplayer(abc.ABC):
def __init__(self, user_name):
self.user_name = user_name
self.max_time = datetime.now()
self.app = create_app()


@abc.abstractmethod
def filter(self, row):
""" Modify row as needed by the subclass

Returns:
row (dict): modified row as needed
or None if row needs to be removed
"""
pass

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember reading some PEP8 / flake8 rules about not using passes for empty functions / classes with docstrings but I can't find it now. I'll add the pass.


def convert_to_influx_insert_format(self, row, measurement):
data = {
'measurement': measurement,
'time': convert_to_unix_timestamp(row['time']),
}

data['fields'] = row
data['fields'].pop('time')

try:
dedup_tag = data['fields'].pop('dedup_tag')
data['tags'] = {'dedup_tag': dedup_tag}
except KeyError:
pass # no dedup tag, don't need to do anything

return data


def copy_measurement(self, src, dest, apply_filter=False):
done = False
offset = 0
while True:
result = self.ls.get_listens_batch_for_dump(src, self.max_time, offset)
rows = []
count = 0
for row in result.get_points(get_measurement_name(src)):
count += 1
if apply_filter:
row = self.filter_function(row)
if row:
rows.append(self.convert_to_influx_insert_format(row, quote(dest)))
self.ls.write_points_to_db(rows)
offset += DUMP_CHUNK_SIZE
if count == 0:
break


def start(self):
with self.app.app_context():
current_app.logger.info("Connecting to Influx...")
self.ls = init_influx_connection(current_app.logger, {
'REDIS_HOST': current_app.config['REDIS_HOST'],
'REDIS_PORT': current_app.config['REDIS_PORT'],
'REDIS_NAMESPACE': current_app.config['REDIS_NAMESPACE'],
'INFLUX_HOST': current_app.config['INFLUX_HOST'],
'INFLUX_PORT': current_app.config['INFLUX_PORT'],
'INFLUX_DB_NAME': current_app.config['INFLUX_DB_NAME'],
})
current_app.logger.info("Done!")

new_measurement_name = str(uuid.uuid4())
current_app.logger.info("Temporary destination measurement: %s", new_measurement_name)

current_app.logger.info("Copying listens from %s to temporary measurement...", self.user_name)
self.copy_measurement(src=self.user_name, dest=new_measurement_name, apply_filter=True)
current_app.logger.info("Done!")


current_app.logger.info("Removing user measurement...")
self.ls.delete(self.user_name)
current_app.logger.info("Done!")

current_app.logger.info("Copying listens back from temporary measurement to %s...", self.user_name)
self.copy_measurement(src=new_measurement_name, dest=self.user_name, apply_filter=False)
current_app.logger.info("Done!")

current_app.logger.info("Removing temporary measurement...")
self.ls.delete(new_measurement_name)
current_app.logger.info("Done!")
2 changes: 2 additions & 0 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ def init_influx():
cli.add_command(populate.cli, name="stats")
import listenbrainz.db.dump_manager as dump_manager
cli.add_command(dump_manager.cli, name="dump")
import listenbrainz.listen_replay.cli as listen_replay
cli.add_command(listen_replay.cli, name="replay")


if __name__ == '__main__':
Expand Down