<font size="8">Data Acquisition, Modeling and Analysis: Big Data Analytics - Song Recommender</font>
<font size="6">Written by Alexander M. Pellegrino</font>
<font size="6">Under Dr. Rensheng Wang</font>
<font size="6">On October 21st, 2023</font>

<font size="6">Part 1 - Data Parsing</font>
<font size="4">This code segment is responsible for reading the data from the files and putting it into cleanly organized DataFrames for later processing. Because I misunderstood in the initial assignment and believed I needed to handle all the different ratings at once, I have copied the code that already processes all of the data for reuse. This project will filter out only the track ratings and utilize them, but all data will be contained within the frames.</font>

In [1]:
import math
import polars as pl

<font size="4">Due to the sheer number of lines, pandas ended up hitting limitations on its speed when processing the file, sometimes taking over half an hour to complete even when aggressively optimized at the cost of legibility. Though it introduces some inconvenience in library compatability, the significantly faster polars library will be utilized for as much of the data processing as possible. The dataframes will be converted to pandas frames for display purposes and for library interop when necessary. This adds overhead to displaying the frames and to sending the data to other libraries, but it speeds up the heaviest parts of the program: parsing and splitting the data.</font>

In [2]:
# Create number parsers that can handle the "None" string in the files
def parse_integer_value(value):
    return None if value is None or value == 'None' else int(value)

def parse_float_value(value):
    return None if value is None or value == 'None' else float(value)

# Album data parser
def load_album_data(file_path):
    # Need to be cautious here - we're loading the entire file at once, could block program if too large
    
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    # Finding the maximum number of genres in the dataset
    max_genres = max(len(line.strip().split('|')[2:]) for line in lines if '|' in line.strip())
    
    # Fixed columns
    album_ids = []
    artist_ids = []
    genre_lists = []
    
    # Read file line-by-line
    for line in lines:
        
        # Read known columns
        parts = line.strip().split('|')
        album_ids.append(parse_integer_value(parts[0]))
        artist_ids.append(parse_float_value(parts[1]))
        
        # Parse genre parts as floats, and pad with None if there are fewer genres than max_genres
        genre_parts = [parse_float_value(g) for g in parts[2:]]
        genre_parts.extend([None] * (max_genres - len(genre_parts)))
        genre_lists.append(genre_parts)
    
    # Organize data to place into DataFrame
    data = {
        "AlbumID": album_ids,
        "ArtistID": artist_ids
    }
    
    # Variable column width (genres)
    for i in range(max_genres):
        data[f"Genre_{i+1}"] = [genre[i] for genre in genre_lists]
    
    # Define the schema for the DataFrame
    album_schema = {
        "AlbumID": pl.Int64,
        "ArtistID": pl.Float64,
        **{f"Genre_{i+1}": pl.Float64 for i in range(max_genres)}
    }
    
    # Create the DataFrame with the schema
    df = pl.DataFrame(data, schema=album_schema)
    return df

# Artist data parser
def load_artist_data(file_path):
    # Artist data is just a list - no splitting or variable columns needed.
    artist_ids = []
    
    with open(file_path, "r") as file:
        for line in file:
            artist_ids.append(int(line.strip()))
    
    # Place into DataFrame for easier interop later
    df = pl.DataFrame({
        'ArtistID': artist_ids
    })
    return df

# Genre data parser
def load_genre_data(file_path):
    # List, similar to artist data.
    genre_ids = []
    genre_file = open(file_path, 'r')
    for line in genre_file:
        genre_ids.append(int(line.strip()))
    
    # Once again, placed into DataFrame for easier interop
    df = pl.DataFrame({
        'GenreID': genre_ids
    })
    return df

# Test data parser
def load_test_data(file_path):
    
    # Again, reading entire file at once due to variable columns. Must
    # be careful not to exceed system memory or lock CPU thread.
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    # Fixed columns
    user_ids = []
    track_ids = []
    
    # Handle the unique User|Count format of this file (also seen in training data)
    i = 0
    while i < len(lines):
        user_id, n = map(int, lines[i].strip().split('|'))
        i += 1 
        for _ in range(int(n)):
            track_id = int(lines[i].strip())
            user_ids.append(user_id)
            track_ids.append(track_id)
            i += 1
    
    # DataFrame for interop
    df = pl.DataFrame({
        'UserID': user_ids,
        'TrackID': track_ids
    })
    return df

