In [1]:
from sparknlp.annotator import * 
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline
from pyspark.ml import Pipeline
import sparknlp  
import nltk
import pyspark.sql.functions as F
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, FloatType, StringType

import numpy as np
import os
import datetime

import spacy
import dateparser.search

# Start SparkNLP
spark = sparknlp.start()

In [2]:
tweets = spark.read.load("rel.tweets.parquet")

In [3]:
#add Bert Embeddings required by the name entity reconizer
bert_embed = BertEmbeddings()\
    .pretrained('bert_base_cased', 'en') \
    .setInputCols(["sentences",'tokens'])\
    .setOutputCol("embeddings")\
    .setCaseSensitive(True)\
    .setPoolingLayer(0)
    
#pretrained pos tagger to identify part of speech
pos_tagger = PerceptronModel()\
    .pretrained("pos_anc")\
    .setInputCols(["tokens", "sentences"])\
    .setOutputCol("POS")
    
#compute NER tags using pre-trained Bert model
ner = NerDLModel()\
    .pretrained('ner_dl_bert')\
    .setInputCols(["document", "tokens", "embeddings"])\
    .setOutputCol("ner")

#sentiment detector
sent_detector = ViveknSentimentModel()\
    .pretrained("sentiment_vivekn")\
    .setInputCols(["lemma", "document"]) \
    .setOutputCol("sentiment")
    
pipeline = Pipeline(stages=[bert_embed, pos_tagger, ner, sent_detector])
m = pipeline.fit(tweets)
tweets = m.transform(tweets)

bert_base_cased download started this may take some time.
Approximate size to download 389.2 MB
[OK!]
pos_anc download started this may take some time.
Approximate size to download 4.3 MB
[OK!]
ner_dl_bert download started this may take some time.
Approximate size to download 15.5 MB
[OK!]
sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
[OK!]


In [None]:
tweets.select("sentiment.result").show()

In [6]:
#filter out location tokens that look like stopwords
eng_stopwords = nltk.corpus.stopwords.words('english')

#filter out location token wich are not tagged as nouns
location_pos_tags = ["NN", "NP", "NNP", "NNS", "NPS", "NNPS"]

@F.udf(ArrayType(StringType()))
def get_locations(tokens, ner_tags, pos_tags, filter_pos = True):
    
    def check_token(token, pos_tag):
        if token in eng_stopwords:
            return False
        if filter_pos and (pos_tag not in location_pos_tags):
            return False

        return True
    
    locations = []
    cur_loc = ""
    i = 0
    for ner_tag in ner_tags:
        if ner_tag == "B-LOC":
            if check_token(tokens[i], pos_tags[i]):
                cur_loc = tokens[i]
        elif ner_tag == "I-LOC":
            if check_token(tokens[i], pos_tags[i]):
                cur_loc += " " + tokens[i]
        else:
            if cur_loc != "":
                locations.append(cur_loc)
                cur_loc = ""
        i += 1
        
    return locations if len(locations) else []

tweets = tweets\
    .withColumn("Locations", get_locations(tweets.tokens.result, tweets.ner.result, tweets.POS.result))


In [7]:
#Look-up locations from a list of a predefined locations and values
#https://github.com/napsternxg/TwitterNER/tree/master/data/cleaned/custom_lexicons

locations = spark.read.option("header", "true").option("sep", ",").csv("locations.txt") 
venues = spark.read.option("header", "true").option("sep", ",").csv("venues.txt") 

locations = locations.select(locations.Location).distinct()
venues = venues.select(venues.Venue).distinct()

In [11]:
#recognize locations and tidy up results
results = tweets\
    .join(venues, F.expr("array_contains(Locations, Venue)"), "left")\
    .join(locations, F.expr("array_contains(Locations, Location)"), "left")\
    .withColumn("RecognizedLocation", F.expr("CASE WHEN Venue IS NOT NULL THEN Venue WHEN Location IS NOT NULL THEN Location ELSE NULL END"))\
    .groupBy("tweet_id", "user_id", "dt", "tweet", "Locations")\
    .agg(
        F.collect_set(F.col("RecognizedLocation")).alias("RecognizedLocations"),        
        F.collect_set(F.col("Artist")).alias("Artists"),
        F.collect_set(F.col("sentiment.Result")).alias("Sentiment")
    )
    

In [30]:
#use spacy library to recognize dates

spacy_nlp = spacy.load('en_core_web_sm')

@F.udf(ArrayType(StringType()))
def get_dates(tweet):
    dates = []
    doc = spacy_nlp(tweet)
    for entity in doc.ents:
        if entity.label_ == "DATE":
            dates.append(entity.text)
            
    return dates

#use dateparser lib to process date strings and try to adjust them with respect to the twitter date
@F.udf(ArrayType(StringType()))
def process_dates(date_strings, tweet_dt):
    dates = []

    today = datetime.date.today()
    tweet_dt = dateparser.parse(tweet_dt)
    dt_diff = today - tweet_dt.date()
    
    for dt_str in date_strings:
        parsed_dates = dateparser.search.search_dates(dt_str)
        if parsed_dates:
            for dt in parsed_dates:            
                #parsed date
                parsed_dt = dt[1]
            
                #check if parsed date is closer to today than to the twitter date
                twitter_diff = parsed_dt.date() - tweet_dt.date()
                today_diff = today - parsed_dt.date()
            
                if (abs(today_diff.days) < abs(twitter_diff.days)):
                    #get the number of days between the current date and the twitter date
                    #and adjust the parsed date
                    parsed_dt = parsed_dt.date() - dt_diff

                dates.append(str(parsed_dt))
                
    return dates

results = results\
    .withColumn("Dates", get_dates(results.tweet))\
    .withColumn("ProcessedDates", process_dates(F.col("Dates"), results.dt))


In [31]:
#Show results
results\
    .select("tweet_id", "Artists", "Locations", "RecognizedLocations", "Dates", "ProcessedDates", "Sentiment")\
    .show()

+----------+--------------------+--------------------+-------------------+---------------+--------------------+------------+
|  tweet_id|             Artists|           Locations|RecognizedLocations|          Dates|      ProcessedDates|   Sentiment|
+----------+--------------------+--------------------+-------------------+---------------+--------------------+------------+
|3986316035|         [Lady Gaga]|                  []|                 []|             []|                  []|[[positive]]|
|4143687686|[Tommy Lee, Court...|             [Vegas]|            [Vegas]|             []|                  []|[[positive]]|
|5627695652|         [Bob Dylan]|                  []|                 []|             []|                  []|[[negative]]|
|5644862232|    [Britney Spears]|                  []|                 []|             []|                  []|[[positive]]|
|5678086598|   [Michael Jackson]|                  []|                 []|             []|                  []|[[negative]]|
