In [2]:
import csv
import logging
from pathlib import Path
import psycopg2
import io
from properties import LOG_PATH, LFM2B_ROOT, POSTGRES_CREDENTIALS

In [9]:

logging.basicConfig(filename=LOG_PATH, level=logging.INFO, datefmt='%d-%m-%Y,%H:%M:%S',
                    format='%(levelname)s - %(asctime)s: %(message)s')
LOGGER = logging.getLogger()

# Tables in order of creation (this order respects foreign keys)
tables = [
    ("artist", "artist.sql"),
    ("genre", "genre.sql"),
    ("artist_genre", "artist_genre.sql"),
    ("lfm_user", "user.sql"),
    ("track", "track.sql"),
    ("listening_event", "listening_event.sql")
]

In [None]:
def delete_all(db_cursor):
    """Deletes all data from the database. Does not commit."""
    for table, _ in reversed(tables):
        db_cursor.execute(f"DROP TABLE IF EXISTS {table}")


def create_database(db_cursor):
    """Runs all the SQL create table files. Does not commit."""
    for _, file_name in tables:
        with open(Path("lfm2b_schemas", file_name), 'r', encoding='utf-8') as f:
            db_cursor.execute(f.read())


def populate_artists_and_genres(db_cursor):
    """
    Goes through `artists_valid.tsv` and populates tables `artist`, `genre` and `artist_genre`.
    Output is logged to db_creation.log
    """
    processed_genres = set()
    id_of_genre = {}

    num_errors = 0
    with open(Path(LFM2B_ROOT, "artists_valid.tsv"), 'r', encoding='utf-8') as f:
        LOGGER.info("---- ARTIST-AND-GENRES")
        reader = create_reader(f)
        next(reader) # Skip header
        for i, row in enumerate(reader):
            try:
                artist_id, artist_name, genres = row
                # Add the artist
                db_cursor.execute("INSERT INTO artist VALUES (%s, %s) ON CONFLICT (artist_id) DO NOTHING" , [artist_id, artist_name])
                for genre in [g.strip() for g in genres.split(",")]:
                    # Add the genre to the genre table
                    if genre not in processed_genres:
                        processed_genres.add(genre)
                        id_of_genre[genre] = len(processed_genres)
                        db_cursor.execute("INSERT INTO genre VALUES (%s, %s) ON CONFLICT (genre_id) DO NOTHING", [id_of_genre[genre], genre])
                    # Add the link of artist and genre to the join table
                    db_cursor.execute("INSERT INTO artist_genre VALUES (%s, %s) ON CONFLICT (genre_id, artist_id) DO NOTHING",
                                      [artist_id, id_of_genre[genre]])  # IGNORE makes sure to not allow duplicates
            except Exception as err:
                db_cursor.connection.rollback()
                LOGGER.error("row_no: %d. row : %s. err : %s", i, row, err)
                num_errors += 1
                continue
        db_cursor.execute("SELECT count(*) FROM artist")
        LOGGER.info("#artists written: %d", db_cursor.fetchone()[0])
        db_cursor.execute("SELECT count(*) FROM genre")
        LOGGER.info("#genres written: %d", db_cursor.fetchone()[0]),
        LOGGER.warning('#errors: %d', num_errors)
        LOGGER.info("---- ARTIST-AND-GENRES")