# Track data parser
def load_track_data(file_path):
    
    # Memory warning; see prior functions
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    # Finding the maximum number of genres in the dataset
    max_genres = max(len(line.strip().split('|')[3:]) for line in lines if '|' in line.strip())
    
    # Fixed columns
    track_ids = []
    album_ids = []
    artist_ids = []
    genre_lists = []
    
    for line in lines:
        # Read known columns
        parts = line.strip().split('|')
        track_ids.append(parse_integer_value(parts[0]))
        album_ids.append(parse_float_value(parts[1]))
        artist_ids.append(parse_float_value(parts[2]))
        
        # Parse genre parts as floats, and pad with None if there are fewer genres than max_genres
        genre_parts = [parse_float_value(g) for g in parts[3:]]
        genre_parts.extend([None] * (max_genres - len(genre_parts)))
        genre_lists.append(genre_parts)
    
    # Organize data to place into DataFrame
    data = {
        'TrackID': track_ids,
        'AlbumID': album_ids,
        'ArtistID': artist_ids,
    }
    for i in range(max_genres):
        data[f"Genre_{i+1}"] = [genre[i] for genre in genre_lists]
    
    # Define DataFrame schema
    track_data_schema = {
        'TrackID': pl.Int64,
        'AlbumID': pl.Float64,
        'ArtistID': pl.Float64,
        **{f'Genre_{i+1}': pl.Float64 for i in range(max_genres)}  # Fixed dictionary comprehension
    }
    
    # Create final DataFrame
    df = pl.DataFrame(data, schema=track_data_schema)
    
    return df

# Training data parser
def load_training_data(file_path, artist_data, genre_data, album_data):
    
    # Memory warning; see above.
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    # The sheer number of lookups makes this function horribly slow when
    # searching DataFrames, even with Polars. Converting to set lookups
    # in order to optimize type categorization for each ID.
    artist_ids = set(artist_data['ArtistID'])
    genre_ids = set(genre_data['GenreID'])
    album_ids = set(album_data['AlbumID'])
    
    # Fixed columns
    user_ids = []
    item_ids = []
    item_types = []
    ratings = []
    
    # Handling the UserID|Count format
    i = 0
    while i < len(lines):
        user_id, n = map(int, lines[i].strip().split('|'))
        i += 1
        for _ in range(int(n)):
            parts = lines[i].strip().split('\t')
            item_id = int(parts[0])
            rating = int(parts[1])
            
            # Determine the item type based on the presence of the ID in different sets
            if item_id in artist_ids:
                item_type = 'Artist'
            elif item_id in genre_ids:
                item_type = 'Genre'
            elif item_id in album_ids:
                item_type = 'Album'
            else:
                item_type = 'Track'
            
            user_ids.append(user_id)
            item_ids.append(item_id)
            item_types.append(item_type)
            ratings.append(rating)
            i += 1 
    
    # Final DataFrame construction
    df = pl.DataFrame({
        'UserID': user_ids,
        'ItemID': item_ids,
        'ItemType': item_types,
        'Rating': ratings
    })
    return df

<font size="4">Testing File Parsers - Outputs need to be converted to Pandas frames in order to display properly in the Jupyter notebook.</font>

In [3]:
album_data = load_album_data("albumData2.txt")
album_data.to_pandas()

Unnamed: 0,AlbumID,ArtistID,Genre_1,Genre_2,Genre_3,Genre_4,Genre_5,Genre_6,Genre_7,Genre_8,...,Genre_12,Genre_13,Genre_14,Genre_15,Genre_16,Genre_17,Genre_18,Genre_19,Genre_20,Genre_21
0,0,,214765.0,,,,,,,,...,,,,,,,,,,
1,6,228091.0,158282.0,81520.0,242383.0,,,,,,...,,,,,,,,,,
2,19,85028.0,103715.0,,,,,,,,...,,,,,,,,,,
3,30,16832.0,31567.0,,,,,,,,...,,,,,,,,,,
4,33,26330.0,149962.0,209270.0,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
52824,296104,125866.0,158282.0,,,,,,,,...,,,,,,,,,,
52825,296106,221637.0,116130.0,9432.0,,,,,,,...,,,,,,,,,,
52826,296107,,61215.0,,,,,,,,...,,,,,,,,,,
52827,296108,93277.0,61215.0,,,,,,,,...,,,,,,,,,,


