In [None]:
# Import Library

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark import StorageLevel
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.cluster import Cluster
from pyspark.sql.types import IntegerType

In [None]:
# Spark session 생성

conf = SparkConf()
conf.set('spark.app.name', 'recomm_music')
conf.set("spark.jars.packages", 'com.redislabs:spark-redis_2.12:3.1.0')
conf.set("spark.redis.connection.host", "127.0.0.1")
conf.set("spark.redis.connection.port", "6379")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

%load_ext sparksql_magic
%config SparkSql.limit= 1000

In [None]:
# Spark 버전 및 정보 확인
print(spark.version)
print(spark.sparkContext.master)
print(spark.sparkContext.sparkUser())
print(f'hadoop version = {spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}')

### 1. 데이터 수집
- 다운로드 및 HDFS 업로드
- ```https://storage.googleapis.com/aas-data-sets/profiledata_06-May-2005.tar.gz```

In [None]:
!mkdir -p "/Users/yooinsun/data/audio"
!wget -O /Users/yooinsun/data/audio/profiledata_06-May-2005.tar.gz https://storage.googleapis.com/aas-data-sets/profiledata_06-May-2005.tar.gz

In [None]:
!tar xvfz ../data/audio/profiledata_06-May-2005.tar.gz -C ../data/audio
!ls -alh ../data/audio/profiledata_06-May-2005

In [None]:
#export HADOOP_USER_NAME=spark
!../hadoop-3.3.6/bin/hdfs dfs -mkdir -p /data/audio
!../hadoop-3.3.6/bin/hdfs dfs -put -f /data/audio/profiledata_06-May-2005 /data/audio

!../hadoop-3.3.6/bin/hdfs dfs -ls -h /data/audio
!../hadoop-3.3.6/bin/hdfs dfs -ls -h /data/audio/profiledata_06-May-2005

In [None]:
!../hadoop-3.3.6/bin/hdfs dfs -cat /data/audio/profiledata_06-May-2005/README.txt

In [None]:
!../hadoop-3.3.6/bin/hdfs dfs -cat /data/audio/profiledata_06-May-2005/user_artist_data.txt

In [None]:
!../hadoop-3.3.6/bin/hdfs dfs -cat /data/audio/profiledata_06-May-2005/artist_data.txt

In [None]:
!../hadoop-3.3.6/bin/hdfs dfs -cat /data/audio/profiledata_06-May-2005/artist_alias.txt

### 2. 데이터 탐색, 데이터 정렬


```
"Music Listening Dataset" (Audioscrobbler.com => https://last.fm/) 
-> 6 May 2005, for around 150,000 real people
-> Audioscrobbler is receiving around 2 million song submissions per day
(https://ko.wikipedia.org/wiki/%EB%9D%BC%EC%8A%A4%ED%8A%B8_FM, https://namu.wiki/w/Last.fm)
''''
scrobble	미국·영국 [|skrɒbəl]  [VERB] (of an online music service) to record a listener's musical preferences and recommend similar music that he or she might enjoy
```

- user_artist_data.txt
    - 3 columns: userid artistid playcount => 공백(' ')으로 구분됨
    
    - 형식 : ```유저(공백)아티스트(공백)플레이카운트```
        - ex) 1052430 2032445 12 =>	2032445	신화 (Shinhwa)
     
- artist_data.txt
    - 2 columns: artistid artist_name	=> 탭('\t')으로 구분됨
    
    - 형식 :  ```아티스트(탭)이름```
        - ex) 한글 아티스트 이름 (다나, 클래지콰이, 윤건, 엠씨더맥스, 음악가 없음, 유미, 김성필, 엠 투 엠, 신부수업 OST, 데이슬리퍼, 신화 (Shinhwa), 베이비 복스, 윤도현)
        - ex) 2032445	신화 (Shinhwa)
    
- artist_alias.txt
    - 2 columns: badid goodid	=> 탭('\t')으로 구분됨
    - known incorrectly spelt artists and the correct artist id. you can correct errors in user_artist_data as you read it in using this file

    - 형식 : ```배드아이디(탭)굿아이디```    
        - ex) 2032445	6834637
        - ex) 2032445	신화 (Shinhwa) : 6834637	신화
        - ex) 10103564	베이비 복스 : 1101679	Baby V.O.X
        - ex) 2101369	Baby Vox 3 : 1101679	Baby V.O.X
        - ex) 1028894	Baby Vox : 1101679	Baby V.O.X

user_artist_data.txt 정보 확인
- text method 로 txt 읽기
- csv method 로 txt 읽기
- describe(), summary() 를 통해, 데이터 살펴보기

In [None]:
# python
userArtistDS = spark.read.text("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/user_artist_data.txt")

print(userArtistDS.count())
userArtistDS.printSchema()
userArtistDS.show()

# #scala
# val userArtistDS = spark
#     .read
#     .textFile("hdfs://spark-master-01:9000/data/audio/profiledata_06-May-2005/user_artist_data.txt")  //--spark.read.textFile(path)
# 
# println(userArtistDS.count())
# userArtistDS.printSchema()
# userArtistDS.show()
# z.show(userArtistDS.limit(20))

In [None]:
# python
userArtistCSVDF = spark.read.option("sep", " ").csv("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/user_artist_data.txt").toDF("userid", "artistid", "playcount")

print(userArtistCSVDF.count())
print(userArtistDS.count() - userArtistCSVDF.count())
userArtistCSVDF.printSchema()
userArtistCSVDF.show()

# # scalar
# val userArtistCSVDF = spark
#     .read
#     .option("sep", " ")
#     .csv("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/user_artist_data.txt")  //--spark.read.csv(path)....
#     .toDF("userid", "artistid", "playcount")
# 
# println(userArtistCSVDF.count())
# println(userArtistDS.count() - userArtistCSVDF.count())
# userArtistCSVDF.printSchema()
# userArtistCSVDF.show()
# z.show(userArtistCSVDF.limit(20))


In [None]:
userArtistCSVDF.describe().show()  #--DataFrame.describe()....

In [None]:
userArtistCSVDF.summary().show()  #-DataFrame.summary()....

통계정보 확인 했을떄, 이상데이터 발견
- 확인 방법: describe() 와 summary() 를 활용하여 데이터 통계 확인
- 이상 데이터: min, max, 25%, 50%, 75% 수치가 비정상적임을 확인함
- 조치 방안: 각 column type 을 string -> int 로 변경

In [None]:
# python
userArtistCSVDF2 = userArtistCSVDF.selectExpr("cast(userid as int)", "cast(artistid as int)", "cast(playcount as int)")
userArtistCSVDF2.printSchema()

