In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from downcfg import USHER_CFG

MEDIA_DIR = USHER_CFG.dest_dir 

In [None]:
from db_managers import MetadataManager
mm = MetadataManager(MEDIA_DIR)
mm.df

In [None]:
extensions = mm.df.index.str.lower().map(lambda f: f.rsplit('.', maxsplit=1)[1]).value_counts()
extensions.plot.bar(title="file extensions")

In [None]:
mm.df.plot(subplots=True, figsize=(14,12))

In [None]:
toptags = mm.df['tags'].str.split().explode().value_counts()
toptags[:60].plot.bar(figsize=(12,4), title="tag frequency")

In [None]:
topawards = mm.df['awards'].str.split().explode().value_counts()
topawards[:60].plot.bar(figsize=(12,4), title="award frequency")

In [None]:
if False:
    with open("./tags_vocab.py", "w") as f:
        data = "VOCAB = [\n" + ''.join([f'  "{t}",\n' for t in toptags.index]) + "]"
        f.write(data)

In [None]:
mm.df.hist(column=['Glicko_pts', 'Glicko_rd', 'ELO_pts', 'stars', 'nmatches'], bins=100, figsize=(20,12))

In [None]:
mem_parasytes = []
for _, row in mm.df.iterrows():
    f = row.name
    fullname = os.path.join(MEDIA_DIR, f)
    assert os.path.exists(fullname), f
    filesize = os.path.getsize(fullname) / 1024**2
    ratio = row['stars']/filesize
    if ratio<0.1 and row['stars']<2.4:
        mem_parasytes.append((filesize, ratio, row['stars'], f))
mem_parasytes.sort(reverse=True)
totalmb = 0
for m, r, s, n in mem_parasytes[:30]:
    print(f"{m:6.1f}mb  {r:8.3f}     {s:4.2f} {n}")
    totalmb += m
print(f"------\n {totalmb:6.1f}mb")


In [None]:
from helpers import start_file
from send2trash import send2trash
import shutil

HANDLE_PARASYTES = 0
for *_, fname in mem_parasytes[:HANDLE_PARASYTES]:
    fullname = os.path.join(MEDIA_DIR, fname)
    assert os.path.exists(fullname), fname
    start_file(fullname)    
    while True:
        usr_action = input("what do you want to do? [s,skip/b,buffer/t,todo/r,rm,remove]:").lower()
        if usr_action in ["s", "skip"]:
            break
        elif usr_action in ["b", "buffer"]:
            mm.delete(fname)
            shutil.move(fullname, USHER_CFG.buffer_dir)
            break
        elif usr_action in ["t", "todo"]:
            mm.delete(fname)
            shutil.move(fullname, os.path.join(USHER_CFG.buffer_dir, "todo/"))
            break
        elif usr_action in ["r", "rm", "remove"]:
            mm.delete(fname)
            send2trash(fullname)
            break
        else:
            print("unknown command", usr_action, "try again")
            continue

## Health checks

Metadata and file checks

In [None]:
import unittest
from tags_vocab import VOCAB
from metadata import ManualMetadata, get_metadata


class TestMeidaItem(unittest.TestCase):
    def __init__(self, row):
        super().__init__()
        self.row = row

    def setUp(self):
        fullname = os.path.join(MEDIA_DIR, self.row.name)
        self.assertTrue(os.path.exists(fullname))
        self.disk_meta = get_metadata(fullname)
    
    def test_row(self):
        self.assertFalse(any(self.row.isna()), self.row)
        df_meta = ManualMetadata.from_str(self.row['tags'], int(self.row['stars']), self.row['awards'])
        self.assertEqual(df_meta, self.disk_meta)
    
    def test_disk_meta(self):
        self.assertTrue(self.disk_meta.tags)
        self.assertTrue(self.disk_meta.stars >= 0)
        self.assertFalse([t for t in self.disk_meta.tags if t not in VOCAB])
        self.assertNotEqual(len([t for t in self.disk_meta.tags if t.startswith("known_model")]), 1, "unnamed known_model")
        bad_awa = []
        for a in self.disk_meta.awards:
            if   a.startswith("e_"):   a = a[2:]
            elif a.startswith("wow_"): a = a[4:]
            else: continue
            if a not in VOCAB:                   bad_awa.append((a, "not in vocab"))
            if a not in self.disk_meta.tags:     bad_awa.append((a, "not in tags"))
            if any(d in a for d in "013456789"): bad_awa.append((a, "digit"))
        self.assertFalse(bad_awa)
    
    # Idk how to make it beautiful parametrized :(
    def runTest(self):
        self.test_row()
        self.test_disk_meta()
    def shortDescription(self):
        return f"test for {self.row.name}"


