# Spark Implementation

## Set Seed

In [1]:
SEED = 42

In [2]:
# Import findspark
import findspark
findspark.init('/home/ubuntu/spark-3.3.1-bin-hadoop3')
findspark.find()

'/home/ubuntu/spark-3.3.1-bin-hadoop3'

In [3]:
from pyspark.sql import SparkSession

# The entry point into all functionality in Spark is the SparkSession class.
spark = (SparkSession
	.builder
	.appName("DS5110/CS5501: my awesome Spark program")
	.master("spark://172.31.88.97:7077")
	.config("spark.executor.memory", "1024M")
	.getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/04/21 23:40:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

## Read Data

In [5]:
# Split data into X and y
X_reg_train = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/X_reg_train.csv')
              )
X_reg_test = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/X_reg_test.csv')
              )
X_class_train = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/X_class_train.csv')
              )
X_class_test = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/X_class_test.csv')
              )
y_reg_train = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/y_reg_train.csv')
              )
y_reg_test = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/y_reg_test.csv')
              )
y_class_train = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/y_class_train.csv')
              )
y_class_test = (spark.read
               .format('csv')
               .option('inferSchema', True)
               .option('header', True)
               .load('hdfs://172.31.88.97:9000/y_class_test.csv')
              )

                                                                                

## Train test split
# NEEDS TO BE STANDARDIZED ACROSS IMPLEMENTATIONS

In [13]:
(training_data, test_data) = subset_data.randomSplit([0.8, 0.2], seed=SEED)

### Setup

In [14]:
from pyspark import SparkConf, SparkContext
from sklearn.neighbors import NearestNeighbors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator, MulticlassClassificationEvaluator

evaluator = RegressionEvaluator(
    labelCol='baseFare', predictionCol='prediction', metricName='rmse'
)

numericCols = ['dateDelta', 'seatsRemaining', 'totalTravelDistance', 'durationSeconds']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
train = assembler.transform(training_data)
test = assembler.transform(test_data)

## KNN Model

In [None]:
# Input a Spark object containing
# all vectors, called myvecs
myvecs.cache()
 
# Create kNN tree locally, and broadcast
myvecscollected = myvecs.collect()
knnobj = NearestNeighbors().fit(myvecscollected)
bc_knnobj = sc.broadcast(knnobj)
 
# Get neighbors for each point, distributedly
results = myvecs.map(lambda x: bc_knnobj.value.kneighbors(x))

## K-Means Model

In [None]:
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(train)

# Make predictions
predictions = model.transform(test)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)



Silhouette with squared euclidean distance = 0.7215018401621854
Cluster Centers: 
[2.71430356e+01 6.40547939e+00 1.15242095e+03 1.56616018e+04]
[2.73867929e+01 6.00535333e+00 2.16721632e+03 3.69557006e+04]


                                                                                

## Random Forest Model

### Timing Code

In [None]:
%%timeit
# Fit
rf = RandomForestRegressor(featuresCol = 'features', labelCol='baseFare')
model = rf.fit(train)

# Predict
predictions = model.transform(test)

# Evaluate
evaluator = RegressionEvaluator(
    labelCol='baseFare', predictionCol='prediction', metricName='rmse'
)
rmse = evaluator.evaluate(predictions)

### Accuracy Code

In [48]:
# Fit
rf = RandomForestRegressor(featuresCol = 'features', labelCol='baseFare')
model = rf.fit(train)

# Predict
predictions = model.transform(test)

# Evaluate
evaluator = RegressionEvaluator(
    labelCol='baseFare', predictionCol='prediction', metricName='rmse'
)
rmse = evaluator.evaluate(predictions)

print(rmse)

148.39153072138427

## SVM

In [53]:
from pyspark.ml.feature import StringIndexer

label_stringIdx = StringIndexer(inputCol = 'baseFare', outputCol = 'labelIndex')
training_data = label_stringIdx.fit(training_data).transform(training_data)
test_data = label_stringIdx.fit(test_data).transform(test_data)

                                                                                

In [54]:
numericCols = ['dateDelta', 'seatsRemaining', 'totalTravelDistance', 'durationSeconds', 'baseFare']
assembler = VectorAssembler(inputCols=numericCols, outputCol="features")
train = assembler.transform(training_data)
test = assembler.transform(test_data)

In [60]:
from pyspark.ml.classification import LinearSVC, OneVsRest

# initiate base classifier
lsvc = LinearSVC(maxIter=10, regParam=0.1)

# initiate one vs. rest classifier
ovr = OneVsRest(classifier=lsvc, featuresCol = 'features', labelCol='labelIndex')

# Fit model
model = ovr.fit(train)

# predict
predictions = model.transform(test)

# initialize evaluator
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')

# compute classification error
accuracy = evlauator.evaluate(predictions)
print(f'Test error = {accuracy}')

                                                                                

24/04/08 17:31:59 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 


                                                                                

24/04/08 17:32:59 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 


                                                                                

24/04/08 17:38:44 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
24/04/08 17:38:50 ERROR OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 


KeyboardInterrupt: 

[Stage 1495:>                                                      (0 + 8) / 20]