In [3]:
pip install spark-nlp==3.3.4

Note: you may need to restart the kernel to use updated packages.


In [4]:
spark

In [5]:
pip install numpy pandas nltk

Note: you may need to restart the kernel to use updated packages.


# Imports

In [26]:
from sparknlp.annotator import *
import pyspark.sql.functions as f
from pyspark.sql import Window
import pyspark.sql.types as t
from pyspark.ml.feature import Tokenizer as pysparkTokenizer, HashingTF, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner, PerceptronModel, Chunker
from pyspark.ml.clustering import LDA
from nltk.corpus import stopwords
import pandas as pd
import nltk
import sparknlp
nltk.download('stopwords')

from pyspark.sql.functions import regexp_extract, col
from pyspark.sql.functions import *

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [7]:
print("Spark NLP version", sparknlp.version())

Spark NLP version 3.3.4


## Preprocess tweeter data

In [64]:
df_tweet = spark.read.option("tableName", "Tweets").format("dynamodb").load().select(f.col("text"))
df_tweet = df_tweet.withColumn("user", regexp_extract(col("text"), r"@([A-Za-z0-9]+)", 1))
df_tweet = (df_tweet.withColumn('text', regexp_replace('text', r"@[A-Za-z0-9]+", ''))
            .withColumn('text', regexp_replace('text', "RT[\s:]+", '')))
df_tweet.show()
f"Lenght: {df_tweet.count()}"

+--------------------+--------------+
|                text|          user|
+--------------------+--------------+
|America and Nevad...|HillaryClinton|
|As we continue ou...|      JoeBiden|
|There is somethin...|  amyklobuchar|
|They used to blam...|    SenSchumer|
|The most valuable...|      JoeBiden|
| You’ll vote on i...|    SenSchumer|
| That's just a li...|         POTUS|
| Bye Harry https:...|    SenSchumer|
| Give up already ...|      RandPaul|
| your power Ameri...|            VP|
| America family h...|         POTUS|
| I love you so mu...|            VP|
|Harry. Steel of s...|     clairecmc|
| Maybe y’all shou...|            VP|
|Harry. Steel of s...|     clairecmc|
|                Liar|            VP|
|When Harry Reid w...|   BarackObama|
|Harry Reid was on...|    SenSchumer|
|When Harry Reid w...|   BarackObama|
|_JoeManchin He wa...|           Sen|
+--------------------+--------------+
only showing top 20 rows



'Lenght: 2049'

In [65]:
df_acc = spark.read.option("header","true").csv("accounts.csv").select("party", "username").distinct()
df_acc.show()
f"Lenght: {df_acc.count()}"

+-----+---------------+
|party|       username|
+-----+---------------+
|    D|  SenWhitehouse|
|    D| SenCoonsOffice|
|    R|     ThomTillis|
|    D|    SenStabenow|
|    R|RoundsforSenate|
|    D|       timkaine|
|    D|    SenJackReed|
|    R|  JohnnyIsakson|
|    D|      SenBooker|
|    R|  SenJohnHoeven|
|    R|  TheBushCenter|
|    R|RepublicanStudy|
|    R|SenatorLankford|
|    D| SenJeffMerkley|
|    D| MurrayCampaign|
|    R|   SenTomCotton|
|    R|     TeamCornyn|
|    R|    RogerWicker|
|    D|      Bob_Casey|
|    R|    TomCottonAR|
+-----+---------------+
only showing top 20 rows



'Lenght: 198'

In [66]:
df_tweet = df_tweet.join(df_acc, df_tweet.user == df_acc.username).select('party', 'text')
df_tweet.show()

+-----+--------------------+
|party|                text|
+-----+--------------------+
|    D|America and Nevad...|
|    D|As we continue ou...|
|    D|There is somethin...|
|    D|They used to blam...|
|    D|The most valuable...|
|    D| You’ll vote on i...|
|    D| That's just a li...|
|    D| Bye Harry https:...|
|    R| Give up already ...|
|    D| your power Ameri...|
|    D| America family h...|
|    D| I love you so mu...|
|    D|Harry. Steel of s...|
|    D| Maybe y’all shou...|
|    D|Harry. Steel of s...|
|    D|                Liar|
|    D|When Harry Reid w...|
|    D|Harry Reid was on...|
|    D|When Harry Reid w...|
|    D|When Harry Reid w...|
+-----+--------------------+
only showing top 20 rows



## Preprocess reddit data

