In [1]:
val rawData = sc.textFile("ml-100k/u.data")

In [2]:
rawData.first()

196	242	3	881250949

In [3]:
val rawRatings = rawData.map(_.split("\t").take(3))

In [5]:
import org.apache.spark.mllib.recommendation.ALS

In [8]:
import org.apache.spark.mllib.recommendation.Rating

In [9]:
val ratings = rawRatings.map { case Array(user, movie, rating) => 
Rating(user.toInt, movie.toInt, rating.toDouble)}

In [10]:
ratings.first()

Rating(196,242,3.0)

### Traing a model on the MovieLens

In [11]:
val model = ALS.train(ratings, 50, 10, 0.01)

In [12]:
model.userFeatures

users MapPartitionsRDD[210] at mapValues at ALS.scala:255

In [13]:
model.userFeatures.count

943

In [14]:
model.productFeatures.count

1682

In [15]:
val predictedRating = model.predict(789, 123)

In [16]:
predictedRating

2.039842264056634

In [17]:
val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)

In [18]:
println(topKRecs.mkString("\n"))

Rating(789,530,5.976424168642172)
Rating(789,641,5.87802968106169)
Rating(789,182,5.86964485232568)
Rating(789,199,5.643635699341024)
Rating(789,526,5.632784329713259)
Rating(789,511,5.578748776142128)
Rating(789,211,5.564848267553379)
Rating(789,134,5.535656295377457)
Rating(789,179,5.511331563535278)
Rating(789,156,5.492505422932971)


In [20]:
val movies = sc.textFile("ml-100k/u.item")

In [22]:
val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt,
array(1))).collectAsMap()

In [23]:
val moviesForUser = ratings.keyBy(_.user).lookup(789)

In [24]:
println(moviesForUser.size)

33


In [25]:
moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.
   product), rating.rating)).foreach(println)

(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)


In [26]:
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)

(Man Who Would Be King, The (1975),5.976424168642172)
(Paths of Glory (1957),5.87802968106169)
(GoodFellas (1990),5.86964485232568)
(Bridge on the River Kwai, The (1957),5.643635699341024)
(Ben-Hur (1959),5.632784329713259)
(Lawrence of Arabia (1962),5.578748776142128)
(M*A*S*H (1970),5.564848267553379)
(Citizen Kane (1941),5.535656295377457)
(Clockwork Orange, A (1971),5.511331563535278)
(Reservoir Dogs (1992),5.492505422932971)


In [27]:
import scala.io.Source

In [29]:
val userMovies = ratings.map{ case Rating(user, product, rating) =>
   (user, product) }.groupBy(_._1)


In [36]:
val rawData = sc.textFile("train_noheader.tsv")

In [37]:
val records = rawData.map(line => line.split("\t"))

In [38]:
records.first()

Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees...

In [39]:
import org.apache.spark.mllib.regression.LabeledPoint

In [40]:
import org.apache.spark.mllib.linalg.Vectors

In [41]:
val data = records.map { r => 
val trimmed = r.map(_.replaceAll("\"", ""))
val label = trimmed(r.size -1).toInt
val features = trimmed.slice(4, r.size -1).map(d => if (d ==
"?") 0.0 else d.toDouble)
LabeledPoint(label, Vectors.dense(features))}

In [42]:
data.cache

MapPartitionsRDD[227] at map at <console>:33

In [44]:
val numData = data.count

In [45]:
val nbData = records.map { r =>
val trimmed = r.map(_.replaceAll("\"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size -1).map(d => if (d == "?") 0.0 
else d.toDouble).map(d => if (d < 0) 0.0 else d)
LabeledPoint(label, Vectors.dense(features))}

### Building Classification Model with Logistic regression

In [47]:
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.classification.NaiveBayes


In [48]:
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.tree.impurity.Entropy

In [49]:
val numIterations = 10
val maxTreeDepth = 5

### Traing LogisticRegression

In [50]:
val lrModel = LogisticRegressionWithSGD.train(data, numIterations)

### Train Support Vector Machine with Gradient D kernel

In [51]:
val svmModel = SVMWithSGD.train(data, numIterations)

### Train Naive Bayes Model

In [52]:
val nbModel = NaiveBayes.train(nbData)

### Train Decision Tree

In [53]:
val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)

## Make Prediction 

In [55]:
val dataPoint = data.first
val prediction = lrModel.predict(dataPoint.features)

In [56]:
val trueLabel = dataPoint.label

In [57]:
val predictions = lrModel.predict(data.map(lp => lp.features))
predictions.take(5)

Array(1.0, 1.0, 1.0, 1.0, 1.0)

### Evaluate the performance of the classification using ROC, F, Accuracy, Precision and Recall

In [59]:
val lrTotalCorrect = data.map { point =>
if (lrModel.predict(point.features) == point.label) 1 else 0}.sum
val lrAccuracy = lrTotalCorrect / data.count

