# import, cleaning and preping

In [1]:
from pyspark.sql import SparkSession
import sparknlp

# Initialize Spark session with all configurations
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/twitterData.tweets") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/twitterData.tweets") \
    .config("spark.jars", "/home/hduser/Desktop/mongo-spark-connector_2.12-10.2.0.jar") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4")\
    .getOrCreate()


In [2]:
# Define ONNX parameters
#onnx_params = {
#    "spark.jsl.settings.onnx.gpuDeviceId": "0",
#    "spark.jsl.settings.onnx.intraOpNumThreads": "5",
#    "spark.jsl.settings.onnx.optimizationLevel": "BASIC_OPT",
#    "spark.jsl.settings.onnx.executionMode": "SEQUENTIAL"

#    .config("spark.jsl.settings.onnx.gpuDeviceId", onnx_params["spark.jsl.settings.onnx.gpuDeviceId"]) \
#    .config("spark.jsl.settings.onnx.intraOpNumThreads", onnx_params["spark.jsl.settings.onnx.intraOpNumThreads"]) \
#    .config("spark.jsl.settings.onnx.optimizationLevel", onnx_params["spark.jsl.settings.onnx.optimizationLevel"]) \
#    .config("spark.jsl.settings.onnx.executionMode", onnx_params["spark.jsl.settings.onnx.executionMode"]) \

In [3]:
df = spark.read.format("mongodb") \
    .option("spark.mongodb.input.uri", "mongodb://localhost:27017/twitterData.tweets") \
    .load()

In [4]:
df = df.sample(False, 0.01)  

In [5]:
df.show()

[Stage 0:>                                                          (0 + 0) / 1][Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------+----------+--------+--------------------+---------------+
|                 _id|                date|    flag|       ids|sequence|                text|           user|
+--------------------+--------------------+--------+----------+--------+--------------------+---------------+
|65325dcdb4cad9b16...|Mon Apr 06 22:28:...|NO_QUERY|1467843734|     146|my nokia 1110 died..|      chatpataa|
|65325dcdb4cad9b16...|Mon Apr 06 22:36:...|NO_QUERY|1467873980|     246|missed Brent at p...|     gregcronin|
|65325dcdb4cad9b16...|Mon Apr 06 22:40:...|NO_QUERY|1467890212|     301|Mraow, I feel lik...|   MorganWillis|
|65325dcdb4cad9b16...|Mon Apr 06 22:57:...|NO_QUERY|1467951422|     561|sad that the 'fee...|        felloff|
|65325dcdb4cad9b16...|Mon Apr 06 23:00:...|NO_QUERY|1467962336|     592|my heart is broke...|          umfoo|
|65325dcdb4cad9b16...|Mon Apr 06 23:01:...|NO_QUERY|1467962671|     595|I can't take this...|  Bi0hazard2886|
|65325dcdb

                                                                                

In [6]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- ids: long (nullable = true)
 |-- sequence: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)



In [7]:
from pyspark.sql.functions import from_unixtime, unix_timestamp

# Set legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Set the default time zone for the session
spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")

input_format = "E MMM dd HH:mm:ss z yyyy"
standard_format = "yyyy-MM-dd HH:mm:ss"

# Since we've set the session time zone, the unix_timestamp function will treat the input as in PDT
df = df.withColumn("standardized_date", from_unixtime(unix_timestamp("date", input_format), standard_format))

df.show()


