In [None]:
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

--2024-11-15 18:38:21--  http://setup.johnsnowlabs.com/colab.sh
Resolving setup.johnsnowlabs.com (setup.johnsnowlabs.com)... 3.86.22.73
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|3.86.22.73|:80... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh [following]
--2024-11-15 18:38:21--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.108.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1191 (1.2K) [text/plain]
Saving to: ‘STDOUT’


2024-11-15 18:38:21 (42.6 MB/s) - written to stdout [1191/1191]

Installing PySpark 3.2.3 and Spark NLP 5.5.1
setup Colab for PySpark 3.2.3 and Spark NLP 5.5.1
[2

In [None]:
!pip install pyspark



In [None]:
import sparknlp
from pyspark.sql import SparkSession
from sparknlp.base import DocumentAssembler, EmbeddingsFinisher
from sparknlp.annotator import BertEmbeddings, Tokenizer
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, StringType


In [None]:
!pip install findspark
import findspark
findspark.init() # Initialize Spark
# from pyspark.sql import SparkSession
# Start Spark session with Spark NLP
spark = sparknlp.start()




In [None]:

# Load the dataset
df1 = pd.read_csv("hdfs://MADHANS:8020/user/cluster/incident_data.csv")

# Define schema for PySpark DataFrame
schema = StructType([
    StructField("text", StringType(), True),
    StructField("hazard-category", StringType(), True),
])

# Convert pandas DataFrame to PySpark DataFrame
df = spark.createDataFrame(df1, schema)




In [None]:
print("Spark NLP version: {}".format(sparknlp.version()))
print("Apache Spark version: {}".format(spark.version))

Spark NLP version: 5.5.1
Apache Spark version: 3.2.3


In [None]:
df.show(1)

+--------------------+---------------+
|                text|hazard-category|
+--------------------+---------------+
|Case Number: 011-...|      allergens|
+--------------------+---------------+
only showing top 1 row



In [None]:

# Step 1: Create a Document Assembler (for text preprocessing)
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

# Step 2: Tokenizer to split the text into tokens
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Step 3: Define the BERT Embeddings Model
bert_embeddings = BertEmbeddings.pretrained("bert_base_uncased", "en") \
    .setInputCols(["document", "token"]) \
    .setOutputCol("embeddings")

# Step 4: Use EmbeddingsFinisher to aggregate the embeddings as a vector
embeddings_finisher = EmbeddingsFinisher() \
    .setInputCols("embeddings") \
    .setOutputCols("embeds") \
    .setOutputAsVector(True) \
    .setCleanAnnotations(True)


bert_base_uncased download started this may take some time.
Approximate size to download 392.5 MB
[OK!]


In [None]:

# Step 5: Average the embeddings (using the embeddings from all tokens in a document)

from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, FloatType
import numpy as np

# Define UDF to average the embeddings for each document
def average_embeddings(vectors):
    return np.mean(vectors, axis=0).tolist()  # Averaging across token embeddings

average_embeddings_udf = udf(average_embeddings, ArrayType(FloatType()))

# Step 6: Define the pipeline with the stages
pipeline = Pipeline(stages=[document_assembler, tokenizer, bert_embeddings, embeddings_finisher])

# Step 7: Apply the pipeline to your DataFrame
result_df = pipeline.fit(df).transform(df)

# Step 8: Average the embeddings for each document (optional: use CLS token for document representation)
result_df = result_df.withColumn("vectors", average_embeddings_udf(col("embeds")))
result_df = result_df.drop("embeds")


In [None]:
result_df.show(1)

+--------------------+---------------+--------------------+
|                text|hazard-category|             vectors|
+--------------------+---------------+--------------------+
|Case Number: 011-...|      allergens|[-0.41468012, -0....|
+--------------------+---------------+--------------------+
only showing top 1 row



In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [None]:

# Step 1: Index the label column (hazard-category)
indexer = StringIndexer(inputCol="hazard-category", outputCol="label")
indexed_df = indexer.fit(result_df).transform(result_df)
indexer_model = indexer.fit(result_df)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.sql.functions import col
from pyspark.mllib.evaluation import MulticlassMetrics # Import for confusion matrix
# Define a UDF to convert array<float> to DenseVector
def array_to_vector(array):
    return Vectors.dense(array)

array_to_vector_udf = udf(array_to_vector, VectorUDT())

# Apply the UDF to create a new column "features" as DenseVector
assembled_df = indexed_df.withColumn("features", array_to_vector_udf(col("vectors")))

# Scale features to the range [0, 1] using MinMaxScaler
# scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# scalerModel = scaler.fit(assembled_df)
# scaled_df = scalerModel.transform(assembled_df)
scaled_df = assembled_df

# Step 3: Split the data into training and test sets
train_df, test_df = scaled_df.randomSplit([0.8, 0.2], seed=42)

# Define an evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")


In [4]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics

# Define the Logistic Regression model
lr_model = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")

# Create a pipeline
lr_pipeline = Pipeline(stages=[lr_model])

# Train the model
lr_trained_model = lr_pipeline.fit(train_df)

# Predict on test data
lr_predictions = lr_trained_model.transform(test_df)

# Evaluate accuracy
lr_accuracy = evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy: {lr_accuracy:.4f}")

# Confusion Matrix
lr_predictionAndLabels = lr_predictions.select("prediction", "label").rdd.map(lambda x: (x.prediction, x.label))
lr_metrics = MulticlassMetrics(lr_predictionAndLabels)
print("Logistic Regression Confusion Matrix:")
print(lr_metrics.confusionMatrix().toArray())



Logistic Regression Accuracy: 0.6642
Logistic Regression Confusion Matrix:
[[  3. 152.   4.   8.   3.   1.   0.   0.   0.   0.]
 [189.   4.   1.   2.   0.   0.   0.   0.   0.   0.]
 [  3.   6.  62.  10.   0.   0.   0.   0.   0.   0.]
 [ 18.  21.  30.   0.  10.   0.   0.   0.   0.   0.]
 [  0.   5.   1.  65.   0.   0.   0.   0.   0.   0.]
 [  0.   0.   8.   2.  27.   0.   0.   0.   0.   0.]
 [  5.  13.  12.   0.  10.   0.   0.   0.   0.   0.]
 [  6.   5.   2.   0.   2.   0.   0.   0.   0.   0.]
 [  0.   0.   7.   0.   1.   8.   0.   0.   0.   0.]
 [  2.   0.   1.   0.   0.   2.   8.   0.   0.   0.]]


In [5]:
from pyspark.ml.classification import DecisionTreeClassifier

# Define the Decision Tree model
dt_model = DecisionTreeClassifier(featuresCol="features", labelCol="label")

# Create a pipeline
dt_pipeline = Pipeline(stages=[dt_model])

# Train the model
dt_trained_model = dt_pipeline.fit(train_df)

# Predict on test data
dt_predictions = dt_trained_model.transform(test_df)

# Evaluate accuracy
dt_accuracy = evaluator.evaluate(dt_predictions)
print(f"Decision Tree Accuracy: {dt_accuracy:.4f}")

# Confusion Matrix
dt_predictionAndLabels = dt_predictions.select("prediction", "label").rdd.map(lambda x: (x.prediction, x.label))
dt_metrics = MulticlassMetrics(dt_predictionAndLabels)
print("Decision Tree Confusion Matrix:")
print(dt_metrics.confusionMatrix().toArray())


Decision Tree Accuracy:0.5113
Decision Tree Confusion Matrix
[[132.  19.  52.   1.   0.   1.   0.   0.   0.   0.]
 [ 21.  95.  63.   1.   0.   2.   0.   0.   0.   0.]
 [ 14.   7.  66.   4.   0.   0.   0.   0.   0.   0.]
 [ 14.   7.  43.  15.   0.   0.   0.   0.   0.   0.]
 [  0.   0.   0.   0.  71.   0.   0.   0.   0.   0.]
 [ 10.   8.  27.   0.   0.   4.   0.   0.   0.   0.]
 [ 13.   4.  24.   4.   0.   0.   0.   0.   0.   0.]
 [  2.   2.  10.   0.   0.   0.   0.   0.   0.   0.]
 [  0.   0.   8.   0.   0.   0.   0.   0.   0.   0.]
 [  2.   0.   3.   0.   0.   0.   8.   0.   0.   0.]]


In [3]:
from pyspark.ml.classification import RandomForestClassifier

# Define the Random Forest model
rf_model = RandomForestClassifier(featuresCol="features", labelCol="label")

# Create a pipeline
rf_pipeline = Pipeline(stages=[rf_model])

# Train the model
rf_trained_model = rf_pipeline.fit(train_df)

# Predict on test data
rf_predictions = rf_trained_model.transform(test_df)

# Evaluate accuracy
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy: {rf_accuracy:.4f}")

# Confusion Matrix
rf_predictionAndLabels = rf_predictions.select("prediction", "label").rdd.map(lambda x: (x.prediction, x.label))
rf_metrics = MulticlassMetrics(rf_predictionAndLabels)
print("Random Forest Confusion Matrix:")
print(rf_metrics.confusionMatrix().toArray())


Random Forest Accuracy: 0.5634
Random Forest Confusion Matrix:
[[176.  14.   4.   2.   0.   0.   0.   0.   0.   0.]
 [ 23. 118.  14.   8.   5.   3.   0.   0.   0.   0.]
 [  3.   6.  62.  10.   0.   0.   0.   0.   0.   0.]
 [ 18.  21.  30.   0.  10.   0.   0.   0.   0.   0.]
 [  9.   5.   4.  52.   0.   0.   0.   0.   0.   0.]
 [  0.   0.   8.   2.  27.   0.   0.   0.   0.   0.]
 [  13.  2.  15.   0.  10.   0.   0.   0.   0.   0.]
 [  6.   5.   2.   0.   2.   0.   0.   0.   0.   0.]
 [  5.   3.   7.   0.   1.   0.   0.   0.   0.   0.]
 [  2.   3.   1.   4.   0.   2.   1.   0.   0.   0.]]


In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Define the Neural Network model
nn_model = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="label",
    layers=[768, 512, 256, 128, 64, len(indexer_model.labels)]
)

# Create a pipeline
nn_pipeline = Pipeline(stages=[nn_model])

# Train the model
nn_trained_model = nn_pipeline.fit(train_df)

# Predict on test data
nn_predictions = nn_trained_model.transform(test_df)

# Evaluate accuracy
nn_accuracy = evaluator.evaluate(nn_predictions)
print(f"Neural Network Accuracy: {nn_accuracy:.4f}")

# Confusion Matrix
nn_predictionAndLabels = nn_predictions.select("prediction", "label").rdd.map(lambda x: (x.prediction, x.label))
nn_metrics = MulticlassMetrics(nn_predictionAndLabels)
print("Neural Network Confusion Matrix:")
print(nn_metrics.confusionMatrix().toArray())


Neural Network Accuracy: 0.7250




Neural Network Confusion Matrix:
[[192.   4.   0.   7.   0.   2.   0.   0.   0.   0.]
 [  2. 159.   3.   7.   0.  11.   0.   0.   0.   0.]
 [  3.   6.  62.  10.   0.  10.   0.   0.   0.   0.]
 [ 16.   6.  21.  32.   0.   4.   0.   0.   0.   0.]
 [  0.   0.   0.   0.  71.   0.   0.   0.   0.   0.]
 [  0.  12.   8.   2.   0.  27.   0.   0.   0.   0.]
 [  5.   5.  13.  12.   0.  10.   0.   0.   0.   0.]
 [  2.   5.   3.   1.   0.   3.   0.   0.   0.   0.]
 [  0.   0.   7.   0.   0.   1.   0.   0.   0.   0.]
 [  1.   0.   2.   0.   0.   2.   0.   0.   0.   0.]]


In [None]:
spark.stop()