In [1]:
import numpy as np

import os

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    max, avg, sum, count, countDistinct, min,
    percentile_approx, col, asc, desc, collect_list,
    lit, rand, when, pandas_udf, PandasUDFType, expr,
    date_trunc, date_format, to_date, explode, isnan
)

from pyspark.ml.evaluation import RankingEvaluator
from pyspark.sql.types import DoubleType, StructField, StructType, StringType
import pyspark.pandas as ps

spark = SparkSession.builder.master('spark://cs305:22270').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/13 14:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
def gen_utiltiy_matrix(interactions, tracks):
    interactions.createOrReplaceTempView('interactions')
    tracks.createOrReplaceTempView('tracks')

    listens_per_user_track = spark.sql(
        """
        SELECT user_id,universal_id,sum(num_listens) as num_listens
        FROM interactions
        LEFT JOIN tracks
        ON tracks.recording_msid=interactions.recording_msid
        GROUP BY user_id,universal_id
        """
    )

    listens_per_user = listens_per_user_track.select(
        listens_per_user_track.user_id, listens_per_user_track.num_listens
    ).groupBy('user_id').agg(
        sum(listens_per_user_track.num_listens).alias('total_listens')
    )

    listens_per_user = listens_per_user.withColumn(
        'use_for_fit',
        when(
            listens_per_user.total_listens >= 500, True
        ).otherwise(
            False
        )
    )

    normed_listens_per_user_track = listens_per_user_track.join(listens_per_user, how='left', on='user_id')
    normed_listens_per_user_track = normed_listens_per_user_track.withColumn(
        "prop_listens",
        col("num_listens")/col("total_listens")
    ).select(
        ['user_id', 'universal_id', 'prop_listens', 'use_for_fit']
    ).orderBy(
        col('user_id').asc(),
        col('prop_listens').desc()
    )

    return normed_listens_per_user_track

def gen_utiltiy_matrix_ema(interactions, tracks, window: int = 7):
    interactions.createOrReplaceTempView('interactions')
    tracks.createOrReplaceTempView('tracks')

    listens_per_user_track = spark.sql(
        """
        SELECT user_id,universal_id,sum(num_listens) as num_listens
        FROM interactions
        LEFT JOIN tracks
        ON tracks.recording_msid=interactions.recording_msid
        GROUP BY user_id,universal_id
        """
    )

    listens_per_user = listens_per_user_track.select(
        listens_per_user_track.user_id, listens_per_user_track.num_listens
    ).groupBy('user_id').agg(
        sum(listens_per_user_track.num_listens).alias('total_listens')
    )

    listens_per_user = listens_per_user.withColumn(
        'use_for_fit',
        when(
            listens_per_user.total_listens >= 500, True
        ).otherwise(
            False
        )
    )

    normed_listens_per_user_track = listens_per_user_track.join(listens_per_user, how='left', on='user_id')
    normed_listens_per_user_track = normed_listens_per_user_track.withColumn(
        "prop_listens",
        col("num_listens")/col("total_listens")
    ).select(
        ['user_id', 'universal_id', 'prop_listens', 'use_for_fit']
    ).orderBy(
        col('user_id').asc(),
        col('prop_listens').desc()
    )

    return normed_listens_per_user_track

def calc_baseline_popularity(utility_mat, beta):
    utility_mat = utility_mat.filter(
        utility_mat.use_for_fit
    )

    baseline_popularity = utility_mat.groupBy(
        'universal_id'
    ).agg(
        sum(utility_mat.prop_listens).alias("total"),
        count(utility_mat.prop_listens).alias("num_users"),
    )

    baseline_popularity = baseline_popularity.filter(
        baseline_popularity.num_users >= 250
    )
    
    baseline_popularity = baseline_popularity.withColumn(
        "P_i",
        col("total") / (col("num_users") + beta)
    ).orderBy(
        "P_i",
        ascending=False
    )

    return baseline_popularity.select(['universal_id','P_i'])

def calc_performance_metrics(predicted, actual):
    actual_compressed = actual.groupBy(
        'user_id'
    ).agg(
        collect_list(col('universal_id').astype('double')).alias('universal_id'),
        collect_list(col('prop_listens').astype('double')).alias('prop_listens')
    )
    actual_compressed = actual_compressed.withColumn('key', lit(1))


    predicted_compressed = predicted.limit(5000).agg(
        collect_list(col('universal_id').astype('double')).alias('predicted_universal_id')
    ).withColumn('key', lit(1))

    results = actual_compressed.join(
        predicted_compressed,
        how='left',
        on='key'
    )

    return RankingEvaluator(
        predictionCol='predicted_universal_id',
        labelCol='universal_id',
        metricName='meanAveragePrecisionAtK',
        k=100
    ).evaluate(results)

In [13]:
interactions = spark.read.parquet("/scratch/work/courses/DSGA1004-2021/listenbrainz/interactions_test.parquet")
tracks = spark.read.parquet('tracks_test.parquet') 

interactions.createOrReplaceTempView('interactions')
tracks.createOrReplaceTempView('tracks')

