# ALS 알고리즘을 사용한 Artist recommendation


In [1]:
println(spark.version)
println(sc.master)
println(sc.sparkUser)


In [2]:
%sh

mkdir -p /yms060-spark/sungjin/spark/data/audio

wget -O /yms060-spark/sungjin/spark/data/audio/profiledata_06-May-2005.tar.gz https://storage.googleapis.com/aas-data-sets/profiledata_06-May-2005.tar.gz


In [3]:
%sh

cd /yms060-spark/sungjin/spark/data/audio
tar xvfz ./profiledata_06-May-2005.tar.gz
ls -alh


In [4]:
%sh

ls -alh /yms060-spark/sungjin/spark/data/audio/profiledata_06-May-2005


In [5]:
%sh

# export HADOOP_USER_NAME=spark
yms060-spark/sungjin/spark/hadoop3/bin/hdfs dfs -mkdir -p yms060-spark/sungjin/spark/data/audio
yms060-spark/sungjin/spark/hadoop3/bin/hdfs dfs -put -f yms060-spark/sungjin/spark/data/audio/profiledata_06-May-2005 yms060-spark/sungjin/spark/data/audio

yms060-spark/sungjin/spark/hadoop3/bin/hdfs dfs -ls -h yms060-spark/sungjin/spark/data/audio
yms060-spark/sungjin/spark/hadoop3/bin/hdfs dfs -ls -h yms060-spark/sungjin/spark/data/audio/profiledata_06-May-2005


In [6]:
val userArtistDS = spark
    .read
    .textFile("/yms060-spark/sungjin/spark/data/audio/profiledata_06-May-2005/user_artist_data.txt")  //--spark.read.textFile(path)....

println(userArtistDS.count())
userArtistDS.printSchema()
userArtistDS.show()
z.show(userArtistDS.limit(20))


In [7]:
val userArtistCSVDF = spark
    .read
    .option("sep", " ")
    .csv("hdfs://yms060-spark:9000/sungjin/data/audio/profiledata_06-May-2005/user_artist_data.txt") 
    .toDF("userid", "artistid", "playcount")

println(userArtistCSVDF.count())
println(userArtistDS.count() - userArtistCSVDF.count())
userArtistCSVDF.printSchema()
userArtistCSVDF.show()
z.show(userArtistCSVDF.limit(20))


In [8]:
userArtistCSVDF.printSchema


In [9]:
userArtistCSVDF.describe().show()  //--DataFrame.describe()....


In [10]:
userArtistCSVDF.summary().show()  //--DataFrame.summary()....


In [11]:
val userArtistCSVDF2 = userArtistCSVDF
    .selectExpr("cast(userid as int)", "cast(artistid as int)", "cast(playcount as int)")  //--cast(col as type)....

userArtistCSVDF2.printSchema


In [12]:
userArtistCSVDF2.summary().show()  //--DataFrame.summary()....


In [13]:
userArtistCSVDF2
    .where("playcount = 439771")
    .show()


In [14]:
// 음악 한곡당 4분이라 가정
4 * 439771 / 60F/ 24L/ 365F


In [15]:
6 * 30 * 24 * 60 / 4