# scala
# val userArtistCSVDF2 = userArtistCSVDF
#     .selectExpr("cast(userid as int)", "cast(artistid as int)", "cast(playcount as int)")  //--cast(col as type)....
# 
# userArtistCSVDF2.printSchema

In [None]:
# type(str->int) 변경 후, 통계정보 재확인
userArtistCSVDF2.summary().show()


In [None]:
userArtistCSVDF2.where("playcount = 439771").show()

In [None]:
'''
[ playcount = 439771의 의미? ]

=> 음악 한곡당 4분이라 가정할 경우, 쉬지않고 연속으로 약 3.34년 동안 음악을 들은 것
'''

print((4 * 439771) /60/ 24/ 365)


In [None]:
'''
[ 해당 프로젝트에서 max palycount 값 정하는 기준 ]

- 기준은 6개월이 최대치라고 설정함 + 음악 한곡당 4분이라 가정함
- max plyacount 값은 64800 으로 설정함
'''

print((60 * 24 * 30 * 6) / 4)

새로운 Column 추가하기

In [None]:
# python
userArtistCSVDF2.where(userArtistCSVDF2.playcount > 64800).withColumn('playyear', F.round(*[userArtistCSVDF2.playcount * 4 / 60/ 24/365] , 3)).orderBy('playcount', ascending=False).show()

# # scala
# userArtistCSVDF2
#     .where($"playcount" > 64800)
#     .withColumn("playyear", round('playcount * 4 / 60F / 24L / 365F, 3))
#     .orderBy('playcount.desc)
#     .show()

In [None]:
# python
userArtistCSVDF3 = userArtistCSVDF2.filter(userArtistCSVDF2.playcount <= 64800)
userArtistCSVDF3.persist(StorageLevel.MEMORY_ONLY)
print(userArtistCSVDF3.count())
userArtistCSVDF3.printSchema()
userArtistCSVDF3.show()

#scala
# val userArtistCSVDF3 = userArtistCSVDF2
#     .filter("playcount <= 64800")
# 
# userArtistCSVDF3.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
# println(userArtistCSVDF3.count())
# userArtistCSVDF3.printSchema()
# userArtistCSVDF3.show()
# z.show(userArtistCSVDF3.limit(20))


In [None]:
userArtistCSVDF3.storageLevel

In [None]:
print(userArtistDS.count() - userArtistCSVDF3.count())

artist_data.txt (아티스트_이름)
- text method 로 txt 읽기
- csv method 로 txt 읽기
- describe(), summary() 를 통해, 데이터 살펴보기


In [None]:
#python
artistDS = spark.read.text("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/artist_data.txt") # "hdfs://spark-master-01:9000/data/audio/profiledata_06-May-2005/artist_data.txt"

print(artistDS.count())
artistDS.printSchema()
artistDS.show(truncate=False)

# scala
# val artistDS = spark
#     .read
#     .textFile("hdfs://spark-master-01:9000/data/audio/profiledata_06-May-2005/artist_data.txt")
# 
# println(artistDS.count())
# artistDS.printSchema()
# artistDS.show(truncate=false)
# z.show(artistDS.limit(20))


In [None]:
# python
artistDF = spark.read.option("sep", "\t").option("inferSchema", True).csv("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/artist_data.txt").toDF("artistid", "artistname") # inferSchema 옵션은, schema를 Spark이 자동으로 알아내는 경우 사용
    
print(artistDF.count())
print(artistDS.count() - artistDF.count())
artistDF.printSchema()
artistDF.show()

# scala
# val artistDF = spark
#     .read
#     .option("sep", "\t")
#     .option("inferSchema", true)  //--inferSchema => true....
#     .csv("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/artist_data.txt")
#     .toDF("artistid", "artistname")
#     
# println(artistDF.count())
# println(artistDS.count() - artistDF.count())
# artistDF.printSchema()
# artistDF.show(false)
# z.show(artistDF.limit(20))

In [None]:
artistDF.summary().show() 

오류 데이터(artistid toInt 타입변환이 안되는 경우) 확인

In [None]:
# python
@F.udf
def str_to_int(s):
    try:
        return int(s)
    except:
        return None


artistDF.withColumn("artistid_toInt",str_to_int(artistDF.artistid)).where("artistid_toInt is null").show()
artistDF.withColumn("artistid_toInt",str_to_int(artistDF.artistid)).where("artistid_toInt is null").summary().show()

# scala
# artistDF.filter(row => {
#     val artistid = row.getString(0)
#     val artistname = row.getString(1)
#     try {
#         artistid.toInt
#         false
#     } catch {
#         case e:Exception => true  //--int로 형변환이 되지 않는 것만 필터링해서 살펴보자....
#     }
#     
# }).show(false)

오류 데이터 제거

In [None]:
# python
artistFinal = artistDF.withColumn("artistid",str_to_int(artistDF.artistid)).where("artistid is not null").where('artistname is not null').selectExpr("cast(artistid as int) artistid", 'artistname')
print(artistFinal.count())
artistFinal.printSchema()
artistFinal.show()

# scala
# val artistFinal = artistDF.filter(row => {
#     val artistid = row.getString(0)
#     val artistname = row.getString(1)
#     try {
#         artistid.toInt
#         true
#     } catch {
#         case e:Exception => false  //--int로 형변환이 되지 않는 것은 버리자....
#     }
# })
# .where("artistid is not null")
# .where("artistname is not null")
# .withColumn("artistid", expr("cast(artistid as int)"))
# 
# println(artistFinal.count())
# artistFinal.printSchema()
# artistFinal.show(false)
# z.show(artistFinal.limit(20))

In [None]:
artistFinal.summary().show()

In [None]:
# 아티스트 이름이 null인 데이터 확인
artistFinal.where("artistname is null").show()

In [None]:
#아티스트 이름이 이상한(숫자) 데이터 확인
artistFinal.where("artistname in ('33', '304', '1988')").show()

In [None]:
#아티스트 이름이 이상한(숫자) 데이터 확인
artistDS.where("value like '1335772%' or value like '1344623%' or value like '2032179%'").show()

In [None]:
#아티스트 이름이 이상한(min/max) 데이터 확인
artistFinal.where("artistname in ('', '￿￿￿￿￿￿￿￿￿￿￿￿くȁ')").show()