In [4]:
artist_data = load_artist_data("artistData2.txt")
artist_data.to_pandas()

Unnamed: 0,ArtistID
0,16
1,23
2,35
3,40
4,49
...,...
18669,295987
18670,296007
18671,296012
18672,296013


In [5]:
genre_data = load_genre_data("genreData2.txt")
genre_data.to_pandas()

Unnamed: 0,GenreID
0,208
1,315
2,642
3,1075
4,1271
...,...
562,292093
563,293670
564,293688
565,294138


In [6]:
test_data = load_test_data("testItem2.txt")
test_data.to_pandas()

Unnamed: 0,UserID,TrackID
0,199810,208019
1,199810,74139
2,199810,9903
3,199810,242681
4,199810,18515
...,...,...
119995,249010,72192
119996,249010,86104
119997,249010,186634
119998,249010,293818


In [7]:
track_data = load_track_data("trackData2.txt")
track_data.to_pandas()

Unnamed: 0,TrackID,AlbumID,ArtistID,Genre_1,Genre_2,Genre_3,Genre_4,Genre_5,Genre_6,Genre_7,...,Genre_12,Genre_13,Genre_14,Genre_15,Genre_16,Genre_17,Genre_18,Genre_19,Genre_20,Genre_21
0,1,106710.0,281667.0,214765.0,162234.0,155788.0,,,,,...,,,,,,,,,,
1,2,280977.0,233685.0,131552.0,173467.0,48505.0,,,,,...,,,,,,,,,,
2,3,38422.0,219136.0,61215.0,201738.0,88853.0,,,,,...,,,,,,,,,,
3,4,119529.0,166863.0,17453.0,35389.0,,,,,,...,,,,,,,,,,
4,5,16742.0,294690.0,61215.0,34486.0,274088.0,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
224036,296100,166516.0,33011.0,274088.0,199606.0,88853.0,,,,,...,,,,,,,,,,
224037,296101,,,,,,,,,,...,,,,,,,,,,
224038,296102,153644.0,289056.0,158282.0,139095.0,242383.0,,,,,...,,,,,,,,,,
224039,296105,68336.0,6613.0,82064.0,,,,,,,...,,,,,,,,,,


In [8]:
training_data = load_training_data("trainItem2.txt", artist_data, genre_data, album_data)
training_data.to_pandas()

Unnamed: 0,UserID,ItemID,ItemType,Rating
0,199808,248969,Artist,90
1,199808,2663,Artist,90
2,199808,28341,Artist,90
3,199808,42563,Artist,90
4,199808,59092,Artist,90
...,...,...,...,...
12403570,249011,270557,Artist,90
12403571,249011,273574,Artist,90
12403572,249011,286938,Artist,90
12403573,249011,287681,Genre,80


In [9]:
# Get unique User IDs from test set
testing_user_ids = list(set(test_data['UserID']))
testing_user_ids

