### Text preparation Pyspark

## Importing libraries

For this text preparation process we are going to use the **PySpark** library 

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, rand
import sparknlp
from sparknlp.annotator import Stemmer, LemmatizerModel
from sparknlp.base import DocumentAssembler, Pipeline

In [2]:
spark=SparkSession.builder.appName('nlp').getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/09/14 19:43:59 WARN Utils: Your hostname, DESKTOP-KCSPFSJ resolves to a loopback address: 127.0.1.1; using 172.24.244.59 instead (on interface eth0)
23/09/14 19:43:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/14 19:44:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Reading Twitter Data

In [3]:
path_in = "twitterClimateData.csv"
df = spark.read.csv(path_in,inferSchema=True,header=True,sep=';')
df = df.select(["text","hashtags"])
df.show()

                                                                                

+--------------------+--------------------+
|                text|            hashtags|
+--------------------+--------------------+
|2020 is the year ...|#votethemout #cli...|
|Winter has not st...|#climatefriday #c...|
|WEEK 55 of #Clima...|      #ClimateStrike|
|A year of resista...|#greta #gretathun...|
| HAPPY HOLIDAYS #...|#greta #gretathun...|
|10 Questions to A...|#climatechange #n...|
|#climatestrike #F...|#climatestrike #F...|
|#ClimateChangeIsR...|#ClimateChangeIsR...|
|My oldest daughte...|#climatestrike #l...|
|Our toddler #POTU...|#POTUS #Time #Gre...|
|"""The change is ...|#ClimateChange #c...|
|Moments after #Im...|#ImpeachmentVote ...|
|#climatestrike #C...|#climatestrike #C...|
|Keep up the great...|#ClimateChangeIsR...|
|Congratulations @...|#climatestrike #F...|
|Even though I hop...|#HongKongProteste...|
|*gretathunberg Is...|#PersonoftheYear ...|
| Congratulations ...|#vegan #climatest...|
|I get my energy a...|#ClimateStrike #F...|
| THE CHAMBER OF C...|#greta #gr

In [4]:
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)



In [5]:
df.count()

72405

## Text preparation process

The goal of this process is to reduce the number of tokens but without eliminating the intepretability of the words, in order to create the best bag of words possible. We are going to split this process for each column of the DataFrame, first for `text` column and then for `hashtags` column.

### Text preparation process for `Text` Column

### 1) Tokenization

In [6]:
tokenization=Tokenizer(inputCol='text',outputCol='text_tokens')

In [7]:
df_tokens=tokenization.transform(df)

In [8]:
df_tokens.show()

+--------------------+--------------------+--------------------+
|                text|            hashtags|         text_tokens|
+--------------------+--------------------+--------------------+
|2020 is the year ...|#votethemout #cli...|[2020, is, the, y...|
|Winter has not st...|#climatefriday #c...|[winter, has, not...|
|WEEK 55 of #Clima...|      #ClimateStrike|[week, 55, of, #c...|
|A year of resista...|#greta #gretathun...|[a, year, of, res...|
| HAPPY HOLIDAYS #...|#greta #gretathun...|[, happy, holiday...|
|10 Questions to A...|#climatechange #n...|[10, questions, t...|
|#climatestrike #F...|#climatestrike #F...|[#climatestrike, ...|
|#ClimateChangeIsR...|#ClimateChangeIsR...|[#climatechangeis...|
|My oldest daughte...|#climatestrike #l...|[my, oldest, daug...|
|Our toddler #POTU...|#POTUS #Time #Gre...|[our, toddler, #p...|
|"""The change is ...|#ClimateChange #c...|["""the, change, ...|
|Moments after #Im...|#ImpeachmentVote ...|[moments, after, ...|
|#climatestrike #C...|#cl

In [9]:
stopword_removal=StopWordsRemover(inputCol='text_tokens',outputCol='refined_text_tokens')

In [10]:
refined_text_df=stopword_removal.transform(df_tokens)
refined_text_df.show()

+--------------------+--------------------+--------------------+--------------------+
|                text|            hashtags|         text_tokens| refined_text_tokens|
+--------------------+--------------------+--------------------+--------------------+
|2020 is the year ...|#votethemout #cli...|[2020, is, the, y...|[2020, year, #vot...|
|Winter has not st...|#climatefriday #c...|[winter, has, not...|[winter, stopped,...|
|WEEK 55 of #Clima...|      #ClimateStrike|[week, 55, of, #c...|[week, 55, #clima...|
|A year of resista...|#greta #gretathun...|[a, year, of, res...|[year, resistance...|
| HAPPY HOLIDAYS #...|#greta #gretathun...|[, happy, holiday...|[, happy, holiday...|
|10 Questions to A...|#climatechange #n...|[10, questions, t...|[10, questions, a...|
|#climatestrike #F...|#climatestrike #F...|[#climatestrike, ...|[#climatestrike, ...|
|#ClimateChangeIsR...|#ClimateChangeIsR...|[#climatechangeis...|[#climatechangeis...|
|My oldest daughte...|#climatestrike #l...|[my, oldest

In [11]:
len_udf = udf(lambda s: len(s), IntegerType()) 

refined_text_df = refined_text_df.withColumn("token_text_count", len_udf(col('refined_text_tokens')))

refined_text_df.orderBy(rand()).show(10)