In [None]:
artistFinal.where("artistname in (' ', '￿￿￿￿￿￿￿￿￿￿￿￿くȁ')").show()
artistFinal.where("artistname in ('.', '￿￿￿￿￿￿￿￿￿￿￿￿くȁ')").show()
artistFinal.where("artistname in ('', '￿￿￿￿￿￿￿￿￿￿￿￿くȁ')").show()

artistDS.where("value like '6986651%' or value like '9915481%' or value like '1025136%' or value like '1165062%'").show()

In [None]:
artistDS.where("value like '1165062%' or value like '10495051%'").show()

In [None]:
# 걸러진 데이터 건수 확인
print(artistDS.count() - artistFinal.count())

### (3) artist_alias.txt (배드아이디_굿아이디)


In [None]:
# python
artistAliasDS = spark.read.text("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/artist_alias.txt") # hdfs://spark-master-01:9000/data/audio/profiledata_06-May-2005/artist_alias.txt
    
print(artistAliasDS.count())
artistAliasDS.printSchema()
artistAliasDS.show()

#scala
# val artistAliasDS = spark
#     .read
#     .textFile("hdfs://spark-master-01:9000/data/audio/profiledata_06-May-2005/artist_alias.txt")
#     
# println(artistAliasDS.count())
# artistAliasDS.printSchema()
# artistAliasDS.show()
# z.show(artistAliasDS.limit(20))


In [None]:
# python
artistAliasDF = spark.read.option("sep", "\t").option("inferSchema", True).csv("hdfs://localhost:9000/data/audio/profiledata_06-May-2005/artist_alias.txt").toDF("badid", "goodid") # hdfs://spark-master-01:9000/data/audio/profiledata_06-May-2005/artist_alias.txt
print(artistAliasDF.count())
artistAliasDF.printSchema()
artistAliasDF.show()

# scala
# val artistAliasDF = spark
#     .read
#     .option("sep", "\t")
#     .option("inferSchema", true)  //--inferSchema => true....
#     .csv("hdfs://spark-master-01:9000/data/audio/profiledata_06-May-2005/artist_alias.txt")
#     .toDF("badid", "goodid")
#     
# println(artistAliasDF.count())
# artistAliasDF.printSchema()
# artistAliasDF.show()
# z.show(artistAliasDF.limit(20))


In [None]:
# 통계정보 확인
artistAliasDF.summary().show()

summary() 의 count 정보를 확인했을때, badid 와 goodid 의 개수가 일치하지 않음

In [None]:
artistAliasDF.where("badid is null").show(truncate=False)
artistAliasDF.filter("goodid is null").show(truncate=False)

데이터에서 Null 제거

In [None]:
# python
artistAliasFinal = artistAliasDF.filter("badid is not null").filter("goodid is not null")

print(artistAliasFinal.count())
artistAliasFinal.printSchema()
artistAliasFinal.show()

artistAliasFinal.createOrReplaceTempView("artistAliasFinal")

# scala
# val artistAliasFinal = artistAliasDF
#     .filter("badid is not null")
#     .filter("goodid is not null")
# 
# println(artistAliasFinal.count())
# artistAliasFinal.printSchema()
# artistAliasFinal.show()
# z.show(artistAliasFinal.limit(20))
# 
# artistAliasFinal.createOrReplaceTempView("artistAliasFinal")


In [None]:
artistAliasFinal.summary().show()


In [None]:
# 걸러진 데이터 건수 확인
print(artistAliasDS.count() - artistAliasFinal.count())

(4) user_artist_data.txt(유저_아티스트_플레이카운트 데이터) 에서 아티스트의 BadID를 GoodID로 변경하는 작업수행


In [None]:
# TempView 등록
userArtistCSVDF3.printSchema()
artistAliasFinal.printSchema()

userArtistCSVDF3.createOrReplaceTempView("userArtistCSVDF3")
artistAliasFinal.createOrReplaceTempView("artistAliasFinal")

In [None]:
# badid 목록 보기
%%sparksql sql_df

SELECT
    distinct(badid)
FROM
    artistAliasFinal
limit 20;

In [None]:
sql_df.show()

In [None]:
# badid 개수 및 목록 보기

# artistAliasFinal.select(distinct($"badid")).show() 
# functions에 distinct 함수 자체가 아직 없어요.... error: not found: value distinct
artistAliasFinal.select(F.count_distinct(artistAliasFinal.badid)).show() #--functions에 countDistinct 함수 존재함.... 정상동작....

#artistAliasFinal.selectExpr("distinct(badid)").show()  //--distinct(col) 에러발생.... AnalysisException: Undefined function: 'distinct'....
artistAliasFinal.selectExpr("count(distinct(badid))").show()  #count(distinct(col)) 정상동작....


spark.sql("select distinct(badid) from artistAliasFinal").show()  #distinct(col) 정상동작....
spark.sql("select count(distinct(badid)) from artistAliasFinal").show()  #count(distinct(col)) 정상동작....

In [None]:
# badid 갯수 확인
print(artistAliasFinal.count())
artistAliasFinal.selectExpr("count(distinct(badid))").show()

badid를 goodid로 변경하기 위한 join 테스트

In [None]:
%%sparksql badTogood_df_1

select
    *
from
    userArtistCSVDF3 ua
    left outer join
    artistAliasFinal aa
    on 
    ua.artistid = aa.badid
where
    True
limit 20;

In [None]:
badTogood_df_1.show()

In [None]:
%%sparksql badTogood_df_2

select 
    ua.*,
    aa.*,
    case
        when aa.badid is not null then aa.goodid
        else ua.artistid
    end
    as artistid2
from
    userArtistCSVDF3 ua
    left outer join
    artistAliasFinal aa
    on 
    ua.artistid = aa.badid
where
    true
limit 20;

In [None]:
badTogood_df_2.show()

In [None]:
%%sparksql badTogood_df_3

select 
    ua.userid,
    case
        when aa.badid is not null then aa.goodid
        else ua.artistid
    end
    as artistid,
    ua.playcount
from
    userArtistCSVDF3 ua
    left outer join
    artistAliasFinal aa
    on 
    ua.artistid = aa.badid
where
    true
--and aa.badid is not null
limit 20;


In [None]:
badTogood_df_3.show()

In [None]:
# python
userArtistCSVDF4 = spark.sql('select ua.userid, case when aa.badid is not null then aa.goodid else ua.artistid end as artistid, ua.playcount from userArtistCSVDF3 ua left outer join artistAliasFinal aa on ua.artistid = aa.badid where true')

print(userArtistCSVDF4.count())
userArtistCSVDF4.printSchema()
userArtistCSVDF4.show()

