In [1]:
# https://nlp.johnsnowlabs.com/docs/en/annotators#languagedetectordl

# maven - com.johnsnowlabs.nlp:spark-nlp_2.11:2.5.3
# pypi - spark-nlp, vaderSentiment, boto3==1.9.157

# example on sentiment analysis
  # https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/training/english/classification/SentimentDL_train_multiclass_sentiment_classifier.ipynb
# https://nlp.johnsnowlabs.com/docs/en/pipelines#multi-language

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark import SparkFiles
from pyspark.ml.feature import (
    StringIndexer,
    HashingTF, 
    IDF
)
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes

In [3]:
# create Spark session

app_name = "Fake-News"

spark = SparkSession.builder.appName(app_name) \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.5.3") \
    .getOrCreate()

Exception: Java gateway process exited before sending its port number

In [4]:
file_uri = "/mnt/mnt_s3/train.csv"

trainDataset = spark.read \
    .format("com.databricks.spark.csv") \
    .options(header='true', inferSchema="true") \
    .load(file_uri)
display(trainDataset)

NameError: name 'spark' is not defined

In [None]:
file_uri = "/mnt/mnt_s3/test.csv"

testDataset = spark.read \
    .format("com.databricks.spark.csv") \
    .options(header='true', inferSchema="true") \
    .load(file_uri)
display(testDataset)

In [None]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from sparknlp.pretrained import PretrainedPipeline

sparknlp.start()

In [None]:
# actual content is inside description column
document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

use = UniversalSentenceEncoder.pretrained() \
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

# the classes/labels/categories are in category column
sentimentdl = SentimentDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("label")\
  .setMaxEpochs(5)\
  .setEnableOutputLogs(True)

pipeline = Pipeline(
    stages = [
        document,
        use,
        sentimentdl
    ])

In [None]:
pipelineModel = pipeline.fit(trainDataset)

In [None]:
pipelineModel.save("/mnt/mnt_s3/sentimentdl_pipeline")

In [None]:
loadedPipeline = PipelineModel.load("/mnt/mnt_s3/sentimentdl_pipeline")

In [None]:
import pandas as pd
pdf = pd.DataFrame({"text": [
  "This is so great! How are you", 
  "No way, never again!", 
  "Yes, bring on more!"
]})
df = spark.createDataFrame(pdf)

results = pipelineModel.transform(df)
display(
  results
)

In [None]:
display(
  results.selectExpr("text", "class.result")
)

In [None]:
analyzer.polarity_scores("This is so great! How are you")

In [None]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

analyzer = SentimentIntensityAnalyzer()

@F.udf(returnType="struct<neg:double,neu:double,pos:double,compound:double>")
def getVADER(text):
	return analyzer.polarity_scores(text)

new_results = results.withColumn("vaderSentiment", getVADER("text"))
display(new_results)

In [None]:
# Save to Postgres

import boto3

secret_name = "ut/pstgres/db3"
region_name = "us-east-2"
access_key = "AKIAUYOWD7QA6AUDJSO2"
secret_key = "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"

session = boto3.session.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name)
client = session.client('secretsmanager')
secret_value = client.get_secret_value(SecretId=secret_name)

import json
def get_connection(secret_value):
  return json.loads(secret_value['SecretString'])

connection = get_connection(secret_value)
# Postgres credentials
jdbcHostname = connection['host']
jdbcPort = connection['port']
jdbcDatabase = "postgres"
dialect = "postgresql"
jdbcUsername = connection['username']
jdbcPassword = connection['password']

jdbcUrl = f"jdbc:{dialect}://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "org.postgresql.Driver" 
}

In [None]:
display(
  new_results.repartition(1)
  .selectExpr("monotonically_increasing_id()+1 as id", "text", "class.result[0] as result", "vaderSentiment.neg as vaderNeg",
             "vaderSentiment.neu as vaderNeu", "vaderSentiment.pos as vaderPos", "vaderSentiment.compound as vaderCompound")
)

In [None]:
table = "sentiment_results"
mode = "overwrite" # options are: error, append, overwrite

(new_results.repartition(1)
  .selectExpr("monotonically_increasing_id()+1 as id", "text", "class.result[0] as result", "vaderSentiment.neg as vaderNeg",
             "vaderSentiment.neu as vaderNeu", "vaderSentiment.pos as vaderPos", "vaderSentiment.compound as vaderCompound")
 .write
 .jdbc(jdbcUrl, table, mode, connectionProperties)
)

In [None]:
import pandas as pd
pdf = pd.DataFrame({"text": [
  "With the nation's school systems in an upheaval since the pandemic began, several governors are beginning to take sides in the debate between key national leaders pushing for children to attend classes in person and many local officials hesitant to congregate students before it is safe.", 
  "Con los sistemas escolares de la nación en una agitación desde que comenzó la pandemia, varios gobernadores están comenzando a tomar partido en el debate entre los líderes nacionales clave que presionan para que los niños asistan a clases en persona y muchos funcionarios locales dudan en congregar a los estudiantes antes de que sea seguro.", 
  "С момента возникновения пандемии в школьной системе страны несколько губернаторов начинают принимать участие в дебатах между ключевыми национальными лидерами, которые настаивают на том, чтобы дети посещали занятия лично, и многими местными чиновниками, которые не собираются собирать учащихся, прежде чем это станет безопасным."
]})
df = spark.createDataFrame(pdf)
display(df)

In [None]:
documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

languageDetector = LanguageDetectorDL.pretrained("ld_wiki_20") \
    .setInputCols("document") \
    .setOutputCol("language")\
    .setThreshold(0.3)\
    .setCoalesceSentences(True)

pipeline = Pipeline(
    stages = [
      documentAssembler,
      languageDetector
  ])

model = pipeline.fit(df)

display(
  model.transform(df)
)