In [5]:
#!pip install findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF,IDF
from pyspark.ml.classification import LinearSVC
from pyspark.sql import functions as F

In [6]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp

In [9]:
import pyspark as ps
from pyspark.sql import SQLContext
import warnings
try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    sc = ps.SparkContext('spark://spark:7077')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")



In [10]:
#create or get Spark Session
#spark = sparknlp.start()
spark = SparkSession.builder \
    .appName("sparkNLP")\
    .master("spark://spark:7077")\
    .config("spark.driver.memory","8G")\
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5")\
    .config("spark.kryoserializer.buffer.max", "500m")\
    .getOrCreate()
print("sparknlp version", sparknlp.version(), "sparkversion", spark.version)

sparknlp version 4.2.4 sparkversion 3.2.3


In [15]:
#https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "covid") \
  .load()

AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".        

### Document Assembler

<h5> It turned tweets into documents. This step is considered as an entry to use sparkNLP library functions and annotators</h5>

In [24]:
from sparknlp.base import DocumentAssembler
documentAssembler = sparknlp.base.DocumentAssembler().setInputCol('tweet').setOutputCol('document')
print(f"\n\t2. Attaching DocumentAssembler Transformer to the pipeline")


	2. Attaching DocumentAssembler Transformer to the pipeline


### Sentence Detector

I used this function to detect the multiple sentences from the tweet. However, there are more accurate and deep sentence detection and spell checkers could be used but need a lot of time and computation.

In [25]:
dentence_detector=SentenceDetector().setInputCols(['document']).setOutputCol('sentence')

### Tokenizer

It turns the document sentences into words

In [26]:
tokenizer = Tokenizer().setInputCols(["sentence"]).setOutputCol ("token")

### Stop Words Remover

stop words like I, we, me, and so on were removed as they don’t affect the meaning of the word.
It removes punctuation and turns words into lower case. Then, turn these clean words documents into regular words.

In [27]:
normalizer = Normalizer().setInputCols(["token"]).setOutputCol ('normalized').setLowercase (True)

finisher = Finisher().setInputCols(["normalized"]).setOutputCols(["token_features"]).setOutputAsArray(True) \
.setCleanAnnotations(False)# To generate Term Frequency

### Hashing

It turns the words into numerical values, this process is called vectorization.

In [28]:
hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures")# To generate Inverse Document Frequency

In [29]:
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

### Support Vector machine (SVM)

It’s a well-known machine-learning algorithm used in classification. It’s used to map the numeric features to the sentiment.

In [30]:
SVC = LinearSVC(labelCol = 'label',featuresCol="features",maxIter=60)

Now, we are ready to make a full pipeline…

In [31]:
from pyspark.ml import Pipeline
pipeline = Pipeline().setStages([documentAssembler,dentence_detector,tokenizer,normalizer,finisher,hashingTF,idf,SVC])

In [32]:
training_data = spark.read.csv('train.csv',inferSchema=True,header=True)

In [33]:
training_data.show()

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
|  6|    0|[2/2] huge fan fa...|
|  7|    0| @user camping to...|
|  8|    0|the next school y...|
|  9|    0|we won!!! love th...|
| 10|    0| @user @user welc...|
| 11|    0| â #ireland con...|
| 12|    0|we are so selfish...|
| 13|    0|i get to see my d...|
| 14|    1|@user #cnn calls ...|
| 15|    1|no comment!  in #...|
| 16|    0|ouch...junior is ...|
| 17|    0|i am thankful for...|
| 18|    1|retweet if you ag...|
| 19|    0|its #friday! ð...|
| 20|    0|as we all know, e...|
+---+-----+--------------------+
only showing top 20 rows



In [34]:
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet',r'http\S+',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','@\w+',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','#',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','RT',''))

training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','&lt',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','&gt',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','&amp',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','&quot',''))

training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','-',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet',' ',''))
training_data=training_data.withColumn('tweet',F.regexp_replace('tweet','  ',''))