# scala
# val userArtistCSVDF4 = spark.sql("""
# 
# select 
#     ua.userid,
#     case
#         when aa.badid is not null then aa.goodid
#         else ua.artistid
#     end
#     as artistid,
#     ua.playcount
# from
#     userArtistCSVDF3 ua
#     left outer join
#     artistAliasFinal aa
#     on 
#     ua.artistid = aa.badid
# where
#     true
# --and aa.badid is not null
# """)
# 
# println(userArtistCSVDF4.count())
# userArtistCSVDF4.printSchema()
# userArtistCSVDF4.show()
# z.show(userArtistCSVDF4.limit(20))


SQL문으로 작업한 내용을 DataFrame의 API로 작업하기 => badid를 goodid로 바꾼 최종 데이터

In [None]:
# python
userArtistFinal = userArtistCSVDF3.join(artistAliasFinal, userArtistCSVDF3.artistid== artistAliasFinal.badid, "left_outer").withColumn("artistid2", F.expr("case when badid is not null then goodid else artistid end")).withColumn("artistid2", F.when(F.col("badid").isNotNull(), F.col("goodid")).otherwise(F.col("artistid"))).select('userid', F.col('artistid2').alias("artistid"), 'playcount')
# userArtistFinal = spark.sql('select * from userArtistCSVDF3 as ua left outer join artistAliasFinal as aa on ua.artistid = aa.badid').withColumn("artistid2", F.expr("case when badid is not null then goodid else artistid end")).withColumn("artistid2", F.when(F.col("aa.badid").isNotNull(), F.col("aa.goodid")).otherwise(F.col("ua.artistid"))).select('userid', F.col('artistid2').alias('artistid'), 'playcount')

userArtistFinal.persist(StorageLevel.MEMORY_ONLY)    
print(userArtistFinal.count())
userArtistFinal.printSchema()
userArtistFinal.show()

userArtistFinal.createOrReplaceTempView("userArtistFinal")

# scala
# val userArtistFinal = userArtistCSVDF3.as("ua")
#     .join(artistAliasFinal.as("aa"), $"ua.artistid" === $"aa.badid", "left_outer")
# //  .withColumn("artistid2", case when $"aa.badid" is not null then $"aa.goodid" else $"ua.artistid" end)  //--case 함수 제공 안함....
#     .withColumn("artistid2", expr("case when aa.badid is not null then aa.goodid else ua.artistid end"))  //--expr 함수 사용....
#     .withColumn("artistid2", when(col("aa.badid").isNotNull, col("aa.goodid")).otherwise(col("ua.artistid")))  //--when 함수 사용....
#     .select('userid, 'artistid2 as "artistid", 'playcount)
# 
# userArtistFinal.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)    
# println(userArtistFinal.count())
# userArtistFinal.printSchema()
# userArtistFinal.show()
# z.show(userArtistFinal.limit(20))
# 
# userArtistFinal.createOrReplaceTempView("userArtistFinal")

In [None]:
# 걸러진 데이터 건수 확인
print(userArtistCSVDF3.count() - userArtistFinal.count())

In [None]:
# 걸러진 badid 데이터 건수 확인
print(userArtistCSVDF3.select("artistid").distinct().count() - userArtistFinal.select("artistid").distinct().count())

badid를 goodid로 바꾼 후 같은 아티스트에 대한 playcount가 2개 이상인 데이터 확인 #1

In [None]:
%%sparksql plc_check_1

select 
    userid,
    artistid,
    count(playcount) as cnt
from
    userArtistFinal
group by userid, artistid
having
    True
and cnt > 1
order by cnt DESC
limit 20;

In [None]:
%%sparksql plc_check_2

select 
    *
from
    userArtistFinal
where
    True
and userid = 2133748
and artistid = 1018110
;


In [None]:
%%sparksql plc_check_3

select
    *
from 
    artistAliasFinal
where
    True
and goodid = 1018110
;


In [None]:
plc_check_3.show()

같은 아티스트에 대한 plyacount 합치기

In [None]:
%%sparksql plc_check_4

select 
    userid,
    artistid,
    sum(playcount) as playcount
from
    userArtistFinal
group by userid, artistid
order by playcount desc
limit 20;

같은 아티스트에 대한 playcount 합치기 ( DF API로 작업하기 )

In [None]:
# python
userArtistFinal2 = userArtistFinal.groupBy("userid", "artistid").agg(F.sum('playcount').alias('playcount')).filter('playcount <= 64800') # 이전에 설정한 max playcount 값인 '64800'로 데이터 걸러내기

userArtistFinal2.printSchema()
userArtistFinal2.show()

# scala
# val userArtistFinal2 = userArtistFinal
#     .groupBy("userid", "artistid")
#     .agg(sum("playcount").as("playcount"))
#     .filter("playcount <= 64800")  //--저희의 max playcount 값 '64800'로 데이터 걸러내기....
# 
# userArtistFinal2.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)    
# println(userArtistFinal2.count())
# userArtistFinal2.printSchema()
# userArtistFinal2.show()
# z.show(userArtistFinal2.limit(20))
# 
# userArtistFinal2.createOrReplaceTempView("userArtistFinal2")

In [None]:
#  playcount가 합쳐진 데이터 건수 확인
print(userArtistFinal.count() - userArtistFinal2.count())

In [None]:
# 최종 학습 데이터 캐시

userArtistFinal2.cache() # userid_artistid_playcount

userArtistFinal2.persist(StorageLevel.MEMORY_ONLY) 
userArtistFinal2.createOrReplaceTempView("userArtistFinal2")
print(userArtistFinal2.count())

# 기타 데이터 캐시
artistFinal.cache()  # artistid_artistname
print(artistFinal.count())
artistAliasFinal.persist()  # badid_goodid
print(artistAliasFinal.count())

In [None]:
# 최종 학습 데이터 캐시 해제
userArtistFinal2.unpersist()  #userid_artistid_playcount

# 기타 데이터 캐시 해제
artistFinal.unpersist()  #artistid_artistname
artistAliasFinal.unpersist()  #badid_goodid

Cassandra 에 userArtistFinal2 저장하기 **
- apache-cassandra-4.0.11/bin/cassandra
- Table 생성

In [None]:
# cassandra 와 연결
# 참고사이트 : https://docs.datastax.com/en/developer/python-driver/3.24/getting_started/

try:
    cluster = Cluster(['127.0.0.1'], port=9042)
    session = cluster.connect(keyspace='mykeyspace')
except Exception as e:
    print(e)

In [None]:
session.execute('describe keyspaces').all()

In [None]:
# [Cassandra] Table List 조회
session.execute('describe tables').all()

