Add your cloudant credentials below, change notebook format to 'Code' and run the cell to save your credentials

Before running this notebook, ensure you have installed spark-cloudant 1.6.4 by running the notebook: **Install spark-cloudant 1.6.4 lib**

In [1]:
! python -c 'import cloudant' || pip install cloudant --user

In [2]:
# utility method for timestamps
import time
def ts():
    return time.strftime("%Y-%m-%d %H:%M:%S %Z")

In [3]:
# utility method for logging
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger("CloudantRecommender")

def info(*args):
    
    # sends output to notebook
    print(args)
    
    # sends output to kernel log file
    LOGGER.info(args)
    
def error(*args):
    
    # sends output to notebook
    print(args)
    
    # sends output to kernel log file
    LOGGER.error(args)

In [4]:
# utility class for holding cloudant connection details
import json

def set_attr_if_exists(obj, data, k):
    try:
        setattr(obj, k, data[k])
    except AttributeError:
        pass

class CloudantConfig:
    def __init__(self, database, json_file=None, host=None, username=None, password=None):
       
        self.database = database
        self.host = None
        self.username = None
        self.password = None

        with open(json_file) as data_file:    
            data = json.load(data_file)
            
            set_attr_if_exists(self, data, 'host')
            set_attr_if_exists(self, data, 'username')
            set_attr_if_exists(self, data, 'password')
        
        # override json attributes if provided
        if host:     self.host = host
        if username: self.username = username
        if password: self.password = password

In [5]:
sourceDB = CloudantConfig(
                    json_file='cloudant_credentials.json', 
                    database="ratingdb"
                    )

 - We generate recommendations, create a new Cloudant database for the recommendations and save them into the new Cloudant database.
 - When we have finished writing the recommendations to Cloudant, we save a metadata record into the recommendation_meta database with the name of the new database.
 - Client applications use the metadata record to determine which database to retrieve the recommendations from.
 - We delete older databases after writing the metadata, but keep the five latest ones.
 - We need to keep at least one database because if a client reads the meta pointing to the previous database it will try to read from that database.
 - We don't have just one database and continually update the recommendation records in Cloudant because lots of changes can be considered an anti-pattern.
 - The recommendation_meta database is created for us by the web application setup scripts.
 - The spark-cloudant package is used to read the data from Cloudant but not to write the data to Cloudant because of this issue: https://github.com/cloudant-labs/spark-cloudant/issues/82
 - The python-cloudant package is used to write the data.

In [6]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

import json
import numpy as np

# we use the cloudant python library to save the recommendations
from cloudant.client import Cloudant
from cloudant.adapters import Replay429Adapter