def populate_tracks(db_cursor, start=None, end=None):
    """
    Goes through `tracks_valid.tsv` and populates table `track`.
    Output is logged to db_creation.log

    start and end indicate the rows that should be considered (0-indexed, inclusive bounds)
    """
    num_errors = 0
    db_cursor.execute("SELECT count(*) FROM track")
    start_tracks = db_cursor.fetchone()[0]
    with open(Path(LFM2B_ROOT, "tracks_valid.tsv"), 'r', encoding='utf-8') as f:
        LOGGER.info("---- TRACKS")
        LOGGER.info("start=%s, end=%s", start, end)
        reader = create_reader(f)
        next(reader)  # this file contains headers
        for i, row in enumerate(reader):
            if end is not None and i > end:
                break
            if start is not None and i < start:
                continue

            try:
                track_id, track_name, artist_id = row
                db_cursor.execute("INSERT INTO track VALUES (%s, %s, %s)", [track_id, track_name, artist_id])
            except Exception as err:
                db_cursor.connection.rollback()
                LOGGER.error("row_no: %d. row : %s. err : %s", i, row, err)
                num_errors += 1
                continue
        db_cursor.execute("SELECT count(*) FROM track")
        LOGGER.info("#tracks written: %d", db_cursor.fetchone()[0] - start_tracks)
        LOGGER.warning('#errors: %d', num_errors)
        LOGGER.info("---- TRACKS")


def populate_users(db_cursor):
    """
    Goes through `users_valid.tsv` and populates table `lfm_user`.
    Output is logged to db_creation.log
    """
    num_errors = 0
    with open(Path(LFM2B_ROOT, "users_valid.tsv"), 'r', encoding='utf-8') as f:
        LOGGER.info("---- USERS")
        reader = create_reader(f)
        next(reader) # this file has headers
        for i, row in enumerate(reader):
            try:
                user_id, gender, country, creation_time, age_on_2013_10_31, age_valid = row
                db_cursor.execute("INSERT INTO lfm_user VALUES (%s, %s, %s, %s, %s, %s)",
                                  [user_id, gender, country, creation_time, age_on_2013_10_31, age_valid])
            except Exception as err:
                db_cursor.connection.rollback()
                LOGGER.error("row_no: %d. row : %s. err : %s", i, row, err)
                num_errors += 1
                continue
        db_cursor.execute("SELECT count(*) FROM lfm_user")               
        LOGGER.info("#users written: %d", db_cursor.fetchone()[0])
        LOGGER.warning('#errors: %d', num_errors)
        LOGGER.info("---- USERS")


def populate_listening_events(db_cursor, start=None, end=None, batch_size=3000):
    """
    Goes through `listening-events.tsv` and populates table `listening_event`.
    Output is logged to db_creation.log
    """
    this_errors = []
    num_errors = 0
    with open(Path(LFM2B_ROOT, "listening-events.tsv", "listening-events.tsv"), 'r', encoding='utf-8') as f:
        LOGGER.info("---- LISTENING EVENTS")
        LOGGER.info(f"Start at {start}, end at {end}")
        reader = create_reader(f)
        next(reader) # Skip header
        batch = []
        for i, row in enumerate(reader):
            if start is not None and i < start:
                continue
            if end is not None and i > end:
                    break
            try:
                user_id, track_id, timestamp, age_at_listen, artist_id = row
                batch.append([user_id, track_id, timestamp, age_at_listen, artist_id])
                if len(batch) >= batch_size:
                    this_errors += safe_execute_batch(db_cursor, "INSERT INTO listening_event VALUES (%s, %s, %s, %s, %s)", batch)
                    num_errors += len(this_errors)
                    LOGGER.info(f"Wrote batch. Now at {i}")
                    batch.clear()

            except Exception as err:
                LOGGER.error("row_no: %d. row : %s. err : %s", i, row, err)
                num_errors += 1
                continue

        # Write remaining batch
        if batch:
            this_errors += safe_execute_batch(db_cursor, "INSERT INTO listening_event VALUES (%s, %s, %s, %s, %s)", batch)
            num_errors += len(this_errors)
            LOGGER.info(f"Wrote final batch. Now at {i}")

        db_cursor.execute("SELECT count(*) FROM listening_event")
        LOGGER.info("#total LEs in database: %d", db_cursor.fetchone()[0])
        LOGGER.warning('#errors: %d', num_errors)
        LOGGER.info("---- LISTENING EVENTS")
        return this_errors


def populate_database(db_cursor):
    """Reads files and fills the database """
    populate_artists_and_genres(db_cursor)
    populate_tracks(db_cursor, start=None, end=None)
    populate_users(db_cursor)
    populate_listening_events(db_cursor)


