In [1]:
import numpy as np
import pandas as pd
import scipy.stats as sts


In [8]:
# ran out of RAM below
# df_ll = pd.read_csv('processed_data/df_ll.csv')

# df = df_ll[['LOAN_ID',"lender_v"]]

# from scipy.sparse import csr_matrix
# df_sparse = csr_matrix( (np.ones(df.values.shape[0]), (df.values[:,0], df.values[:,1])), shape=(1500000,1500000))

# from scipy.sparse.linalg import svds

# u,sigma,vt = svds(df_sparse,k=25)

# sigma = np.diag(sigma)

# temp = np.dot(u,sigma)

# pred = np.dot(temp,vt)

In [7]:
import pyspark as ps

spark = (ps.sql.SparkSession.builder 
        .master("local[4]") 
        .appName("kiva") 
        .getOrCreate()
        )

sc = spark.sparkContext

In [9]:
spark

In [10]:
df_ll = spark.read.csv('processed_data/df_ll.csv',header=True)

In [55]:
# df_ll = df_ll.select('LOAN_ID','lender_v')
# cast("float")
df = df_ll.select(df_ll.LOAN_ID.cast("int"),df_ll.lender_v.cast("int"))

In [56]:
df.show(2)

+-------+--------+
|LOAN_ID|lender_v|
+-------+--------+
| 483693|  970524|
| 483693| 1153379|
+-------+--------+
only showing top 2 rows



In [57]:
from pyspark.sql.functions import lit

df_with_one = df.withColumn("value", lit(1))
df_with_one.show(4)

+-------+--------+-----+
|LOAN_ID|lender_v|value|
+-------+--------+-----+
| 483693|  970524|    1|
| 483693| 1153379|    1|
| 483693|  187221|    1|
| 483693|  758772|    1|
+-------+--------+-----+
only showing top 4 rows



In [58]:
(sample,rest) = df_with_one.randomSplit([1.0,9.0],17)
# 10% samples

In [14]:
sample.count()

2830799

In [9]:
splits[1].count()

25463132

In [59]:
(test,train) = sample.randomSplit([1.0,2.0],17) #train, test split

### spark RDD - ALS model

In [16]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [17]:
train.show(3) #train

+-------+--------+-----+
|LOAN_ID|lender_v|value|
+-------+--------+-----+
|1000000|  255491|    1|
|1000006| 1159371|    1|
|1000017|  775140|    1|
+-------+--------+-----+
only showing top 3 rows



In [19]:
rank = 10
numIterations = 5
model = ALS.trainImplicit(train, rank, numIterations, alpha=0.01)

In [20]:
model.productFeatures().first()

(0,
 array('d', [-1.3540508991827238e-10, -4.942654618922404e-11, 1.5867716862683068e-10, -1.184339709636717e-10, 1.7544958541559907e-10, -3.040698595691005e-11, 1.4020448864393575e-11, -2.80317522666218e-11, 9.058524763627673e-11, -3.3328761972484244e-10]))

In [21]:
model.userFeatures().first()

(84,
 array('d', [4.773510227096267e-07, -2.0603437178579043e-07, -2.4050004299169814e-07, 5.896069410482596e-07, 5.543909651350987e-07, -5.338889650374767e-07, -1.2012419858820067e-07, 3.049543693123269e-07, -8.583297272934942e-08, -8.86436339442298e-07]))

In [22]:
# For Product X, Find N Users to Sell To
model.recommendUsers(112954,10)

[Rating(user=353877, product=112954, rating=1.2713641794018589e-15),
 Rating(user=364491, product=112954, rating=8.591934221515829e-16),
 Rating(user=294866, product=112954, rating=8.580264815456912e-16),
 Rating(user=360515, product=112954, rating=8.567132818499773e-16),
 Rating(user=360850, product=112954, rating=8.546119267452279e-16),
 Rating(user=313080, product=112954, rating=8.50040929337948e-16),
 Rating(user=296696, product=112954, rating=8.463275689970369e-16),
 Rating(user=1069919, product=112954, rating=5.123664697819945e-16),
 Rating(user=870655, product=112954, rating=5.064082531279886e-16),
 Rating(user=314583, product=112954, rating=5.064082531279886e-16)]

In [51]:
# For User Y Find N Products to Promote
model.recommendProducts(1000000,10)

[Rating(user=1000000, product=127737, rating=0.00035109201090663725),
 Rating(user=1000000, product=1324264, rating=0.00011416923147043728),
 Rating(user=1000000, product=1351485, rating=5.280993206730332e-05),
 Rating(user=1000000, product=497388, rating=2.455624244660106e-05),
 Rating(user=1000000, product=156079, rating=2.0856600996067862e-05),
 Rating(user=1000000, product=1292568, rating=9.770872143429366e-06),
 Rating(user=1000000, product=502744, rating=8.801814963218335e-06),
 Rating(user=1000000, product=945372, rating=7.95519832448031e-06),
 Rating(user=1000000, product=374043, rating=7.874760479057205e-06),
 Rating(user=1000000, product=1335018, rating=7.793464551947238e-06)]

In [47]:
#Predict Single Product for Single User
model.predict(1000000,255491) #an actual 1 pair.

1.509191707657275e-07

In [25]:
# Predict Multi Users and Multi Products
predictions = model.predictAll(train.rdd.map(lambda r: (r[0], r[1])))

In [26]:
true_reorg = train.rdd.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = predictions.map(lambda x:((x[0],x[1]), x[2]))