[199810,
 199812,
 199813,
 199814,
 199815,
 199816,
 199817,
 199818,
 199819,
 199820,
 199821,
 199822,
 199823,
 199824,
 199826,
 199827,
 199830,
 199838,
 199839,
 199844,
 199845,
 199846,
 199847,
 199853,
 199854,
 199855,
 199858,
 199859,
 199861,
 199862,
 199863,
 199866,
 199868,
 199873,
 199876,
 199880,
 199882,
 199884,
 199886,
 199890,
 199891,
 199894,
 199895,
 199896,
 199899,
 199900,
 199902,
 199907,
 199911,
 199912,
 199914,
 199915,
 199918,
 199920,
 199921,
 199924,
 199926,
 199928,
 199931,
 199932,
 199934,
 199936,
 199937,
 199939,
 199941,
 199944,
 199947,
 199949,
 199954,
 199955,
 199958,
 199961,
 199962,
 199963,
 199966,
 199973,
 199974,
 199976,
 199977,
 199981,
 199982,
 199983,
 199984,
 199989,
 199991,
 199996,
 199997,
 200000,
 200008,
 200011,
 200015,
 200017,
 200020,
 200022,
 200024,
 200031,
 200032,
 200033,
 200034,
 200039,
 200042,
 200044,
 200045,
 200046,
 200049,
 200053,
 200054,
 200055,
 200061,
 200062,
 200063,
 

<font size="6">Part 2 - Track Scoring</font>
<font size="4">This code segment is responsible for scoring tracks based on their known ratings. If they are unrated, it will attempt to score them based on their known album and artist ratings. The name of the game here is optimization, as due to the sheer size of the dataset every added comparison results in millions more checks to run before the program completes.</font>

In [10]:
# Initialize final output dictionary
recommendations = {}

# Run for each unique user
for user_id in testing_user_ids:
    # Individual track scores for this user
    user_scores = {}

    # Track IDs to rate
    user_testing_tracks = test_data.filter(pl.col('UserID') == user_id)
    
    # Number of tracks to recommend
    num_user_recommendations = math.ceil(len(user_testing_tracks) / 2)
    
    # Known user reviews for the current user
    user_ratings = training_data.filter(pl.col('UserID') == user_id)
    
    # For loops are highly unoptimized on DataFrames - converting to a native dict is faster here
    for track in user_testing_tracks.to_dicts():
        track_id = track['TrackID']
        
        # If the user has already rated the track, no scoring calculation is needed to approximate
        # their rating - we'll just use the rating they gave. I don't believe our current test set
        # actually includes this scenario, but it's good to handle for future-proofing and real-world use.
        user_track_rating = user_ratings.filter((pl.col('ItemType') == 'Track') & (pl.col('ItemID') == track_id))
        if len(user_track_rating) > 0:
            user_scores[track_id] = user_track_rating['Rating'].to_list()[0]

        # Handle tracks that the user hasn't rated - this is the main part of the program.
        else:
            # Look up the artist and album for the track
            track_info = track_data.filter(pl.col('TrackID') == track_id).to_dicts()[0]
            album_id = track_info['AlbumID']
            artist_id = track_info['ArtistID']

            # Initialize score to 0
            score = 0

            # Check if the user has rated the artist
            if artist_id != "NaN":
                artist_rating = user_ratings.filter((pl.col('ItemType') == 'Artist') & (pl.col('ItemID') == artist_id))
                if len(artist_rating) > 0:
                    # Artist and album scores each count for half the weight of a full rating
                    # This is because we aren't CERTAIN the user will like or dislike the song
                    # from these factors alone, so they should be considered less heavily than
                    # if the user has actually given the track a rating directly.
                    score += artist_rating['Rating'].to_list()[0] * 0.5

            # Check if the user has rated the album
            if album_id != "NaN":
                album_rating = user_ratings.filter((pl.col('ItemType') == 'Album') & (pl.col('ItemID') == album_id))
                if len(album_rating) > 0:
                    # Artist and album scores each count for half the weight of a full rating
                    # This is because we aren't CERTAIN the user will like or dislike the song
                    # from these factors alone, so they should be considered less heavily than
                    # if the user has actually given the track a rating directly.
                    score += album_rating['Rating'].to_list()[0] * 0.5

            # Use our computed score to rate the song (our prediction)
            user_scores[track_id] = score

    # Sort the user_scores by their final ratings
    sorted_user_scores = {k: v for k, v in sorted(user_scores.items(), key=lambda item: item[1], reverse=True)}

    # Add the top num_user_recommendations to recommendations
    for i, (track_id, score) in enumerate(sorted_user_scores.items()):
        recommendations[f'{user_id}_{track_id}'] = 1 if i < num_user_recommendations else 0

# Convert recommendations to a DataFrame for writing
recommendations_df = pl.DataFrame({
    "TrackID": list(recommendations.keys()),
    "Predictor": list(recommendations.values())
})

# Write the DataFrame to a CSV file - program completed
recommendations_df.write_csv("predictions.csv")

KeyboardInterrupt: 