In [1]:
import pyspark

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 52118ca7-4d12-40b2-ad15-d303279e0911
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 52118ca7-4d12-40b2-ad15-d303279e0911 to get into ready status...
Session 52118ca7-4d12-40b2-ad15-d303279e0911 has been created.



In [2]:
from pyspark.sql import functions as F




<h2>Step 1: Import data from google drive</h2>

In [3]:
user_df = spark.read.parquet("s3://music-preference-bucket/user_data/")
top_artists_df = spark.read.parquet("s3://music-preference-bucket/top_artists/")




<h2>Step 2: Clean the data</h2>

In [4]:
#drop users with null country for both df
drop_ids_df = user_df.filter(user_df['country'].isNull()).select('user_id')
top_artists_df = top_artists_df.join(drop_ids_df, how='left_anti', on='user_id').drop('mbid')
user_df = user_df.dropna(subset=['country'])




In [5]:
#Clean up playcount and cast into integers
top_artists_df = top_artists_df.withColumn(
    "playcount_cast",
    F.expr("try_cast(playcount as int)")
)




insert the average of playcount before and after for null playcounts

In [6]:
from pyspark.sql.window import Window
#insert the average of playcount before and after for null playcounts
w = Window.partitionBy("user_id").orderBy("rank") #groups users
top_artists_df = top_artists_df.withColumn(
    "prev_value",
    F.last("playcount_cast", ignorenulls=True).over(w.rowsBetween(-1, -1)) #apply last on user
).withColumn(
    "next_value",
    F.first("playcount_cast", ignorenulls=True).over(w.rowsBetween(1, 1))
)




In [7]:
top_artists_df = top_artists_df.withColumn(
    "cleaned_playcount",
    F.when(
        F.col("playcount_cast").isNull() & F.col("prev_value").isNull(),
        F.col("next_value")
    ).when(
        F.col("playcount_cast").isNull() & F.col("next_value").isNull(),
        F.col("prev_value")
    ).when(
        F.col("playcount_cast").isNull(),
        (F.col("prev_value") + F.col("next_value")) / 2
    ).otherwise(top_artists_df.playcount_cast.cast("int"))
)




In [8]:
top_artists_df = top_artists_df.drop("playcount", "playcount_cast", "prev_value", "next_value")




In [4]:
#store cleaned data into s3
user_df.write.parquet("s3://music-preference-bucket/cleaned_user_data/", mode="overwrite")
top_artists_df.write.parquet("s3://music-preference-bucket/cleaned_top_artists/", mode="overwrite")

NameError: name 'user_df' is not defined


<h2>Step 3: create sequence to be fed into word2vec model</h2>

In [9]:
from pyspark.sql import functions as F
#create column to store weight of each artist
weighted_df = top_artists_df.withColumn("weight", F.round(1 + F.log10(F.col('cleaned_playcount'))).cast("int"))




In [10]:
#create column that repeats the artist's name according to weight
user_sequence_df = weighted_df.withColumn("artist_weighted", F.array_repeat("artist_name", F.col("weight")))




In [11]:
#explode the artist_weighted array into multiple rows
expanded_df = (
    user_sequence_df
        .withColumn("artist", F.explode("artist_weighted"))
)




In [12]:
user_sequences = expanded_df.select("user_id", "rank", "artist").orderBy("user_id", "rank").groupBy("user_id").agg(F.collect_list("artist").alias("user_sequence"))




In [13]:
user_sequences.show(5)

