In [1]:
# sc master running locally
sc.master

import warnings
warnings.filterwarnings("ignore")

# spark is from the previous example.
sc = spark.sparkContext

In [2]:
path = "/user1/Suicide_Detection.csv"
df = spark.read.csv(path)

                                                                                

In [3]:
# The inferred schema can be visualized using the printSchema() method
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [4]:
# Rename the columns
df = df.withColumnRenamed('_c0', 'ID')      # Rename _c0 to ID
df = df.withColumnRenamed('_c1', 'Text')    # Rename _c1 to Text
df = df.withColumnRenamed('_c2', 'Label')   # Rename _c2 to Label

# Show the DataFrame with renamed columns
df.show(5)

+----+--------------------+-----------+
|  ID|                Text|      Label|
+----+--------------------+-----------+
|null|                text|      class|
|   2|Ex Wife Threateni...|    suicide|
|   3|Am I weird I don'...|non-suicide|
|   4|"Finally 2020 is ...|non-suicide|
|   8|i need helpjust h...|    suicide|
+----+--------------------+-----------+
only showing top 5 rows



In [5]:
# Count Rows
row_count = df.count()
print("Total Rows:", row_count)

# Get Schema
df.printSchema()



Total Rows: 664905
root
 |-- ID: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- Label: string (nullable = true)



                                                                                

In [6]:
from pyspark.sql.functions import col

# "ID" column datatype changing
df = df.withColumn("ID", col("ID").cast("integer"))

In [7]:
# Remove rows with null values in any column
df = df.na.drop()

In [8]:
from pyspark.sql.functions import col

# Filter the DataFrame to include only 'suicide' and 'non-suicide' labels
df = df.filter((col('Label') == 'suicide') | (col('Label') == 'non-suicide'))

# Show the resulting DataFrame
df.show()

+---+--------------------+-----------+
| ID|                Text|      Label|
+---+--------------------+-----------+
|  2|Ex Wife Threateni...|    suicide|
|  3|Am I weird I don'...|non-suicide|
|  4|"Finally 2020 is ...|non-suicide|
|  8|i need helpjust h...|    suicide|
| 18|My life is over a...|    suicide|
| 19|I took the rest o...|    suicide|
| 21|Do you think gett...|    suicide|
| 23|Been arrested - f...|    suicide|
| 24|Fuck the verizon ...|non-suicide|
| 31|Me: I know I have...|non-suicide|
| 37|Guys I want frien...|non-suicide|
| 39|I’m trashLol I no...|    suicide|
| 41|What is the best ...|    suicide|
| 43|Today's fact is R...|non-suicide|
| 44|I feel like I am ...|    suicide|
| 45|Is it worth it?Is...|    suicide|
| 47|I triple nipple d...|non-suicide|
| 54|Hey, im gonna sle...|non-suicide|
| 57|I learnt a new sk...|non-suicide|
| 62|Why does no one u...|non-suicide|
+---+--------------------+-----------+
only showing top 20 rows



In [9]:
# Assuming 'df' is your DataFrame and 'Label' is the column of interest
label_counts = df.groupBy("Label").count().orderBy(col("count").desc())

# Show the label counts
label_counts.show()

                                                                                

+-----------+-----+
|      Label|count|
+-----------+-----+
|non-suicide|76506|
|    suicide|53612|
+-----------+-----+



In [10]:
df.show(2, truncate=False)

+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|ID |Text                                                                                                                                                                                                                                                                 

In [11]:
from pyspark.ml.feature import Tokenizer

# Create a Tokenizer object
tokenizer = Tokenizer(inputCol="Text", outputCol="tokens")

# Tokenize the "processed_text" column
df = tokenizer.transform(df)

# Show the result
df.show(truncate=False)

+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
# Show the "tokens" column
token_df = df.select("tokens")

token_df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
df.show(5, truncate=True)