In [16]:
userArtistCSVDF2
    .where($"playcount" > 64800)
    .withColumn("playyear", round('playcount * 4 / 60F / 24L / 365F, 3))
    .orderBy('playcount.desc)
    .show()


In [17]:
val userArtistCSVDF3 = userArtistCSVDF2
    .filter("playcount <= 64800")

userArtistCSVDF3.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
println(userArtistCSVDF3.count())
userArtistCSVDF3.printSchema()
userArtistCSVDF3.show()
z.show(userArtistCSVDF3.limit(20))


In [18]:
println(userArtistDS.count() - userArtistCSVDF3.count())


In [19]:
val artistDS = spark
    .read
    .textFile("hdfs://yms060-spark:9000/sungjin/data/audio/profiledata_06-May-2005/artist_data.txt")

println(artistDS.count())
artistDS.printSchema()
artistDS.show(truncate=false)
z.show(artistDS.limit(20))


In [20]:
val artistDF = spark
    .read
    .option("sep", "\t")
    .option("inferSchema", true)  //--inferSchema => true....
    .csv("hdfs://yms060-spark:9000/sungjin/data/audio/profiledata_06-May-2005/artist_data.txt")
    .toDF("artistid", "artistname")
    
println(artistDF.count())
println(artistDS.count() - artistDF.count())
artistDF.printSchema()
artistDF.show(false)
z.show(artistDF.limit(20))


In [21]:
artistDF.summary().show()  //--DataFrame.summary()....


In [22]:
artistDF.filter(row => {
    val artistid = row.getString(0)
    val artistname = row.getString(1)
    try {
        artistid.toInt
        false
    } catch {
        case e:Exception => true
    }
    
}).show(false)


In [23]:
val artistFinal = artistDF.filter(row => {
    val artistid = row.getString(0)
    val artistname = row.getString(1)
    try {
        artistid.toInt
        true
    } catch {
        case e:Exception => false
    }
})
.where("artistid is not null")
.where("artistname is not null")
.withColumn("artistid", expr("cast(artistid as int)"))

println(artistFinal.count())
artistFinal.printSchema()
artistFinal.show(false)
z.show(artistFinal.limit(20))


In [24]:
artistFinal.summary().show()


In [25]:
artistFinal.where("artistname is null").show()


In [26]:
artistFinal.where("artistname in ('33', '304', '1988')").show()


In [27]:
artistDS.where("value like '1335772%' or value like '1344623%' or value like '2032179%'").show(false)


In [28]:
artistFinal.where("artistname in ('', '￿￿￿￿￿￿￿￿￿￿￿￿くȁ')").show()


In [29]:
artistDS.where("value like '1165062%' or value like '10495051%'").show(false)


In [30]:
println(artistDS.count() - artistFinal.count())


In [31]:
val artistAliasDS = spark
    .read
    .textFile("hdfs://yms060-spark:9000/sungjin/data/audio/profiledata_06-May-2005/artist_alias.txt")
    
println(artistAliasDS.count())
artistAliasDS.printSchema()
artistAliasDS.show()
z.show(artistAliasDS.limit(20))


In [32]:
val artistAliasDF = spark
    .read
    .option("sep", "\t")
    .option("inferSchema", true) 
    .csv("hdfs://yms060-spark:9000/sungjin/data/audio/profiledata_06-May-2005/artist_alias.txt")
    .toDF("badid", "goodid")
    
println(artistAliasDF.count())
artistAliasDF.printSchema()
artistAliasDF.show()
z.show(artistAliasDF.limit(20))


In [33]:
artistAliasDF.summary().show()


In [34]:
artistAliasDF.where("badid is null").show(false)
artistAliasDF.filter("goodid is null").show(false)


In [35]:
val artistAliasFinal = artistAliasDF
    .filter("badid is not null")
    .filter("goodid is not null")

println(artistAliasFinal.count())
artistAliasFinal.printSchema()
artistAliasFinal.show()
z.show(artistAliasFinal.limit(20))

artistAliasFinal.createOrReplaceTempView("artistAliasFinal")


In [36]:
artistAliasFinal.summary().show(false)


In [37]:
println(artistAliasDS.count() - artistAliasFinal.count())


In [38]:
userArtistCSVDF3.printSchema()
artistAliasFinal.printSchema()

userArtistCSVDF3.createOrReplaceTempView("userArtistCSVDF3")
artistAliasFinal.createOrReplaceTempView("artistAliasFinal")


In [39]:
%sql

select
    distinct(badid)
from 
    artistAliasFinal
limit 20;


In [40]:
//artistAliasFinal.select(distinct($"badid")).show()  
artistAliasFinal.select(countDistinct($"badid")).show()  

spark.sql("select distinct(badid) from artistAliasFinal").show()  
spark.sql("select count(distinct(badid)) from artistAliasFinal").show() 


In [41]:
println(artistAliasFinal.count())
artistAliasFinal.select(expr("count(distinct(badid))")).show()


In [42]:
%sql

select 
    *
from
    userArtistCSVDF3 ua
    left outer join
    artistAliasFinal aa
    on 
    ua.artistid = aa.badid
where
    true
limit 20;


In [43]:
%sql

select 
    ua.*,
    aa.*,
    case
        when aa.badid is not null then aa.goodid
        else ua.artistid
    end
    as artistid2
from
    userArtistCSVDF3 ua
    left outer join
    artistAliasFinal aa
    on 
    ua.artistid = aa.badid
where
    true
--and aa.badid is not null
limit 20;


In [44]:
%sql

select 
    ua.userid,
    case
        when aa.badid is not null then aa.goodid
        else ua.artistid
    end
    as artistid,
    ua.playcount
from
    userArtistCSVDF3 ua
    left outer join
    artistAliasFinal aa
    on 
    ua.artistid = aa.badid
where
    true
limit 20;


In [45]:
val userArtistCSVDF4 = spark.sql("""

select 
    ua.userid,
    case
        when aa.badid is not null then aa.goodid
        else ua.artistid
    end
    as artistid,
    ua.playcount
from
    userArtistCSVDF3 ua
    left outer join
    artistAliasFinal aa
    on 
    ua.artistid = aa.badid
where
    true
--and aa.badid is not null

""")

println(userArtistCSVDF4.count())
userArtistCSVDF4.printSchema()
userArtistCSVDF4.show()
z.show(userArtistCSVDF4.limit(20))


In [46]:
println(userArtistCSVDF3.count() - userArtistFinal.count())


In [47]:
println(userArtistCSVDF3.select("artistid").distinct().count() - userArtistFinal.select("artistid").distinct().count())


In [48]:
createOrReplaceTempViewcreateOrReplaceTempView%sql

select 
    userid,
    artistid,
    count(playcount) as cnt
from
    userArtistFinal
group by userid, artistid
having
    true
and cnt > 1
order by cnt desc
limit 20;


In [49]:
%sql

select 
    *
from
    userArtistFinal
where
    true
and userid = 2133748
and artistid = 1018110
;


In [50]:
%sql

select
    *
from 
    artistAliasFinal
where
    true
and goodid = 1018110
;


In [51]:
%sql

select 
    userid,
    artistid,
    sum(playcount) as playcount
from
    userArtistFinal
group by userid, artistid
order by playcount desc
limit 20;


In [52]:
val userArtistFinal2 = userArtistFinal
    .groupBy("userid", "artistid")
    .agg(sum("playcount").as("playcount"))
    .filter("playcount <= 64800") 

userArtistFinal2.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)    
println(userArtistFinal2.count())
userArtistFinal2.printSchema()
userArtistFinal2.show()
z.show(userArtistFinal2.limit(20))

userArtistFinal2.createOrReplaceTempView("userArtistFinal2")


In [53]:
println(userArtistFinal.count() - userArtistFinal2.count())


In [54]:
userArtistFinal2.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) // userid_artistid_playcount
println(userArtistFinal2.count())

artistFinal.cache()  // artistid_name
println(artistFinal.count())
artistAliasFinal.persist()  // badid_goodid
println(artistAliasFinal.count())


In [55]:
userArtistFinal2.unpersist()

artistFinal.unpersist()
artistAliasFinal.unpersist()


In [56]:
%sql

select 
    *
from
    mycatalog.mykeyspace.user_artist_data
limit 20;


In [57]:
userArtistFinal2.printSchema

userArtistFinal2
    .limit(100000) 
    .writeTo("mycatalog.mykeyspace.user_artist_data")
    .append
    

In [58]:
val userArtistFinal3 = spark.table("mycatalog.mykeyspace.user_artist_data")

userArtistFinal3.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) 
println(userArtistFinal3.count())

