## KMeans Clustering

### 建立KMeans 模型

In [None]:
%pyspark
raw_data = sc.textFile('file:/tmp/iris.csv')
raw_data.take(3)

header = raw_data.first()
skip_data = raw_data.filter(lambda line : line != header)
skip_data.take(3)

import numpy 
parsedData = skip_data.map(lambda e: \
    numpy.array( [float(ele) for ele in e.split(',')[0:4]] ) )
parsedData.take(3)

from pyspark.mllib.clustering import KMeans
clusters = KMeans.train(parsedData, 4, maxIterations=10,runs=30, initializationMode="random")

iris1 = parsedData.first()
iris1
clusters.predict(iris1)

prediction = clusters.predict(parsedData)
prediction.collect()

### 計算  Withing Cluster Sum of Square Error 

In [None]:
%pyspark

from math import sqrt
def error(point):
    center = clusters.centers[clusters.predict(point)] 
    return sqrt(sum([x**2 for x in (point - center)]))


In [None]:
%pyspark
parsedData.take(3)
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x,y: x+ y)
WSSSE

## 客戶分群

In [None]:
raw_data = sc.textFile('file:/tmp/customers.csv')
raw_data.take(3)
header = raw_data.first()
skip_data = raw_data.filter(lambda line: line != header )
parsed_data = skip_data.map(lambda e: numpy.array(e.split(',')[3:]) )

from pyspark.mllib.clustering import KMeans
clusters = KMeans.train(parsed_data, 5, maxIterations=10,runs=30, initializationMode="random")
predictions = clusters.predict(parsed_data).collect()
predictions

## ALS 推薦

### 載入資料

In [None]:
%pyspark
rawData = sc.textFile("/tmp/u.data") 
rawData.first()

rawRatings = rawData.map(lambda e: e.split()) 
rawRatings.take(3)

### 資料轉換

In [None]:
%pyspark
from pyspark.sql import Row
#將資料轉進ratingsRDD 物件 
ratingsRDD = rawRatings.map(\
           lambda p: Row(userId=int(p[0]), \
           movieId=int(p[1]), \
           rating=float(p[2]), \
           timestamp=int(p[3])))

ratingsRDD.take(3)

In [None]:
%pyspark
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])


### 產生模型

In [None]:
%pyspark
from pyspark.ml.recommendation import ALS
als = ALS(rank=50, maxIter=10, regParam=0.01, \
      userCol="userId", itemCol="movieId", \
      ratingCol="rating")
model = als.fit(training)
model

### 檢視模型

In [None]:
%pyspark
#dir(model)
#model.itemFactors.take(1)
model.itemFactors.count()
model.userFactors.count()

### 產生topk 推薦

In [None]:
%pyspark
userRecs = model.recommendForAllUsers(10)
userRecs.select('recommendations').take(1)

movieRecs = model.recommendForAllItems(10)
movieRecs.select('recommendations').take(1)


## Cosine Similarties

In [None]:
%pyspark
def cosineSimilarity(x, y):
    return x.dot(y)/(x.norm(2)*y.norm(2))

In [None]:
%pyspark
from pyspark.mllib.linalg import Vectors
features = model.itemFactors.filter('id == 567').select('features')
features_vec = features.rdd.map(lambda e: e['features']).collect()

features1 = model.itemFactors.filter('id == 789').select('features')
features1_vec = features1.rdd.map(lambda e: e['features']).collect()
#features_vec
itemVector = Vectors.dense(features_vec[0]) 
itemVector1 = Vectors.dense(features1_vec[0])
cosineSimilarity(itemVector, itemVector1)

In [None]:
%pyspark
from pyspark.mllib.linalg.distributed import RowMatrix
mat = RowMatrix(model.itemFactors.rdd.map(lambda e: e['features']))
res = mat.columnSimilarities()
res.entries.collect()

In [None]:
%pyspark
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix,IndexedRow
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
mat = IndexedRowMatrix(model.itemFactors.rdd.map(lambda x: IndexedRow(x[0],x[1]))).toBlockMatrix().transpose().toIndexedRowMatrix()
exact = mat.columnSimilarities() 
res = exact.entries.collect()



In [None]:
%pyspark
moviedic = {}
with open('/tmp/u.item', 'r') as f:
    for l in f.readlines():
        rec = l.strip().split('|')
        moviedic[int(rec[0])] = rec[1]

ary = []
for ele in res:
    if ele.i == 181:
        ary.append((moviedic[ele.i], moviedic[ele.j], ele.value) )
        
import operator
swd = sorted(ary, key = operator.itemgetter(2), reverse=True)
for ele in swd[0:10]:
    print(ele)


### 評估推薦模型

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test).na.drop()
evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction") 
rmse = evaluator.evaluate(predictions)
rmse

## Stateless

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import sys
sc = SparkContext(appName="StreamingErrorCount")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 5)
#ssc.checkpoint("hdfs:///user/hdp/streaming")
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

counts = lines.flatMap(lambda line: line.split(" "))\
    .filter(lambda word:"ERROR" in word)\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a+b)

counts.pprint()

ssc.start()
ssc.awaitTermination()

## Stateful

In [None]:
from pyspark.streaming import StreamingContext
import sys
sc = SparkContext(appName="StreamingErrorCount")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
#ssc.checkpoint#("hdfs:///user/hdp/streaming")
ssc.checkpoint("file:///tmp/streaming")
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

counts = lines.flatMap(lambda line: line.split(" "))\
    .filter(lambda line:"ERROR" in line)\
    .map(lambda word: (word, 1))\
    .reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

counts.pprint()

ssc.start()
ssc.awaitTermination()