# SPARK RDD interface

In [None]:
## local mode: pyspark --master local[7]
## cluster mode: /usr/lib/spark/sbin/start-all.sh
##               visit: http://google-vm-ip:8080
##               check jobs: http://google-vm-ip:4040/jobs/
##				 pyspark --master spark://slbd2017.c.slbd-157219.internal:7077 (copied from webpage)

## Data from https://www.dropbox.com/s/bh5qe2ligmggw3s/sp500withAllRec.csv?dl=0

## Spark v1.0+ RDD API
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel




data = sc.textFile("sp500withAllRec.csv")
header = data.first()
data = data.filter(lambda row: row!=header)
parsedData = data.map(lambda line: array([float(x) for x in line.split(',')]))

data.count()


clusters = KMeans.train(parsedData, 500, maxIterations=100, initializationMode="random")

# load data in "mem" across machines
parsedData.cache()
clusters = KMeans.train(parsedData, 500, maxIterations=100, initializationMode="random")

In [None]:
## Get cluster labels
parsedData.map(lambda point: clusters.predict(point)).take(10)
parsedData.map(lambda point: clusters.predict(point)).countByValue()

# compute sum squares by map-reduce
def ss(point):
    center = clusters.centers[clusters.predict(point)]
    return sum([x**2 for x in (point - center)])

WSSSE = parsedData.map(lambda point: ss(point)).reduce(lambda x, y: x + y)
print "Within Set Sum of Squared Error = " + str(WSSSE)


# SPARK 2.0 Data Frame API

In [None]:
## Spark 2.0 Date Frame API 

spDF = spark.read.csv("sp500withAllRec.csv", header=True, inferSchema=True)


# first 5 lines
spDF.limit(5).show()


# count
spDF.count()

# pick column
spDF.filter(spDF['aa']>0.1).show()
spDF.filter(spDF['aa']>0.1).count()



# create SQL
spDF.createGlobalTempView("sp500")


# run sql
spark.sql('select count(*) from global_temp.sp500').show()

# cache
spark.catalog.cacheTable("global_temp.sp500")
spark.table("global_temp.sp500").count()