listens_per_user_track = spark.sql(
    """
    SELECT user_id,universal_id,sum(num_listens) as num_listens, CAST (CAST (timestamp AS DATE) AS STRING) AS date, 
        MAX(timestamp) AS max_day, DATEDIFF(day, timestamp, max_day) AS diff
    FROM interactions
    LEFT JOIN tracks
    ON tracks.recording_msid=interactions.recording_msid
    GROUP BY user_id,universal_id, CAST (CAST (timestamp AS DATE) AS STRING)
    """
)

# listens_per_user_track
# listens_per_user_track = listens_per_user_track.na.fill(value=0,subset=["num_listens"])

# schema = (listens_per_user_track.select('*')).schema.add(StructField('num_listens_ema', DoubleType()))

# @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# def ema(pdf):
#     pdf['num_listens_ema'] = pdf['num_listens'].ewm(span=1.5, min_periods=1).mean()
#     return pdf

# listens_per_user_track = listens_per_user_track.groupby('user_id').apply(ema)
listens_per_user_track.show()

AnalysisException: Column 'max_day' does not exist. Did you mean one of the following? [tracks.num_listens, tracks.universal_id, tracks.recording_msid, interactions.user_id, interactions.timestamp, tracks.__index_level_0__, interactions.recording_msid]; line 3 pos 60;
'Aggregate [user_id#344, universal_id#351L, cast(cast(timestamp#346 as date) as string)], [user_id#344, universal_id#351L, sum(num_listens#352) AS num_listens#358, cast(cast(timestamp#346 as date) as string) AS date#359, max(timestamp#346) AS max_day#360, timestampdiff(day, timestamp#346, 'max_day, Some(America/New_York)) AS diff#361]
+- Join LeftOuter, (recording_msid#350 = recording_msid#345)
   :- SubqueryAlias interactions
   :  +- View (`interactions`, [user_id#344,recording_msid#345,timestamp#346])
   :     +- Relation [user_id#344,recording_msid#345,timestamp#346] parquet
   +- SubqueryAlias tracks
      +- View (`tracks`, [recording_msid#350,universal_id#351L,num_listens#352,__index_level_0__#353L])
         +- Relation [recording_msid#350,universal_id#351L,num_listens#352,__index_level_0__#353L] parquet


In [None]:
all_dates_df.orderBy('date').collect()

In [7]:
# interactions = ps.read_parquet("/scratch/work/courses/DSGA1004-2021/listenbrainz/interactions_test.parquet")
# tracks = spark.read.parquet('tracks_test.parquet')
# tracks = tracks.drop('__index_level_0__')
# tracks = ps.DataFrame(tracks)

# interactions['date'] = interactions['timestamp'].dt.date
# listens_per_day = interactions.merge(tracks, on='recording_msid', how='left') \
#                     .groupby(['user_id', 'universal_id', 'date']) \
#                     .sum() \
#                     .reset_index()
# listens_per_day = listens_per_day.sort_values(by=['date'])

# lisens_per_day['ema_num_listens'] = listens_per_day['num_listens'].ewm(com=1, min_periods=3).mean()



In [4]:
interactions_train = spark.read.parquet('interactions_split_train.parquet')
interactions_val = spark.read.parquet('interactions_split_val.parquet')
interactions_test = spark.read.parquet("/scratch/work/courses/DSGA1004-2021/listenbrainz/interactions_test.parquet")

tracks_train = spark.read.parquet('tracks_train.parquet')
tracks_test = spark.read.parquet('tracks_test.parquet')

                                                                                

In [5]:
utility_mat_train = gen_utiltiy_matrix(interactions_train, tracks_train)
utility_mat_val = gen_utiltiy_matrix(interactions_val, tracks_train)
utility_mat_test = gen_utiltiy_matrix(interactions_test, tracks_test)

In [None]:
candidate_betas = 10 ** np.linspace(0, 8, 150)

betas = []
map_at_100_train = []
map_at_100_val = []
map_at_100_test = []

for beta in candidate_betas:
    print(f"Running Beta = {round(beta, 0)}")

    baseline_pops_train = calc_baseline_popularity(utility_mat_train, beta)
    
    metrics_train = calc_performance_metrics(baseline_pops_train, utility_mat_train)
    metrics_val = calc_performance_metrics(baseline_pops_train, utility_mat_val)
    metrics_test = calc_performance_metrics(baseline_pops_train, utility_mat_test)

    betas.append(beta)
    map_at_100_train.append(metrics_train)
    map_at_100_val.append(metrics_val)
    map_at_100_test.append(metrics_test)
    
    print(f"MAP Train: {round(metrics_train, 5)}, MAP Val: {round(metrics_val, 5)}, MAP Test: {round(metrics_test, 5)}")

Running Beta = 1.0




In [None]:
import pandas as pd

pd.DataFrame({
    'beta': betas,
    'Train MAP': map_at_100_train,
    'Validation MAP': map_at_100_val,
    'Test MAP': map_at_100_test
}).to_csv('baseline_popularity_results.csv')