This notebook is designed to run in a IBM Watson Studio default runtime (NOT the Watson Studio Apache Spark Runtime as the default runtime with 1 vCPU is free of charge). Therefore, we install Apache Spark in local mode for test purposes only. Please don't use it in production.

In case you are facing issues, please read the following two documents first:

https://github.com/IBM/skillsnetwork/wiki/Environment-Setup

https://github.com/IBM/skillsnetwork/wiki/FAQ

Then, please feel free to ask:

https://coursera.org/learn/machine-learning-big-data-apache-spark/discussions/all

Please make sure to follow the guidelines before asking a question:

https://github.com/IBM/skillsnetwork/wiki/FAQ#im-feeling-lost-and-confused-please-help-me


If running outside Watson Studio, this should work as well. In case you are running in an Apache Spark context outside Watson Studio, please remove the Apache Spark setup in the first notebook cells.

In [None]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')


In [None]:
!pip install pyspark==2.4.5

In [None]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [None]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In case you want to learn how ETL is done, please run the following notebook first and update the file name below accordingly

https://github.com/IBM/coursera/blob/master/coursera_ml/a2_w1_s3_ETL.ipynb

In [None]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

In [None]:
from pyspark.ml.feature import StringIndexer


indexer = StringIndexer(inputCol="class", outputCol="classIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
indexed.select('classIndex').distinct().show()

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer


encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

# For your special case that has string instead of doubles you should cast them first.
# expr = [col(c).cast("Double").alias(c) 
#         for c in vectorAssembler.getInputCols()]

# df2 = df2.select(*expr)
features_vectorized = vectorAssembler.transform(encoded)
features_vectorized.show()

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

min_max_scaler = MinMaxScaler(inputCol="features", outputCol="features_norm")
min_max_scaler_model = min_max_scaler.fit(features_vectorized)
normalized_data = min_max_scaler_model.transform(features_vectorized)
normalized_data.show()



In [None]:
df_train = normalized_data.drop("source").drop("class").drop("classIndex").drop("features").drop("x").drop("y").drop("z")

In [None]:
df_train.show()

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, min_max_scaler_model])
model = pipeline.fit(df)
prediction = model.transform(df)

In [None]:
prediction.show()