+-------+--------------------+
|user_id|       user_sequence|
+-------+--------------------+
|     12|[O√∂phoi, O√∂phoi...|
|     18|[The Dillinger Es...|
|     38|[Bondage Fairies,...|
|     67|[A Day to Remembe...|
|     70|[U2, U2, U2, U2, ...|
+-------+--------------------+
only showing top 5 rows


<h2>Step 4: use word2vec to form embedding</h2>

In [14]:
from pyspark.ml.feature import Word2Vec
dims = 128

w2v_model = Word2Vec(
    vectorSize = dims,
    windowSize = 5,
    minCount = 2,
    numPartitions = 4,
    inputCol = "user_sequence",
    outputCol = "user_embedding"
)




In [15]:
w2v_model = w2v_model.fit(user_sequences)




<h2>Step 5: Store model into s3</h2>

In [35]:
import boto3
w2v_model.getVectors().write.parquet("s3://music-preference-bucket/artist_embeddings/", mode="overwrite")




In [41]:
w2v_model.getVectors().columns

['word', 'vector']


In [None]:
artist_embeddings_df = w2v_model.getVectors()

<h2>Step 6: create embedding for each user</h2>

In [18]:
artist_embeddings_df = spark.read.parquet("s3://music-preference-bucket/artist_embeddings/")
user_df = spark.read.parquet("s3://music-preference-bucket/cleaned_user_data/")
top_artists_df = spark.read.parquet("s3://music-preference-bucket/cleaned_top_artists/")




In [19]:
spark.conf.set("spark.sql.shuffle.partitions", "1000")
spark.conf.set("spark.sql.adaptive.enabled", "true")




In [20]:
#make artist embedding vector into array
artist_embeddings_df = artist_embeddings_df.select(
    F.col("word").alias("artist_name"),
    vector_to_array("vector").alias("emb")
)




In [21]:
#add artist embedding and weight to top_artists_df 
user_artist = (
    top_artists_df
    .join(artist_embeddings_df, on="artist_name", how="inner")
    .withColumn("weight", F.log10(F.col("cleaned_playcount") + 1))
    .select("user_id", "rank", "weight", "emb")
)




In [22]:
#repartition to avoide spill
user_artist = (
    user_artist
    .repartition(1000, "user_id")
)

user_scaled_embeddings = (
    user_artist
    .groupBy("user_id")
    .agg(
        F.sum("weight").alias("weight_sum"),
        F.aggregate(
            F.collect_list(
                F.transform("emb", lambda x: x * F.col("weight"))
            ),
            F.array_repeat(F.lit(0.0), 128),
            lambda acc, x: F.transform(acc, lambda v, i: v + x[i])
        ).alias("sum_vec")
    )
)




In [23]:
#produce the weighted mean
user_embeddings_df = user_scaled_embeddings.select(
    "user_id",
    F.transform(
        "sum_vec",
        lambda x: x / F.col("weight_sum")
    ).alias("user_embedding")
)




In [24]:
user_embeddings_df = user_embeddings_df.join(user_df, "user_id", "inner")

(
    user_embeddings_df
    .repartition(500, "user_id")
    .write
    .mode("overwrite")
    .parquet("s3://music-preference-bucket/user_embeddings/")
)




In [25]:
user_embeddings_df.show(5)

+-------+--------------------+------------------+---------------+
|user_id|      user_embedding|           country|total_scrobbles|
+-------+--------------------+------------------+---------------+
|    148|[0.03456595002237...|     United States|            562|
|    496|[-0.0449543348938...|Russian Federation|         143124|
|    833|[-0.1019500409192...|     United States|         104176|
|   1088|[-0.1010368947044...|     United States|          45845|
|   1342|[-0.0139159571897...|            Sweden|           9363|
+-------+--------------------+------------------+---------------+
only showing top 5 rows


In [49]:
user_embeddings_df.first()

Row(user_id=12, user_embedding=[-0.05067372235991417, 0.1007816418829873, 0.08870364404594912, -0.006708111050205171, -0.03327242270202116, -0.019926956026419685, 0.009964543948663415, -0.006480001971371798, -0.03787271198840715, 0.10043763767232697, -0.008331445356425514, -0.09697475059963026, -0.04078526977245786, 0.004657762098244807, 0.02793102165691815, -0.05218308891780549, 0.014877170603089783, -0.07451262988606067, -0.01737810529402924, 0.0365440879316109, -0.0782646253015479, 0.042127161199509634, 0.06919779272842522, 0.037412218520038974, -0.016570137145054908, 0.04536218840300717, -0.0782227337125071, -0.0033580802221653445, -0.05680549727642077, -0.02785061593556387, 0.008981319812882587, 0.04215886901176896, -0.08552669300155082, -0.04877689186301139, -0.04878971818160236, -0.060486754719238055, -0.007742160710013353, -0.08728634633140232, 0.06062000669702991, 0.05673156396420344, -0.12089280510471297, -0.030363310646490473, 0.10181451185427406, 0.03816320530519724, -0.023

<h2>Step 7: clean up the dataset</h2>

In [3]:
user_embeddings_df = spark.read.parquet("s3://music-preference-bucket/user_embeddings/")




In [4]:
#seperaate the dimensions into columns
n_dim = len(user_embeddings_df.first().user_embedding)

for i in range(n_dim):
    user_embeddings_df = user_embeddings_df.withColumn(
        f"dim_{i}",
        F.col("user_embedding")[i]
    )
    
user_embeddings_df = user_embeddings_df.drop("user_embedding")




In [5]:
#drop users with null scrobbles then log scale total scrobbles
user_embeddings_df = user_embeddings_df.filter(~F.col("total_scrobbles").isNull())
user_embeddings_df = user_embeddings_df.withColumn(
    "log_scaled_scrobbles",
    F.log("total_scrobbles")
).drop("total_scrobbles")




In [6]:
#filter out users with no country label
user_embeddings_df = user_embeddings_df.filter(F.col("country").isNotNull() & 
                                               (~F.lower(F.trim(F.col("country"))).isin("none", "null", "")))




In [7]:
#filter out countries with less than 50 users
rare_countries = user_embeddings_df.select("country").groupBy("country").count().filter(F.col("count") <= 50).select("country")
user_embeddings_df = user_embeddings_df.join(rare_countries, on='country', how='left_anti')




In [8]:
user_embeddings_df.select("country").groupBy("country").count().orderBy(F.col("count")).show(5)

+--------------------+-----+
|             country|count|
+--------------------+-----+
|               Niger|   53|
|           Nicaragua|   53|
|            Mongolia|   54|
|Syrian Arab Republic|   54|
|Virgin Islands, B...|   56|
+--------------------+-----+
only showing top 5 rows


<h2>Step 9: divide train validation test datasets</h2>

In [21]:
user_embeddings_df = user_embeddings_df.withColumn(
    "bucket",
    F.expr("pmod(hash(user_id), 100)")
)




In [22]:
train_df = user_embeddings_df.filter(F.col("bucket") < 80).drop("bucket")
val_df   = user_embeddings_df.filter((F.col("bucket") >= 80) & (F.col("bucket") < 90)).drop("bucket")
test_df  = user_embeddings_df.filter(F.col("bucket") >= 90).drop("bucket")




In [28]:
#check that all countries are in each dataset
print("total countries: ", user_embeddings_df.select("country").distinct().count())
print("training dataset: ", train_df.select("country").distinct().count())
print("test dataset: ", test_df.select("country").distinct().count())
print("validation dataset: ", val_df.select("country").distinct().count())

total countries:  119
training dataset:  119
test dataset:  119
validation dataset:  119


In [29]:
#check number of rows in each dataset
print("total countries: ", user_embeddings_df.count())
print("training dataset: ", train_df.count())
print("test dataset: ", test_df.count())
print("validation dataset: ", val_df.count())

total countries:  319554
training dataset:  255437
test dataset:  31994
validation dataset:  32123


In [None]:
(
    train_df
    .write
    .mode("overwrite")
    .parquet("s3://music-preference-bucket/train/")
)


(
    val_df
    .write
    .mode("overwrite")
    .parquet("s3://music-preference-bucket/validation/")
)

(
    test_df
    .write
    .mode("overwrite")
    .parquet("s3://music-preference-bucket/test/")
)