+--------------------+--------------------+--------------------+--------------------+----------------+
|                text|            hashtags|         text_tokens| refined_text_tokens|token_text_count|
+--------------------+--------------------+--------------------+--------------------+----------------+
|"Leftover irony o...|      #GlobalWarming|["leftover, irony...|["leftover, irony...|              29|
|"""#ClimateChange...|#ClimateChange #w...|["""#climatechang...|["""#climatechang...|              15|
|Despite being a l...|#BigGreen #canoei...|[despite, being, ...|[despite, longime...|              23|
|@Chase @Citibank ...|#fossilfuelfree #...|[@chase, @citiban...|[@chase, @citiban...|              22|
|@SenMikeLee is my...|#Hero #GreenNewDe...|[@senmikelee, is,...|[@senmikelee, #he...|              16|
|Discipuli left ea...|#climatestrikenyc...|[discipuli, left,...|[discipuli, left,...|              21|
|'You can't fight ...|#Trump #wildfires...|['you, can't, fig...|['you, fi

                                                                                

### Spark NLP Stemming and Lemmatizing

In [12]:
spark = sparknlp.start()



23/09/14 19:44:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [13]:
#Stemming

stemmer = Stemmer().setInputCols(["refined_text_tokens"]).setOutputCol("stemming_text_tokens")

TypeError: 'JavaPackage' object is not callable

In [14]:
from sparknlp.annotator import LemmatizerModel
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['refined_text_tokens']) \
     .setOutputCol('lemmatize_text_tokens')

lemma_antbnc download started this may take some time.


TypeError: 'JavaPackage' object is not callable

### Text preparation process for `Hashtags` Column

In [16]:
tokenization=Tokenizer(inputCol='hashtags',outputCol='hashtags_tokens')

In [17]:
df_hashtags=tokenization.transform(df)

In [18]:
df_hashtags.show()

+--------------------+--------------------+--------------------+
|                text|            hashtags|     hashtags_tokens|
+--------------------+--------------------+--------------------+
|2020 is the year ...|#votethemout #cli...|[#votethemout, #c...|
|Winter has not st...|#climatefriday #c...|[#climatefriday, ...|
|WEEK 55 of #Clima...|      #ClimateStrike|    [#climatestrike]|
|A year of resista...|#greta #gretathun...|[#greta, #gretath...|
| HAPPY HOLIDAYS #...|#greta #gretathun...|[#greta, #gretath...|
|10 Questions to A...|#climatechange #n...|[#climatechange, ...|
|#climatestrike #F...|#climatestrike #F...|[#climatestrike, ...|
|#ClimateChangeIsR...|#ClimateChangeIsR...|[#climatechangeis...|
|My oldest daughte...|#climatestrike #l...|[#climatestrike, ...|
|Our toddler #POTU...|#POTUS #Time #Gre...|[#potus, #time, #...|
|"""The change is ...|#ClimateChange #c...|[#climatechange, ...|
|Moments after #Im...|#ImpeachmentVote ...|[#impeachmentvote...|
|#climatestrike #C...|#cl

In [19]:
stopword_removal=StopWordsRemover(inputCol='hashtags_tokens',outputCol='refined_hashtags_tokens')

In [20]:
refined_hashtags_df=stopword_removal.transform(df_hashtags)
refined_hashtags_df.show()

+--------------------+--------------------+--------------------+-----------------------+
|                text|            hashtags|     hashtags_tokens|refined_hashtags_tokens|
+--------------------+--------------------+--------------------+-----------------------+
|2020 is the year ...|#votethemout #cli...|[#votethemout, #c...|   [#votethemout, #c...|
|Winter has not st...|#climatefriday #c...|[#climatefriday, ...|   [#climatefriday, ...|
|WEEK 55 of #Clima...|      #ClimateStrike|    [#climatestrike]|       [#climatestrike]|
|A year of resista...|#greta #gretathun...|[#greta, #gretath...|   [#greta, #gretath...|
| HAPPY HOLIDAYS #...|#greta #gretathun...|[#greta, #gretath...|   [#greta, #gretath...|
|10 Questions to A...|#climatechange #n...|[#climatechange, ...|   [#climatechange, ...|
|#climatestrike #F...|#climatestrike #F...|[#climatestrike, ...|   [#climatestrike, ...|
|#ClimateChangeIsR...|#ClimateChangeIsR...|[#climatechangeis...|   [#climatechangeis...|
|My oldest daughte...

In [21]:
len_udf = udf(lambda s: len(s), IntegerType()) 

refined_hashtags_df = refined_hashtags_df.withColumn("token_hashtags_count", len_udf(col('refined_hashtags_tokens')))

refined_hashtags_df.orderBy(rand()).show(10)

23/09/14 19:45:13 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/david/.local/share/virtualenvs/Trabajo2_almdatos-LZAGjvTA/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 810, in main
    eval_type = read_int(infile)
                ^^^^^^^^^^^^^^^^
  File "/home/david/.local/share/virtualenvs/Trabajo2_almdatos-LZAGjvTA/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.sc

Py4JJavaError: An error occurred while calling o148.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 11.0 failed 1 times, most recent failure: Lost task 6.0 in stage 11.0 (TID 40) (172.24.244.59 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (Tokenizer$$Lambda$3659/0x000000084148a040: (string) => array<string>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer.$anonfun$createTransformFunc$1(Tokenizer.scala:40)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2358)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1109)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1091)
	at org.apache.spark.rdd.RDD.$anonfun$takeOrdered$1(RDD.scala:1538)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1525)
	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:291)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4177)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3161)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3382)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:284)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:323)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (Tokenizer$$Lambda$3659/0x000000084148a040: (string) => array<string>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer.$anonfun$createTransformFunc$1(Tokenizer.scala:40)
	... 20 more


## References

1) for stemming and lemmatizing 
* https://www.johnsnowlabs.com/boost-your-nlp-results-with-spark-nlp-stemming-and-lemmatizing-techniques/