Skip to content

Commit

Permalink
Merge pull request #2870 from metabrainz/release-colors
Browse files Browse the repository at this point in the history
Fix release colors update
  • Loading branch information
MonkeyDo committed May 10, 2024
2 parents d10c144 + 0b0e0fd commit 6f088c7
Showing 1 changed file with 112 additions and 117 deletions.
229 changes: 112 additions & 117 deletions mbid_mapping/mapping/release_colors.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import datetime
import os
import re
import subprocess
from time import sleep
from threading import Thread, get_ident

import psycopg2
from psycopg2.errors import OperationalError
from psycopg2.extensions import register_adapter
import requests

Expand All @@ -15,7 +12,6 @@
from mapping.cube import Cube, adapt_cube
from mapping.utils import log


register_adapter(Cube, adapt_cube)

# max number of threads to use -- with 2 we don't need to worry about rate limiting.
Expand All @@ -33,8 +29,6 @@ def process_image(filename, mime_type):
and return the (reg, green, blue) tuple """

with open(filename, "rb") as raw:
proc = subprocess.Popen(["file", filename], stdout=subprocess.PIPE)
tmp = proc.communicate(raw.read())
proc = subprocess.Popen(["jpegtopnm", filename],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
tmp = proc.communicate(raw.read())
Expand All @@ -45,10 +39,10 @@ def process_image(filename, mime_type):

lines = out[0].split(b"\n", 3)
if lines[0].startswith(b"P6"): # PPM
return (lines[3][0], lines[3][1], lines[3][2])
return lines[3][0], lines[3][1], lines[3][2]

if lines[0].startswith(b"P5"): # PGM
return (lines[3][0], lines[3][0], lines[3][0])
return lines[3][0], lines[3][0], lines[3][0]

raise RuntimeError

Expand All @@ -75,10 +69,11 @@ def process_row(row):
""" Process one CAA query row, by fetching the 250px thumbnail,
process the color, then import into the DB """

sleep_duation = 2
sleep_duration = 2
while True:
headers = {
'User-Agent': 'ListenBrainz HueSound Color Bot ( rob@metabrainz.org )'}
'User-Agent': 'ListenBrainz HueSound Color Bot ( rob@metabrainz.org )'
}
release_mbid, caa_id = row["release_mbid"], row["caa_id"]
url = f"https://archive.org/download/mbid-{release_mbid}/mbid-{release_mbid}-{caa_id}_thumb250.jpg"
r = requests.get(url, headers=headers)
Expand All @@ -90,16 +85,14 @@ def process_row(row):

try:
red, green, blue = process_image(filename, row["mime_type"])
insert_row(row["release_mbid"], red,
green, blue, row["caa_id"])
insert_row(row["release_mbid"], red, green, blue, row["caa_id"])
log("%s %s: (%s, %s, %s)" %
(row["caa_id"], row["release_mbid"], red, green, blue))
(row["caa_id"], row["release_mbid"], red, green, blue))
except Exception as err:
log("Could not process %s" % url)
log(err)

os.unlink(filename)
sleep_duation = 2
break

if r.status_code == 403:
Expand Down Expand Up @@ -129,13 +122,12 @@ def process_row(row):
break


def delete_from_lb(caa_id):
def delete_from_lb(lb_conn, caa_id):
""" Delete a piece of coverart from the release_color table. """

with psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn:
with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:
lb_curs.execute(
"""DELETE FROM release_color WHERE caa_id = %s """, (caa_id,))
with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:
lb_curs.execute("""DELETE FROM release_color WHERE caa_id = %s """, (caa_id,))
lb_conn.commit()


def process_cover_art(threads, row):
Expand Down Expand Up @@ -190,19 +182,19 @@ def get_cover_art_counts(mb_curs, lb_curs):
def get_last_updated_from_caa():
""" Fetch the last_updated (last date_updated) value from the CAA table """

with psycopg2.connect(config.MBID_MAPPING_DATABASE_URI) as mb_conn:
with mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs:
mb_curs.execute("""SELECT max(date_uploaded) AS date_uploaded
FROM cover_art_archive.cover_art""")
last_updated = None
row = mb_curs.fetchone()
if row:
try:
last_updated = row["date_uploaded"]
except ValueError:
pass
with psycopg2.connect(config.MB_DATABASE_STANDBY_URI) as mb_conn, \
mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs:
mb_curs.execute("""SELECT max(date_uploaded) AS date_uploaded
FROM cover_art_archive.cover_art""")
last_updated = None
row = mb_curs.fetchone()
if row:
try:
last_updated = row["date_uploaded"]
except ValueError:
pass

return last_updated
return last_updated


def sync_release_color_table():
Expand Down Expand Up @@ -286,89 +278,92 @@ def compare_coverart(mb_query, lb_query, mb_caa_index, lb_caa_index, mb_compare_
the corresponding compare key. The starting indexes (the current comparison index
into the data) must be provided and match the type of the comparison keys. """