def create_reader(file):
    return csv.reader(file, delimiter="\t")

In [17]:
# delete_all(cursor)
# con.commit()

In [18]:
# # create_database(cursor)
# con.commit()

In [19]:
# populate_artists_and_genres(cursor)
# con.commit()

In [20]:
# populate_tracks(cursor)
# con.commit()

In [21]:
# populate_users(cursor)
# con.commit()

In [11]:

with open(LFM2B_ROOT / "invalid_listening_events.tsv", "w") as inv:
    writer = csv.writer(inv, delimiter='\t')
    writer.writerow(["row", "error"])
    errs = populate_listening_events(cursor, start=201200000, end=202200000, batch_size=100000)
    writer.writerows(errs)

In [None]:
# print(len(errs))
# fk_violation_tracks = []
# for row, err in errs:
#     if isinstance(err, psycopg2.errors.ForeignKeyViolation):
#         fk_violation_tracks.append(row[1])
         
# print(len(fk_violation_tracks))
# sorted(set(fk_violation_tracks))

5001
5001


['10101837',
 '10136772',
 '10249083',
 '10260218',
 '10276974',
 '10315286',
 '10315304',
 '10315312',
 '10323020',
 '10353587',
 '10378300',
 '10384971',
 '10427391',
 '10436507',
 '10445048',
 '10503337',
 '10517340',
 '10519141',
 '10524632',
 '10537178',
 '10540381',
 '10574951',
 '1062972',
 '10631714',
 '10653075',
 '10653823',
 '10656173',
 '10657169',
 '10766045',
 '10861580',
 '10895338',
 '10928366',
 '10933419',
 '10961983',
 '10977836',
 '11001307',
 '11014426',
 '11017146',
 '11045972',
 '11046122',
 '11099469',
 '11196669',
 '11263336',
 '11281382',
 '11319079',
 '11320842',
 '11326165',
 '11327486',
 '11335342',
 '11344102',
 '11375522',
 '11389847',
 '11393319',
 '11514539',
 '11516852',
 '11537668',
 '11559647',
 '11669386',
 '11720257',
 '11723204',
 '11740792',
 '11788028',
 '11856158',
 '11906432',
 '11940158',
 '11953796',
 '11980788',
 '11984633',
 '12043761',
 '12056947',
 '12086824',
 '12087208',
 '12097695',
 '12144115',
 '12261650',
 '12267867',
 '12297191',


In [23]:
con.close()

In [23]:
FIND_ROW = 202200000

con = psycopg2.connect(
    dbname="lfm2b",
    **POSTGRES_CREDENTIALS
)
cursor = con.cursor()

with open(LFM2B_ROOT / "listening-events.tsv/listening-events.tsv", encoding="utf-8") as f:
    reader = create_reader(f)
    next(reader)
    surrounding = 1
    rows = []

    for i, row in enumerate(reader):
        if i < FIND_ROW - surrounding:
            continue
        if i > FIND_ROW + surrounding:
            break

        rows.append((i, row))
        # try:
        #     track_id, track_name, artist_id  = row
        #     if track_id == 27265012:
        #         print(track_id, track_name, artist_id)
        #         break;
        # except ValueError as e:
        #     print(row, e)
    assert len(rows) == 2*surrounding + 1, "not engough rows, probably on first/last rows of file"
    for i in range(surrounding, 0, -1):
        row_num, row = rows[i]
        cursor.execute("select count(*) from listening_event where user_id = %s and track_id = %s and timestamp_listen = %s and age_at_listen = %s and artist_id=%s", row)
        cnt = cursor.fetchone()[0]
        assert cnt in [0,1], f"invalid number of matches ({cnt}) for row {i} {row}"
        if cnt == 1:
            print("Found row ", row_num, row)
            break;
        print("No match for ", row_num, row)
    print("Done")


con.close()


Found row  202200000 ['32860', '33139620', '2012-09-22 22:49:26', '19', '15508']
Done