In [27]:
Do the actual join
true_pred = true_reorg.join(pred_reorg)

In [28]:
# Need to be able to square root the Mean-Squared Error
from math import sqrt

MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = sqrt(MSE)


In [29]:
RMSE

0.0

In [54]:
test_pred = model.predictAll(test.rdd.map(lambda r: (r[0], r[1])))

In [55]:
true_reorg = test.rdd.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = test_pred.map(lambda x:((x[0],x[1]), x[2]))

In [56]:
#Do the actual join
true_pred = true_reorg.join(pred_reorg)

In [57]:
MSE = true_pred.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = sqrt(MSE)

In [58]:
RMSE

0.0

### spark df - ALS model

In [42]:
from pyspark.ml.recommendation import ALS

In [43]:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,userCol="lender_v", itemCol="LOAN_ID", ratingCol="value")

In [52]:
train.dtypes

[('LOAN_ID', 'string'), ('lender_v', 'string'), ('value', 'int')]

In [60]:
model = als.fit(train)

In [61]:
predictions = model.transform(test)

In [63]:
from pyspark.ml.evaluation import RegressionEvaluator

In [64]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="value",
                                predictionCol="prediction")

In [65]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = nan


Same as RDD ALS model - rmse is not a good metric to evaluate the performance. Because there could be 1m zeros and a few ones. Overall the rmse will just be zeros.(?)

solutions: -- build a metrics:   
what is the top 5 loans the system is recommending?  
if the true lender-loan pair is in the the recommendations? Yes - 1, No - 0;  
The mean of the column will be the metrics. The higher the number, the better the performance.

In [66]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(5)

In [None]:
userRecs.show(2)

In [45]:
train.show()

+-------+--------+-----+
|LOAN_ID|lender_v|value|
+-------+--------+-----+
|1000000|  255491|    1|
|1000006| 1159371|    1|
|1000017|  775140|    1|
|1000026|   51878|    1|
|1000044|  409840|    1|
|1000061| 1039961|    1|
|1000072| 1222640|    1|
|1000072| 1365845|    1|
|1000072|   23408|    1|
|1000072|  934438|    1|
|1000074| 1114645|    1|
|1000074| 1131616|    1|
|1000074|  965558|    1|
|1000078| 1199460|    1|
|1000078| 1253941|    1|
|1000078| 1380069|    1|
|1000078|  284135|    1|
|1000090| 1365845|    1|
|1000103| 1002601|    1|
|1000103| 1029855|    1|
+-------+--------+-----+
only showing top 20 rows



In [30]:
# l = predictions.collect()

### spark - SVD model 

In [30]:
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

In [31]:
mat = CoordinateMatrix(train.rdd.map(lambda r: MatrixEntry(r.LOAN_ID, r.lender_v,r.value)))

In [33]:
matrow = mat.toRowMatrix()

In [35]:
svd = matrow.computeSVD(5, computeU=True)

Py4JJavaError: An error occurred while calling o195.computeSVD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 333.0 failed 1 times, most recent failure: Lost task 3.0 in stage 333.0 (TID 202, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Found duplicate indices: 1069744.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.mllib.linalg.Vectors$$anonfun$sparse$1.apply$mcVI$sp(Vectors.scala:334)
	at org.apache.spark.mllib.linalg.Vectors$$anonfun$sparse$1.apply(Vectors.scala:333)
	at org.apache.spark.mllib.linalg.Vectors$$anonfun$sparse$1.apply(Vectors.scala:333)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.mllib.linalg.Vectors$.sparse(Vectors.scala:333)
	at org.apache.spark.mllib.linalg.distributed.CoordinateMatrix$$anonfun$2.apply(CoordinateMatrix.scala:90)
	at org.apache.spark.mllib.linalg.distributed.CoordinateMatrix$$anonfun$2.apply(CoordinateMatrix.scala:89)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.multiplyGramianMatrixBy(RowMatrix.scala:93)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$16.apply(RowMatrix.scala:268)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix$$anonfun$16.apply(RowMatrix.scala:268)
	at org.apache.spark.mllib.linalg.EigenValueDecomposition$.symmetricEigs(EigenValueDecomposition.scala:106)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeSVD(RowMatrix.scala:268)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeSVD(RowMatrix.scala:194)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: Found duplicate indices: 1069744.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.mllib.linalg.Vectors$$anonfun$sparse$1.apply$mcVI$sp(Vectors.scala:334)
	at org.apache.spark.mllib.linalg.Vectors$$anonfun$sparse$1.apply(Vectors.scala:333)
	at org.apache.spark.mllib.linalg.Vectors$$anonfun$sparse$1.apply(Vectors.scala:333)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.mllib.linalg.Vectors$.sparse(Vectors.scala:333)
	at org.apache.spark.mllib.linalg.distributed.CoordinateMatrix$$anonfun$2.apply(CoordinateMatrix.scala:90)
	at org.apache.spark.mllib.linalg.distributed.CoordinateMatrix$$anonfun$2.apply(CoordinateMatrix.scala:89)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
U = svd.U       # The U factor is a RowMatrix.
s = svd.s       # The singular values are stored in a local dense vector.
V = svd.V       # The V factor is a local dense matrix.

In [None]:
# rows = sc.parallelize([
#     Vectors.sparse(5, {1: 1.0, 3: 7.0}),
#     Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
#     Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
# ])

# matmini = RowMatrix(rows)