<a href="https://colab.research.google.com/github/Kiranmai-Narnavaram/Artificial_Intelligence/blob/main/Big_data_sentimental_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [75]:
!pip install -q pyspark==3.1.2 spark-nlp findspark kaggle nltk

In [76]:
from pyspark.sql import SparkSession

from pyspark.sql.types import StringType, IntegerType

import pyspark.sql.functions as f
from pyspark.sql.functions import col, to_timestamp, expr
from pyspark.sql.functions import udf

from pyspark.mllib.evaluation import MulticlassMetrics

import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import *
from sparknlp.annotator import *

In [77]:
import os 

import findspark

In [78]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz

In [79]:
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.1.2-bin-hadoop3.2'

findspark.init('spark-3.1.2-bin-hadoop3.2')

# Spark

In [80]:
spark = SparkSession.builder \
    .appName('Spark NLP')\
    .master('local[*]')\
    .config('spark.driver.memory','16G')\
    .config('spark.driver.maxResultSize', '0') \
    .config('spark.kryoserializer.buffer.max', '2000M')\
    .config('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.1')\
    .getOrCreate()

In [None]:
df =(spark.read
          .format('csv')
          .option('header', 'false')
          .load('training.1600000.processed.noemoticon.csv'))

df.show()

In [None]:
df.count(), len(df.columns)

In [None]:
df.printSchema()

In [None]:
df = (df.withColumnRenamed('_c0','target')
        .withColumnRenamed('_c1','id')
        .withColumnRenamed('_c2','tweet_date')
        .withColumnRenamed('_c3','flag')
        .withColumnRenamed('_c4','user')
        .withColumnRenamed('_c5','text')
)

df = df.withColumn('tweet_date', expr('substring(tweet_date, 5, 27)'))


df = df.select(col('target').cast('int'),
                         col('id').cast('int'),
                         to_timestamp(col('tweet_date'),'MMM dd HH:mm:ss zzz yyyy').alias('date'),
                         col('flag').cast('string'),
                         col('user').cast('string'),
                         col('text').cast('string'),
                        )

In [None]:
df = df.withColumn('weekday', f.date_format('date', 'EEEE'))

In [None]:
df.show()

# Exploratory Data Analysis

In [None]:
import pandas as pd

from matplotlib import pyplot as plt
import seaborn as sns

import plotly.express as px
import plotly.graph_objects as go

dataset = df.sample(0.10).toPandas()

In [None]:
values = dataset['target'].value_counts().keys().tolist()
counts = dataset['target'].value_counts().tolist()

fig = px.pie(values=counts, names=['Postitive','Negative'], title='Feedback distribuition')
fig.show()

In [None]:
fig = px.histogram(dataset, y='text', x='weekday', color='target', labels=['Positive','Negative'], barmode='group', histfunc='count', height=400 )

fig.show()

# NLP 

In [None]:
import nltk
nltk.download('stopwords')

In [None]:
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

stop_words = stopwords.words('english')

In [None]:
import nltk
nltk.download('punkt')

@udf(returnType=StringType()) 
def clean_tweet(tweet):
 
  word_tokens = word_tokenize(tweet)
  
  filtered_sentence = [w.lower() for w in word_tokens if not w.lower() in stop_words and w.isalpha()]

  return ' '.join(filtered_sentence)

In [None]:
df = df.withColumn('clean_text', clean_tweet(col('text')))

df.show()

In [None]:
end_df = df.select(col('clean_text'), col('target'))

end_df.show()

# The Model

# Pipeline

In [None]:
document_assembler = DocumentAssembler()\
    .setInputCol('clean_text')\
    .setOutputCol('document')
    
use = UniversalSentenceEncoder.pretrained(name='tfhub_use', lang='en')\
 .setInputCols(['document'])\
 .setOutputCol('sentence_embeddings')

sentiment_dl = SentimentDLModel.pretrained(name='sentimentdl_use_twitter', lang='en')\
    .setInputCols(['sentence_embeddings'])\
    .setOutputCol('sentiment')

In [None]:
pipeline = Pipeline(
      stages = [
          document_assembler,
          use,
          sentiment_dl
      ])

# Training for the model

In [None]:
(training_data, test_data) = end_df.randomSplit([0.75, 0.25])
model = pipeline.fit(training_data)

In [None]:
result = model.transform(test_data)
result.show(20)

In [None]:
sentiment_result = result.withColumn('cols', f.explode(f.arrays_zip('document.result', 'sentiment.result'))) \
.select(f.expr("cols['0']").alias('document'), 'target', 
        f.expr("cols['1']").alias('sentiment'))

sentiment_result.show(truncate=False)

# Evaluating the model


In [None]:
RESULT_MAP = { 'positive': 4, 'negative':0, 'neutral':2 }

@udf(returnType=IntegerType())
def map_results(text):
  return RESULT_MAP[text]

In [None]:
sentiment_result = sentiment_result.na.drop()

final_result = sentiment_result.withColumn('result', map_results(col('sentiment')))

final_result.show()

In [None]:
predictionAndLabels = final_result.select('target', 'result')
metrics = MulticlassMetrics(predictionAndLabels.rdd.map(lambda x: tuple(map(float, x))))