In [None]:
# [Cassandra] userArtistFinal2 Table 내용 확인
session.execute('select * from user_artist_data limit 20').all()

In [None]:
# [Cassandra] Table 스키마 확인

session.execute("SELECT column_name,type FROM system_schema.columns WHERE keyspace_name ='mykeyspace' and table_name='user_artist_data'").all()

In [None]:
userArtistFinal2.show()

[Cassandra] 최종 학습 데이터 저장

In [None]:
# Table 생성
session.execute("CREATE TABLE user_artist_data2 ( userid int , artistid int , playcount int, primary key (userid, artistid))")

In [None]:


# Table 에 data insert 하기
insert_query = session.prepare("INSERT INTO user_artist_data2 (userid, artistid, playcount) VALUES (?, ?, ?)")
parameters = [(userid, artistid, playcount) for userid, artistid, playcount in userArtistFinal2.limit(100000).toLocalIterator()] # 시간 단축을 위해, 데이터 일부만 저장
execute_concurrent_with_args(session, insert_query, parameters, concurrency=50)

### 4. ALS 알고리즘 학습


```
추천 알고리즘 인 ALS를 이용하여 모델 생성 및 추천 실행....
 
ALS("Alternating Least Squares", "교차 최소 제곱") 알고리즘은 Netflix Prize에서 
발표된 논문인 "Collaborative Filtering for the Implicit Feedback Datasets"과, 
"Large-scale Parallel Collaborative Filtering for the Netflix Prize"에서 주로 사용된 방식....

Spark MLlib의 ALS는 이 두 논문에서 아이디어를 가져와 구현....

- Collaborative Filtering for Implicit Feedback Datasets
(http://yifanhu.net/PUB/cf.pdf)

Large-scale Parallel Collaborative Filtering for the Netflix Prize
(http://shiftleft.com/mirrors/www.hpl.hp.com/personal/Robert_Schreiber/papers/2008%20AAIM%20Netflix/netflix_aaim08(submitted).pdf)
```

#### [Matrix Completion]

