In [1]:
# Copyright ActionML, LLC under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# ActionML licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql import SQLContext
from pyspark.sql import functions as F
import predictionio

In [2]:
# 
MAIN_DATA_PATH = "./full-guide-meta-and-prefs/prefs/part-*"
META_DATA_PATH = "./full-guide-meta-and-prefs/meta/*/*"
TEST_DATA_OUT = "test_data.txt"

In [3]:
sqlContext = SQLContext(sc)
csvName = MAIN_DATA_PATH
df = (sqlContext
      .read
      .format('com.databricks.spark.csv')
      .options(header=False, delimiter='\t', quote='@',inferSchema=False)
      .load(csvName))

df = df.dropDuplicates(['C0', 'C1', 'C2'])

# Select only those users that has at least on "fresh" event
good_users = (df.select(df.C0.alias('userId'), (df.C1 == "fresh").alias('hasFresh'))
                .groupBy("userId")
                .agg({'hasFresh': 'max'})
                .filter("max(hasFresh) = True")
                .select("userId"))

# Select only records from "good users"
good_df = df.join(good_users, 
                  df["C0"] == good_users["userId"], 
                  how="left_outer").filter("userId = C0")

good_df = good_df.select(good_df["userId"], 
                         good_df["C1"].alias("label"), 
                         good_df["C2"].alias("movieId"))

In [4]:
csvNameMeta = META_DATA_PATH
df_meta = (sqlContext
      .read
      .format('com.databricks.spark.csv')
      .options(header=False, delimiter=',', quote='"',inferSchema=False)
      .load(csvNameMeta))

df_meta = df_meta.dropDuplicates(['C0']) # Remove duplicate records with the same ID

In [6]:
# Now join appropriate CF data and meta data frame 
jdf = good_df.join(df_meta, good_df["movieId"] == df_meta["C0"], how="left_outer")
jdf_like = jdf.filter("label = 'fresh'")
jdf_dislike = jdf.filter("label = 'rotten'")

seed = 92671
traindf, testdf = jdf_like.randomSplit([0.8, 0.2], seed)
full_traindf = traindf.unionAll(jdf_dislike).cache()

In [18]:
# Test data has only fresh events
data = testdf.select("userId", "movieId").collect()

with open(TEST_DATA_OUT, "w") as f:
    for row in data:
        f.write(row.userId.encode('utf-8'))
        f.write(",")
        f.write(row.movieId.encode('utf-8'))
        f.write("\n")
    f.close()

In [9]:
# Primary events (fresh)
like_events = (full_traindf.filter("label = 'fresh'")
                  .select("userId", "movieId")
                  .map(lambda x: (x.userId, "like", x.movieId)))

# Dislikes
dislike_events = (full_traindf.filter("label = 'rotten'")
                  .select("userId", "movieId")
                  .map(lambda x: (x.userId, "dislike", x.movieId)))

def srec(userId, toSplit, label, event):
    """ Creates list of tuples with multiple possible values in
    one field (like genre, etc.)
    """
    out_event = "like-" + event if label == "fresh" else "dislike-" + event
    if toSplit is None:
        return []
    else:
        return [(userId, out_event, s.strip()) 
                for s in toSplit.split("\t") if s.strip != ""]
        
    
genre_events = (full_traindf.select("userId", "label", jdf["C6"].alias("genres"))
                .flatMap(lambda x: srec(x.userId, x.genres, x.label, "genre")))

director_events = (full_traindf.select("userId", "label", jdf["C7"].alias("director"))
                .flatMap(lambda x: srec(x.userId, x.director, x.label, "director")))

writer_events = (full_traindf.select("userId", "label", jdf["C8"].alias("writer"))
                .flatMap(lambda x: srec(x.userId, x.writer, x.label, "writer")))

cast_events = (full_traindf.select("userId", "label", jdf["C9"].alias("cast"))
                .flatMap(lambda x: srec(x.userId, x.cast, x.label, "cast")))

In [15]:
exporter = predictionio.FileExporter(file_name="train_events.json")

rdds = [like_events, dislike_events, genre_events, 
        director_events, writer_events, cast_events]

for rdd in rdds:
    for s in rdd.collect():
        (user, event, item) = s
        # To avoid empty target_entity_id or entity_id
        user = user.strip()
        item = item.strip()
        if (user != "") and (item != ""):
            exporter.create_event(
                event=event,
                entity_type="user",
                entity_id=user,
                target_entity_type="item",
                target_entity_id=item,
            )
exporter.close()