# Big Data 4 - Spark MLLib

In [1]:
%login montana.dataapplab.com

[ssh] Login to montana.dataapplab.com...
[ssh] forwarding local agent
[ssh] Successfully logged in.


In [2]:
pwd

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang
/home/dcdsdepeizhang


In [3]:
ls

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang
data
demo


In [4]:
cd demo/

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang
[ssh] new cwd: /home/dcdsdepeizhang/demo


In [5]:
rm /home/dcdsdepeizhang/data/iris.csv
hdfs dfs -rm hdfs:///user/dcdsdepeizhang/data/iris.csv
hdfs dfs -rm -r -f hdfs:///user/dcdsdepeizhang/models/iris-NaiveBayesModel

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang/demo
20/07/30 00:12:06 INFO fs.TrashPolicyDefault: Moved: 'hdfs://m1.mt.dataapplab.com:8020/user/dcdsdepeizhang/data/iris.csv' to trash at: hdfs://m1.mt.dataapplab.com:8020/user/dcdsdepeizhang/.Trash/Current/user/dcdsdepeizhang/data/iris.csv
20/07/30 00:12:09 INFO fs.TrashPolicyDefault: Moved: 'hdfs://m1.mt.dataapplab.com:8020/user/dcdsdepeizhang/models/iris-NaiveBayesModel' to trash at: hdfs://m1.mt.dataapplab.com:8020/user/dcdsdepeizhang/.Trash/Current/user/dcdsdepeizhang/models/iris-NaiveBayesModel1596093129058


In [6]:
hdfs dfs -get hdfs:///user/jason/data/iris.csv /home/dcdsdepeizhang/data/iris.csv

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang/demo


In [7]:
ls /home/dcdsdepeizhang/data/iris.csv

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang/demo
/home/dcdsdepeizhang/data/iris.csv


In [8]:
hdfs dfs -put /home/dcdsdepeizhang/data/iris.csv hdfs:///user/dcdsdepeizhang/data/iris.csv

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang/demo


In [9]:
hdfs dfs -ls hdfs:///user/dcdsdepeizhang/data/iris.csv

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang/demo
-rw-r--r--   3 dcdsdepeizhang student       5107 2020-07-30 00:12 hdfs:///user/dcdsdepeizhang/data/iris.csv


In [10]:
cat << EOF > spark-mllib.py

from pyspark import SparkContext
sc = SparkContext()
sc.setLogLevel("ERROR")

from pyspark.sql import SparkSession
spark = SparkSession(sc)

from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



struct = StructType([ 
    StructField('Id',            IntegerType(), True), 
    StructField('SepalLengthCm', DoubleType(),  True), 
    StructField('SepalWidthCm',  DoubleType(),  True), 
    StructField('PetalLengthCm', DoubleType(),  True), 
    StructField('PetalWidthCm',  DoubleType(),  True), 
    StructField('Species',       StringType(),  True)
])

print "\nCreate DataFrame\n"
df_iris = spark.read.csv('hdfs:///user/dcdsdepeizhang/data/iris.csv', header=True, schema=struct)
df_iris.printSchema()
df_iris.describe().show()
df_iris.show(5)

print "\nVectorize Features & Label Target\n"
vecAssembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm", "PetalWidthCm"], outputCol="features")
df_features = vecAssembler.transform(df_iris)
stringIndexer = StringIndexer(inputCol="Species", outputCol="label")
df_features_label = stringIndexer.fit(df_features).transform(df_features)
df_features_label.show(5)
# df_features_label.select('label').distinct().show()

print "\nNaive Bayes Model\n"
df_train, df_test = df_features_label.randomSplit([.7, .3])
nb = NaiveBayes(featuresCol="features", labelCol="label")
nb_model = nb.fit(df_train)
print nb_model

print "\nTest Model on a Trial Vector\n"
X_trial = sc.parallelize([Row(features=Vectors.dense([5.1, 3.5, 1.4, 0.2]))]).toDF()
y_trial = nb_model.transform(X_trial)
y_trial.show()

print "\nPredict on Test Set\n"
df_predicted = nb_model.transform(df_test.select('features', 'label'))
df_predicted.show(5)

print "\nEvaluate Model\n"
evaluator = MulticlassClassificationEvaluator()
print evaluator.evaluate(df_predicted)

print "\nExport Model & Import Model\n"
output_dir = "hdfs:///user/dcdsdepeizhang/models/iris-NaiveBayesModel"
nb_model.save(output_dir)
nb_model = NaiveBayesModel.load(output_dir)
y_trial = nb_model.transform(X_trial)
y_trial.show()

print "\nDecision Tree Model\n"
dt = DecisionTreeClassifier(maxDepth=5, featuresCol="features", labelCol="label")
dt_model = dt.fit(df_train)
print dt_model.toDebugString

print "\nTest Model on a Trial Vector\n"
y_trial = dt_model.transform(X_trial)
y_trial.show()

print "\nPredict on Test Set\n"
df_predicted = dt_model.transform(df_test.select('features', 'label'))
df_predicted.show(5)

print "\nEvaluate Model\n"
evaluator = MulticlassClassificationEvaluator()
print evaluator.evaluate(df_predicted)

EOF

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang/demo


In [11]:
SPARK_MAJOR_VERSION=2 spark-submit spark-mllib.py

[ssh] host = montana.dataapplab.com, cwd = /home/dcdsdepeizhang/demo
SPARK_MAJOR_VERSION is set to 2, using Spark2
20/07/30 00:12:28 INFO SparkContext: Running Spark version 2.0.0.2.5.3.0-37
20/07/30 00:12:28 INFO SecurityManager: Changing view acls to: dcdsdepeizhang
20/07/30 00:12:28 INFO SecurityManager: Changing modify acls to: dcdsdepeizhang
20/07/30 00:12:28 INFO SecurityManager: Changing view acls groups to: 
20/07/30 00:12:28 INFO SecurityManager: Changing modify acls groups to: 
20/07/30 00:12:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(dcdsdepeizhang); groups with view permissions: Set(); users  with modify permissions: Set(dcdsdepeizhang); groups with modify permissions: Set()
20/07/30 00:12:29 INFO Utils: Successfully started service 'sparkDriver' on port 44974.
20/07/30 00:12:29 INFO SparkEnv: Registering MapOutputTracker
20/07/30 00:12:29 INFO SparkEnv: Registering BlockManagerMaster
20/07/30 00:12

In [12]:
%logout

[ssh] Closing existing connection.
[ssh] Successfully logged out.
