In [2]:
import numpy as np
from pyspark import SparkConf, SparkContext 

def parsePurchases(line):
    """
    Parses a purchases record in Purchases format user_Id::sku_Id::purchases::timestamp .
    """
    fields = line.strip().split("::")
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

def parseSku(line):
    """
    Parses a sku record in Sku format skuId::sku_Title .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

if __name__ == "__main__":
    if (len(sys.argv) != 3):
        print("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g " + \
          "recommendaion.py DataDir personalPurchasesFile")
        sys.exit(1)

    # set up environment
    conf = SparkConf() \
      .setAppName("Reccomendation") \
      .set("spark.executor.memory", "2g")
    sc = SparkContext(conf=conf)

    # load personal purchases
    myPurchases = loadPurchases(sys.argv[2])
    myPurchasesRDD = sc.parallelize(myPurchases, 1)
    
    # load purchases and product titles

    PurchasesHomeDir = sys.argv[1]

    # ratings is an RDD of (last digit of timestamp, (user_Id, sku_Id, purchases))
    purchases = sc.textFile(join(PurchasesHomeDir, "purchases.dat")).map(parsePurchases)

    # sku is an RDD of (sku_Id, sku_Title)
    sku = dict(sc.textFile(join(PurchasesHomeDir, "sku.dat")).map(parseSku).collect())

    numPurchases = purchases.count()
    numUsers = purchases.values().map(lambda r: r[0]).distinct().count()
    numSkus = purchases.values().map(lambda r: r[1]).distinct().count()

    print("Got %d ratings from %d users on %d movies.", numPurchases, numUsers, numSkus)

    # split ratings into train (60%), validation (20%), and test (20%) based on the 
    # last digit of the timestamp, add myRatings to train, and cache them

    # training, validation, test are all RDDs of (userId, movieId, rating)

    numPartitions = 4
    training = purchases.filter(lambda x: x[0] < 6) \
      .values() \
      .union(myPurchasesRDD) \
      .repartition(numPartitions) \
      .cache()

    validation = purchases.filter(lambda x: x[0] >= 6 and x[0] < 8) \
      .values() \
      .repartition(numPartitions) \
      .cache()

    test = purchases.filter(lambda x: x[0] >= 8).values().cache()

    numTraining = training.count()
    numValidation = validation.count()
    numTest = test.count()

    print("Training: %d, validation: %d, test: %d", numTraining, numValidation, numTest)

    # train models and evaluate them on the validation set

    ranks = [8, 12]
    lambdas = [0.1, 10.0]
    numIters = [10, 20]
    bestModel = None
    bestValidationRmse = float("inf")
    bestRank = 0
    bestLambda = -1.0
    bestNumIter = -1

    for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
        model = ALS.train(training, rank, numIter, lmbda)
        validationRmse = computeRmse(model, validation, numValidation)
        print("RMSE (validation) = %f for the model trained with ", validationRmse + \
            "rank = %d, lambda = %.1f, and numIter = %d.", rank, lmbda, numIter)
        if (validationRmse < bestValidationRmse):
            bestModel = model
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lmbda
            bestNumIter = numIter

    testRmse = computeRmse(bestModel, test, numTest)

    # evaluate the best model on the test set
    print("The best model was trained with rank = %d and lambda = %.1f, ", bestRank, bestLambda \
      + "and numIter = %d, and its RMSE on the test set is %f.", bestNumIter, testRmse)

    # compare the best model with a naive baseline that always returns the mean rating
    meanPurchase = training.union(validation).map(lambda x: x[2]).mean()
    baselineRmse = sqrt(test.map(lambda x: (meanPurchase - x[2]) ** 2).reduce(add) / numTest)
    improvement = (baselineRmse - testRmse) / baselineRmse * 100
    print("The best model improves the baseline by %.2f", improvement + "%.")

    # make personalized recommendations

    myRatedSkuIds = set([x[1] for x in myPurchases])
    candidates = sc.parallelize([m for m in movies if m not in myRatedSkuIds])
    predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
    recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]

    print("Movies recommended for you:"
    for i in xrange(len(recommendations)):
        print ("%2d: %s" % (i + 1, sku[recommendations[i][1]])).encode('ascii', 'ignore'))

    # clean up
    sc.stop()

ImportError: 
Importing the multiarray numpy extension module failed.  Most
likely you are trying to import a failed build of numpy.
If you're working with a numpy git repo, try `git clean -xdf` (removes all
files not under version control).  Otherwise reinstall numpy.
