In [1]:
from pymongo import UpdateOne
from pprint import pprint
from tqdm import tqdm
from exploration.config import sql_inst, mongo_inst

In [4]:
# Order of insertion matters
SQL_DUMPS = [
    "osu_random_2021_01",
    "osu_random_2020_12",
    "osu_random_2020_11",
    "osu_random_2020_10",
    "osu_random_2020_09",
    "osu_random_2020_08"
]

NEW_DB_NAME = "osu_random_db"

In [5]:
mongo_db = mongo_inst[NEW_DB_NAME]

# mongo_db['osu_scores_high'].create_index(
#     [('user_id', pymongo.ASCENDING), ('beatmap_id', pymongo.ASCENDING) , ('enabled_mods', pymongo.ASCENDING)],
#     unique=True
# )

In [12]:
def get_fields(cursor_ex):
    return list(map(lambda c: c[0], cursor_ex.description))

def insert_user_scores (sql_db_name, user_id):
    with sql_inst.cursor() as cursor:
        cursor.execute(f"select * from {sql_db_name}.osu_scores_high WHERE user_id = {user_id}")
        fields = get_fields(cursor)
        fields[0] = '_id'

        user_scores = [dict(zip(fields, row)) for row in cursor]

        mongo_db['osu_scores_high'].insert_many(user_scores)

In [5]:
def migrate_dumps(dump_names, mongo_db):
    user_ids = mongo_db['osu_user_stats'].find({}, {})
    migrated_users = set(map(lambda u: u['_id'], user_ids))

    for db_name in dump_names:
        print(f"Importing dump: {db_name}")

        with sql_inst.cursor() as cursor:
            cursor.execute(f"select * from {db_name}.osu_user_stats")

            fields = get_fields(cursor)
            fields[0] = '_id'

            with tqdm(total=cursor.rowcount) as progress_bar:
                for row in cursor:
                    user_stats = dict(zip(fields, row))

                    if user_stats['_id'] not in migrated_users:
                        mongo_db['osu_user_stats'].insert(user_stats)
                        insert_user_scores(db_name, user_stats['_id'])
                        
                        migrated_users.add(user_stats['_id'])
                    
                    progress_bar.update(1)
        print()


In [6]:
migrate_dumps(SQL_DUMPS, mongo_db)

Importing dump: osu_random_2020_08
100%|██████████| 10000/10000 [03:25<00:00, 48.55it/s]



In [15]:
def migrate_beatmaps(dump_names, mongo_db):
    for db_name in dump_names:
        print(f"Importing dump: {db_name}")

        with sql_inst.cursor() as cursor:
            cursor.execute(f"select * from {db_name}.osu_beatmaps")
            
            fields = get_fields(cursor)
            fields[0] = '_id'

            updates = []
            for row in cursor:
                beatmap = dict(zip(fields, row))
                query = {'_id': beatmap['_id']}
                update = {'$setOnInsert': beatmap}
                updates.append(UpdateOne(query, update, upsert=True))

            mongo_db['osu_beatmaps'].bulk_write(updates)


In [14]:
migrate_beatmaps(SQL_DUMPS, mongo_db)

Importing dump: osu_random_2021_01


NameError: name 'osu_random_db' is not defined

In [None]:
print("Creating index on 'user_id'")
osu_random_db['osu_scores_high'].create_index('user_id')
print("Creating index on 'beatmap_id'")
osu_random_db['osu_scores_high'].create_index('beatmap_id')