userArtistFinal3.printSchema
userArtistFinal3.show


In [59]:
import org.apache.spark.ml.recommendation.ALS

val als = new ALS()
    .setUserCol("userid")
    .setItemCol("artistid")
    .setRatingCol("playcount")
    .setImplicitPrefs(true)
    .setMaxIter(5)
    .setRegParam(0.1)  
    .setAlpha(40)  
    .setRank(10) 
    .setColdStartStrategy("drop") 
    .setSeed(11L)


In [60]:
println("\n>>>> als.explainParams()")
println(als.explainParams())

println("\n\n>>>> als.extractParamMap()")
als.extractParamMap()


In [61]:
userArtistFinal3.printSchema()

val alsModel = als.fit(userArtistFinal3)


In [62]:
userArtistFinal3
    .groupBy("userid")
    .agg(count("artistid").as("count_artist"), sum("playcount").as("sum_playcount"))
    .where("count_artist >= 20")
    .orderBy($"count_artist".asc, $"sum_playcount".desc)
    .show()


In [63]:
val userDS = Seq(1001440, 2010008, 987654321)
    .toDF("userID")
    .as[Int]  // Dataset으로 형변환

userDS.printSchema
userDS.show(false)

// 추천 5개
val recommendedForSomeUsersDF = alsModel.recommendForUserSubset(userDS, 5)
recommendedForSomeUsersDF.printSchema
recommendedForSomeUsersDF.show(false)


In [64]:
val userDS = Seq(1001440, 2010008, 987654321)
    .toDF("userID")
    .as[Int]

userDS.printSchema
userDS.show(false)

//추천 5개
val recommendedForSomeUsersDF = alsModel.recommendForUserSubset(userDS, 5)
recommendedForSomeUsersDF.printSchema
recommendedForSomeUsersDF.show(false)