suite = unittest.TestSuite(TestMeidaItem(row) for _,row in mm.df[:2000].iterrows())
unittest.TextTestRunner().run(suite)

Troublesome filenames:

In [None]:
import re
from helpers import better_fname

IM_JUST_LOOKING = True


def trouble_lvl(s:str) -> int:
    FORBIDDEN = re.escape(r'<>:"/\|?*,')
    EXCELLENT = r'_0-9a-zA-Z\.'
    GOOD = EXCELLENT + r' АБВГДЕЁЖЗИЙКЛМНОПРСТУФХЦЧШЩЪЫЬЭЮЯабвгдеёжзийклмнопрстуфхцчшщъыьэюя\+\-'
    OKAY = GOOD + r'\[\]\(\)='
    if not s or s.isspace():             return 99
    if not s.isprintable():              return 98
    if re.search('['+FORBIDDEN+']', s):  return 97
    if s.startswith('-'):                return 89
    if re.search('[^'+OKAY+']', s):      return 16
    if re.search('[^'+GOOD+']', s):      return 15
    if re.search('[^'+EXCELLENT+']', s): return 3
    if len(s) < 9:                       return 1
    return 0

troubled_fnames = [(trouble_lvl(f), f) for f in os.listdir(MEDIA_DIR) if os.path.isfile(os.path.join(MEDIA_DIR, f)) and trouble_lvl(f)]
print(len(troubled_fnames), "troubles:")
for lvl, f in sorted(troubled_fnames, reverse=True):
    better_f = better_fname(f)
    print(lvl, f, "\n  ", better_f)
    if lvl and f != better_f:
        if not IM_JUST_LOOKING: 
            mm.rename(f, better_f)

Rating systems disagree with overall stars or between themselves:

In [None]:
from rating_backends import Glicko, ELO
from ae_rater_types import Rating

df_consensus = mm.df.copy().reset_index()[['name', 'Glicko_pts', 'Glicko_rd', 'ELO_pts', 'stars']]
for cls in Glicko, ELO:
    rts = lambda pts: cls().rating_to_stars(Rating(pts))
    df_consensus[f'{cls.__name__}_exp_stars'] = df_consensus[f'{cls.__name__}_pts'].map(rts)
df_consensus['error_stars'] = (((df_consensus['Glicko_exp_stars'] + df_consensus['ELO_exp_stars']) / 2) - df_consensus['stars']).abs()
df_consensus['stars_disagreement'] = (df_consensus['Glicko_exp_stars'] - df_consensus['ELO_exp_stars']).abs()
df_consensus = df_consensus[(df_consensus['error_stars']>1e-3) | (df_consensus['stars_disagreement']>1.5)]
df_consensus.sort_values(['error_stars', 'stars_disagreement'], ascending=False, inplace=True)
print(len(df_consensus), "disagreements:")
df_consensus

In [None]:
FIX_DISAGREEMENTS = 0
for _, row in df_consensus[:FIX_DISAGREEMENTS].iterrows():
    fullname = os.path.join(MEDIA_DIR, row['name']) 
    assert os.path.exists(fullname)
    start_file(fullname)
    usr_stars = float(input("how many stars? "))
    assert 0 <= usr_stars <= 7
    glicko_rat = Glicko().stars_to_rating(usr_stars)
    elo_rat = ELO().stars_to_rating(usr_stars)
    upd = {
        'stars': usr_stars,
        'Glicko_rd': min(int(row['Glicko_rd'])+100, glicko_rat.rd),
        'Glicko_pts': glicko_rat.points,
        'ELO_pts': elo_rat.points,
    }
    mm.update(fullname, upd)
if FIX_DISAGREEMENTS:
    mm._commit()
    