From 0b0e0fd6d48caff9b398b8883ba1221a5ef76fd9 Mon Sep 17 00:00:00 2001 From: Kartik Ohri Date: Fri, 10 May 2024 09:19:01 +0530 Subject: [PATCH] Fix release colors update The mapping database in the config is an outdated replica of MB db that hasn't been updated in 6 months probably because the dumps DB was moved to a new node, but we still have the old coordinates hardcoded. Change it to use MB standby instead. The process_row function had a bug in downloading and processing the images. Mostly stylistic changes other than that. --- mbid_mapping/mapping/release_colors.py | 229 ++++++++++++------------- 1 file changed, 112 insertions(+), 117 deletions(-) diff --git a/mbid_mapping/mapping/release_colors.py b/mbid_mapping/mapping/release_colors.py index f28e73c041..a650f53010 100755 --- a/mbid_mapping/mapping/release_colors.py +++ b/mbid_mapping/mapping/release_colors.py @@ -1,12 +1,9 @@ -import datetime import os -import re import subprocess from time import sleep from threading import Thread, get_ident import psycopg2 -from psycopg2.errors import OperationalError from psycopg2.extensions import register_adapter import requests @@ -15,7 +12,6 @@ from mapping.cube import Cube, adapt_cube from mapping.utils import log - register_adapter(Cube, adapt_cube) # max number of threads to use -- with 2 we don't need to worry about rate limiting. @@ -33,8 +29,6 @@ def process_image(filename, mime_type): and return the (reg, green, blue) tuple """ with open(filename, "rb") as raw: - proc = subprocess.Popen(["file", filename], stdout=subprocess.PIPE) - tmp = proc.communicate(raw.read()) proc = subprocess.Popen(["jpegtopnm", filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE) tmp = proc.communicate(raw.read()) @@ -45,10 +39,10 @@ def process_image(filename, mime_type): lines = out[0].split(b"\n", 3) if lines[0].startswith(b"P6"): # PPM - return (lines[3][0], lines[3][1], lines[3][2]) + return lines[3][0], lines[3][1], lines[3][2] if lines[0].startswith(b"P5"): # PGM - return (lines[3][0], lines[3][0], lines[3][0]) + return lines[3][0], lines[3][0], lines[3][0] raise RuntimeError @@ -75,10 +69,11 @@ def process_row(row): """ Process one CAA query row, by fetching the 250px thumbnail, process the color, then import into the DB """ - sleep_duation = 2 + sleep_duration = 2 while True: headers = { - 'User-Agent': 'ListenBrainz HueSound Color Bot ( rob@metabrainz.org )'} + 'User-Agent': 'ListenBrainz HueSound Color Bot ( rob@metabrainz.org )' + } release_mbid, caa_id = row["release_mbid"], row["caa_id"] url = f"https://archive.org/download/mbid-{release_mbid}/mbid-{release_mbid}-{caa_id}_thumb250.jpg" r = requests.get(url, headers=headers) @@ -90,16 +85,14 @@ def process_row(row): try: red, green, blue = process_image(filename, row["mime_type"]) - insert_row(row["release_mbid"], red, - green, blue, row["caa_id"]) + insert_row(row["release_mbid"], red, green, blue, row["caa_id"]) log("%s %s: (%s, %s, %s)" % - (row["caa_id"], row["release_mbid"], red, green, blue)) + (row["caa_id"], row["release_mbid"], red, green, blue)) except Exception as err: log("Could not process %s" % url) log(err) os.unlink(filename) - sleep_duation = 2 break if r.status_code == 403: @@ -129,13 +122,12 @@ def process_row(row): break -def delete_from_lb(caa_id): +def delete_from_lb(lb_conn, caa_id): """ Delete a piece of coverart from the release_color table. """ - with psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn: - with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs: - lb_curs.execute( - """DELETE FROM release_color WHERE caa_id = %s """, (caa_id,)) + with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs: + lb_curs.execute("""DELETE FROM release_color WHERE caa_id = %s """, (caa_id,)) + lb_conn.commit() def process_cover_art(threads, row): @@ -190,19 +182,19 @@ def get_cover_art_counts(mb_curs, lb_curs): def get_last_updated_from_caa(): """ Fetch the last_updated (last date_updated) value from the CAA table """ - with psycopg2.connect(config.MBID_MAPPING_DATABASE_URI) as mb_conn: - with mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs: - mb_curs.execute("""SELECT max(date_uploaded) AS date_uploaded - FROM cover_art_archive.cover_art""") - last_updated = None - row = mb_curs.fetchone() - if row: - try: - last_updated = row["date_uploaded"] - except ValueError: - pass + with psycopg2.connect(config.MB_DATABASE_STANDBY_URI) as mb_conn, \ + mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs: + mb_curs.execute("""SELECT max(date_uploaded) AS date_uploaded + FROM cover_art_archive.cover_art""") + last_updated = None + row = mb_curs.fetchone() + if row: + try: + last_updated = row["date_uploaded"] + except ValueError: + pass - return last_updated + return last_updated def sync_release_color_table(): @@ -286,89 +278,92 @@ def compare_coverart(mb_query, lb_query, mb_caa_index, lb_caa_index, mb_compare_ the corresponding compare key. The starting indexes (the current comparison index into the data) must be provided and match the type of the comparison keys. """ - with psycopg2.connect(config.MBID_MAPPING_DATABASE_URI) as mb_conn: - with mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs: - with psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn: - with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs: - - mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs) - log("CAA count: %d\n LB count: %d" % (mb_count, lb_count)) - - threads = [] - mb_row = None - lb_row = None - - mb_rows = [] - lb_rows = [] - - mb_done = False - lb_done = True if lb_query is None else False - - extra = 0 - missing = 0 - processed = 0 - - while True: - if len(mb_rows) == 0 and not mb_done: - mb_curs.execute( - mb_query, (mb_caa_index, SYNC_BATCH_SIZE)) - mb_rows = mb_curs.fetchall() - if len(mb_rows) == 0: - mb_done = True - - if len(lb_rows) == 0 and not lb_done: - lb_curs.execute( - lb_query, (lb_caa_index, SYNC_BATCH_SIZE)) - lb_rows = lb_curs.fetchall() - if len(lb_rows) == 0: - lb_done = True - - if not mb_row and len(mb_rows) > 0: - mb_row = mb_rows.pop(0) - - if not lb_row and len(lb_rows) > 0: - lb_row = lb_rows.pop(0) - - if not lb_row and not mb_row: - break - - processed += 1 - if processed % 100000 == 0: - log("processed %d of %d: missing %d extra %d" % - (processed, mb_count, missing, extra)) - - # If the item is in MB, but not in LB, add to LB - if lb_row is None or mb_row[mb_compare_key] < lb_row[lb_compare_key]: - process_cover_art(threads, mb_row) - missing += 1 - mb_caa_index = mb_row[mb_compare_key] - mb_row = None - continue - - # If the item is in LB, but not in MB, remove from LB - if mb_row is None or mb_row[mb_compare_key] > lb_row[lb_compare_key]: - extra += 1 - delete_from_lb(lb_row[lb_compare_key]) - lb_caa_index = lb_row[lb_compare_key] - lb_row = None - continue - - # If the caa_id is present in both, skip both - if mb_row[mb_compare_key] == lb_row[lb_compare_key]: - mb_caa_index = mb_row[mb_compare_key] - lb_caa_index = lb_row[lb_compare_key] - lb_row = None - mb_row = None - continue - - assert False - - join_threads(threads) - log( "Finished! added/skipped %d removed %d from release_color" % (missing, extra)) - - mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs) - log("CAA count: %d\n LB count: %d" % (mb_count, lb_count)) - - metrics.init("listenbrainz") - metrics.set("listenbrainz-caa-mapper", - caa_front_count=mb_count, lb_caa_count=lb_count) + with psycopg2.connect(config.MB_DATABASE_STANDBY_URI) as mb_conn, \ + mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs, \ + psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn, \ + lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs: + + mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs) + log("CAA count: %d" % (mb_count,)) + log("LB count: %d" % (lb_count,)) + + threads = [] + mb_row = None + lb_row = None + + mb_rows = [] + lb_rows = [] + + mb_done = False + lb_done = True if lb_query is None else False + + extra = 0 + missing = 0 + processed = 0 + + while True: + if len(mb_rows) == 0 and not mb_done: + mb_curs.execute(mb_query, (mb_caa_index, SYNC_BATCH_SIZE)) + mb_rows = mb_curs.fetchall() + if len(mb_rows) == 0: + mb_done = True + + if len(lb_rows) == 0 and not lb_done: + lb_curs.execute(lb_query, (lb_caa_index, SYNC_BATCH_SIZE)) + lb_rows = lb_curs.fetchall() + if len(lb_rows) == 0: + lb_done = True + + if not mb_row and len(mb_rows) > 0: + mb_row = mb_rows.pop(0) + + if not lb_row and len(lb_rows) > 0: + lb_row = lb_rows.pop(0) + + if not lb_row and not mb_row: + break + + processed += 1 + if processed % 100000 == 0: + log("processed %d of %d: missing %d extra %d" % (processed, mb_count, missing, extra)) + + # If the item is in MB, but not in LB, add to LB + if lb_row is None or mb_row[mb_compare_key] < lb_row[lb_compare_key]: + process_cover_art(threads, mb_row) + missing += 1 + mb_caa_index = mb_row[mb_compare_key] + mb_row = None + continue + + # If the item is in LB, but not in MB, remove from LB + if mb_row is None or mb_row[mb_compare_key] > lb_row[lb_compare_key]: + extra += 1 + delete_from_lb(lb_conn, lb_row[lb_compare_key]) + lb_caa_index = lb_row[lb_compare_key] + lb_row = None + continue + + # If the caa_id is present in both, skip both + if mb_row[mb_compare_key] == lb_row[lb_compare_key]: + mb_caa_index = mb_row[mb_compare_key] + lb_caa_index = lb_row[lb_compare_key] + lb_row = None + mb_row = None + continue + + assert False + + join_threads(threads) + log("Finished! added/skipped %d removed %d from release_color" % (missing, extra)) + + mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs) + log("CAA count: %d" % (mb_count,)) + log("LB count: %d" % (lb_count,)) + + metrics.init("listenbrainz") + metrics.set( + "listenbrainz-caa-mapper", + caa_front_count=mb_count, + lb_caa_count=lb_count + ) +