[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------+----------+--------+--------------------+---------------+-------------------+
|                 _id|                date|    flag|       ids|sequence|                text|           user|  standardized_date|
+--------------------+--------------------+--------+----------+--------+--------------------+---------------+-------------------+
|65325dcdb4cad9b16...|Mon Apr 06 22:28:...|NO_QUERY|1467843734|     146|my nokia 1110 died..|      chatpataa|2009-04-06 22:28:26|
|65325dcdb4cad9b16...|Mon Apr 06 22:36:...|NO_QUERY|1467873980|     246|missed Brent at p...|     gregcronin|2009-04-06 22:36:19|
|65325dcdb4cad9b16...|Mon Apr 06 22:40:...|NO_QUERY|1467890212|     301|Mraow, I feel lik...|   MorganWillis|2009-04-06 22:40:43|
|65325dcdb4cad9b16...|Mon Apr 06 22:57:...|NO_QUERY|1467951422|     561|sad that the 'fee...|        felloff|2009-04-06 22:57:57|
|65325dcdb4cad9b16...|Mon Apr 06 23:00:...|NO_QUERY|1467962336|     592|my heart is broke.

                                                                                

In [8]:
df.select("standardized_date").show(10, truncate=False)

[Stage 2:>                                                          (0 + 1) / 1]

+-------------------+
|standardized_date  |
+-------------------+
|2009-04-06 22:28:26|
|2009-04-06 22:36:19|
|2009-04-06 22:40:43|
|2009-04-06 22:57:57|
|2009-04-06 23:00:55|
|2009-04-06 23:01:01|
|2009-04-06 23:06:12|
|2009-04-06 23:08:33|
|2009-04-06 23:09:38|
|2009-04-06 23:17:06|
+-------------------+
only showing top 10 rows



                                                                                

In [9]:
import pyspark.sql.functions as F
df.agg(F.min("standardized_date"), F.max("standardized_date")).show()



+----------------------+----------------------+
|min(standardized_date)|max(standardized_date)|
+----------------------+----------------------+
|   2009-04-06 22:28:26|   2009-06-25 10:25:57|
+----------------------+----------------------+



                                                                                

- The operation to add a column with a different date format was succesfull based on the last lines of code.

- The sequence column also shows that the chronological order was affected during the importation of the dataset, thus the orderBy will be applied to correct the order.

In [10]:
from pyspark.sql.functions import col

df = df.orderBy(col("sequence"))
df.show()



+--------------------+--------------------+--------+----------+--------+--------------------+---------------+-------------------+
|                 _id|                date|    flag|       ids|sequence|                text|           user|  standardized_date|
+--------------------+--------------------+--------+----------+--------+--------------------+---------------+-------------------+
|65325dcdb4cad9b16...|Mon Apr 06 22:28:...|NO_QUERY|1467843734|     146|my nokia 1110 died..|      chatpataa|2009-04-06 22:28:26|
|65325dcdb4cad9b16...|Mon Apr 06 22:36:...|NO_QUERY|1467873980|     246|missed Brent at p...|     gregcronin|2009-04-06 22:36:19|
|65325dcdb4cad9b16...|Mon Apr 06 22:40:...|NO_QUERY|1467890212|     301|Mraow, I feel lik...|   MorganWillis|2009-04-06 22:40:43|
|65325dcdb4cad9b16...|Mon Apr 06 22:57:...|NO_QUERY|1467951422|     561|sad that the 'fee...|        felloff|2009-04-06 22:57:57|
|65325dcdb4cad9b16...|Mon Apr 06 23:00:...|NO_QUERY|1467962336|     592|my heart is broke.

                                                                                

In [11]:
df = df.drop("_id", "flag", "ids", "user", "date", "sequence")
df.show()



+--------------------+-------------------+
|                text|  standardized_date|
+--------------------+-------------------+
|my nokia 1110 died..|2009-04-06 22:28:26|
|missed Brent at p...|2009-04-06 22:36:19|
|Mraow, I feel lik...|2009-04-06 22:40:43|
|sad that the 'fee...|2009-04-06 22:57:57|
|my heart is broke...|2009-04-06 23:00:55|
|I can't take this...|2009-04-06 23:01:01|
|im sooo sad right...|2009-04-06 23:06:12|
|@stephenkruiser I...|2009-04-06 23:08:33|
|@Wyldceltic1 He h...|2009-04-06 23:09:38|
|Okay, so.. STILL ...|2009-04-06 23:17:06|
|I suddenly miss m...|2009-04-06 23:41:52|
|Ugh. still workin...|2009-04-06 23:44:55|
|really now, time ...|2009-04-07 00:00:49|
|Too much traffic ...|2009-04-07 00:07:50|
|morning all. So t...|2009-04-07 00:13:06|
|@geoffmartinez yo...|2009-04-07 00:20:22|
|Even a four day w...|2009-04-07 00:22:36|
|why do other peop...|2009-04-07 00:36:12|
|@joenoia wass up ...|2009-04-07 00:53:01|
|I am scheduled to...|2009-04-07 01:15:36|
+----------

                                                                                

In [12]:
from pyspark.sql.functions import isnull, count, when

# Counting missing values for each column
missing_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])

missing_counts.show()




+----+-----------------+
|text|standardized_date|
+----+-----------------+
|   0|                0|
+----+-----------------+



                                                                                

In [13]:
# Check for duplicate rows based on all columns
duplicate_count = df.count() - df.dropDuplicates().count()