In [60]:
lrTotalCorrect

3806.0

In [61]:
lrAccuracy

0.5146720757268425

Gives 51 percent, not very impressive

In [62]:
val svmTotalCorrect = data.map { point =>
if (svmModel.predict(point.features) == point.label) 1 else 0}.sum
val nbTotalCorrect = nbData.map { point =>
if (nbModel.predict(point.features) == point.label) 1 else 0}.sum

In [63]:
val dtTotalCorrect = data.map { point =>
val score = dtModel.predict(point.features)
val predicted = if (score > 0.5) 1 else 0
if (predicted == point.label) 1 else 0}.sum

In [64]:
val svmAccuracy = svmTotalCorrect / numData

In [67]:
svmAccuracy

0.5146720757268425

In [65]:
val nbAccuracy = nbTotalCorrect / numData

In [68]:
nbAccuracy

0.5803921568627451

In [66]:
val dtAccuracy = dtTotalCorrect / numData

In [69]:
dtAccuracy

0.6482758620689655

### Precision and recall

In [71]:
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
   val metrics = Seq(lrModel, svmModel).map { model =>
     val scoreAndLabels = data.map { point =>
       (model.predict(point.features), point.label)
     }
     val metrics = new BinaryClassificationMetrics(scoreAndLabels)
     (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.
   areaUnderROC)
   }

In [73]:
val nbMetrics = Seq(nbModel).map { model =>
val scoreAndLabels = nbData.map { point =>
val score = model.predict(point.features)
(if (score > 0.5) 1.0 else 0.0, point.label)}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}

In [75]:
val dtMetrics = Seq(dtModel).map{ model =>
val scoreAndLabels = data.map { point =>
val score = model.predict(point.features)
(if (score > 0.5) 1.0 else 0.0, point.label)}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)}


In [76]:
val allMetrics = metrics ++ nbMetrics ++ dtMetrics
allMetrics.foreach{case (m, pr, roc) => 
println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")}

LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%
NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559%
DecisionTreeModel, Area under PR: 74.3081%, Area under ROC: 64.8837%


### Standardize Feature

In [77]:
import org.apache.spark.mllib.linalg.distributed.RowMatrix

In [78]:
val vectors = data.map(lp => lp.features)

In [79]:
val matrix = new RowMatrix(vectors)

In [80]:
val matrixSummary = matrix.computeColumnSummaryStatistics()

In [81]:
println(matrixSummary.mean)

[0.41225805299526774,2.76182319198661,0.46823047328613876,0.21407992638350257,0.0920623607189991,0.04926216043908034,2.255103452212025,-0.10375042752143329,0.0,0.05642274498417848,0.02123056118999324,0.23377817665490225,0.2757090373659231,0.615551048005409,0.6603110209601082,30.077079107505178,0.03975659229208925,5716.598242055454,178.75456389452327,4.960649087221106,0.17286405047031753,0.10122079189276531]


In [83]:
println(matrixSummary.min)

[0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,0.0,0.045564223,-1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0]


In [85]:
println(matrixSummary.max)

[0.999426,363.0,1.0,1.0,0.980392157,0.980392157,21.0,0.25,0.0,0.444444444,1.0,0.716883117,113.3333333,1.0,1.0,100.0,1.0,207952.0,4997.0,22.0,1.0,1.0]


In [86]:
println(matrixSummary.variance)

[0.10974244167559023,74.30082476809655,0.04126316989120245,0.021533436332001124,0.009211817450882448,0.005274933469767929,32.53918714591818,0.09396988697611537,0.0,0.001717741034662896,0.020782634824610638,0.0027548394224293023,3.6837889196744116,0.2366799607085986,0.22433071201674218,415.87855895438463,0.03818116876739597,7.877330081138441E7,32208.11624742624,10.453009045764313,0.03359363403832387,0.0062775328842146995]


In [87]:
println(matrixSummary.numNonzeros)

[5053.0,7354.0,7172.0,6821.0,6160.0,5128.0,7350.0,1257.0,0.0,7362.0,157.0,7395.0,7355.0,4552.0,4883.0,7347.0,294.0,7378.0,7395.0,6782.0,6868.0,7235.0]


In [88]:
import org.apache.spark.mllib.feature.StandardScaler

In [90]:
val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)

In [92]:
val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features)))

In [93]:
println(data.first.features)

[0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]


In [94]:
println(scaledData.first.features)

[1.1376473364976751,-0.08193557169294784,1.0251398128933333,-0.05586356442541853,-0.4688932531289351,-0.35430532630793654,-0.3175352172363122,0.3384507982396541,0.0,0.8288221733153222,-0.14726894334628504,0.22963982357812907,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642484,-0.2034625779299476,-0.03296720969690467,-0.04878112975579767,0.9400699751165406,-0.10869848852526329,-0.27882078231369967]


In [None]:
println()