class CloudantMovieRecommender:
    
    def __init__(self, sc):
        self.sc = sc
    
    def train(self, sourceDB):
                      
        info("Starting load from Cloudant: ", ts())

        dfReader = sqlContext.read.format("com.cloudant.spark")
        dfReader.option("cloudant.host", sourceDB.host)
        
        if sourceDB.username:
            dfReader.option("cloudant.username", sourceDB.username)
            
        if sourceDB.password:
            dfReader.option("cloudant.password", sourceDB.password)
            
        df = dfReader.load(sourceDB.database).cache()

        info("Finished load from Cloudant: ", ts())
        info("Found", df.count(), "records in Cloudant")
        
        # convert cloudant docs into Rating objects
        def make_rating(row):
            (user_id, prod_id) = row[0].split('/')
            user_id = int(user_id.replace('user_', ''))
            prod_id = int(prod_id.replace('movie_', ''))

            rating = float(row[2])
            return Rating(user_id, prod_id, rating)
        
        ratings = df.map(make_rating)

        rank = 50
        numIterations = 20
        lambdaParam = 0.1

        info("Starting train model: ", ts())
        self.model = ALS.train(ratings, rank, numIterations, lambdaParam)
        info("Finished train model: ", ts())
        
    def get_top_recommendations(self):
        info("Starting __get_top_recommendations: ", ts())
        df = self.model.recommendProductsForUsers(10).toDF()
        df.cache()
        info("Finished __get_top_recommendations: ", ts())
        return df
        
    def del_old_recommendationdbs(self, cloudant_client, db_name_prefix):
        dbs_to_del = cloudant_client.all_dbs()

        # only delete dbs we are using for recommendations
        dbs_to_del = [db for db in dbs_to_del if db.startswith(db_name_prefix + '_') ]

        # ensure the list is in timestamp order
        dbs_to_del.sort()

        # keeping the last 5 dbs and delete the rest
        for db in dbs_to_del[:-5]:
            cloudant_client.delete_database(db)
            info("Deleted old recommendations db", db)
            
    def update_meta_document(self, cloudant_client, meta_db_name, latest_db_name):
        
        meta_db = cloudant_client[meta_db_name]
        
        from datetime import datetime
        ts = datetime.utcnow().isoformat()

        try:
            # update doc if exists
            meta_doc = meta_db['recommendation_metadata']
            meta_doc['latest_db'] = latest_db_name
            meta_doc['timestamp_utc'] = ts
            meta_doc.save()
            info("Updated recommendationdb metadata record with latest_db", latest_db_name, meta_doc)
        except KeyError:
            # create a new doc
            data = {
                '_id': 'recommendation_metadata',
                'latest_db': latest_db_name,
                'timestamp_utc': ts,
                }
            meta_doc = meta_db.create_document(data)
            meta_doc.save()
            
            if meta_doc.exists():
                info("Saved recommendationdb metadata record", str(data))
                
        # save product features to enable later generationg of Vt
        # see: http://stackoverflow.com/questions/41537470/als-model-how-to-generate-full-u-vt-v
        pf = self.model.productFeatures().sortByKey()

        pf_keys = json.dumps(pf.sortByKey().keys().collect())
        pf_vals = json.dumps(pf.sortByKey().map(lambda x: list(x[1])).collect())               
        
        # the pf_keys/pf_vals are too big and exceed the >1mb document size limit
        # so we save them as attachments
        
        meta_doc.put_attachment(
            attachment='product_feature_keys', 
            content_type='application/json', 
            data=pf_keys
        )

        meta_doc.put_attachment(
            attachment='product_feature_vals', 
            content_type='application/json', 
            data=pf_vals
        )
    
    def create_recommendationdb(self, cloudant_client):
        # create a database for recommendations
        import time
        db_name = destDB.database + '_' + str(int(time.time()))
        
        db = cloudant_client.create_database(db_name)
        info("Created new recommendations db", db_name)
        return db
        
    def save_recommendations(self, destDB):
        df = movieRecommender.get_top_recommendations()
        
        cloudant_client = Cloudant(
                                destDB.username,
                                destDB.password,
                                account=destDB.username, 
                                adapter=Replay429Adapter(retries=10, initialBackoff=1)
                                )
        cloudant_client.connect()
        self.del_old_recommendationdbs(cloudant_client, destDB.database)
        recommendations_db = self.create_recommendationdb(cloudant_client)

        # reformat data for saving
        docs = df.map(lambda x: {'_id':str(x[0]), 'recommendations':x[1]}).collect()
        
        # we could hit cloudant resource limits if trying to save entire doc
        # so we save it in smaller sized chunks
        
        for i in range(0, len(docs), 100):
            chunk = docs[i:i + 100]
            recommendations_db.bulk_docs(chunk) # TODO check for errors saving the chunk
            info("Saved recommendations chunk", i, ts())
        
        self.update_meta_document(cloudant_client, destDB.database, recommendations_db.database_name)
        
        info("Saved recommendations to: ", recommendations_db.database_name, ts())

        cloudant_client.disconnect()

In [7]:
sourceDB = CloudantConfig(
                    json_file='cloudant_credentials.json', 
                    database="ratingdb"
                    )

destDB = CloudantConfig(
                    json_file='cloudant_credentials.json', 
                    database="recommendationdb", 
                    )

import traceback
try:
    movieRecommender = CloudantMovieRecommender(sc)
    movieRecommender.train(sourceDB)
    movieRecommender.save_recommendations(destDB)
except Exception as e:
    error(str(e), traceback.format_exc(), ts())
    raise e