In [35]:
training_data.show(4)

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0|whenafatherisdysf...|
|  2|    0|thanksforlyftcred...|
|  3|    0|   bihdayyourmajesty|
|  4|    0|modeliloveutakewi...|
+---+-----+--------------------+
only showing top 4 rows



In [36]:
processed_tweets = pipeline.fit(training_data)

In [37]:
processed_tweets

PipelineModel_6c87dc28a098

In [89]:
#pip install elasticsearch
from elasticsearch import Elasticsearch

In [90]:
# test your ES instance is running
# test your ES instance is running
es = Elasticsearch("http://localhost:9200")
es.info(pretty=True)

  es.info(pretty=True)


ObjectApiResponse({'name': 'ab62d2783536', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'XyzgtYCEQ9m5rf9Vrgywmg', 'version': {'number': '7.17.7', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '78dcaaa8cee33438b91eca7f5c7f56a70fec9e80', 'build_date': '2022-10-17T15:29:54.167373105Z', 'build_snapshot': False, 'lucene_version': '8.11.1', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'})

In [155]:
from kafka import KafkaConsumer
#import pydoop.hdfs as hdfs
consumer = KafkaConsumer('covid',bootstrap_servers=['localhost:9092'])

import pandas as pd
columns = ['id','tweet', 'label']
#df=spark.createDataFrame(vals,columns)

for message in consumer:
   #print(i) 
   values = message.value.decode('utf-8').splitlines()
   dfObj = pd.DataFrame(values,columns = ['tweet']) 

   dfObj.reset_index(drop=True) 
   dfObj=spark.createDataFrame(dfObj) 
    
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet',r'http\S+',''))
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','@\w+',''))
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','#',''))
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','RT',''))

   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','&lt',''))
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','&gt',''))
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','&amp',''))
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','&quot',''))

   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','-',' '))
   #dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet',' ',''))
   dfObj=dfObj.withColumn('tweet',F.regexp_replace('tweet','  ',' '))
   print(dfObj.select('tweet').show())
   result=processed_tweets.transform(dfObj)
   t=result.select('tweet').collect()
   p=result.select('prediction').collect()
   ##################################################
    ##################################################
   #results = processed_tweets.transform(test_data)
   from datetime import datetime
   # Getting the current date and time
   dt = datetime.now()
   # getting the timestamp
   ts = datetime.timestamp(dt) 
    # write data
   es.index(
         index='tweets_mondial',
         document={
          'tweet': t[0]['tweet'],
          'prediction': p[0]['prediction'],
          'Timestamp' :ts
 })
    

  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|"   Do you trust ...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : DeSantis' bes...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : On bipartisan...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : This is inter...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : This is just ...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" :  Oh yeah. \n\...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : BREAKING: New...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : DeSantis slam...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : GOV. DESANTIS...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : James Cameron...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : COVID cases a...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : The climate c...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" The mask is the...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : The Pfizer mR...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|"Please make this...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : He was only a...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : IN THE FIGHT ...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" We have hung ou...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : Happy Decembe...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : Hey Neighbors...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : I can no long...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : Skin cancer v...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : Tucker Carlso...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : Of course. Th...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():


+--------------------+
|               tweet|
+--------------------+
|" : A major new o...|
+--------------------+

None


  es.index(
  for column, series in pdf.iteritems():
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Apache-spark\spark-3.2.3-bin-hadoop2.7\python\lib\py4j-0.10.9.5-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Apache-spark\spark-3.2.3-bin-hadoop2.7\python\lib\py4j-0.10.9.5-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\pc\AppData\Local\Programs\Python\Python310\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [60]:
create_analys = {
    # this mapping definition sets up the fields for the rating events
    "mappings": {
        "properties": {
            "tweet": {
                "type": "keyword"
            },
            "prediction": {
                "type": "integer"
            }
        }  
    }
}


In [61]:
# create indices with the settings and mappings above
res_tweet = es.indices.create(index="tweet", body=create_analys)

  res_tweet = es.indices.create(index="tweet", body=create_analys)
  res_tweet = es.indices.create(index="tweet", body=create_analys)
