<a href="https://colab.research.google.com/github/nestorpereiralinares/PySpark-ML/blob/main/Spark_Neural_Network_Mlib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
# java and enviroment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

**Train neural network. MultilayerPerceptronClassifier**

https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html

**Set up spark context and SparkSession**

In [3]:
# Set up spark context and SparkSession

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Feedforward neural network example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark

In [8]:
# read data
# We can set header='true' and inferSchema='true' to infer the schema while reading the data


!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv -P sample_data/

filepath = "sample_data/winequality-red.csv"
df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter=';').load(filepath)
df.show(5, truncate=False)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|pH  |sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4          |0.7             |0.0        |1.9           |0.076    |11.0               |34.0                |0.9978 |3.51|0.56     |9.4    |5      |
|7.8          |0.88            |0.0        |2.6           |0.098    |25.0               |67.0                |0.9968 |3.2 |0.68     |9.8    |5      |
|7.8          |0.76            |0.04       |2.3           |0.092    |15.0               |54.0                |0.997  |3.26|0.65     |9.8    |5      |
|11.2         |0.28            |0.56       |1.9           |0.075    |17.0               |60.0       

In [9]:
# functions to change categorical variable size

# Convert to float format
def string_to_float(x):
    return float(x)

#
def condition(r):
    if (0<= r <= 4):
        label = "low"
    elif(4< r <= 6):
        label = "medium"
    else:
        label = "high"
    return label

Using functions

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())
df= df.withColumn("quality", quality_udf("quality"))

In [11]:
df

DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: string]

Convert the data to **dense vector**

In [12]:


# convert the data to dense vector
def transData(data):
    return data.rdd.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).\
           toDF(['label','features'])

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

data= transData(df)
data.show()

+------+--------------------+
| label|            features|
+------+--------------------+
|medium|[7.4,0.7,0.0,1.9,...|
|medium|[7.8,0.88,0.0,2.6...|
|medium|[7.8,0.76,0.04,2....|
|medium|[11.2,0.28,0.56,1...|
|medium|[7.4,0.7,0.0,1.9,...|
|medium|[7.4,0.66,0.0,1.8...|
|medium|[7.9,0.6,0.06,1.6...|
|  high|[7.3,0.65,0.0,1.2...|
|  high|[7.8,0.58,0.02,2....|
|medium|[7.5,0.5,0.36,6.1...|
|medium|[6.7,0.58,0.08,1....|
|medium|[7.5,0.5,0.36,6.1...|
|medium|[5.6,0.615,0.0,1....|
|medium|[7.8,0.61,0.29,1....|
|medium|[8.9,0.62,0.18,3....|
|medium|[8.9,0.62,0.19,3....|
|  high|[8.5,0.28,0.56,1....|
|medium|[8.1,0.56,0.28,1....|
|   low|[7.4,0.59,0.08,4....|
|medium|[7.9,0.32,0.51,1....|
+------+--------------------+
only showing top 20 rows



Split the data into training and test sets (40% held out for testing)

In [13]:
# Split the data into train and test
(trainingData, testData) = data.randomSplit([0.6, 0.4])

# Split the data into train and test

**Train neural network**.  MultilayerPerceptronClassifier

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.MultilayerPerceptronClassifier.html

https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html

In [16]:

from pyspark.ml.linalg import Vectors
# specify layers for the neural network:
# input layer of size 11 (features), two intermediate of size 5 and 4
# and output of size 7 (classes)

layers = [11, 5, 4, 4, 3 , 7]

# create the trainer and set its parameters

# mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)

FNN = MultilayerPerceptronClassifier(labelCol="indexedLabel", \
                                     featuresCol="indexedFeatures",\
                                     maxIter=100, layers=layers, \
                                     blockSize=128, seed=1234)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and forest in a Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, FNN, labelConverter])
# train the model
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

NameError: ignored

In [None]:
# Make predictions

# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

In [None]:
#Evaluation

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))