('Starting load from Cloudant: ', '2017-01-13 07:35:29 CST')
('Finished load from Cloudant: ', '2017-01-13 07:44:48 CST')
('Found', 1000003, 'records in Cloudant')
('Starting train model: ', '2017-01-13 07:49:05 CST')
('Finished train model: ', '2017-01-13 07:50:41 CST')
('Starting __get_top_recommendations: ', '2017-01-13 07:50:41 CST')
('Finished __get_top_recommendations: ', '2017-01-13 07:50:47 CST')
('Deleted old recommendations db', u'recommendationdb_1484258805')
('Created new recommendations db', 'recommendationdb_1484315447')
('Saved recommendations chunk', 0, '2017-01-13 07:50:52 CST')
('Saved recommendations chunk', 100, '2017-01-13 07:50:52 CST')
('Saved recommendations chunk', 200, '2017-01-13 07:50:52 CST')
('Saved recommendations chunk', 300, '2017-01-13 07:50:52 CST')
('Saved recommendations chunk', 400, '2017-01-13 07:50:52 CST')
('Saved recommendations chunk', 500, '2017-01-13 07:50:52 CST')
('Saved recommendations chunk', 600, '2017-01-13 07:50:52 CST')
('Saved recom

## For debugging issues

In [8]:
# dump the latest kernel log
! cat $(ls -1 $HOME/logs/notebook/*pyspark* | sort -r | head -1)

sort: write failed: standard output: Broken pipe
sort: write error
/usr/local/src/bluemix_jupyter_bundle.v31/provision/pyspark_kernel_wrapper.sh /gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/notebook/jupyter-rt/kernel-ca960e98-2971-44f9-9e2c-25f125822604.json spark160master
no extra config
load default config from : /gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/notebook/spark-config/spark160master
-------- Environment for PySpark --------
APP_ENV_BM_DOMAIN=ng.bluemix.net
APP_ENV_CDSX_NOTEBOOKS_API=cdsx-notebooks-api.ng.bluemix.net
APP_ENV_ENVIRONMENT=prod
APP_ENV_IBM_ONLY_AUTH=false
APP_ENV_JUPYTER_TENANTS_API=cdsx-tenants-api.ng.bluemix.net
APP_ENV_NOTEBOOKS_JOB_MANAGER=cdsx-notebooks-job-manager.ng.bluemix.net
ATLAS_VERSION=3.10.2
_=/bin/printenv
BLUEMIX_RES_PLAN=s
BRUNEL_CONFIG=locjavascript=/data/jupyter2/23fd2775-6db5-4d40-b15a-8ea34840daaa/nbextensions/brunel_ext
CC_DISABLE_BIG_BUFFER_API=true
CDSX_APP_ENV_NOTEBOOKS_API_URL=https://cdsx-notebooks-api.ng.bluemix.net/v1/noteb

In [9]:
# look for our log output in the latest kernel log file
! grep 'CloudantRecommender' $(ls -1 $HOME/logs/notebook/*pyspark* | sort -r | head -1)

sort: write failed: standard output: Broken pipe
sort: write error
17/01/13 07:35:29 INFO CloudantRecommender: [Starting load from Cloudant: , 2017-01-13 07:35:29 CST]
17/01/13 07:44:48 INFO CloudantRecommender: [Finished load from Cloudant: , 2017-01-13 07:44:48 CST]
17/01/13 07:49:05 INFO CloudantRecommender: [Found, 1000003, records in Cloudant]
17/01/13 07:49:05 INFO CloudantRecommender: [Starting train model: , 2017-01-13 07:49:05 CST]
17/01/13 07:50:41 INFO CloudantRecommender: [Finished train model: , 2017-01-13 07:50:41 CST]
17/01/13 07:50:41 INFO CloudantRecommender: [Starting __get_top_recommendations: , 2017-01-13 07:50:41 CST]
17/01/13 07:50:47 INFO CloudantRecommender: [Finished __get_top_recommendations: , 2017-01-13 07:50:47 CST]
17/01/13 07:50:47 INFO CloudantRecommender: [Deleted old recommendations db, recommendationdb_1484258805]
17/01/13 07:50:48 INFO CloudantRecommender: [Created new recommendations db, recommendationdb_1484315447]
17/01/13 07:50:52 INFO

In [10]:
# look for our log output in all kernel log files
! grep 'CloudantRecommender' $HOME/logs/notebook/*pyspark* 

/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_164844.log:17/01/05 10:49:08 INFO CloudantRecommender: [Starting load from Cloudant: , 2017-01-05 10:49:08]
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_164844.log:17/01/05 10:53:21 INFO CloudantRecommender: [Finished load from Cloudant: , 2017-01-05 10:53:21]
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_164844.log:17/01/05 10:56:03 INFO CloudantRecommender: [Found, 1000000, records in Cloudant]
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_164844.log:17/01/05 10:56:03 INFO CloudantRecommender: [Starting train model: , 2017-01-05 10:56:03]
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_164844.log:17/01/05 10:56:34 INFO CloudantRecommender: [Finished train model: , 2017-01-05 10:56:34]
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/

In [11]:
! ls $HOME/logs/notebook/*pyspark*

/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_164844.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_170001.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_170634.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_180001.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_185244.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_190001.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_200001.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_210002.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_212445.log
/gpfs/fs01/user/s15a-8ea34840daaa3e-39ca506ba762/logs/notebook/kernel-pyspark-20170105_230004.log
/gpfs/fs01

In [12]:
! grep 'Cloudant' $(ls -1 $HOME/logs/notebook/*pyspark* | sort -r | head -1)

sort: write failed: standard output: Broken pipe
sort: write error
17/01/13 07:35:29 INFO CloudantRecommender: [Starting load from Cloudant: , 2017-01-13 07:35:29 CST]
[WARN] [01/13/2017 07:35:42.111] [Thread-7] [JsonStoreDataAccess(akka://CloudantSpark-f447b13d-5e6c-4d6d-9515-886c76a9fcd1)] Loading data from Cloudant using query: https://9aefd1f0-d288-4666-a12f-abd93ee724fc-bluemix.cloudant.com/ratingdb/_all_docs?limit=1
17/01/13 07:44:48 INFO CloudantRecommender: [Finished load from Cloudant: , 2017-01-13 07:44:48 CST]
17/01/13 07:49:05 INFO CloudantRecommender: [Found, 1000003, records in Cloudant]
17/01/13 07:49:05 INFO CloudantRecommender: [Starting train model: , 2017-01-13 07:49:05 CST]
17/01/13 07:50:41 INFO CloudantRecommender: [Finished train model: , 2017-01-13 07:50:41 CST]
17/01/13 07:50:41 INFO CloudantRecommender: [Starting __get_top_recommendations: , 2017-01-13 07:50:41 CST]
17/01/13 07:50:47 INFO CloudantRecommender: [Finished __get_top_recommendations: , 201