+---+--------------------+-----------+--------------------+
| ID|                Text|      Label|              tokens|
+---+--------------------+-----------+--------------------+
|  2|Ex Wife Threateni...|    suicide|[ex, wife, threat...|
|  3|Am I weird I don'...|non-suicide|[am, i, weird, i,...|
|  4|"Finally 2020 is ...|non-suicide|["finally, 2020, ...|
|  8|i need helpjust h...|    suicide|[i, need, helpjus...|
| 18|My life is over a...|    suicide|[my, life, is, ov...|
+---+--------------------+-----------+--------------------+
only showing top 5 rows



In [14]:
from pyspark.sql.functions import explode, desc
from pyspark.sql import functions as F

# Explode the tokens into separate rows
counting_df = df.withColumn("word", explode(df["tokens"]))

# Group by words and count their occurrences
word_counts = counting_df.groupBy("word").count()

# Order the word counts in descending order
word_counts = word_counts.orderBy(desc("count"))

# Select the top 50 words
top_50_words = word_counts.limit(50)

In [15]:
# Show the top 50 words and their counts
top_50_words.show(50)



+-------+------+
|   word| count|
+-------+------+
|      i|476118|
|     to|277623|
|    and|236422|
|    the|167139|
|     my|165837|
|      a|164026|
|     of|106558|
|     it| 87695|
|     me| 85530|
|   just| 83531|
|     is| 79251|
|     in| 78412|
|    but| 77786|
|   that| 72752|
|   have| 72492|
|    for| 70323|
|     so| 66496|
|   this| 57312|
|   like| 54352|
|    i'm| 53014|
|     be| 51307|
|   want| 48788|
|   with| 48505|
|    you| 47600|
|     do| 45516|
|    was| 44319|
|     on| 43693|
|    not| 40132|
|    i’m| 39662|
|     if| 37987|
|   feel| 36059|
|   know| 35882|
|  about| 34641|
|     or| 34344|
|    all| 33957|
|   what| 33199|
|    get| 33182|
|  don't| 32684|
| filler| 31819|
|     no| 30422|
|     at| 29498|
|because| 27107|
|    out| 26843|
|     am| 26665|
|    how| 26211|
|   been| 25391|
|    can| 25217|
|   they| 25155|
|    are| 24378|
|       | 24034|
+-------+------+



                                                                                

In [16]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Text: string (nullable = true)
 |-- Label: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [17]:
from pyspark.sql.functions import size
from pyspark.sql import types as T

# Define the maximum sequence length you want
max_sequence_length = 50

# Define a function to pad sequences
def pad_sequence(tokens, length):
    return tokens + [""] * (length - len(tokens))

# Create a UDF to apply the padding function
pad_udf = F.udf(lambda tokens: pad_sequence(tokens, max_sequence_length), T.ArrayType(T.StringType()))

# Apply the UDF to pad the sequences
df = df.withColumn("padded_tokens", pad_udf(df["tokens"]))

# Select the relevant columns: ID, padded_tokens, and Label
final_df = df.select("ID", "padded_tokens", "Label")

# Show the resulting DataFrame
final_df.show(5, truncate=False)

[Stage 17:>                                                         (0 + 1) / 1]

+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [18]:
final_df.show(5, truncate=True)

+---+--------------------+-----------+
| ID|       padded_tokens|      Label|
+---+--------------------+-----------+
|  2|[ex, wife, threat...|    suicide|
|  3|[am, i, weird, i,...|non-suicide|
|  4|["finally, 2020, ...|non-suicide|
|  8|[i, need, helpjus...|    suicide|
| 18|[my, life, is, ov...|    suicide|
+---+--------------------+-----------+
only showing top 5 rows



In [19]:
from pyspark.ml.feature import StringIndexer

# Initialize the StringIndexer
label_indexer = StringIndexer(inputCol="Label", outputCol="LabelIndex")

# Fit the StringIndexer on your DataFrame
indexed_df = label_indexer.fit(df).transform(df)

# Show the resulting DataFrame
indexed_df.show(2, truncate=True)

                                                                                

+---+--------------------+-----------+--------------------+--------------------+----------+
| ID|                Text|      Label|              tokens|       padded_tokens|LabelIndex|
+---+--------------------+-----------+--------------------+--------------------+----------+
|  2|Ex Wife Threateni...|    suicide|[ex, wife, threat...|[ex, wife, threat...|       1.0|
|  3|Am I weird I don'...|non-suicide|[am, i, weird, i,...|[am, i, weird, i,...|       0.0|
+---+--------------------+-----------+--------------------+--------------------+----------+
only showing top 2 rows



In [20]:
from pyspark.ml.feature import Word2Vec

# Define Word2Vec parameters
word2vec = Word2Vec(
    vectorSize=100,       # Set the size of the word vectors
    seed=42,              # Set a seed for reproducibility
    inputCol="padded_tokens",  # Specify the input column with your tokenized and padded text
    outputCol="word_vectors"   # Specify the output column for word vectors
)

In [21]:
# Train the Word2Vec model
word2vec_model = word2vec.fit(indexed_df)

                                                                                

In [22]:
# Transform your DataFrame to include word vectors
df = word2vec_model.transform(indexed_df)

In [23]:
df.show(1, truncate=True)

+---+--------------------+-------+--------------------+--------------------+----------+--------------------+
| ID|                Text|  Label|              tokens|       padded_tokens|LabelIndex|        word_vectors|
+---+--------------------+-------+--------------------+--------------------+----------+--------------------+
|  2|Ex Wife Threateni...|suicide|[ex, wife, threat...|[ex, wife, threat...|       1.0|[0.10913591500423...|
+---+--------------------+-------+--------------------+--------------------+----------+--------------------+
only showing top 1 row



In [24]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline

# Initialize the StandardScaler
scaler = StandardScaler(inputCol="word_vectors", outputCol="scaled_word_vectors", withMean=True, withStd=True)

In [25]:
# Create a Pipeline to apply the scaler
pipeline = Pipeline(stages=[scaler])

In [26]:
# Fit and transform your DataFrame using the pipeline
scaled_df = pipeline.fit(df).transform(df)

                                                                                

In [27]:
scaled_df.show(5, truncate=True)

+---+--------------------+-----------+--------------------+--------------------+----------+--------------------+--------------------+
| ID|                Text|      Label|              tokens|       padded_tokens|LabelIndex|        word_vectors| scaled_word_vectors|
+---+--------------------+-----------+--------------------+--------------------+----------+--------------------+--------------------+
|  2|Ex Wife Threateni...|    suicide|[ex, wife, threat...|[ex, wife, threat...|       1.0|[0.10913591500423...|[1.26365459506834...|
|  3|Am I weird I don'...|non-suicide|[am, i, weird, i,...|[am, i, weird, i,...|       0.0|[-0.2058760570921...|[-0.3663998223869...|
|  4|"Finally 2020 is ...|non-suicide|["finally, 2020, ...|["finally, 2020, ...|       0.0|[-0.2428014789707...|[-0.5574733446064...|
|  8|i need helpjust h...|    suicide|[i, need, helpjus...|[i, need, helpjus...|       1.0|[-0.4018417432811...|[-1.3804397978084...|
| 18|My life is over a...|    suicide|[my, life, is, ov...|[my

In [28]:
scaled_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Text: string (nullable = true)
 |-- Label: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- padded_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- LabelIndex: double (nullable = false)
 |-- word_vectors: vector (nullable = true)
 |-- scaled_word_vectors: vector (nullable = true)



In [29]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["scaled_word_vectors"],  # Use the column with word vectors
    outputCol="features"  # Create a new column called "features"
)

# Transform the DataFrame to include the "features" column
df_with_features = assembler.transform(scaled_df)

In [30]:
train_ratio = 0.8
validation_ratio = 0.1
test_ratio = 0.1

train_data, validation_data, test_data = df_with_features.randomSplit([train_ratio, validation_ratio, test_ratio], seed=42)


In [31]:
import kerastuner as kt
import tensorflow as tf
from tensorflow.keras import layers

Using TensorFlow backend


2023-10-01 18:13:25.364291: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-10-01 18:13:28.749658: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-10-01 18:13:28.749732: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-10-01 18:13:28.773038: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-10-01 18:13:29.475863: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-10-01 18:13:29.480612: I tensorflow/core/platform/cpu_feature_guard.cc:182] This Tens

In [32]:
# Assuming you have a Spark DataFrame named df_with_features
X_train = scaled_df.select("scaled_word_vectors").collect()

                                                                                

In [35]:
import sparkflow as sf

In [None]:
import numpy as np
from pyspark.sql.functions import col

# Assuming 'scaled_word_vectors' is a column of DenseVector type
# You can use the `toArray` function to convert it to a NumPy array
# Assuming 'scaled_df' is your DataFrame
numpy_array = np.array(scaled_df.select('scaled_word_vectors').rdd.map(lambda x: x[0]).collect())

# Now, you can get the shape of the NumPy array
shape = numpy_array.shape

print(shape)

In [38]:
import tensorflow as tf
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assuming you have a Spark DataFrame named df_with_features

assembler = VectorAssembler(
    inputCols=["scaled_word_vectors"],
    outputCol="new_features"  # Choose a new name
)


# Step 2: Define the Keras RNN model
def create_rnn_model():
    input_layer = tf.keras.layers.Input(shape=(20, 300))  # Adjust the input shape accordingly
    rnn_layer = tf.keras.layers.SimpleRNN(units=64, activation='relu')(input_layer)
    output_layer = tf.keras.layers.Dense(units=2, activation='softmax')(rnn_layer)
    
    model = tf.keras.Model(inputs=input_layer, outputs=output_layer)
    
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    
    return model

# Step 3: Convert Keras model to TensorFlow SavedModel format
# keras_model = create_rnn_model()
# tf.saved_model.save(keras_model, '/path/to/saved_model')  # Replace with the desired path

# Step 4: Load the TensorFlow SavedModel
# inference_graph = tf.saved_model.load('/path/to/saved_model')  # Replace with the path you used above


# Step 5: Create a MultilayerPerceptronClassifier as your final classification layer
mlp_classifier = MultilayerPerceptronClassifier(
    featuresCol="scaled_word_vectors",
    labelCol="LabelIndex",
    layers=[200, 128, 64, 2],  # Adjust the layer configuration as needed
    blockSize=128,  # Set your block size
    seed=42
)

# Step 6: Create a pipeline
pipeline = Pipeline(stages=[assembler, mlp_classifier])

# Step 7: Define the parameter grid for hyperparameter tuning
paramGrid = (ParamGridBuilder()
    .addGrid(mlp_classifier.blockSize, [64, 128])  # Adjust blockSize values for tuning
    .addGrid(mlp_classifier.maxIter, [5, 10])  # Adjust the number of iterations for tuning
    .build())

# Step 8: Create a TrainValidationSplit with hyperparameter tuning
tvs = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=MulticlassClassificationEvaluator(
        labelCol="LabelIndex", predictionCol="prediction", metricName="accuracy"
    ),
    trainRatio=0.8
)

# Step 9: Fit the model and perform hyperparameter tuning
model = tvs.fit(df_with_features)

2023-10-01 18:30:51,283 WARN execution.CacheManager: Asked to cache already cached data.
2023-10-01 18:30:51,292 WARN execution.CacheManager: Asked to cache already cached data.
2023-10-01 18:31:07,681 WARN memory.MemoryStore: Not enough space to cache rdd_153_1 in memory! (computed 67.9 MiB so far)
2023-10-01 18:31:07,696 WARN storage.BlockManager: Persisting block rdd_153_1 to disk instead.
2023-10-01 18:31:14,600 WARN memory.MemoryStore: Not enough space to cache rdd_153_0 in memory! (computed 131.8 MiB so far)
2023-10-01 18:31:14,601 WARN storage.BlockManager: Persisting block rdd_153_0 to disk instead.
2023-10-01 18:31:14,915 WARN storage.BlockManager: Putting block rdd_170_3 failed due to exception java.lang.ArrayIndexOutOfBoundsException.
2023-10-01 18:31:14,916 WARN storage.BlockManager: Block rdd_170_3 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:15,311 WARN memory.MemoryStore: Not enough space to cache rdd_153_2 in memory! (computed 131.8 MiB

2023-10-01 18:31:15,731 WARN storage.BlockManager: Putting block rdd_153_1 failed due to exception org.apache.spark.TaskKilledException.
2023-10-01 18:31:15,732 WARN storage.BlockManager: Block rdd_153_1 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:15,735 WARN storage.BlockManager: Putting block rdd_170_1 failed due to exception org.apache.spark.TaskKilledException.
2023-10-01 18:31:15,735 WARN storage.BlockManager: Block rdd_170_1 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:15,776 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 34.0 (TID 60) (10.0.2.15 executor driver): TaskKilled (Stage cancelled)
[Stage 34:>                                                         (0 + 2) / 4]

Py4JJavaError: An error occurred while calling o1407.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 34.0 failed 1 times, most recent failure: Lost task 3.0 in stage 34.0 (TID 62) (10.0.2.15 executor driver): java.lang.ArrayIndexOutOfBoundsException
	at java.lang.System.arraycopy(Native Method)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$4(Layer.scala:665)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$4$adapted(Layer.scala:664)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$3(Layer.scala:664)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1496)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1487)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1310)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2224)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2245)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2264)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2289)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1253)
	at org.apache.spark.mllib.optimization.LBFGS$.runLBFGS(LBFGS.scala:195)
	at org.apache.spark.mllib.optimization.LBFGS.optimizeWithLossReturned(LBFGS.scala:154)
	at org.apache.spark.ml.ann.FeedForwardTrainer.train(Layer.scala:855)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.$anonfun$train$1(MultilayerPerceptronClassifier.scala:228)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.train(MultilayerPerceptronClassifier.scala:184)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.train(MultilayerPerceptronClassifier.scala:93)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArrayIndexOutOfBoundsException
	at java.lang.System.arraycopy(Native Method)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$4(Layer.scala:665)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$4$adapted(Layer.scala:664)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$3(Layer.scala:664)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1496)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1423)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1487)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1310)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


2023-10-01 18:31:16,596 WARN storage.BlockManager: Putting block rdd_153_0 failed due to exception org.apache.spark.TaskKilledException.
2023-10-01 18:31:16,607 WARN storage.BlockManager: Block rdd_153_0 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:16,609 WARN storage.BlockManager: Putting block rdd_170_0 failed due to exception org.apache.spark.TaskKilledException.
2023-10-01 18:31:16,610 WARN storage.BlockManager: Block rdd_170_0 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:16,634 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 34.0 (TID 59) (10.0.2.15 executor driver): TaskKilled (Stage cancelled)
2023-10-01 18:31:16,952 WARN storage.BlockManager: Putting block rdd_187_3 failed due to exception java.lang.ArrayIndexOutOfBoundsException.
2023-10-01 18:31:16,954 WARN storage.BlockManager: Block rdd_187_3 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:16,955 ERROR executor.Executor:

2023-10-01 18:31:17,261 WARN storage.BlockManager: Putting block rdd_153_2 failed due to exception org.apache.spark.TaskKilledException.
2023-10-01 18:31:17,262 WARN storage.BlockManager: Block rdd_153_2 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:17,263 WARN storage.BlockManager: Putting block rdd_170_2 failed due to exception org.apache.spark.TaskKilledException.
2023-10-01 18:31:17,263 WARN storage.BlockManager: Block rdd_170_2 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:17,290 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 34.0 (TID 61) (10.0.2.15 executor driver): TaskKilled (Stage cancelled)
2023-10-01 18:31:17,920 WARN storage.BlockManager: Putting block rdd_204_3 failed due to exception java.lang.ArrayIndexOutOfBoundsException.
2023-10-01 18:31:17,923 WARN storage.BlockManager: Block rdd_204_3 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:17,932 ERROR executor.Executor:

2023-10-01 18:31:18,365 WARN storage.BlockManager: Putting block rdd_221_3 failed due to exception java.lang.ArrayIndexOutOfBoundsException.
2023-10-01 18:31:18,365 WARN storage.BlockManager: Block rdd_221_3 could not be removed as it was not found on disk or in memory
2023-10-01 18:31:18,367 ERROR executor.Executor: Exception in task 3.0 in stage 37.0 (TID 65)
java.lang.ArrayIndexOutOfBoundsException
	at java.lang.System.arraycopy(Native Method)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$4(Layer.scala:665)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$4$adapted(Layer.scala:664)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.ml.ann.DataStacker.$anonfun$stack$3(Layer.scala:664)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
	at org.apache.spark.storage.memory.MemoryStore.

In [None]:
def build_model(hp):
    model = keras.Sequential()
    
    # Tune the number of units in the first RNN layer
    hp_units = hp.Int('units', min_value=32, max_value=128, step=32)
    
    # Input shape for text classification (input_dim is the length of word vectors)
    input_shape = (200, 1)  # Replace input_dim with the actual length of your word vectors
    
    model.add(layers.SimpleRNN(units=hp_units, activation='relu', input_shape=input_shape))
    
    # Output layer
    model.add(layers.Dense(2, activation='softmax'))
    
    # Compile the model
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    
    return model

In [None]:
tuner = kt.Hyperband(build_model,
                     objective='val_accuracy',  # You can choose a different metric
                     max_epochs=10,
                     factor=2,
                     directory='keras_tuner_dir',
                     project_name='simple_rnn')

In [None]:
# Define callbacks, e.g., early stopping
callbacks = [keras.callbacks.EarlyStopping(patience=3)]

# Perform the hyperparameter search
tuner.search(train_data, validation_data=validation_data, epochs=10, callbacks=callbacks)

# Get the best hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]

In [None]:
word_vectors = df_with_features.select("scaled_word_vectors").collect()
predictions = []

for row in word_vectors:
    vector = tf.convert_to_tensor(row.scaled_word_vectors.toArray(), dtype=tf.float32)
    vector = tf.reshape(vector, (1, -1, 1))
    prediction = model.predict(vector)
    predictions.append(float(tf.argmax(prediction, axis=1)[0]))

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["scaled_word_vectors"],
    outputCol="features"
)

df_with_features = assembler.transform(scaled_df)

In [None]:
# Define the RNN model using TensorFlow
def create_rnn_model():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.SimpleRNN(units=best_units, activation='relu', input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(tf.keras.layers.Dense(2, activation='softmax'))
    optimizer = tf.keras.optimizers.Adam(learning_rate=best_learning_rate)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
def predict_udf(rows):
    model = create_rnn_model()
    predictions = []
    for row in rows:
        # Assuming you have a "scaled_word_vectors" column containing the word vectors
        word_vectors = row.scaled_word_vectors
        word_vectors = tf.reshape(word_vectors, (1, -1, 1))
        prediction = model.predict(word_vectors)
        predictions.append(float(tf.argmax(prediction, axis=1)[0]))
    return predictions

In [None]:
# Register the UDF
predictor_udf = F.udf(predict_udf, FloatType())

# Apply the UDF to make predictions
predictions_df = df_with_features.withColumn("predictions", predictor_udf(F.struct([df_with_features[x] for x in df_with_features.columns])))

In [None]:
import tensorflow as tf
import tensorflowonspark as tfos
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler

# Create a SparkSession
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

# Define the best hyperparameters
best_units = 64  # Replace with the actual best value found
best_learning_rate = 0.0001  # Replace with the actual best value found

# Define the RNN model using TensorFlow
def create_rnn_model():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.SimpleRNN(units=best_units, activation='relu', input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(tf.keras.layers.Dense(2, activation='softmax'))
    optimizer = tf.keras.optimizers.Adam(learning_rate=best_learning_rate)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Define a UDF to apply the RNN model to Spark DataFrame
def predict_udf(rows):
    model = create_rnn_model()
    predictions = []
    for row in rows:
        # Assuming you have a "scaled_word_vectors" column containing the word vectors
        word_vectors = row.scaled_word_vectors
        word_vectors = tf.reshape(word_vectors, (1, -1, 1))
        prediction = model.predict(word_vectors)
        predictions.append(float(tf.argmax(prediction, axis=1)[0]))
    return predictions

# Register the UDF
predictor_udf = F.udf(predict_udf, FloatType())

# Apply the UDF to make predictions
predictions_df = df_with_features.withColumn("predictions", predictor_udf(F.struct([df_with_features[x] for x in df_with_features.columns])))

# Show the predictions
predictions_df.show()

# Stop the SparkSession
spark.stop()

In [None]:
test_loss, test_accuracy = final_model.evaluate(test_data)
print(f"Test Accuracy: {test_accuracy:.4f}")


In [None]:
xdf.show(2, truncate=Fals)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

nltk.download('wordnet')
nltk.download('stopwords')
nltk.download('punkt')
lists = stopwords.words('english')

In [None]:
def tokenize(text):
    return word_tokenize(text)

def remove_stopwords(tokens):
    stop_words = set(stopwords.words('english'))
    return [word for word in tokens if word.lower() not in stop_words]

def lemmatize(tokens):
    lemmatizer = WordNetLemmatizer()
    return [lemmatizer.lemmatize(word) for word in tokens]

In [None]:
# UDFs with Spark
tokenize_udf = udf(tokenize, StringType())
remove_stopwords_udf = udf(remove_stopwords, StringType())
lemmatize_udf = udf(lemmatize, StringType())

In [None]:
# Tokenize, remove stopwords, and lemmatize the "text" column
df = df.withColumn("tokens", tokenize_udf(df["text"]))
df = df.withColumn("filtered_tokens", remove_stopwords_udf(df["tokens"]))
df = df.withColumn("processed_text", lemmatize_udf(df["filtered_tokens"]))

In [None]:
df.select("processed_text").show(1, truncate=False)

First row original Text:

Ex Wife Threatening SuicideRecently I left my wife for good because she has cheated on me twice and lied to me so much that I have decided to refuse to go back to her. As of a few days ago, she began threatening suicide. I have tirelessly spent these paat few days talking her out of it and she keeps hesitating because she wants to believe I'll come back. I know a lot of people will threaten this in order to get their way, but what happens if she really does? What do I do and how am I supposed to handle her death on my hands? I still love my wife but I cannot deal with getting cheated on again and constantly feeling insecure. I'm worried today may be the day she does it and I hope so much it doesn't happen.

First row after Tokenization, Lemmatization and Removing Stopwords:

Ex, Wife, Threatening, SuicideRecently, left, wife, good, cheated, twice, lied, much, decided, refuse, go, back, ., day, ago, ,, began, threatening, suicide, ., tirelessly, spent, paat, day, talking, keep, hesitating, want, believe, 'll, come, back, ., know, lot, people, threaten, order, get, way, ,, happens, really, ?, supposed, handle, death, hand, ?, still, love, wife, deal, getting, cheated, constantly, feeling, insecure, ., 'm, worried, today, may, day, hope, much, n't, happen, .

In [None]:
df.show(5)

In [None]:
# List of columns to remove
columns_to_remove = ["Text", "tokens", "filtered_tokens", "lemmatized_tokens"]

# Remove the specified columns
df = df.drop(*columns_to_remove)

df.show(2)

In [None]:
from pyspark.sql.functions import col

# Reorder the columns
df = df.select("ID", "processed_text", "Label")

df.show(5)

In [None]:
from pyspark.sql.functions import lower, regexp_replace
from functools import reduce

# List of preprocessing functions
preprocessing_functions = [
    lambda text: lower(text),                                     # Convert text to lowercase
    lambda text: regexp_replace(text, r'\S+@\S+', ''),           # Remove email addresses
    lambda text: regexp_replace(text, r'<.*?>', ''),             # Remove HTML tags
    lambda text: regexp_replace(text, r'[^a-zA-Z0-9\s,.]', ' '), # Remove special characters except , and .
    lambda text: regexp_replace(text, r'[^\x00-\x7F]+', ''),     # Remove accented characters
    lambda text: regexp_replace(text, r'\s+', ' ')              # Remove extra spaces
]

# Apply preprocessing functions to the "processed_text" column
for func in preprocessing_functions:
    df = df.withColumn("processed_text", func(df["processed_text"]))

In [None]:
# Show the updated DataFrame
df.show(20, truncate=True)

In [None]:
from pyspark.sql.functions import col

# Filter the labels
df = df.filter(col('Label') == 'suicide')

# Show the resulting DataFrame
df.show(5)

In [None]:
from pyspark.ml.feature import Tokenizer

# Create a Tokenizer instance
tokenizer = Tokenizer(inputCol="processed_text", outputCol="tokens")

# Tokenize the "processed_text" column
tokenized_df = tokenizer.transform(df)

# Show the resulting DataFrame
tokenized_df.select("Label", "tokens").show(truncate=True)

In [None]:
from pyspark.sql.functions import split, explode, col

# Split the processed_text column into individual words
df = df.withColumn("words", split(col("processed_text"), " "))

# Explode the words column to create a row for each word
df = df.withColumn("word", explode(col("words")))

# Group by words and count their occurrences
word_counts = df.groupBy("word").count()

# Order the word counts in descending order
word_counts = word_counts.orderBy(col("count").desc())

# Show the top 20 words and their counts
word_counts.show(20)

In [None]:
X = df['processed_text']
y = df['Label']

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, Word2Vec, CountVectorizer
from pyspark.ml import Pipeline

In [None]:
# Step 1: Tokenize the text data
tokenizer = Tokenizer(inputCol="processed_text", outputCol="tokens")

In [None]:
# Step 2: Create a vocabulary from the tokens
count_vectorizer = CountVectorizer(inputCol="tokens", outputCol="features")

In [None]:
# Define a pipeline to execute these steps
pipeline = Pipeline(stages=[tokenizer, count_vectorizer])

In [None]:
# Fit and transform the data using the pipeline
model = pipeline.fit(df)
transformed_df = model.transform(df)

In [None]:
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="words", outputCol="word_vectors")
model = word2Vec.fit(wordsData)
result = model.transform(wordsData)

In [None]:
# Assuming you have a label column named "label" and features column named "word_vectors"
labeledData = result.select("ID", "word_vectors","Label")

In [None]:
labeledData.show(5, truncate=True)

In [None]:
word_vectors = labeledData.select("word_vectors").rdd.map(lambda x: x[0]).collect()

In [None]:
num_words = len(word_vectors[0])

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import accuracy_score

In [None]:
labeledData.show(2, truncate=True)

In [None]:
X = labeledData.select("word_vectors")

In [None]:
# Define the split ratios (80% for training, 10% for testing, 10% for validation)
train_ratio = 0.8
test_ratio = 0.1
validation_ratio = 0.1

# Split the data into training, testing, and validation
train_data, test_data, validation_data = df.randomSplit([train_ratio, test_ratio, validation_ratio], seed=42)

In [None]:
#  The number of rows in each split
print("Train Data Count: ", train_data.count())
print("Test Data Count: ", test_data.count())
print("Validation Data Count: ", validation_data.count())

In [None]:
from tensorflow.keras.layers import Embedding
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dropout, SimpleRNN, Dense


In [None]:
# Assuming num_words is the number of unique words in your vocabulary
num_columns = len(X.columns)

EMBEDDING_DIM = 100

model.add(Embedding(input_dim=num_words,
                    output_dim=EMBEDDING_DIM,
                    input_length=num_columns,
                    weights=[gensim_weight_matrix],
                    trainable=False))
model.add(SimpleRNN(100, return_sequences=True, input_shape=(None, num_words)))  # Input shape adjusted
model.add(Dropout(0.2))
model.add(SimpleRNN(200, return_sequences=True))
model.add(Dropout(0.4))
model.add(SimpleRNN(100, return_sequences=False))
model.add(Dense(2, activation='softmax'))  # Output dimension adjusted for binary classification

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

#EarlyStopping and ModelCheckpoint
from keras.callbacks import EarlyStopping, ModelCheckpoint
es = EarlyStopping(monitor = 'val_loss', mode = 'min', verbose = 1, patience = 5)
mc = ModelCheckpoint('./model.h5', monitor = 'val_accuracy', mode = 'max', verbose = 1, save_best_only = True)

In [None]:
history_embedding = model.fit(train_data, train_labels, 
                                epochs = 25, batch_size = 128, 
                                validation_data=(validation_data, validation_labels),
                                verbose = 1, callbacks= [es, mc]  )