if duplicate_count > 0:
    print(f"Number of duplicate rows: {duplicate_count}")
    # Remove duplicates and retain only the first occurrence
    #df = df.dropDuplicates()
    #print("Duplicates removed.")
#else:
    #print("No duplicates found.")


                                                                                

In [14]:
from pyspark.sql.functions import date_format, count

# Grouping by the day and counting
tweets_per_day = df.groupBy(date_format("standardized_date", "yyyy-MM-dd").alias("day")).agg(count("*").alias("number_of_tweets"))

tweets_per_day.orderBy("day").show()




+----------+----------------+
|       day|number_of_tweets|
+----------+----------------+
|2009-04-06|              32|
|2009-04-07|             182|
|2009-04-17|              26|
|2009-04-18|             217|
|2009-04-19|             274|
|2009-04-20|             183|
|2009-04-21|              99|
|2009-05-01|              55|
|2009-05-02|             275|
|2009-05-03|             368|
|2009-05-04|             161|
|2009-05-09|             121|
|2009-05-10|             230|
|2009-05-11|              53|
|2009-05-13|              37|
|2009-05-14|             184|
|2009-05-16|              78|
|2009-05-17|             378|
|2009-05-18|             355|
|2009-05-21|              22|
+----------+----------------+
only showing top 20 rows



                                                                                

In [15]:
from pyspark.sql.functions import length
# It checks entries with html tags or url.
# Add a column for text length
df = df.withColumn("text_length", length(df["text"]))

# Compute average, minimum, and maximum text lengths
avg_length = df.agg({"text_length": "avg"}).collect()[0][0]
min_length = df.agg({"text_length": "min"}).collect()[0][0]
max_length = df.agg({"text_length": "max"}).collect()[0][0]

print(f"Average text length: {avg_length}")
print(f"Minimum text length: {min_length}")
print(f"Maximum text length: {max_length}")




Average text length: 73.64880580217581
Minimum text length: 6
Maximum text length: 170


                                                                                

In [16]:
from pyspark.sql import functions as F

# Count entries with URLs
url_count = df.filter(F.col("text").rlike("http(s)?://([\\w-]+\\.)+[\\w-]+(/[\\w- ./?%&=]*)?")).count()

# Count entries with HTML tags
html_tags_count = df.filter(F.col("text").rlike("<[^>]+>")).count()

# Count entries with mentions (@username)
mentions_count = df.filter(F.col("text").rlike("@\\w+")).count()

print(f"Number of entries with URLs: {url_count}")
print(f"Number of entries with HTML tags: {html_tags_count}")
print(f"Number of entries with mentions: {mentions_count}")




Number of entries with URLs: 726
Number of entries with HTML tags: 0
Number of entries with mentions: 7518




In [17]:
from pyspark.sql import functions as F
# Remove URLs
df = df.withColumn("text", F.regexp_replace(F.col("text"), "http(s)?://([\\w-]+\\.)+[\\w-]+(/[\\w- ./?%&=]*)?", ""))

# Remove HTML tags
df = df.withColumn("text", F.regexp_replace(F.col("text"), "<[^>]+>", ""))

# Remove mentions (i.e., @username)
df = df.withColumn("text", F.regexp_replace(F.col("text"), "@\\w+", ""))

In [18]:
# Count entries with URLs
url_count = df.filter(F.col("text").rlike("http(s)?://([\\w-]+\\.)+[\\w-]+(/[\\w- ./?%&=]*)?")).count()

# Count entries with HTML tags
html_tags_count = df.filter(F.col("text").rlike("<[^>]+>")).count()

# Count entries with mentions (@username)
mentions_count = df.filter(F.col("text").rlike("@\\w+")).count()

print(f"Number of entries with URLs: {url_count}")
print(f"Number of entries with HTML tags: {html_tags_count}")
print(f"Number of entries with mentions: {mentions_count}")



Number of entries with URLs: 0
Number of entries with HTML tags: 0
Number of entries with mentions: 0




In [19]:
# df.filter(F.col("text").rlike("http(s)?://([\\w-]+\\.)+[\\w-]+(/[\\w- ./?%&=]*)?")).select("text").show(truncate=False)


In [20]:
df.select("text").show(20, truncate=False)



+--------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                            |
+--------------------------------------------------------------------------------------------------------------------------------+
|my nokia 1110 died..                                                                                                            |
|missed Brent at praise band.   No fun to not have the your lead guitarist.  &lt;pout&gt;                                        |
|Mraow, I feel like dancing, but first art school wants to rape me some more.                                                    |
|sad that the 'feet' of my macbook just fell off : sad that the 'feet' of my macbook just fell off                               |
|my heart is broken every morning dropping Foo at pre school, now i understand when

                                                                                