val recommendedForSomeUsersDF2 = recommendedForSomeUsersDF
    .withColumn("recommend", explode($"recommendations"))
    .withColumn("artistid", $"recommend.artistid")
    .withColumn("rating", $"recommend.rating")

recommendedForSomeUsersDF2.printSchema()
recommendedForSomeUsersDF2.show(false)
println(recommendedForSomeUsersDF2.count())

val recommendedForSomeUsersDF3 = recommendedForSomeUsersDF2
    .drop("recommendations", "recommend")

recommendedForSomeUsersDF3.printSchema()
recommendedForSomeUsersDF3.show(false)
println(recommendedForSomeUsersDF3.count())

val recommendedForSomeUsersDF4 = recommendedForSomeUsersDF3.as("reco")
    .join(artistFinal.as("art"), $"reco.artistid" === $"art.artistid")
    .orderBy($"userid".asc, $"rating".desc)

recommendedForSomeUsersDF4.show(false)
z.show(recommendedForSomeUsersDF4)


In [65]:
val historyForSomeUsersDF = userArtistFinal3
    .where("userid in (1001440, 2010008)").as("history")
    .join(artistFinal.as("art"), $"history.artistid" === $"art.artistid")
    .orderBy($"userid".asc, $"playcount".desc)

historyForSomeUsersDF.show(40, false)
z.show(historyForSomeUsersDF)


In [66]:
val predictionsDF = alsModel.transform(userArtistFinal3)
predictionsDF.printSchema

predictionsDF
    .orderBy($"prediction".desc)
    .show(false)

predictionsDF
    .orderBy($"prediction".asc)
    .show(false)
    

In [67]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val regEval = new RegressionEvaluator()
    .setLabelCol("playcount")
    .setPredictionCol("prediction")
    .setMetricName("rmse")


In [68]:
val rmse = regEval.evaluate(predictionsDF)

println(rmse)


In [69]:
import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline()
    .setStages(Array(als))


In [70]:
import org.apache.spark.ml.tuning.ParamGridBuilder

val paramMaps = new ParamGridBuilder()
    .addGrid(als.alpha, Array(40.0, 1.0))
    .addGrid(als.rank, Array(2, 3))
    .addGrid(als.regParam, Array(1.0, 0.01))
    .build()

println(paramMaps.mkString(", "))


In [71]:
import org.apache.spark.ml.tuning.CrossValidator

val cv = new CrossValidator()
    .setEstimator(pipeline)  
    .setEstimatorParamMaps(paramMaps)  
    .setEvaluator(regEval)  
    .setNumFolds(2) 


In [72]:
val Array(trainDS, testDS) = userArtistFinal3.randomSplit(Array(0.7, 0.3), 11L)

println(userArtistFinal3.count)

trainDS.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
println(trainDS.count)

testDS.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
println(testDS.count)


In [73]:
import org.apache.spark.ml.tuning.CrossValidatorModel

val cvModel = cv.fit(trainDS)


In [74]:
println("\n>>>> Best Model : \n" + cvModel.bestModel)
println("\n>>>> Avg Metrics : \n" + cvModel.avgMetrics.mkString("\n"))
println("\n>>>> Estimator ParamMaps : \n" + cvModel.getEstimatorParamMaps.mkString("\n"))


In [75]:
val zippedParamAndMetrics = cvModel.getEstimatorParamMaps
    .zip(cvModel.avgMetrics)
    .sortBy(_._2)

println("\n>>>> Zipped Param And Metrics : \n" + zippedParamAndMetrics.mkString("\n"))


In [76]:
val predictionsDF2 = cvModel.transform(testDS)
predictionsDF2.show()

val rmse2 = regEval.evaluate(predictionsDF2)
println(rmse2)


In [77]:
cvModel
    .write
    .overwrite
    .save("hdfs://yms060-spark:9000/sungjin/model/audio/profiledata_06-May-2005/als_crossvalidator")


In [78]:
import org.apache.spark.ml.recommendation._

println("\n>>>> CrossValidatorModel : \n" + cvModel)
println("\n>>>> CrossValidatorModel.bestModel : \n" + cvModel.bestModel)

val pipelineBestModel = cvModel.bestModel.asInstanceOf[PipelineModel]

println("\n>>>> PipelineModel.stages(0) : \n" + pipelineBestModel.stages(0))

val alsBestModel = pipelineBestModel.stages(0).asInstanceOf[ALSModel]