with psycopg2.connect(config.MBID_MAPPING_DATABASE_URI) as mb_conn:
with mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs:
with psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn:
with lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d\n LB count: %d" % (mb_count, lb_count))

threads = []
mb_row = None
lb_row = None

mb_rows = []
lb_rows = []

mb_done = False
lb_done = True if lb_query is None else False

extra = 0
missing = 0
processed = 0

while True:
if len(mb_rows) == 0 and not mb_done:
mb_curs.execute(
mb_query, (mb_caa_index, SYNC_BATCH_SIZE))
mb_rows = mb_curs.fetchall()
if len(mb_rows) == 0:
mb_done = True

if len(lb_rows) == 0 and not lb_done:
lb_curs.execute(
lb_query, (lb_caa_index, SYNC_BATCH_SIZE))
lb_rows = lb_curs.fetchall()
if len(lb_rows) == 0:
lb_done = True

if not mb_row and len(mb_rows) > 0:
mb_row = mb_rows.pop(0)

if not lb_row and len(lb_rows) > 0:
lb_row = lb_rows.pop(0)

if not lb_row and not mb_row:
break

processed += 1
if processed % 100000 == 0:
log("processed %d of %d: missing %d extra %d" %
(processed, mb_count, missing, extra))

# If the item is in MB, but not in LB, add to LB
if lb_row is None or mb_row[mb_compare_key] < lb_row[lb_compare_key]:
process_cover_art(threads, mb_row)
missing += 1
mb_caa_index = mb_row[mb_compare_key]
mb_row = None
continue

# If the item is in LB, but not in MB, remove from LB
if mb_row is None or mb_row[mb_compare_key] > lb_row[lb_compare_key]:
extra += 1
delete_from_lb(lb_row[lb_compare_key])
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
continue

# If the caa_id is present in both, skip both
if mb_row[mb_compare_key] == lb_row[lb_compare_key]:
mb_caa_index = mb_row[mb_compare_key]
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
mb_row = None
continue

assert False

join_threads(threads)
log( "Finished! added/skipped %d removed %d from release_color" % (missing, extra))

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d\n LB count: %d" % (mb_count, lb_count))

metrics.init("listenbrainz")
metrics.set("listenbrainz-caa-mapper",
caa_front_count=mb_count, lb_caa_count=lb_count)
with psycopg2.connect(config.MB_DATABASE_STANDBY_URI) as mb_conn, \
mb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as mb_curs, \
psycopg2.connect(config.SQLALCHEMY_DATABASE_URI) as lb_conn, \
lb_conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as lb_curs:

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d" % (mb_count,))
log("LB count: %d" % (lb_count,))

threads = []
mb_row = None
lb_row = None

mb_rows = []
lb_rows = []

mb_done = False
lb_done = True if lb_query is None else False

extra = 0
missing = 0
processed = 0

while True:
if len(mb_rows) == 0 and not mb_done:
mb_curs.execute(mb_query, (mb_caa_index, SYNC_BATCH_SIZE))
mb_rows = mb_curs.fetchall()
if len(mb_rows) == 0:
mb_done = True

if len(lb_rows) == 0 and not lb_done:
lb_curs.execute(lb_query, (lb_caa_index, SYNC_BATCH_SIZE))
lb_rows = lb_curs.fetchall()
if len(lb_rows) == 0:
lb_done = True

if not mb_row and len(mb_rows) > 0:
mb_row = mb_rows.pop(0)

if not lb_row and len(lb_rows) > 0:
lb_row = lb_rows.pop(0)

if not lb_row and not mb_row:
break

processed += 1
if processed % 100000 == 0:
log("processed %d of %d: missing %d extra %d" % (processed, mb_count, missing, extra))

# If the item is in MB, but not in LB, add to LB
if lb_row is None or mb_row[mb_compare_key] < lb_row[lb_compare_key]:
process_cover_art(threads, mb_row)
missing += 1
mb_caa_index = mb_row[mb_compare_key]
mb_row = None
continue

# If the item is in LB, but not in MB, remove from LB
if mb_row is None or mb_row[mb_compare_key] > lb_row[lb_compare_key]:
extra += 1
delete_from_lb(lb_conn, lb_row[lb_compare_key])
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
continue

# If the caa_id is present in both, skip both
if mb_row[mb_compare_key] == lb_row[lb_compare_key]:
mb_caa_index = mb_row[mb_compare_key]
lb_caa_index = lb_row[lb_compare_key]
lb_row = None
mb_row = None
continue

assert False

join_threads(threads)
log("Finished! added/skipped %d removed %d from release_color" % (missing, extra))

mb_count, lb_count = get_cover_art_counts(mb_curs, lb_curs)
log("CAA count: %d" % (mb_count,))
log("LB count: %d" % (lb_count,))

metrics.init("listenbrainz")
metrics.set(
"listenbrainz-caa-mapper",
caa_front_count=mb_count,
lb_caa_count=lb_count
)

0 comments on commit 6f088c7

Please sign in to comment.