# HANDLE EMOJI STAGE, BUT I AM HAVING TROUBLES WITH CODING AND LIBRARY FOR IT. 

In [21]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import LanguageDetectorDL

documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
langDetector = LanguageDetectorDL.pretrained("ld_wiki_tatoeba_cnn_21", "xx").setInputCols(["document"]).setOutputCol("lang")


ld_wiki_tatoeba_cnn_21 download started this may take some time.
Approximate size to download 7.1 MB
[ / ]ld_wiki_tatoeba_cnn_21 download started this may take some time.
Approximate size to download 7.1 MB
[ | ]Download done! Loading the resource.
[ \ ]

2023-10-27 19:13:48.081528: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[OK!]


In [22]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[documentAssembler, langDetector])
model = pipeline.fit(df)
df = model.transform(df)

df.show()  # This will display the DataFrame with detected languages


                                                                                

+--------------------+-------------------+-----------+--------------------+--------------------+
|                text|  standardized_date|text_length|            document|                lang|
+--------------------+-------------------+-----------+--------------------+--------------------+
|my nokia 1110 died..|2009-04-06 22:28:26|         20|[{document, 0, 19...|[{language, 0, 19...|
|missed Brent at p...|2009-04-06 22:36:19|         88|[{document, 0, 87...|[{language, 0, 87...|
|Mraow, I feel lik...|2009-04-06 22:40:43|         77|[{document, 0, 76...|[{language, 0, 76...|
|sad that the 'fee...|2009-04-06 22:57:57|         97|[{document, 0, 96...|[{language, 0, 96...|
|my heart is broke...|2009-04-06 23:00:55|        128|[{document, 0, 12...|[{language, 0, 12...|
|I can't take this...|2009-04-06 23:01:01|         66|[{document, 0, 65...|[{language, 0, 65...|
|im sooo sad right...|2009-04-06 23:06:12|         35|[{document, 0, 34...|[{language, 0, 34...|
| I'm so sorry to ...|2009-04-

In [23]:
from pyspark.sql.functions import col

df = df.withColumn("detected_language", col("lang.result").getItem(0))

In [24]:
df.columns

['text',
 'standardized_date',
 'text_length',
 'document',
 'lang',
 'detected_language']

In [25]:
df = df.drop("document", "lang")
df.show()

                                                                                

+--------------------+-------------------+-----------+-----------------+
|                text|  standardized_date|text_length|detected_language|
+--------------------+-------------------+-----------+-----------------+
|my nokia 1110 died..|2009-04-06 22:28:26|         20|               en|
|missed Brent at p...|2009-04-06 22:36:19|         88|               en|
|Mraow, I feel lik...|2009-04-06 22:40:43|         77|               en|
|sad that the 'fee...|2009-04-06 22:57:57|         97|               en|
|my heart is broke...|2009-04-06 23:00:55|        128|               en|
|I can't take this...|2009-04-06 23:01:01|         66|               en|
|im sooo sad right...|2009-04-06 23:06:12|         35|               en|
| I'm so sorry to ...|2009-04-06 23:08:33|        109|               en|
| He has Karate to...|2009-04-06 23:09:38|         48|               en|
|Okay, so.. STILL ...|2009-04-06 23:17:06|         31|               en|
|I suddenly miss m...|2009-04-06 23:41:52|         

In [26]:
# Commented, it takes a long time to run, around half an hour.

# Group by detected_language and count the number of tweets for each language
#language_counts = df.groupBy("detected_language").agg(F.count("text").alias("number_of_tweets"))

# Sort the results by number_of_tweets in descending order for better readability
#language_counts = language_counts.sort(F.desc("number_of_tweets"))

# Show the results
#language_counts.show()


In [27]:
# Commented, it takes a long time to run, more than 3 hours.

# Get 'en' count from language_counts DataFrame
#en_count = language_counts.filter(F.col("detected_language") == "en").collect()[0]["number_of_tweets"]

# Calculate 'non-en' count
#non_en_count = language_counts.filter(F.col("detected_language") != "en").agg(F.sum("number_of_tweets")).collect()[0][0]

#print(f"Total number of 'en' tweets: {en_count}")
#print(f"Total number of 'non-en' tweets: {non_en_count}")

In [28]:
# Drop rows where detected_language is not 'en' from the original result DataFrame
df = df.filter(F.col("detected_language") == "en")

In [29]:
# Count the total number of observations in the filtered df_eng.
en_tweets = df.count()

print(f"Filtered count (English tweets only): {en_tweets}")



Filtered count (English tweets only): 15694


                                                                                

In [30]:
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation
df = df.withColumn('text', regexp_replace(df['text'], r"[^\w\s]", ""))

In [31]:
from pyspark.sql.functions import lower
# Convert to lowercase
df = df.withColumn('text', lower(df['text']))

In [32]:
df.select("text").show(20, truncate=False)



+--------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                      |
+--------------------------------------------------------------------------------------------------------------------------+
|my nokia 1110 died                                                                                                        |
|missed brent at praise band   no fun to not have the your lead guitarist  ltpoutgt                                        |
|mraow i feel like dancing but first art school wants to rape me some more                                                 |
|sad that the feet of my macbook just fell off  sad that the feet of my macbook just fell off                              |
|my heart is broken every morning dropping foo at pre school now i understand when moms say quothe has my heart brokenquot |


                                                                                

In [33]:
from pyspark.ml.feature import RegexTokenizer

# Initialize a regex tokenizer
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")

# Transform the dataset
tokens = regex_tokenizer.transform(df)
tokens.select("tokens").show(truncate=False)



+------------------------------------------------------------------------------------------------------------------------------------------------+
|tokens                                                                                                                                          |
+------------------------------------------------------------------------------------------------------------------------------------------------+
|[my, nokia, 1110, died]                                                                                                                         |
|[missed, brent, at, praise, band, no, fun, to, not, have, the, your, lead, guitarist, ltpoutgt]                                                 |
|[mraow, i, feel, like, dancing, but, first, art, school, wants, to, rape, me, some, more]                                                       |
|[sad, that, the, feet, of, my, macbook, just, fell, off, sad, that, the, feet, of, my, macbook, just, fell, off]     

                                                                                

In [34]:
from pyspark.ml.feature import StopWordsRemover

# Initialize a stopwords remover
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

# Transform the tokenized data
df = remover.transform(tokens)
df.select("filtered_tokens").show(truncate=False)



+-------------------------------------------------------------------------------------------------------------+
|filtered_tokens                                                                                              |
+-------------------------------------------------------------------------------------------------------------+
|[nokia, 1110, died]                                                                                          |
|[missed, brent, praise, band, fun, lead, guitarist, ltpoutgt]                                                |
|[mraow, feel, like, dancing, first, art, school, wants, rape]                                                |
|[sad, feet, macbook, fell, sad, feet, macbook, fell]                                                         |
|[heart, broken, every, morning, dropping, foo, pre, school, understand, moms, say, quothe, heart, brokenquot]|
|[cant, take, heat, like, oven, feel, sick, nwo]                                                        

                                                                                

In [35]:
df.show()

                                                                                

+--------------------+-------------------+-----------+-----------------+--------------------+--------------------+
|                text|  standardized_date|text_length|detected_language|              tokens|     filtered_tokens|
+--------------------+-------------------+-----------+-----------------+--------------------+--------------------+
|  my nokia 1110 died|2009-04-06 22:28:26|         20|               en|[my, nokia, 1110,...| [nokia, 1110, died]|
|missed brent at p...|2009-04-06 22:36:19|         88|               en|[missed, brent, a...|[missed, brent, p...|
|mraow i feel like...|2009-04-06 22:40:43|         77|               en|[mraow, i, feel, ...|[mraow, feel, lik...|
|sad that the feet...|2009-04-06 22:57:57|         97|               en|[sad, that, the, ...|[sad, feet, macbo...|
|my heart is broke...|2009-04-06 23:00:55|        128|               en|[my, heart, is, b...|[heart, broken, e...|
|i cant take this ...|2009-04-06 23:01:01|         66|               en|[i, cant

In [39]:
df.columns

['text',
 'standardized_date',
 'text_length',
 'detected_language',
 'tokens',
 'filtered_tokens',
 'lemmatized_tokens']

In [None]:
# Lemmatization does not run, serialization udf issue? 
# TypeError: code() argument 13 must be str, not into 
aa

In [38]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import nltk
from nltk.stem import WordNetLemmatizer

# Initialize the NLTK Lemmatizer
nltk.download("wordnet")
lemmatizer = WordNetLemmatizer()

# Define a function to perform lemmatization using NLTK
def lemmatize_tokens(tokens):
    return [lemmatizer.lemmatize(token) for token in tokens]

# Register the UDF
lemmatize_udf = udf(lemmatize_tokens, ArrayType(StringType()))

# Apply the UDF to the DataFrame
df = df.withColumn("lemmatized_tokens", lemmatize_udf(df["filtered_tokens"]))

# Show the resulting DataFrame
df.select("lemmatized_tokens").show()

[nltk_data] Downloading package wordnet to /home/hduser/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
2023-10-27 20:27:50,800 ERROR executor.Executor: Exception in task 0.0 in stage 66.0 (TID 226)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 603, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 449, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 251, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
                     ^^^^

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 603, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 449, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 251, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 71, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
    return pickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: code() argument 13 must be str, not int


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import spacy

# Define a function to perform lemmatization using spaCy
def lemmatize_tokens(tokens):
    nlp = spacy.load("en_core_web_sm")
    doc = nlp(" ".join(tokens))
    return [token.lemma_ for token in doc]

# Register the UDF
lemmatize_udf = udf(lemmatize_tokens, ArrayType(StringType()))

# Apply the UDF to the DataFrame
df = df.withColumn("lemmatized_tokens", lemmatize_udf(df["filtered_tokens"]))

# Show the resulting DataFrame
df.select("lemmatized_tokens").show()

In [40]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="filtered_tokens", outputCol="term_frequency")
model = cv.fit(df)
df = model.transform(df)


                                                                                

In [42]:
df.select("filtered_tokens", "term_frequency").show()



+--------------------+--------------------+
|     filtered_tokens|      term_frequency|
+--------------------+--------------------+
| [nokia, 1110, died]|(19911,[623,3422,...|
|[missed, brent, p...|(19911,[45,107,79...|
|[mraow, feel, lik...|(19911,[4,42,87,8...|
|[sad, feet, macbo...|(19911,[46,515,81...|
|[heart, broken, e...|(19911,[38,87,88,...|
|[cant, take, heat...|(19911,[4,8,42,77...|
|[im, sooo, sad, r...|(19911,[0,34,46,4...|
|[im, sorry, hear,...|(19911,[0,46,53,8...|
|[karate, tourname...|(19911,[278,466,4...|
|[okay, still, sch...|(19911,[22,87,248...|
|[suddenly, miss, ...|(19911,[35,511,26...|
|[ugh, still, work...|(19911,[22,80,234...|
|[really, time, sl...|(19911,[12,17,36,...|
|[much, traffic, a...|(19911,[8,30,69,1...|
|[morning, tired, ...|(19911,[6,38,67,1...|
|[youre, going, me...|(19911,[10,68,191...|
|[even, four, day,...|(19911,[2,23,67,7...|
|[people, get, rep...|(19911,[1,73,2852...|
|[wass, lovely, an...|(19911,[13,113,12...|
|[scheduled, produ...|(19911,[22

                                                                                

In [41]:
from pyspark.ml.feature import IDF

idf = IDF(inputCol="term_frequency", outputCol="tfidf")
idf_model = idf.fit(df)
df = idf_model.transform(df)


                                                                                

In [43]:
df.select("filtered_tokens", "term_frequency", "tfidf").show()

                                                                                

+--------------------+--------------------+--------------------+
|     filtered_tokens|      term_frequency|               tfidf|
+--------------------+--------------------+--------------------+
| [nokia, 1110, died]|(19911,[623,3422,...|(19911,[623,3422,...|
|[missed, brent, p...|(19911,[45,107,79...|(19911,[45,107,79...|
|[mraow, feel, lik...|(19911,[4,42,87,8...|(19911,[4,42,87,8...|
|[sad, feet, macbo...|(19911,[46,515,81...|(19911,[46,515,81...|
|[heart, broken, e...|(19911,[38,87,88,...|(19911,[38,87,88,...|
|[cant, take, heat...|(19911,[4,8,42,77...|(19911,[4,8,42,77...|
|[im, sooo, sad, r...|(19911,[0,34,46,4...|(19911,[0,34,46,4...|
|[im, sorry, hear,...|(19911,[0,46,53,8...|(19911,[0,46,53,8...|
|[karate, tourname...|(19911,[278,466,4...|(19911,[278,466,4...|
|[okay, still, sch...|(19911,[22,87,248...|(19911,[22,87,248...|
|[suddenly, miss, ...|(19911,[35,511,26...|(19911,[35,511,26...|
|[ugh, still, work...|(19911,[22,80,234...|(19911,[22,80,234...|
|[really, time, sl...|(19