![Matrix Completion](https://d3i71xaburhd42.cloudfront.net/07d2577de9fb4bb5cbd7424ce5d64e6ef0dd78a0/30-Figure2.1-1.png)




#### [ALS("Alternating Least Squares", "교차 최소 제곱")]

![ALS](https://miro.medium.com/max/1400/1*ezY_g30VQ8MTGpDwd3z56w.png)


[Cassandra] 최종 학습 데이터 로딩 + Cache

In [None]:
userArtistFinal3 = spark.createDataFrame(data=session.execute("SELECT * FROM user_artist_data").all(), schema = ['userid', 'artistid', 'playcount'])
#최종 학습 데이터 캐시
userArtistFinal3.persist(StorageLevel.MEMORY_ONLY) #userid_artistid_playcount
print(userArtistFinal3.count())

userArtistFinal3.printSchema()
userArtistFinal3.show()

# scala
# val userArtistFinal3 = spark.table("mycatalog.mykeyspace.user_artist_data")
# 
# //--최종 학습 데이터 캐시....
# userArtistFinal3.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) //--userid_artistid_playcount....
# println(userArtistFinal3.count())
# 
# userArtistFinal3.printSchema
# userArtistFinal3.show

[캐시해제] 최종 학습 데이터 캐시 해제

In [None]:
# 최종 학습 데이터 캐시 해제
userArtistFinal3.unpersist()  # userid_artistid_playcount

ALS 알고리즘 파라미터 세팅

In [None]:
# python
als = ALS(rank=10,coldStartStrategy='drop',seed=11, alpha=40, regParam=0.1, maxIter=5, implicitPrefs=True, ratingCol='playcount', itemCol='artistid', userCol='userid') # als = ALS().setUserCol("userid").setItemCol("artistid").setRatingCol("playcount").setImplicitPrefs(True).setMaxIter(5).setRegParam(0.1).setAlpha(40).setRank(10).setColdStartStrategy("drop").setSeed(11)

# scala
# val als = new ALS()
#     .setUserCol("userid")
#     .setItemCol("artistid")
#     .setRatingCol("playcount")
#     .setImplicitPrefs(true)
#     .setMaxIter(5)
#     .setRegParam(0.1)  //--hyper parameter.... Param for regularization parameter (>= 0).
#     .setAlpha(40)  //--hyper parameter.... Param for the alpha parameter in the implicit preference formulation (nonnegative). Default: 1.0
#     .setRank(10)  //--hyper parameter.... Param for rank of the matrix factorization (positive). Default: 10
#     .setColdStartStrategy("drop")  //--Param for strategy for dealing with unknown or new users/items at prediction time. Supported values: nan,drop. (default: nan)
#     .setSeed(11L)


In [None]:
# 파라미터 내용 보기
print("\n>>>> als.explainParams()")
print(als.explainParams())

print("\n\n>>>> als.extractParamMap()")
als.extractParamMap()

In [None]:
userArtistFinal3.printSchema()

# ALS 알고리즘 학습
alsModel = als.fit(userArtistFinal3)

### 4. 추천

- 존재하는 사용자 vs. 존재하지 않는(New) 사용자
- 모든 사용자 : 아티스트 쌍에 대해 모델 적용
- 모든 사용자에 대한 추천
- 모든 아티스트에 대한 추천
- 아티스트 이름도 같이 보기


테스트용 사용자 추출

In [None]:
# python
userArtistFinal3.groupBy("userid").agg(F.count("artistid").alias("count_artist"), F.sum("playcount").alias("sum_playcount")).where("count_artist >= 20").orderBy(F.col("count_artist").asc(), F.col("sum_playcount").desc()).show()

# scala
# userArtistFinal3
#     .groupBy("userid")
#     .agg(count("artistid").as("count_artist"), sum("playcount").as("sum_playcount"))
#     .where("count_artist >= 20")
#     .orderBy($"count_artist".asc, $"sum_playcount".desc)
#     .show()

학습된 ALS Model로 추천해보기 > Exist User & New User (w/ 아티스트 이름....)

In [None]:
# python
userDS =spark.createDataFrame([1001440, 2010008, 987654321], IntegerType()).toDF('userID')
userDS.printSchema()
userDS.show()

# 특정 사용자를 위한 추천 5개....
recommendedForSomeUsersDF = alsModel.recommendForUserSubset(userDS, 5)
recommendedForSomeUsersDF.printSchema()
recommendedForSomeUsersDF.show()

# explode....
recommendedForSomeUsersDF2 = recommendedForSomeUsersDF.withColumn("recommend", F.explode("recommendations")).withColumn("artistid", F.col('recommend').artistid).withColumn("rating",  F.col("recommend").rating)

recommendedForSomeUsersDF2.printSchema()
recommendedForSomeUsersDF2.show()
print(recommendedForSomeUsersDF2.count())

# 불필요한 colum drop하기
recommendedForSomeUsersDF3 = recommendedForSomeUsersDF2.drop("recommendations", "recommend")

recommendedForSomeUsersDF3.printSchema()
recommendedForSomeUsersDF3.show()
print(recommendedForSomeUsersDF3.count())

# recommendedForSomeUsersDF3 와 artistFinal DF 를 join 하여, artistid 로 artis name 가져오기
recommendedForSomeUsersDF4 = recommendedForSomeUsersDF3.join(artistFinal, 'artistid').orderBy(F.col("userid").asc(), F.col("rating").desc())
recommendedForSomeUsersDF4.show()

# scala
# val userDS = Seq(1001440, 2010008, 987654321)
#     .toDF("userID")
#     .as[Int]  //--Dataset으로 형변환.... by Encoder....
# 
# userDS.printSchema
# userDS.show(false)
# 
# //--특정 사용자를 위한 추천 5개....
# val recommendedForSomeUsersDF = alsModel.recommendForUserSubset(userDS, 5)
# recommendedForSomeUsersDF.printSchema
# recommendedForSomeUsersDF.show(false)
# 
# //--explode....
# val recommendedForSomeUsersDF2 = recommendedForSomeUsersDF
#     .withColumn("recommend", explode($"recommendations"))
#     .withColumn("artistid", $"recommend.artistid")
#     .withColumn("rating", $"recommend.rating")
# 
# recommendedForSomeUsersDF2.printSchema()
# recommendedForSomeUsersDF2.show(false)
# println(recommendedForSomeUsersDF2.count())
# 
# //--drop....
# val recommendedForSomeUsersDF3 = recommendedForSomeUsersDF2
#     .drop("recommendations", "recommend")
# 
# recommendedForSomeUsersDF3.printSchema()
# recommendedForSomeUsersDF3.show(false)
# println(recommendedForSomeUsersDF3.count())
# 
# //--join w/artistFinal....
# val recommendedForSomeUsersDF4 = recommendedForSomeUsersDF3.as("reco")
#     .join(artistFinal.as("art"), $"reco.artistid" === $"art.artistid")
#     .orderBy($"userid".asc, $"rating".desc)
# 
# recommendedForSomeUsersDF4.show(false)
# z.show(recommendedForSomeUsersDF4)


그럴싸한 추천을 제공하는지 확인해보기 > 기존 플레이한 아티스트 vs. 추천된 아티스트 비교

In [None]:
userArtistFinal3.createOrReplaceTempView("userArtistFinal3");
artistFinal.createOrReplaceTempView("artistFinal");

In [None]:
# python
historyForSomeUsersDF = spark.sql('select * from (select * from userArtistFinal3 where userid in (1001440, 2010008)) as history join artistFinal as art on history.artistid = art.artistid').orderBy(F.col("userid").asc(), F.col("playcount").desc())

historyForSomeUsersDF.show(40, False)

# scala
# val historyForSomeUsersDF = userArtistFinal3
#     .where("userid in (1001440, 2010008)").as("history")
#     .join(artistFinal.as("art"), $"history.artistid" === $"art.artistid")
#     .orderBy($"userid".asc, $"playcount".desc)
# 
# historyForSomeUsersDF.show(40, false)
# z.show(historyForSomeUsersDF)


모든 (사용자 : 아티스트) 쌍에 대해 모델 적용하기

In [None]:
# python
predictionsDF = alsModel.transform(userArtistFinal3)
predictionsDF.printSchema()

predictionsDF.orderBy(F.col("prediction").desc()).show()
predictionsDF.orderBy(F.col("prediction").asc()).show()

# scala
# val predictionsDF = alsModel.transform(userArtistFinal3)
# predictionsDF.printSchema
# 
# predictionsDF
#     .orderBy($"prediction".desc)
#     .show(false)
# 
# predictionsDF
#     .orderBy($"prediction".asc)
#     .show(false)
    

### 5. 모델 평가


모델 평가를 위한 Evaluator 정의 (평가지표 : RMSE)

In [None]:
# python
regEval = RegressionEvaluator(predictionCol="prediction", labelCol="playcount", metricName="rmse" )

# scala
# val regEval = new RegressionEvaluator()
#     .setLabelCol("playcount")
#     .setPredictionCol("prediction")
#     .setMetricName("rmse")


모델 평가

In [None]:
rmse = regEval.evaluate(predictionsDF)

print(rmse)

### 6. 하이퍼 파라미터 튜닝

Pipeline 알고리즘 정의 > stage 1개 + ALS

In [None]:
# python
pipeline = Pipeline(stages=[als]) # 다른 방법 pipeline = Pipeline.setStages(als)

# scala
# val pipeline = new Pipeline()
#     .setStages(Array(als))


하이퍼 파라미터 조합을 ParamMap으로 구성하기

In [None]:
# python
paramMaps = ParamGridBuilder().addGrid(als.alpha, (40.0, 10.0, 5.0, 1.0)).addGrid(als.rank, (2,3,10)).addGrid(als.regParam, (1.0, 0.01)).build()

print(paramMaps)

# scala
# val paramMaps = new ParamGridBuilder()
#     .addGrid(als.alpha, Array(40.0,10.0, 5.0, 1.0))
#     .addGrid(als.rank, Array(2, 3, 10))
#     .addGrid(als.regParam, Array(1.0, 0.01))
#     .build()
# 
# println(paramMaps.mkString(", "))

CrossValidator 정의

In [None]:
# python
"""
estimator : ML 알고리즘
estimatorParamMaps : 튜닝하고자 하는 하이퍼 파라미터 조합
evaluator : 모델을 평가하는 평가자
numFolds : 데이터를 나누는 기준
"""
cv = CrossValidator(estimator=pipeline,estimatorParamMaps=paramMaps,evaluator=regEval,numFolds=2 )  

# scala
# val cv = new CrossValidator()
#     .setEstimator(pipeline)  //--ML 알고리즘.... Pipeline....
#     .setEstimatorParamMaps(paramMaps)  //--튜닝하고자 하는 하이퍼 파라미터 조합....
#     .setEvaluator(regEval)  //--모델을 평가하는 평가자....
#     .setNumFolds(2)  //--데이터를 나누는 기준....


train dataset 과 test dataset 분리하기

In [None]:
# python
(trainDS, testDS) = userArtistFinal3.randomSplit([0.7, 0.3], 11)
print(userArtistFinal3.count())

trainDS.persist(StorageLevel.MEMORY_ONLY)
print(trainDS.count())

testDS.persist(StorageLevel.MEMORY_ONLY)
print(testDS.count())

# scala
# val Array(trainDS, testDS) = userArtistFinal3.randomSplit(Array(0.7, 0.3), 11L)
# 
# println(userArtistFinal3.count)
# 
# trainDS.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
# println(trainDS.count)
# 
# testDS.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
# println(testDS.count)


CrossValidator 학습 > Took 1 min 50 sec

In [None]:
cvModel = cv.fit(trainDS)

Best 모델 조회

In [None]:
# python
print("\n>>>> Best Model : \n" + str(cvModel.bestModel))
print("\n>>>> Avg Metrics :", *cvModel.avgMetrics, sep='\n' )
print("\n>>>> Estimator ParamMaps :", *cvModel.getEstimatorParamMaps(), sep='\n')

# scala
# print("\n>>>> Best Model : \n" + cvModel.bestModel)
# print("\n>>>> Avg Metrics : \n" + cvModel.avgMetrics.mkString("\n"))
# print("\n>>>> Estimator ParamMaps : \n" + cvModel.getEstimatorParamMaps.mkString("\n"))

파라미터 조합별 metric 점수 매핑하여 보기

In [None]:
# python
zippedParamAndMetrics = zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics)
print(*("\n>>>> Zipped Param And Metrics : \n" , *zippedParamAndMetrics ),sep='\n')

# scala
# val zippedParamAndMetrics = cvModel.getEstimatorParamMaps
#     .zip(cvModel.avgMetrics)
#     .sortBy(_._2)
# 
# println("\n>>>> Zipped Param And Metrics : \n" + zippedParamAndMetrics.mkString("\n"))

모델 평가 (RMSE....) #2

In [None]:
# python
predictionsDF2 = cvModel.transform(testDS)
predictionsDF2.show()

rmse2 = regEval.evaluate(predictionsDF2)
print(rmse2)

# scala
# val predictionsDF2 = cvModel.transform(testDS)
# predictionsDF2.show()
# 
# val rmse2 = regEval.evaluate(predictionsDF2)
# println(rmse2)

CrossValidator 모델 HDFS에 저장

In [None]:
# python
cvModel.write().overwrite().save("hdfs://localhost:9000/model/audio/profiledata_06-May-2005/als_crossvalidator")

# scala
# cvModel.write.overwrite.save("hdfs://spark-master-01:9000/model/audio/profiledata_06-May-2005/als_crossvalidator")

실제 Best 모델인 ALSModel 추출하여 HDFS에 저장

In [None]:
# python
print(f">>>> CrossValidatorModel : {cvModel} \n")
print(f">>>> CrossValidatorModel.bestModel : {cvModel.bestModel}\n")
print(f">>>> PipelineModel.stages(0) : {cvModel.bestModel.stages[0]}\n")

# ALSModel
alsBestModel = cvModel.bestModel.stages[0]

# 실제 Best 모델인 ALSModel을 HDFS에 저장 -> 왜? ALSModel에 있는 recommendForUserSubset() API와 같은 추천에 특화된 API를 사용하려구
alsBestModel.write().overwrite().save("hdfs://localhost:9000/model/audio/profiledata_06-May-2005/als")

# scala
# println("\n>>>> CrossValidatorModel : \n" + cvModel)
# println("\n>>>> CrossValidatorModel.bestModel : \n" + cvModel.bestModel)
# 
# //--Cast as PipelineModel.... asInstanceOf[type]....
# val pipelineBestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
# 
# println("\n>>>> PipelineModel.stages(0) : \n" + pipelineBestModel.stages(0))
# 
# //--Cast as ALSModel.... asInstanceOf[type]....
# val alsBestModel = pipelineBestModel.stages(0).asInstanceOf[ALSModel]
# 
# alsBestModel
#     .write
#     .overwrite
#     .save("hdfs://spark-master-01:9000/model/audio/profiledata_06-May-2005/als")

HDFS에 저장된 Best ALSModel을 로딩하여 내용보기

In [None]:
#loadedBestALSModel = ALS.load("hdfs://spark-master-01:9000/model/audio/profiledata_06-May-2005/als")
loadedBestALSModel = alsModel.load("hdfs://localhost:9000/model/audio/profiledata_06-May-2005/als")

print("\n>>>> model.extractParamMap : \n" + str(loadedBestALSModel.extractParamMap()))
print("\n>>>> model.explainParams : \n" + str(loadedBestALSModel.explainParams()))

로딩된 Best ALSModel로 추천하기 > ALSModel.recommendForUserSubset(DS)

In [None]:
# python
userDS2 =spark.createDataFrame([1001440, 2010008, 987654321], IntegerType()).toDF('userID')  # Dataset으로 형변환.... by Encoder....

userDS2.printSchema()
userDS2.show()

# 특정 사용자를 위한 추천 5개
recommendedForSomeUsersDF2 = loadedBestALSModel.recommendForUserSubset(userDS2, 5)
recommendedForSomeUsersDF2.printSchema()
recommendedForSomeUsersDF2.show()

# scala
# val userDS2 = Seq(1001440, 2010008, 987654321)
#     .toDF("userID")
#     .as[Int]  //--Dataset으로 형변환.... by Encoder....
# 
# userDS2.printSchema
# userDS2.show(false)
# 
# //--특정 사용자를 위한 추천 5개....
# val recommendedForSomeUsersDF2 = loadedBestALSModel.recommendForUserSubset(userDS2, 5)
# recommendedForSomeUsersDF2.printSchema
# recommendedForSomeUsersDF2.show(false)
# z.show(recommendedForSomeUsersDF2)


로딩된 Best ALSModel로 추천하기 > ALSModel.recommendForUserSubset(DS) (w/ 아티스트 이름)

In [None]:
recommendedForSomeUsersDF2.show()

In [None]:
# python
# explode
recommendedForSomeUsersDF3 = recommendedForSomeUsersDF2.withColumn("recommend", F.explode("recommendations")).withColumn("artistid", F.col("recommend.artistid")).withColumn("rating", F.col("recommend.rating"))

recommendedForSomeUsersDF3.printSchema()
recommendedForSomeUsersDF3.show()
print(recommendedForSomeUsersDF3.count())

# scala
# //--explode....
# val recommendedForSomeUsersDF3 = recommendedForSomeUsersDF2
#     .withColumn("recommend", explode($"recommendations"))
#     .withColumn("artistid", $"recommend.artistid")
#     .withColumn("rating", $"recommend.rating")
# 
# recommendedForSomeUsersDF3.printSchema()
# recommendedForSomeUsersDF3.show(false)
# println(recommendedForSomeUsersDF3.count())

In [None]:
# python
# drop
recommendedForSomeUsersDF4 = recommendedForSomeUsersDF3.drop("recommendations", "recommend")

recommendedForSomeUsersDF4.printSchema()
recommendedForSomeUsersDF4.show()
print(recommendedForSomeUsersDF4.count())

# scala
# //--drop....
# val recommendedForSomeUsersDF4 = recommendedForSomeUsersDF3
#     .drop("recommendations", "recommend")
# 
# recommendedForSomeUsersDF4.printSchema()
# recommendedForSomeUsersDF4.show(false)
# println(recommendedForSomeUsersDF4.count())

In [None]:
# python
# recommendedForSomeUsersDF33 과 artistFinal df join 하기
recommendedForSomeUsersDF5 = recommendedForSomeUsersDF4.join(artistFinal, 'artistid').sort(F.asc('userid'), F.desc('rating')) #spark.sql('select * from recommendedForSomeUsersDF33 as reco join artistFinal as art on reco.artistid = art.artistid').orderBy(F.col("userid").asc(), F.col("rating").desc())

recommendedForSomeUsersDF5.show()

# scala
# //--join w/artistFinal....
# val recommendedForSomeUsersDF5 = recommendedForSomeUsersDF4.as("reco")
#     .join(artistFinal.as("art"), $"reco.artistid" === $"art.artistid")
#     .orderBy($"userid".asc, $"rating".desc)
# 
# recommendedForSomeUsersDF5.show(false)
# z.show(recommendedForSomeUsersDF5)


[Redis] 모든 사용자를 위한 아티스트 5건 추천 #1

In [None]:
# python
recommendedForAllUsersDF = loadedBestALSModel.recommendForAllUsers(5)

recommendedForAllUsersDF.printSchema()
recommendedForAllUsersDF.show()
print(recommendedForAllUsersDF.count)

# scala
# val recommendedForAllUsersDF = loadedBestALSModel.recommendForAllUsers(5)
# 
# recommendedForAllUsersDF.printSchema
# recommendedForAllUsersDF.show(false)
# println(recommendedForAllUsersDF.count)


[Redis] 모든 사용자를 위한 아티스트 5건 추천 #2

In [None]:
# python
recommendedForAllUsersDF2 = recommendedForAllUsersDF.withColumn("recommendation", F.explode("recommendations")).withColumn("artistid", F.col("recommendation.artistid"))
    
recommendedForAllUsersDF2.printSchema()
recommendedForAllUsersDF2.show()
print(recommendedForAllUsersDF2.count)

# scala
# val recommendedForAllUsersDF2 = recommendedForAllUsersDF
#     .withColumn("recommendation", explode($"recommendations"))
#     .withColumn("artistid", $"recommendation.artistid")
#     
# recommendedForAllUsersDF2.printSchema
# recommendedForAllUsersDF2.show(false)
# println(recommendedForAllUsersDF2.count)


[Redis] 모든 사용자를 위한 아티스트 5건 추천 #3

In [None]:
# python
recommendedForAllUsersDF3 = recommendedForAllUsersDF2.groupBy("userid").agg(F.collect_list("artistid").alias("artistid_list")).withColumn("recommended_artistids", F.array_join(F.col("artistid_list"), " "))
    
recommendedForAllUsersDF3.printSchema()
recommendedForAllUsersDF3.show()
print(recommendedForAllUsersDF3.count)

# scala
# val recommendedForAllUsersDF3 = recommendedForAllUsersDF2
#     .groupBy("userid")
#     .agg(collect_list("artistid").as("artistid_list"))
#     .withColumn("recommended_artistids", array_join(col("artistid_list"), " "))
#     
# recommendedForAllUsersDF3.printSchema
# recommendedForAllUsersDF3.show(false)
# println(recommendedForAllUsersDF3.count)

[Redis] 모든 사용자를 위한 아티스트 5건 추천 #4

In [None]:
# python
recommendedForAllUsersDF4 = recommendedForAllUsersDF3.select( 'userid', 'recommended_artistids' )

recommendedForAllUsersDF4.printSchema()
recommendedForAllUsersDF4.show()
print(recommendedForAllUsersDF4.count)

# scala
# val recommendedForAllUsersDF4 = recommendedForAllUsersDF3.select($"userid", $"recommended_artistids")
# recommendedForAllUsersDF4.printSchema
# recommendedForAllUsersDF4.show(false)
# println(recommendedForAllUsersDF4.count)

[Redis] 모든 사용자를 위한 아티스트 5건 추천 저장

In [ ]:
recommendedForAllUsersDF4.write.format("org.apache.spark.sql.redis").option("table", "user_artists").option("key.column", "userid").mode("append").save()

[Redis] 모든 사용자를 위한 아티스트 5건 추천 조회

In [None]:
# python
recommendedForAllUsersDF5 = spark.read.format("org.apache.spark.sql.redis").option("table", "user_artists").option("key.column", "userid").load()

recommendedForAllUsersDF5.printSchema()
recommendedForAllUsersDF5.show()
print(recommendedForAllUsersDF5.count)

# scala
# val recommendedForAllUsersDF5 = spark
#     .read
#     .format("org.apache.spark.sql.redis")
#     .option("table", "user_artists")
#     .option("key.column", "userid")
#     .load()
# 
# recommendedForAllUsersDF5.printSchema
# recommendedForAllUsersDF5.show(false)
# println(recommendedForAllUsersDF5.count)

[Redis] 특정 사용자를 위한 아티스트 5건 추천 조회

In [None]:
# python
recommendedForAllUsersDF4 = spark.read.format("org.apache.spark.sql.redis").option("keys.pattern", "user_artists:2101263").option("key.column", "userid").option("infer.schema", True).load()

recommendedForAllUsersDF4.printSchema()
recommendedForAllUsersDF4.show()

# scala
# val recommendedForAllUsersDF4 = spark
#     .read
#     .format("org.apache.spark.sql.redis")
#     .option("keys.pattern", "user_artists:2127894")
#     .option("key.column", "userid")
#     .option("infer.schema", true)
#     .load()
# 
# recommendedForAllUsersDF4.printSchema
# recommendedForAllUsersDF4.show(false)
