### **Sessão e leitura dos dados**

In [None]:
# Iniciando uma sessão
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]')\
    .appName("analise_nlp")\
    .config(
    "spark.driver.extraJavaOptions",
    "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED",)\
    .getOrCreate()

In [None]:
dados = spark.read.csv("datasets/imdb-reviews-pt-br.csv",
                       escape="\"",
                       header=True,
                       inferSchema=True)

### **Explorando os dados**



In [None]:
dados.count()

In [None]:
len(dados.columns)

In [None]:
print(f"Linhas: {dados.count()} Colunas: {len(dados.columns)}")

In [None]:
dados.printSchema()

In [None]:
dados.limit(10).show()

Negativo

In [None]:
dados.filter(dados.id == 190).select("text_pt").show(truncate=False)

Positivo

In [None]:
dados.filter(dados.id == 12427).select("text_pt").show(truncate=False)

Quantos comentários negativos e positivos?

In [None]:
dados.groupBy("sentiment").count().show()

# Word Cloud

In [None]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt

amostra = dados.select("text_pt").sample(fraction=0.10, seed=101)
tudo = [texto["text_pt"] for texto in amostra.collect()]

wordcloud = WordCloud(width=1000,
                        height=600,
                        collocations=False,
                        prefer_horizontal=1).generate(str(tudo))

In [None]:
plt.figure(figsize=(20, 8))
plt.imshow(wordcloud)
plt.axis("off")
plt.show()

# <b>Limpeza</b>: Caracteres especiais

In [None]:
import string
string.punctuation

Exemplo da limpeza

In [None]:
amostra = spark.createDataFrame([
       ("Oi, JP! Blz?",),
       ("$$$\\ |~ Parabéns ~| \\$$$",),
       ("(#amovc #paz&amor ^.^)",),
       ("\"bora *_* \"",),
       ("=>->'...``` vc foi selecionad@ ´´´...'<=<-",),
       ("{comprar: arroz; feijão e pepino}  //",),
       ("!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~",),
     ("Milionário & José Rico",)
], ["textos"])


In [None]:
import pyspark.sql.functions as f

amostra = amostra.withColumn("text_regex", f.regexp_replace("textos", "\$", ""))

In [None]:
dados = dados.withColumn("texto_regex", f.regexp_replace("text_en", "[\$#,\"!%&'()*+-./;;<=>?@^_`{|}~\\\\]", ""))

dados.limit(2).show(truncate = False)

In [None]:
dados = dados.withColumn("texto_limpo", f.trim(dados.texto_regex))

# <b>Tokenização</b>: Divisão em tokens

In [None]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="texto_limpo", outputCol="tokens")
tokenizado = tokenizer.transform(dados)

In [None]:
from pyspark.sql.types import IntegerType

countTokens = f.udf(lambda tokens: len(tokens), IntegerType())

tokenizado.select("texto_limpo", "tokens").withColumn("Freq_tokens", countTokens(f.col("tokens")))

In [None]:
tokenizado.show()

In [None]:
data = [(1, 'Spark é ótimo e NLP com Spark é fácil'),
                (0, 'Spark MLlib não ajuda muito'),
                (1, 'O MLlib do Spark ajuda e é fácil')]

colNames =['label', 'texto_limpo']
df = spark.createDataFrame(data, colNames)

In [None]:
import nltk
nltk.download("stopwords")

from nltk.corpus import stopwords

In [None]:
from pyspark.ml.feature import StopWordsRemover

stop_A = stopwords.words("portuguese")
stop_B = StopWordsRemover.loadDefaultStopWords("portuguese")

In [None]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol = "texto_limpo", outputCol= "tokens")
tokenized = tokenizer.transform(df)

In [None]:
remover = StopWordsRemover(inputCol = "tokens", outputCol= "texto_final", stopWords = stop_B)

df = remover.transform(tokenized)   

In [None]:
remover = StopWordsRemover(inputCol = "tokens", outputCol= "texto_final", stopWords = stop_A)

df = remover.transform(tokenized)

In [None]:
remover = StopWordsRemover(inputCol = "tokens", outputCol= "texto_final")

df = remover.transform(tokenized)
feature_data = df

# Bag of words

In [None]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol = "texto_final", outputCol = "CountVec")

In [None]:
model = cv.fit(feature_data)

In [None]:

CountVectorizer_features = model.transform(feature_data)

In [None]:
model = cv.fit(feature_data)

# Hashing TF

In [None]:
from pyspark.ml.feature import HashingTF

In [None]:
hashingTF = HashingTF(inputCol="texto_final", outputCol="hashingTF")
hashingTF.setNumFeatures(50)

In [None]:
HTFfeaturizedData  = hashingTF.transform(CountVectorizer_features)

# TF-IDF

In [35]:
from pyspark.ml.feature import IDF

idf = IDF(inputCol="hashingTF", outputCol="features")

idfModel = idf.fit(HTFfeaturizedData)

TFIDFfeaturizedData = idfModel.transform(HTFfeaturizedData)

NameError: name 'HTFfeaturizedData' is not defined

# Pipeline

In [36]:
# Unindo nossas transformações.
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="texto_limpo", outputCol="tokens")
stopwords = StopWordsRemover(inputCol="tokens", outputCol="texto_final")
hashingTF = HashingTF(inputCol=stopwords.getOutputCol(), outputCol="HTF")
tfidf = IDF(inputCol="HTF", outputCol="features")
stringIndexer = StringIndexer(inputCol="sentiment", outputCol="label")

pipeline = Pipeline(stages=[tokenizer, stopwords, hashingTF, tfidf, stringIndexer])

dados_transformados = pipeline.fit(dados).transform(dados)

NameError: name 'tdidf' is not defined

# Decision Tree

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

tokenizer = Tokenizer(inputCol = "texto_limpo", outputCol = "tokens")

stopwords = StopWordsRemover(inputCol="tokens", outputCol="texto_final")

hashingTF = HashingTF(inputCol=stopwords.getOutputCol(), outputCol="HTF", numFeatures=1000)

tfidf = IDF(inputCol="HTF", outputCol="features")

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages = [tokenizer, stopwords, hashingTF, tdidf, dt])

In [None]:
train, test = dados.ramdonSplit([0.7, 0.3], seed = 101)

In [None]:
dt_model = pipeline.fit(train)

In [None]:
predictions = dt_model.transform(test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Acuracia = %s" % (accuracy))
