In [0]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
! tar xf spark-2.4.4-bin-hadoop2.7.tgz
! pip install -q findspark

## settings

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
import pandas as pd
import pyspark.sql.functions as fn

from  pyspark import StorageLevel
from  pyspark.sql.functions import col, lit
from  pyspark.sql.window import Window
from functools import reduce
DISK_ONLY = StorageLevel.DISK_ONLY

from IPython.display import display
from pandas.core.frame import DataFrame as PandasDataFrame
from pandas.core.frame import  Series as PandasSeries
PandasSeries.display=display
PandasDataFrame.display=display

In [0]:
logger = spark._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel( logger.Level.OFF )

# 元データ作成

In [55]:
# rating_dateはユーザが映画を評価した日付
data = ([("A","mv-1",5,2,"2019-11-01"),("A","mv-2",10,5,"2019-11-02"),
         ("A","mv-3",1,6,"2019-11-03"),("A","mv-4",7,9,"2019-11-04"),
         ("B","mv-1",2,1,"2019-11-10"),("B","mv-2",5,3,"2019-11-11"),("B","mv-3",4,1,"2019-11-12")],
        ["user","movie","rating","times","rating_date"])

ratings_df = (spark.createDataFrame(*data).persist(StorageLevel.DISK_ONLY)
                    .withColumn("rating_name",fn.concat_ws("_",col("movie"),lit("rating")))
                    .withColumn("rating_date",col("rating_date").cast("date"))
             )

# ユーザーのリスト
user_list =[ u[0] for u in  ratings_df.select("user").distinct().collect()]

# 映画のリスト
ratings_cols_df = (ratings_df
        .withColumn("rating_name",fn.concat_ws("_", col("rating_name"),lit("other")))
        .select("rating_name").distinct()
       )
ratings_others = [x[0] for x in ratings_cols_df.collect()]

ratings_df.printSchema()

ratings_df.toPandas().display()

root
 |-- user: string (nullable = true)
 |-- movie: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- times: long (nullable = true)
 |-- rating_date: date (nullable = true)
 |-- rating_name: string (nullable = false)



Unnamed: 0,user,movie,rating,times,rating_date,rating_name
0,A,mv-1,5,2,2019-11-01,mv-1_rating
1,A,mv-2,10,5,2019-11-02,mv-2_rating
2,A,mv-3,1,6,2019-11-03,mv-3_rating
3,A,mv-4,7,9,2019-11-04,mv-4_rating
4,B,mv-1,2,1,2019-11-10,mv-1_rating
5,B,mv-2,5,3,2019-11-11,mv-2_rating
6,B,mv-3,4,1,2019-11-12,mv-3_rating


# Userの部分を作成

In [101]:
users_df = ratings_df.groupBy("user","movie","times").pivot("user").agg(lit(1)).fillna(0)

users_df.toPandas().display()

Unnamed: 0,user,movie,times,A,B
0,B,mv-2,3,0,1
1,A,mv-3,6,1,0
2,A,mv-4,9,1,0
3,A,mv-1,2,1,0
4,B,mv-3,1,0,1
5,B,mv-1,1,0,1
6,A,mv-2,5,1,0


# Movieの部分を作成

In [85]:
mv_df = ratings_df.groupBy("user","movie").pivot("movie").agg(lit(1)).fillna(0)

mv_df.toPandas().display()

Unnamed: 0,user,movie,mv-1,mv-2,mv-3,mv-4
0,B,mv-1,1,0,0,0
1,B,mv-3,0,0,1,0
2,A,mv-3,0,0,1,0
3,A,mv-1,1,0,0,0
4,A,mv-2,0,1,0,0
5,A,mv-4,0,0,0,1
6,B,mv-2,0,1,0,0


# Last Movie ratedの部分を作成

In [60]:
last_mv_rated_df =(ratings_df
    .withColumn("last_rated_movie",fn.lag(col("movie")).over(Window.partitionBy("user").orderBy(col("rating_date"))))
    .filter(col("last_rated_movie").isNotNull())
    .withColumn("last_rated_movie",fn.concat_ws("_", col("last_rated_movie"),lit("last_rated")))
    .groupBy("user","movie").pivot("last_rated_movie").agg(lit(1)).fillna(0)
)
last_mv_rated_df.toPandas().display()