alsBestModel
    .write
    .overwrite
    .save("hdfs://yms060-spark:9000/sungjin/model/audio/profiledata_06-May-2005/als")


In [79]:
val loadedBestALSModel = ALSModel.load("hdfs://yms060-spark:9000/sungjin/model/audio/profiledata_06-May-2005/als")

println("\n>>>> model.extractParamMap : \n" + loadedBestALSModel.extractParamMap)
println("\n>>>> model.explainParams : \n" + loadedBestALSModel.explainParams)


In [80]:
val userDS2 = Seq(1001440, 2010008, 987654321)
    .toDF("userID")
    .as[Int]

userDS2.printSchema
userDS2.show(false)

val recommendedForSomeUsersDF2 = loadedBestALSModel.recommendForUserSubset(userDS2, 5)
recommendedForSomeUsersDF2.printSchema
recommendedForSomeUsersDF2.show(false)
z.show(recommendedForSomeUsersDF2)


In [81]:
val userDS2 = Seq(1001440, 2010008, 987654321)
    .toDF("userID")
    .as[Int] 

userDS2.printSchema
userDS2.show(false)

val recommendedForSomeUsersDF2 = loadedBestALSModel.recommendForUserSubset(userDS2, 5)
recommendedForSomeUsersDF2.printSchema
recommendedForSomeUsersDF2.show(false)

val recommendedForSomeUsersDF22 = recommendedForSomeUsersDF2
    .withColumn("recommend", explode($"recommendations"))
    .withColumn("artistid", $"recommend.artistid")
    .withColumn("rating", $"recommend.rating")

recommendedForSomeUsersDF22.printSchema()
recommendedForSomeUsersDF22.show(false)
println(recommendedForSomeUsersDF22.count())

val recommendedForSomeUsersDF33 = recommendedForSomeUsersDF22
    .drop("recommendations", "recommend")

recommendedForSomeUsersDF33.printSchema()
recommendedForSomeUsersDF33.show(false)
println(recommendedForSomeUsersDF33.count())

val recommendedForSomeUsersDF44 = recommendedForSomeUsersDF33.as("reco")
    .join(artistFinal.as("art"), $"reco.artistid" === $"art.artistid")
    .orderBy($"userid".asc, $"rating".desc)

recommendedForSomeUsersDF44.show(false)
z.show(recommendedForSomeUsersDF44)


In [82]:
val recommendedForAllUsersDF = loadedBestALSModel.recommendForAllUsers(5)

recommendedForAllUsersDF.printSchema
recommendedForAllUsersDF.show(false)
println(recommendedForAllUsersDF.count)


In [83]:
val recommendedForAllUsersDF2 = recommendedForAllUsersDF
    .withColumn("recommendation", explode($"recommendations"))
    .withColumn("artistid", $"recommendation.artistid")
    
recommendedForAllUsersDF2.printSchema
recommendedForAllUsersDF2.show(false)
println(recommendedForAllUsersDF2.count)


In [84]:
val recommendedForAllUsersDF3 = recommendedForAllUsersDF2
    .groupBy("userid")
    .agg(collect_list("artistid").as("artistid_list"))
    .withColumn("recommended_artistids", array_join(col("artistid_list"), " "))
    
recommendedForAllUsersDF3.printSchema
recommendedForAllUsersDF3.show(false)
println(recommendedForAllUsersDF3.count)


In [85]:
val recommendedForAllUsersDF4 = recommendedForAllUsersDF3.select($"userid", $"recommended_artistids")
recommendedForAllUsersDF4.printSchema
recommendedForAllUsersDF4.show(false)
println(recommendedForAllUsersDF4.count)


In [86]:
recommendedForAllUsersDF4
    .write
    .format("org.apache.spark.sql.redis")
    .option("table", "user_artists")
    .option("key.column", "userid")
    .mode("append")
    .save()


In [87]:
val recommendedForAllUsersDF5 = spark
    .read
    .format("org.apache.spark.sql.redis")
    .option("table", "user_artists")
    .option("key.column", "userid")
    .load()

recommendedForAllUsersDF5.printSchema
recommendedForAllUsersDF5.show(false)
println(recommendedForAllUsersDF5.count)


In [88]:
val recommendedForAllUsersDF4 = spark
    .read
    .format("org.apache.spark.sql.redis")
    .option("keys.pattern", "user_artists:2127894")
    .option("key.column", "userid")
    .option("infer.schema", true)
    .load()

recommendedForAllUsersDF4.printSchema
recommendedForAllUsersDF4.show(false)