In [67]:
df_reddit = spark.read.option("tableName", "RedditPosts").format("dynamodb").load().select(f.col("submission_id").alias("text"), f.col("subreddit"))
df_reddit = df_reddit.withColumn("party", initcap(col('subreddit')).substr(1,1)).select("party", "text")
df_reddit.show()

+-----+--------------------+
|party|                text|
+-----+--------------------+
|    D|Lincoln County lo...|
|    D|Harris charts her...|
|    D|Rudy Giuliani and...|
|    D|Thousands of Russ...|
|    D|A Capitol rioter ...|
|    D|19-year-old charg...|
|    D|Harris says Ameri...|
|    D|America is now in...|
|    D|Alleged ‘dead’ Ge...|
|    D|#TBT: The First K...|
|    D|Joe Biden's admin...|
|    D|Georgia Republica...|
|    D|What Commitment t...|
|    D|Supreme Court: Ju...|
|    D|Florida Democrats...|
|    D|When Whiny, Incom...|
|    D|U.S., Russia To H...|
|    D|Where Republicans...|
|    D|Opinion | It’s No...|
|    D|Biden finishes 20...|
+-----+--------------------+
only showing top 20 rows



## Join both sources

In [70]:
df = df_tweet.union(df_reddit)
df.show()
f"Lenght: {df.count()}"

+-----+--------------------+
|party|                text|
+-----+--------------------+
|    D|America and Nevad...|
|    D|As we continue ou...|
|    D|There is somethin...|
|    D|They used to blam...|
|    D|The most valuable...|
|    R| What do they cal...|
|    D| You’ll vote on i...|
|    D| That's just a li...|
|    D| Bye Harry https:...|
|    R| Give up already ...|
|    D| your power Ameri...|
|    D| America family h...|
|    D| I love you so mu...|
|    D|Harry. Steel of s...|
|    D| Maybe y’all shou...|
|    D|Harry. Steel of s...|
|    D|                Liar|
|    D|When Harry Reid w...|
|    D|Harry Reid was on...|
|    D|When Harry Reid w...|
+-----+--------------------+
only showing top 20 rows



'Lenght: 2060'

## Train test dataset

In [74]:
train, test = df.randomSplit([0.7, 0.3], seed=42)

In [76]:
train.groupBy("party") \
    .count() \
    .show()

+-----+-----+
|party|count|
+-----+-----+
|    D| 1560|
|    R|  431|
+-----+-----+



In [77]:
test.groupBy("party") \
    .count() \
    .show()

+-----+-----+
|party|count|
+-----+-----+
|    D|  597|
|    R|  195|
+-----+-----+



In [81]:
# actual content is inside description column
document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

In [85]:
embeddings = BertSentenceEmbeddings.pretrained("sent_small_bert_L8_512") \
      .setInputCols("document") \
      .setOutputCol("sentence_embeddings")

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:41401)
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


sent_small_bert_L8_512 download started this may take some time.


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:41401)

In [83]:
bert_sent = BertSentenceEmbeddings.pretrained('sent_small_bert_L8_512')\
    .setInputCols( ["document"])\
    .setOutputCol("sentence_embeddings")

sent_small_bert_L8_512 download started this may take some time.
Approximate size to download 149.1 MB
[ — ]

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1212, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


[OK!]

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:41401)
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 3343, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-83-73796aadade5>", line 1, in <module>
    bert_sent = BertSentenceEmbeddings.pretrained('sent_small_bert_L8_512')\
  File "/usr/local/lib/python3.6/dist-packages/sparknlp/annotator.py", line 6679, in pretrained
    return ResourceDownloader.downloadModel(BertSentenceEmbeddings, name, lang, remote_loc)
  File "/usr/local/lib/python3.6/dist-packages/sparknlp/pretrained.py", line 59, in downloadModel
    j_obj = _internal._DownloadModel(reader.name, name, language, remote_loc, j_dwn).apply()
  File "/usr/local/lib/python3.6/dist-packages/sparknlp/internal.py", line 214, in __init__
    name, language, remote_loc)
  File "/usr/local/lib/python3.6/dist-packages/sparknlp/inter




Py4JError: An error occurred while calling z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.downloadModel

In [None]:
# the classes/labels/categories are in category column
classsifierdl = classifierDLApproach()\
    .setInputCols (["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("party")\
    .setMaxEpochs (20)\
    .setEnableOutputLogs (True)

In [None]:
bert_sent_clf_pipeline = Pipeline(
    stages = [
        document,
        bert_sent,
        classsifierdl
    ])