### Uploading the dataset to remote cluster

In [1]:
# Push the dataset to user HDFS directory in the cluster
import os
import dsx_core_utils
dsx_core_utils.upload_hdfs_file(
    source_path=os.environ['DSX_PROJECT_DIR']+'/datasets/SMSSpamCollection.csv', 
    target_path="/user/user1/SMSSpamCollection.csv",
    webhdfsurl="https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/webhdfs/v1")

upload success


### List the available environemnts in Hadoop Integration Service

In [2]:
import dsx_core_utils;
dsx_core_utils.get_dsxhi_info()

Available Hadoop systems: 

['huey1', 'bendy1', 'yccdh5']


[{'LIVYSPARK': 'https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/livy/v1',
  'LIVYSPARK2': 'https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/livy2/v1',
  'WEBHDFS': 'https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/webhdfs/v1',
  'imageIdNameList': [{'id': 'dsx-scripted-ml-python3',
    'name': 'Jupyter with Python 3.5, Scala 2.11, R 3.4.3'},
   {'id': 'dsx-scripted-ml-python2',
    'name': 'Jupyter with Python 2.7, Scala 2.11, R 3.4.3'}],
  'imageList': ['dsx-scripted-ml-python3', 'dsx-scripted-ml-python2'],
  'serviceUserID': 'dsxhi',
  'system': 'huey1'},
 {'LIVYSPARK': 'https://bendy1.fyre.ibm.com:8443/gateway/yc375-master-1/livy/v1',
  'LIVYSPARK2': 'https://bendy1.fyre.ibm.com:8443/gateway/yc375-master-1/livy2/v1',
  'WEBHDFS': 'https://bendy1.fyre.ibm.com:8443/gateway/yc375-master-1/webhdfs/v1',
  'imageIdNameList': [{'id': 'dsx-scripted-ml-python2',
    'name': 'Jupyter with Python 2.7, Scala 2.11, R 3.4.3'},
   {'id': 'dsx-scripted-ml-python3',
    'name': 'J

### Connecting to remote spark using Hadoop Integration Service

In [3]:
import dsx_core_utils

dsxhiRegisterDisplayName="huey1"
imageID= "dsx-scripted-ml-python2"
livy_name = "livyspark2"
dsx_core_utils.setup_livy_sparkmagic(
    system=dsxhiRegisterDisplayName, 
    livy=livy_name, 
    imageId=imageID)

sparkmagic has been configured to use https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/livy2/v1 with image Jupyter with Python 2.7, Scala 2.11, R 3.4.3
success configuring sparkmagic livy.


{'IMAGE': {'ID': 'dsx-scripted-ml-python2',
  'NAME': 'Jupyter with Python 2.7, Scala 2.11, R 3.4.3'},
 'LIVY': 'https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/livy2/v1',
 'SYSTEM': 'huey1',
 'WEBHDFS': 'https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/webhdfs/v1'}

In [4]:
%reload_ext sparkmagic.magics

In [5]:
%spark add -s s4 -l python -u https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/livy2/v1 -k

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
44,application_1535429375180_0026,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


### Reading the dataset from remote cluster

In [6]:
%%spark
smsData = sc.textFile("hdfs:///user/user1/SMSSpamCollection.csv")
smsData.cache()

hdfs:///user/user1/SMSSpamCollection.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

### Creating a data pipeline

In [7]:
%%spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF    
from pyspark.ml.feature import Tokenizer
from pyspark.ml.classification import LogisticRegression
    
tokenizer = Tokenizer(inputCol="message",outputCol="words")
hashingTF = HashingTF(inputCol = tokenizer.getOutputCol(),outputCol="tempfeatures")
idf = IDF(inputCol = hashingTF.getOutputCol(),outputCol="features")
lrClassifier = LogisticRegression()
        
pipeline = Pipeline(stages=[tokenizer,hashingTF,idf,lrClassifier])

### Cleaning the Data

In [8]:
%%spark
# creating a labeled vector
def TransformToVector(string):
    attList = string.split(",")
    smsType = 0.0 if attList[0] == "ham" else 1.0
    return [smsType,attList[1]]
        
smsTransformed = smsData.map(TransformToVector)
        
# creating a data frame from labeled vector
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
smsDF = sqlContext.createDataFrame(smsTransformed,["label","message"])
smsDF.cache()

DataFrame[label: double, message: string]

### Perform Machine learning

In [9]:
%%spark
# split data frame into training and testing
(trainingData,testData) = smsDF.randomSplit([0.9,0.1])
        
#Build a model with Pipeline
lrModel = pipeline.fit(trainingData)
        
#Compute Predictions
prediction = lrModel.transform(testData)
            
#Evaluate Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                                                      labelCol="label", \
                                                      metricName = "accuracy")
accuracy = evaluator.evaluate(prediction)
print "Model Accuracy: " + str(round(accuracy*100,2))
        
# Draw a confusion matrix
prediction.groupby("label","prediction").count().show()

Model Accuracy: 89.09
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   50|
|  0.0|       1.0|    3|
|  1.0|       0.0|    9|
|  0.0|       0.0|   48|
+-----+----------+-----+

<a id='model_HDFS_dump'></a>
## Save and Dump the Model to remote HDFS
The model now exists within the memory of the remote livy session. In order to use it in Watson Studio model management, we need to transfer it to the local Watson Studio environment.  This is done in two parts.

First, in the _remote_ session:

  * Write the model to HDFS as a directory.

In [10]:
%%spark 
#Save model to HDFS
model_name = "LRModel_SparkRemote"
hdfs_model_path = "/tmp/"+model_name
lrModel.write().overwrite().save(hdfs_model_path)

  * Pull the model directory from HDFS to the driver node's local fs.

In [12]:
%%spark
import time, subprocess
local_model_path = "./tmp/"+model_name
output1 = subprocess.Popen(("hdfs dfs -copyToLocal " + hdfs_model_path + " " + local_model_path), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
time.sleep(10)
output2 = subprocess.Popen(("ls -ld ./tmp/LRModel_SparkRemote"), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
print output1.communicate()[0], output2.communicate()[0]

drwxr-xr-x 4 yarn hadoop 36 Oct 25 13:00 ./tmp/LRModel_SparkRemote

  * Create an archive of the model directory.

In [13]:
%%spark
import shutil, os
os.chdir("./tmp")
shutil.make_archive( base_name =model_name, format= 'gztar', root_dir = model_name +'/',  base_dir  =  './' )

'/hadoop/yarn/local/usercache/user1/appcache/application_1535429375180_0026/container_e04_1535429375180_0026_01_000001/tmp/LRModel_SparkRemote.tar.gz'

  * Push the archive back up to HDFS.

In [14]:
%%spark
hdfs_model_dir = "/tmp"
output3 = subprocess.Popen(("hdfs dfs -copyFromLocal -f LRModel_SparkRemote.tar.gz " + hdfs_model_dir), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
time.sleep(10)
output4 = subprocess.Popen(("hdfs dfs -ls /tmp/LRModel_SparkRemote.tar.gz"), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
print output3.communicate()[0], output4.communicate()[0]

 -rw-r--r--   3 yarn hdfs      42353 2018-10-25 13:02 /tmp/LRModel_SparkRemote.tar.gz

<a id='model_hdfs_load'></a>
## Load the model from HDFS into Watson Studio
Then, on the Watson Studio _local_ side:
  * Download the model archive from HDFS to a temporary directory in Watson Studio's filesystem.

In [15]:
from dsx_core_utils import hdfs_util

localmpdir = os.environ["DSX_PROJECT_DIR"] + "/.tmp/LRModel_SparkRemote/"
!mkdir -p $localmpdir

# Download the model archive from HDFS.
hdfs_models_dir = "/tmp"
webhdfs_endpoint = "https://huey1.fyre.ibm.com:8443/gateway/yc375-master-1/webhdfs/v1"
hdfs_util.download_file(webhdfs_endpoint,hdfs_models_dir+"/LRModel_SparkRemote.tar.gz",localmpdir+"LRModel_SparkRemote.tar.gz")
import time
time.sleep(5)

download success


In [16]:
!ls -l $localmpdir

total 50
-rw-rw-rw-. 1 1001 allDSXUsers 42301 Oct 25 20:03 LRModel_SparkRemote.tar.gz
drwxr-xr-x. 2 1001 allDSXUsers  4096 Oct 25 17:52 metadata
drwxr-xr-x. 6 1001 allDSXUsers  4096 Oct 25 17:52 stages


  * Unpack the model archive.

In [17]:
!tar -xzf $localmpdir/LRModel_SparkRemote.tar.gz -C $localmpdir

In [18]:
!ls -l $localmpdir

total 50
-rw-rw-rw-.  1 1001 allDSXUsers 42301 Oct 25 20:03 LRModel_SparkRemote.tar.gz
drwxr-xr-x.  2 1001 allDSXUsers  4096 Oct 25 20:00 metadata
drwxr-xr-x. 10 1001 allDSXUsers  4096 Oct 25 20:00 stages


  * Load the model into memory from the unpacked archive.

In [19]:
from pyspark.ml import PipelineModel
lrModel = PipelineModel.load(localmpdir)

<a id='save_model'></a>
## Save the model

In [37]:
from dsx_ml.ml import save
save(name = 'LRModel_SparkRemote',
     model = lrModel,
     test_data = testData,
     algorithm_type = 'Classification')

{'path': '/user-home/1001/DSX_Projects/sms-spam-filter-using-hortonworks/models/LRModel_SparkRemote/1',
 'scoring_endpoint': 'https://dsxl-api/v3/project/score/Python35/spark-2.2/sms-spam-filter-using-hortonworks/LRModel_SparkRemote/1'}

In [36]:
from pyspark.sql import SQLContext
testData = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/sms-data.csv', header='true', inferSchema = 'true')