Unnamed: 0,user,movie,mv-1_last_rated,mv-2_last_rated,mv-3_last_rated
0,B,mv-2,1,0,0
1,B,mv-3,0,1,0
2,A,mv-2,1,0,0
3,A,mv-3,0,1,0
4,A,mv-4,0,0,1


# Other Movie rateの部分を作成

In [0]:
# union用にあるユーザは見ていないが他のユーザは見ている映画のカラムを作る
def add_deficit_cols(df, others) :
    for c in  [col for col in others if col not in df.columns] :
       df=df.withColumn(c, lit(0))
    return df 

In [97]:
df_list_1= list()
for x in user_list :
    tmp_df_1 = ratings_df.filter(col("user")==lit(x)).drop(*["times","rating_date"])
    movie_list =[ m[0] for m in tmp_df_1.select("movie").distinct().collect()]
    
    df_list_2= list()
    for m in movie_list :
        tmp_df_2 = (tmp_df_1
            .withColumn("rating", fn.when(col("movie")==lit(m), lit(0)).otherwise(col("rating")))
            .withColumnRenamed("rating_name","rating_name_other")
            .withColumn("rating_name_other",fn.concat_ws("_", col("rating_name_other"),lit("other")))
            .withColumnRenamed("rating","rating_other")
            .groupBy("user").pivot("rating_name_other")
            .agg(fn.max("rating_other")).fillna(0)
        )

        tmp_df_3=tmp_df_1.filter(col("movie")==lit(m)).join(tmp_df_2, "user")
        df_list_2.append(add_deficit_cols(tmp_df_3, ratings_others))

    tmp_df_4 = reduce(fn.DataFrame.unionByName, df_list_2).drop("rating_name")
    df_list_1.append(tmp_df_4)

other_mv_rated_df = reduce(fn.DataFrame.unionByName, df_list_1)

other_mv_rated_df.orderBy(col("user"),col("movie")).toPandas().display()

Unnamed: 0,user,movie,rating,mv-1_rating_other,mv-2_rating_other,mv-3_rating_other,mv-4_rating_other
0,A,mv-1,5,0,10,1,7
1,A,mv-2,10,5,0,1,7
2,A,mv-3,1,5,10,0,7
3,A,mv-4,7,5,10,1,0
4,B,mv-1,2,0,5,4,0
5,B,mv-2,5,2,0,4,0
6,B,mv-3,4,2,5,0,0


# 全データを結合

In [0]:
_ratings_df = (users_df
                .join(mv_df, ["user","movie"], "inner")
                .join(other_mv_rated_df, ["user","movie"], "inner")
                .join(last_mv_rated_df, ["user","movie"], "leftouter")
                .fillna(0)
                )

## カラムを並べ替えて表示

In [102]:
(_ratings_df
 .orderBy(col("user"),col("movie"))
 .select('A','B','mv-1','mv-2','mv-3','mv-4','mv-1_rating_other','mv-2_rating_other','mv-3_rating_other',
        'mv-4_rating_other','mv-1_last_rated','mv-2_last_rated','mv-3_last_rated','times','rating')
 .toPandas().display()
)

Unnamed: 0,A,B,mv-1,mv-2,mv-3,mv-4,mv-1_rating_other,mv-2_rating_other,mv-3_rating_other,mv-4_rating_other,mv-1_last_rated,mv-2_last_rated,mv-3_last_rated,times,rating
0,1,0,1,0,0,0,0,10,1,7,0,0,0,2,5
1,1,0,0,1,0,0,5,0,1,7,1,0,0,5,10
2,1,0,0,0,1,0,5,10,0,7,0,1,0,6,1
3,1,0,0,0,0,1,5,10,1,0,0,0,1,9,7
4,0,1,1,0,0,0,0,5,4,0,0,0,0,1,2
5,0,1,0,1,0,0,2,0,4,0,1,0,0,3,5
6,0,1,0,0,1,0,2,5,0,0,0,1,0,1,4
