In [96]:
from pyspark.sql import Row
import pyspark.sql.types as T 
from pyspark.sql.functions import udf

DATA_PATH = "/Users/charilaostsarouchas/tsarouch_github/data/recommenders/audioscrobbler_data/profiledata_06-May-2005/"
USER_RAW_RATINGS = os.path.join(DATA_PATH, 'user_artist_data_small.txt')
CANONICAL_IDS_MAP = os.path.join(DATA_PATH, 'artist_alias.txt')
ARTIST_NAMES_MAP = os.path.join(DATA_PATH, 'artist_data.txt')
USER_RATINGS = os.path.join(DATA_PATH, 'user_ratings.txt')

In [111]:
def get_ratings_df():
    """ Prepare the Rating Objects to final format
    Return the rating object (user, item, rate)
    """
    
    # Read Data to DataFrame
    ratings_rdd = sc.textFile(USER_RAW_RATINGS)
    ratings_df = ratings_rdd\
        .map(lambda x: Row(user=x.split(' ')[0], 
                           item=x.split(' ')[1], 
                           rate=x.split(' ')[2]))\
        .toDF()
 
    # Apply the canonical ids
    # Maps artist IDs that are known misspellings or variants to the canonical ID of the artist
    canonical_ids_rdd = sc.textFile(CANONICAL_IDS_MAP)
    canonical_ids_map = canonical_ids_rdd\
        .filter(lambda x: len(x.split('\t'))==2)\
        .map(lambda x: Row(id=x.split('\t')[0],
                       canonical_id=x.split('\t')[1]))\
        .collectAsMap()    
    canonical_ids_map_br = sc.broadcast(canonical_ids_map)
    ratings_df = ratings_df.withColumn(
            'item', 
            udf(lambda x: canonical_ids_map_br.value.get(x, x), T.StringType())
            (ratings_df['item']))
    
    """
    Statistics Examples:
    Number of ratings:  100000
    Number of distinct users:  408
    Number of distinct artists:  37852
    Number of distinct artists (after applying canonical ids):  37605
    """
    
    return ratings_df


def save_ratings(df):
    """Save the ratings DataFrame ready for the recommendation engine
    """
    df.toPandas().to_csv(USER_RATINGS ,index = False)

In [109]:
ratings_df = get_ratings_df()
save_ratings(ratings_df)