In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType,StringType, DoubleType,NumericType,IntegerType
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
from pyspark.sql.functions import udf, array
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import lit
import re
import os

# using Vader for sentiment analysis

In [3]:
## Vader sentiment calculation
## ref: https://github.com/cjhutto/vaderSentiment

# coding: utf-8
# Author: C.J. Hutto
# Thanks to George Berry for reducing the time complexity from something like O(N^4) to O(N).
# Thanks to Ewan Klein and Pierpaolo Pantone for bringing VADER into NLTK. Those modifications were awesome.
# For license information, see LICENSE.TXT

"""
If you use the VADER sentiment analysis tools, please cite:
Hutto, C.J. & Gilbert, E.E. (2014). VADER: A Parsimonious Rule-based Model for
Sentiment Analysis of Social Media Text. Eighth International Conference on
Weblogs and Social Media (ICWSM-14). Ann Arbor, MI, June 2014.
"""
import os
import re
import math
import string
import codecs
import json
from itertools import product
from inspect import getsourcefile
from io import open

# ##Constants##

# (empirically derived mean sentiment intensity rating increase for booster words)
B_INCR = 0.293
B_DECR = -0.293

# (empirically derived mean sentiment intensity rating increase for using ALLCAPs to emphasize a word)
C_INCR = 0.733
N_SCALAR = -0.74

NEGATE = \
    ["aint", "arent", "cannot", "cant", "couldnt", "darent", "didnt", "doesnt",
     "ain't", "aren't", "can't", "couldn't", "daren't", "didn't", "doesn't",
     "dont", "hadnt", "hasnt", "havent", "isnt", "mightnt", "mustnt", "neither",
     "don't", "hadn't", "hasn't", "haven't", "isn't", "mightn't", "mustn't",
     "neednt", "needn't", "never", "none", "nope", "nor", "not", "nothing", "nowhere",
     "oughtnt", "shant", "shouldnt", "uhuh", "wasnt", "werent",
     "oughtn't", "shan't", "shouldn't", "uh-uh", "wasn't", "weren't",
     "without", "wont", "wouldnt", "won't", "wouldn't", "rarely", "seldom", "despite"]

# booster/dampener 'intensifiers' or 'degree adverbs'
# http://en.wiktionary.org/wiki/Category:English_degree_adverbs

BOOSTER_DICT = \
    {"absolutely": B_INCR, "amazingly": B_INCR, "awfully": B_INCR, 
     "completely": B_INCR, "considerable": B_INCR, "considerably": B_INCR,
     "decidedly": B_INCR, "deeply": B_INCR, "effing": B_INCR, "enormous": B_INCR, "enormously": B_INCR,
     "entirely": B_INCR, "especially": B_INCR, "exceptional": B_INCR, "exceptionally": B_INCR, 
     "extreme": B_INCR, "extremely": B_INCR,
     "fabulously": B_INCR, "flipping": B_INCR, "flippin": B_INCR, "frackin": B_INCR, "fracking": B_INCR,
     "fricking": B_INCR, "frickin": B_INCR, "frigging": B_INCR, "friggin": B_INCR, "fully": B_INCR, 
     "fuckin": B_INCR, "fucking": B_INCR, "fuggin": B_INCR, "fugging": B_INCR,
     "greatly": B_INCR, "hella": B_INCR, "highly": B_INCR, "hugely": B_INCR, 
     "incredible": B_INCR, "incredibly": B_INCR, "intensely": B_INCR, 
     "major": B_INCR, "majorly": B_INCR, "more": B_INCR, "most": B_INCR, "particularly": B_INCR,
     "purely": B_INCR, "quite": B_INCR, "really": B_INCR, "remarkably": B_INCR,
     "so": B_INCR, "substantially": B_INCR,
     "thoroughly": B_INCR, "total": B_INCR, "totally": B_INCR, "tremendous": B_INCR, "tremendously": B_INCR,
     "uber": B_INCR, "unbelievably": B_INCR, "unusually": B_INCR, "utter": B_INCR, "utterly": B_INCR,
     "very": B_INCR,
     "almost": B_DECR, "barely": B_DECR, "hardly": B_DECR, "just enough": B_DECR,
     "kind of": B_DECR, "kinda": B_DECR, "kindof": B_DECR, "kind-of": B_DECR,
     "less": B_DECR, "little": B_DECR, "marginal": B_DECR, "marginally": B_DECR,
     "occasional": B_DECR, "occasionally": B_DECR, "partly": B_DECR,
     "scarce": B_DECR, "scarcely": B_DECR, "slight": B_DECR, "slightly": B_DECR, "somewhat": B_DECR,
     "sort of": B_DECR, "sorta": B_DECR, "sortof": B_DECR, "sort-of": B_DECR}

# check for sentiment laden idioms that do not contain lexicon words (future work, not yet implemented)
SENTIMENT_LADEN_IDIOMS = {"cut the mustard": 2, "hand to mouth": -2,
                          "back handed": -2, "blow smoke": -2, "blowing smoke": -2,
                          "upper hand": 1, "break a leg": 2,
                          "cooking with gas": 2, "in the black": 2, "in the red": -2,
                          "on the ball": 2, "under the weather": -2}

# check for special case idioms and phrases containing lexicon words
SPECIAL_CASES = {"the shit": 3, "the bomb": 3, "bad ass": 1.5, "badass": 1.5, "bus stop": 0.0,
                 "yeah right": -2, "kiss of death": -1.5, "to die for": 3, 
                 "beating heart": 3.1, "broken heart": -2.9 }


# #Static methods# #

def negated(input_words, include_nt=True):
    """
    Determine if input contains negation words
    """
    input_words = [str(w).lower() for w in input_words]
    neg_words = []
    neg_words.extend(NEGATE)
    for word in neg_words:
        if word in input_words:
            return True
    if include_nt:
        for word in input_words:
            if "n't" in word:
                return True
    '''if "least" in input_words:
        i = input_words.index("least")
        if i > 0 and input_words[i - 1] != "at":
            return True'''
    return False


def normalize(score, alpha=15):
    """
    Normalize the score to be between -1 and 1 using an alpha that
    approximates the max expected value
    """
    norm_score = score / math.sqrt((score * score) + alpha)
    if norm_score < -1.0:
        return -1.0
    elif norm_score > 1.0:
        return 1.0
    else:
        return norm_score


def allcap_differential(words):
    """
    Check whether just some words in the input are ALL CAPS
    :param list words: The words to inspect
    :returns: `True` if some but not all items in `words` are ALL CAPS
    """
    is_different = False
    allcap_words = 0
    for word in words:
        if word.isupper():
            allcap_words += 1
    cap_differential = len(words) - allcap_words
    if 0 < cap_differential < len(words):
        is_different = True
    return is_different


def scalar_inc_dec(word, valence, is_cap_diff):
    """
    Check if the preceding words increase, decrease, or negate/nullify the
    valence
    """
    scalar = 0.0
    word_lower = word.lower()
    if word_lower in BOOSTER_DICT:
        scalar = BOOSTER_DICT[word_lower]
        if valence < 0:
            scalar *= -1
        # check if booster/dampener word is in ALLCAPS (while others aren't)
        if word.isupper() and is_cap_diff:
            if valence > 0:
                scalar += C_INCR
            else:
                scalar -= C_INCR
    return scalar


class SentiText(object):
    """
    Identify sentiment-relevant string-level properties of input text.
    """

    def __init__(self, text):
        if not isinstance(text, str):
            text = str(text).encode('utf-8')
        self.text = text
        self.words_and_emoticons = self._words_and_emoticons()
        # doesn't separate words from\
        # adjacent punctuation (keeps emoticons & contractions)
        self.is_cap_diff = allcap_differential(self.words_and_emoticons)

    @staticmethod
    def _strip_punc_if_word(token):
        """
        Removes all trailing and leading punctuation
        If the resulting string has two or fewer characters,
        then it was likely an emoticon, so return original string
        (ie ":)" stripped would be "", so just return ":)"
        """
        stripped = token.strip(string.punctuation)
        if len(stripped) <= 2:
            return token
        return stripped

    def _words_and_emoticons(self):
        """
        Removes leading and trailing puncutation
        Leaves contractions and most emoticons
            Does not preserve punc-plus-letter emoticons (e.g. :D)
        """
        wes = self.text.split()
        stripped = list(map(self._strip_punc_if_word, wes))
        return stripped

class SentimentIntensityAnalyzer_1(object):
    """
    Give a sentiment intensity score to sentences.
    """

    def __init__(self, lexicon_dict, emoji_dict):
        self.lexicon = lexicon_dict
        self.emojis = emoji_dict

    def polarity_scores(self, text):
        """
        Return a float for sentiment strength based on the input text.
        Positive values are positive valence, negative value are negative
        valence.
        """
        # convert emojis to their textual descriptions
        text_no_emoji = ""
        prev_space = True
        for chr in text:
            if chr in self.emojis:
                # get the textual description
                description = self.emojis[chr]
                if not prev_space:
                    text_no_emoji += ' '
                text_no_emoji += description
                prev_space = False
            else:
                text_no_emoji += chr
                prev_space = chr == ' '
        text = text_no_emoji.strip()

        sentitext = SentiText(text)

        sentiments = []
        words_and_emoticons = sentitext.words_and_emoticons
        for i, item in enumerate(words_and_emoticons):
            valence = 0
            # check for vader_lexicon words that may be used as modifiers or negations
            if item.lower() in BOOSTER_DICT:
                sentiments.append(valence)
                continue
            if (i < len(words_and_emoticons) - 1 and item.lower() == "kind" and
                    words_and_emoticons[i + 1].lower() == "of"):
                sentiments.append(valence)
                continue

            sentiments = self.sentiment_valence(valence, sentitext, item, i, sentiments)

        sentiments = self._but_check(words_and_emoticons, sentiments)

        valence_dict = self.score_valence(sentiments, text)

        return valence_dict

    def sentiment_valence(self, valence, sentitext, item, i, sentiments):
        is_cap_diff = sentitext.is_cap_diff
        words_and_emoticons = sentitext.words_and_emoticons
        item_lowercase = item.lower()
        if item_lowercase in self.lexicon:
            # get the sentiment valence 
            valence = self.lexicon[item_lowercase]
                
            # check for "no" as negation for an adjacent lexicon item vs "no" as its own stand-alone lexicon item
            if item_lowercase == "no" and i != len(words_and_emoticons)-1 and words_and_emoticons[i + 1].lower() in self.lexicon:
                # don't use valence of "no" as a lexicon item. Instead set it's valence to 0.0 and negate the next item
                valence = 0.0
            if (i > 0 and words_and_emoticons[i - 1].lower() == "no") \
               or (i > 1 and words_and_emoticons[i - 2].lower() == "no") \
               or (i > 2 and words_and_emoticons[i - 3].lower() == "no" and words_and_emoticons[i - 1].lower() in ["or", "nor"] ):
                valence = self.lexicon[item_lowercase] * N_SCALAR
            
            # check if sentiment laden word is in ALL CAPS (while others aren't)
            if item.isupper() and is_cap_diff:
                if valence > 0:
                    valence += C_INCR
                else:
                    valence -= C_INCR

            for start_i in range(0, 3):
                # dampen the scalar modifier of preceding words and emoticons
                # (excluding the ones that immediately preceed the item) based
                # on their distance from the current item.
                if i > start_i and words_and_emoticons[i - (start_i + 1)].lower() not in self.lexicon:
                    s = scalar_inc_dec(words_and_emoticons[i - (start_i + 1)], valence, is_cap_diff)
                    if start_i == 1 and s != 0:
                        s = s * 0.95
                    if start_i == 2 and s != 0:
                        s = s * 0.9
                    valence = valence + s
                    valence = self._negation_check(valence, words_and_emoticons, start_i, i)
                    if start_i == 2:
                        valence = self._special_idioms_check(valence, words_and_emoticons, i)

            valence = self._least_check(valence, words_and_emoticons, i)
        sentiments.append(valence)
        return sentiments

    def _least_check(self, valence, words_and_emoticons, i):
        # check for negation case using "least"
        if i > 1 and words_and_emoticons[i - 1].lower() not in self.lexicon \
                and words_and_emoticons[i - 1].lower() == "least":
            if words_and_emoticons[i - 2].lower() != "at" and words_and_emoticons[i - 2].lower() != "very":
                valence = valence * N_SCALAR
        elif i > 0 and words_and_emoticons[i - 1].lower() not in self.lexicon \
                and words_and_emoticons[i - 1].lower() == "least":
            valence = valence * N_SCALAR
        return valence

    @staticmethod
    def _but_check(words_and_emoticons, sentiments):
        # check for modification in sentiment due to contrastive conjunction 'but'
        words_and_emoticons_lower = [str(w).lower() for w in words_and_emoticons]
        if 'but' in words_and_emoticons_lower:
            bi = words_and_emoticons_lower.index('but')
            for sentiment in sentiments:
                si = sentiments.index(sentiment)
                if si < bi:
                    sentiments.pop(si)
                    sentiments.insert(si, sentiment * 0.5)
                elif si > bi:
                    sentiments.pop(si)
                    sentiments.insert(si, sentiment * 1.5)
        return sentiments

    @staticmethod
    def _special_idioms_check(valence, words_and_emoticons, i):
        words_and_emoticons_lower = [str(w).lower() for w in words_and_emoticons]
        onezero = "{0} {1}".format(words_and_emoticons_lower[i - 1], words_and_emoticons_lower[i])

        twoonezero = "{0} {1} {2}".format(words_and_emoticons_lower[i - 2],
                                          words_and_emoticons_lower[i - 1], words_and_emoticons_lower[i])

        twoone = "{0} {1}".format(words_and_emoticons_lower[i - 2], words_and_emoticons_lower[i - 1])

        threetwoone = "{0} {1} {2}".format(words_and_emoticons_lower[i - 3],
                                           words_and_emoticons_lower[i - 2], words_and_emoticons_lower[i - 1])

        threetwo = "{0} {1}".format(words_and_emoticons_lower[i - 3], words_and_emoticons_lower[i - 2])

        sequences = [onezero, twoonezero, twoone, threetwoone, threetwo]

        for seq in sequences:
            if seq in SPECIAL_CASES:
                valence = SPECIAL_CASES[seq]
                break

        if len(words_and_emoticons_lower) - 1 > i:
            zeroone = "{0} {1}".format(words_and_emoticons_lower[i], words_and_emoticons_lower[i + 1])
            if zeroone in SPECIAL_CASES:
                valence = SPECIAL_CASES[zeroone]
        if len(words_and_emoticons_lower) - 1 > i + 1:
            zeroonetwo = "{0} {1} {2}".format(words_and_emoticons_lower[i], words_and_emoticons_lower[i + 1],
                                              words_and_emoticons_lower[i + 2])
            if zeroonetwo in SPECIAL_CASES:
                valence = SPECIAL_CASES[zeroonetwo]

        # check for booster/dampener bi-grams such as 'sort of' or 'kind of'
        n_grams = [threetwoone, threetwo, twoone]
        for n_gram in n_grams:
            if n_gram in BOOSTER_DICT:
                valence = valence + BOOSTER_DICT[n_gram]
        return valence

    @staticmethod
    def _sentiment_laden_idioms_check(valence, senti_text_lower):
        # Future Work
        # check for sentiment laden idioms that don't contain a lexicon word
        idioms_valences = []
        for idiom in SENTIMENT_LADEN_IDIOMS:
            if idiom in senti_text_lower:
                print(idiom, senti_text_lower)
                valence = SENTIMENT_LADEN_IDIOMS[idiom]
                idioms_valences.append(valence)
        if len(idioms_valences) > 0:
            valence = sum(idioms_valences) / float(len(idioms_valences))
        return valence

    @staticmethod
    def _negation_check(valence, words_and_emoticons, start_i, i):
        words_and_emoticons_lower = [str(w).lower() for w in words_and_emoticons]
        if start_i == 0:
            if negated([words_and_emoticons_lower[i - (start_i + 1)]]):  # 1 word preceding lexicon word (w/o stopwords)
                valence = valence * N_SCALAR
        if start_i == 1:
            if words_and_emoticons_lower[i - 2] == "never" and \
                    (words_and_emoticons_lower[i - 1] == "so" or
                     words_and_emoticons_lower[i - 1] == "this"):
                valence = valence * 1.25
            elif words_and_emoticons_lower[i - 2] == "without" and \
                    words_and_emoticons_lower[i - 1] == "doubt":
                valence = valence
            elif negated([words_and_emoticons_lower[i - (start_i + 1)]]):  # 2 words preceding the lexicon word position
                valence = valence * N_SCALAR
        if start_i == 2:
            if words_and_emoticons_lower[i - 3] == "never" and \
                    (words_and_emoticons_lower[i - 2] == "so" or words_and_emoticons_lower[i - 2] == "this") or \
                    (words_and_emoticons_lower[i - 1] == "so" or words_and_emoticons_lower[i - 1] == "this"):
                valence = valence * 1.25
            elif words_and_emoticons_lower[i - 3] == "without" and \
                    (words_and_emoticons_lower[i - 2] == "doubt" or words_and_emoticons_lower[i - 1] == "doubt"):
                valence = valence
            elif negated([words_and_emoticons_lower[i - (start_i + 1)]]):  # 3 words preceding the lexicon word position
                valence = valence * N_SCALAR
        return valence

    def _punctuation_emphasis(self, text):
        # add emphasis from exclamation points and question marks
        ep_amplifier = self._amplify_ep(text)
        qm_amplifier = self._amplify_qm(text)
        punct_emph_amplifier = ep_amplifier + qm_amplifier
        return punct_emph_amplifier

    @staticmethod
    def _amplify_ep(text):
        # check for added emphasis resulting from exclamation points (up to 4 of them)
        ep_count = text.count("!")
        if ep_count > 4:
            ep_count = 4
        # (empirically derived mean sentiment intensity rating increase for
        # exclamation points)
        ep_amplifier = ep_count * 0.292
        return ep_amplifier

    @staticmethod
    def _amplify_qm(text):
        # check for added emphasis resulting from question marks (2 or 3+)
        qm_count = text.count("?")
        qm_amplifier = 0
        if qm_count > 1:
            if qm_count <= 3:
                # (empirically derived mean sentiment intensity rating increase for
                # question marks)
                qm_amplifier = qm_count * 0.18
            else:
                qm_amplifier = 0.96
        return qm_amplifier

    @staticmethod
    def _sift_sentiment_scores(sentiments):
        # want separate positive versus negative sentiment scores
        pos_sum = 0.0
        neg_sum = 0.0
        neu_count = 0
        for sentiment_score in sentiments:
            if sentiment_score > 0:
                pos_sum += (float(sentiment_score) + 1)  # compensates for neutral words that are counted as 1
            if sentiment_score < 0:
                neg_sum += (float(sentiment_score) - 1)  # when used with math.fabs(), compensates for neutrals
            if sentiment_score == 0:
                neu_count += 1
        return pos_sum, neg_sum, neu_count

    def score_valence(self, sentiments, text):
        if sentiments:
            sum_s = float(sum(sentiments))
            # compute and add emphasis from punctuation in text
            punct_emph_amplifier = self._punctuation_emphasis(text)
            if sum_s > 0:
                sum_s += punct_emph_amplifier
            elif sum_s < 0:
                sum_s -= punct_emph_amplifier

            compound = normalize(sum_s)
            # discriminate between positive, negative and neutral sentiment scores
            pos_sum, neg_sum, neu_count = self._sift_sentiment_scores(sentiments)

            if pos_sum > math.fabs(neg_sum):
                pos_sum += punct_emph_amplifier
            elif pos_sum < math.fabs(neg_sum):
                neg_sum -= punct_emph_amplifier

            total = pos_sum + math.fabs(neg_sum) + neu_count
            pos = math.fabs(pos_sum / total)
            neg = math.fabs(neg_sum / total)
            neu = math.fabs(neu_count / total)

        else:
            compound = 0.0
            pos = 0.0
            neg = 0.0
            neu = 0.0

        sentiment_dict = \
            {"neg": round(neg, 3),
             "neu": round(neu, 3),
             "pos": round(pos, 3),
             "compound": round(compound, 4)}

        return sentiment_dict

In [4]:
## dictionary for sentiment 

lexicon_file="vader_lexicon.txt"
emoji_lexicon="emoji_utf8_lexicon.txt"


def make_lex_dict(lexicon_full_filepath):
    """
    Convert lexicon file to a dictionary
    """
    lex_dict = {}
    for line in lexicon_full_filepath.rstrip('\n').split('\n'):
        if not line:
            continue
        (word, measure) = line.strip().split('\t')[0:2]
        lex_dict[word] = float(measure)
    return lex_dict

def make_emoji_dict(emoji_full_filepath):
    """
    Convert emoji lexicon file to a dictionary
    """
    emoji_dict = {}
    for line in emoji_full_filepath.rstrip('\n').split('\n'):
        (emoji, description) = line.strip().split('\t')[0:2]
        emoji_dict[emoji] = description
    return emoji_dict

with codecs.open(lexicon_file, encoding='utf-8') as f:
    lexicon_full_filepath = f.read()
    
lexicon_dict = make_lex_dict(lexicon_full_filepath)

with codecs.open(emoji_lexicon, encoding='utf-8') as f:
    emoji_full_filepath = f.read()
emojis_dict = make_emoji_dict(emoji_full_filepath)


In [5]:
from pyspark.sql.types import FloatType
# from textblob import TextBlob
# from vaderSentiment_1 import SentimentIntensityAnalyzer


def sentiment_analysis(text):
    analyzer = SentimentIntensityAnalyzer_1(lexicon_dict,emojis_dict)
    return analyzer.polarity_scores(text)['compound'] ## only chec 'compound'

sentiment_analysis_udf = udf(sentiment_analysis , FloatType())


# 

In [56]:
folder = 'billboard_0829_to_0831'
DF0 = sqlContext.read.parquet(folder) 

In [47]:
folder = '2020_sep_oct_nov'
DF = sqlContext.read.parquet(folder)

In [59]:
DF = DF.union(DF0)

In [60]:
# adding weeks for bins
## 
from pyspark.sql.functions import udf, array
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType,StringType,NumericType,IntegerType
from pyspark.sql.functions import countDistinct

def find_month(date: str):   
    month = date.split()[1]
    if month == 'Sep':
        m = 9
    elif month == 'Oct':
        m = 10
    elif month == 'Nov':
        m = 11
    elif month == 'Dec':
        m = 12
    elif month == 'Aug':
        m = 8
    return m

def find_day(date: str):
    return int(date.split()[2])

split_month = udf(find_month, IntegerType())
split_day = udf(find_day, IntegerType())

In [61]:
D = DF.withColumn('month',split_month('created_at'))
D = D.withColumn('day',split_day('created_at'))
D = D.withColumn('month_day', (D['month']*100 + D['day'])  )


In [63]:
D.createOrReplaceTempView("df")
result = spark.sql("SELECT month,day,month_day,COUNT(1) FROM df GROUP BY month,day,month_day \
                    HAVING month == 8 ")
result.show()

+-----+---+---------+--------+
|month|day|month_day|count(1)|
+-----+---+---------+--------+
|    8| 30|      830| 1027198|
|    8| 31|      831| 3132890|
|    8| 29|      829|  751529|
+-----+---+---------+--------+



In [64]:
from pyspark.ml.feature import Bucketizer

week_split = [0,904,911,918,925,1002,1009,1016,1023,1030,1106,1113,1120,1127,1203]
buck = Bucketizer(inputCol = 'month_day'   , splits = week_split, outputCol='weeks')

df_en = D.filter(D['lang']=='en')
ds = buck.transform(df_en)

In [65]:
## calculate emoji count 
import re
from pyspark.sql.functions import udf, array
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType,StringType, ArrayType, FloatType

def remove_stop(line):
    #token = nltk.word_tokenize(line)
    WORD_RE = re.compile(r"[\w']+")
    token = WORD_RE.findall(line)
    tokens = [w.lower() for w in token if w.lower() not in STOPWORDS]
    return tokens

def find_emoji(line):
    ## remove Korean chrachters i.e. 지, 민, 방, 탄
    hangul = re.compile(u'[a-zA-Z0-9\u3131-\u3163\uac00-\ud7a3]+')  
    line = re.sub(hangul, "", line) 
    
    ## Japanese/hiragana
    hiragana = re.compile(u'[\u3040-\u309Fー]+') # == u'[ぁ-んー]+'
    line = re.sub(hiragana, "", line)
    # Japanese/Katakana
    Katakana = re.compile(u'[\u30A0-\u30FF]+') # == u'[ァ-ヾ]+'
    line = re.sub(Katakana, "", line)
    # Japanese/
    Kanji = re.compile(u'[\u4E00-\u9FFF]+') # == u'[一-龠々]+'
    line = re.sub(Kanji, "", line)
    # find emoji
    regex = re.compile(r'([\u263a-\U0001f645])')
    token_list = regex.findall(line)
    
    return token_list

# UDF
emoji = udf(find_emoji, ArrayType(StringType()))


In [66]:
from pyspark.sql.functions import sum as _sum
from pyspark.sql import functions as F
# tokenize
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType


def length(line):
    try:
        return len(line)
    except:
        return 0

def find_truncated(line):
    if line == True:
        return 1
    else:
        return 0
    
def find_http_in_tweets(text:str):
    if 'http' in text:
        return 1
    else:
        return 0


    
# UDF 
find_counts = udf(length, IntegerType())
find_truncated = udf(find_truncated, IntegerType())
find_http = udf(find_http_in_tweets, IntegerType())
# UDF counting words
countTokens = udf(lambda words: len(words), IntegerType())
countChars = udf(lambda text: len(text), IntegerType())


# add col
T = ds.select(['weeks','song_name','artist_name','url','et_url','display_url','et_expanded_url',
               'hashtags','et_hashtags','followers_count','friends_count','listed_count',
               'followers_count','truncated','text'])

## Regextokenize by regex
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
regexTokenized = regexTokenizer.transform(T)
T = regexTokenized.withColumn("tokens", countTokens(col("words"))
                             ).withColumn("char_count",countChars(col("text")))


T = T.withColumn('with_url',find_counts(T['url'])).withColumn('with_et_url',find_counts(T['et_url'])
                                            ).withColumn('with_display_url',find_counts(T['display_url'])
                                            ).withColumn('with_expanded_url',find_counts(T['et_expanded_url'])
                                            ).withColumn('hashtag_count',find_counts(T['hashtags'])
                                            ).withColumn('et_#_count',find_counts(T['et_hashtags'])
                                            ).withColumn("with_truncate", find_truncated(T['truncated']) 
                                            ).withColumn("with_http_tw", find_http(T['text']) 
                                            ).withColumn('senti_score',sentiment_analysis_udf('text')
                                            ).withColumn('emojis_token',find_counts(emoji('text')) )




In [67]:
# aggregate
df = T.groupby(['weeks','song_name','artist_name']).agg(F.mean('with_url').alias('url_avg'),
                                                   F.mean('with_et_url').alias('et_url_avg'),
                                                   F.mean('with_display_url').alias('display_url_avg'),
                                                   F.mean('hashtag_count').alias('hashtag_avg'),
                                                   F.mean('et_#_count').alias('et_hashtag_avg'),
                                                   F.mean('followers_count').alias('avg_followers'),
                                                   F.mean('friends_count').alias('avg_friends'),
                                                   F.mean('listed_count').alias('avg_listed'),
                                                   _sum('with_truncate').alias('sum_trunc'),
                                                   _sum('with_http_tw').alias('sum_http'),
                                                   F.mean('tokens').alias('avg_tokens'), 
                                                   F.mean('char_count').alias('avg_char'),
                                                   F.mean('senti_score').alias('avg_senti'),
                                                   F.mean('emojis_token').alias('avg_emoji_count')
                                                   
                                                  )
df.show()

+-----+----------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+---------+--------+------------------+------------------+--------------------+--------------------+
|weeks|       song_name|    artist_name|             url_avg|          et_url_avg|     display_url_avg|         hashtag_avg|      et_hashtag_avg|     avg_followers|       avg_friends|        avg_listed|sum_trunc|sum_http|        avg_tokens|          avg_char|           avg_senti|     avg_emoji_count|
+-----+----------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+---------+--------+------------------+------------------+--------------------+--------------------+
|  5.0|            null|         H.E.R.|  0.5016393442622951| 0.02622950819672131|  0.50163934

In [68]:
df.toPandas().to_csv('/home/wusean/billboard_0410.csv')

# Count unique user_id

In [69]:
user = ds.groupby(['weeks','song_name','artist_name','user_id']).count().sort('count',ascending=False)
user.show(10)

+-----+---------+------------+-------------------+-----+
|weeks|song_name| artist_name|            user_id|count|
+-----+---------+------------+-------------------+-----+
| 12.0|     null|Taylor Swift| 985916593065476096| 1559|
|  2.0|     null|         BTS|1069692236982415361| 1142|
| 11.0|     null|   BLACKPINK|1303305253639475201|  798|
| 11.0|     null|   BLACKPINK|1177467659564146688|  789|
| 10.0|     null|Taylor Swift|1321361470970425350|  781|
|  9.0|     null|Taylor Swift|1289100387530596352|  761|
| 10.0|     null|Taylor Swift|1323276221791326209|  738|
|  9.0|     null|Taylor Swift|1321361470970425350|  711|
| 10.0|     null|Taylor Swift|1270887261807075329|  701|
|  1.0|     null|         BTS|1069692236982415361|  655|
+-----+---------+------------+-------------------+-----+
only showing top 10 rows



In [79]:
user.groupby(['weeks','song_name','artist_name']
          ).count().sort('count',ascending=False).show()

+-----+---------+-----------+------+
|weeks|song_name|artist_name| count|
+-----+---------+-----------+------+
|  0.0|     null|        BTS|712033|
| 12.0|     null|        BTS|710143|
|  1.0|     null|        BTS|667177|
|  6.0|     null|        BTS|659725|
|  2.0|     null|        BTS|613656|
|  4.0|     null|        BTS|579413|
|  5.0|     null|        BTS|575553|
|  9.0|     null|        BTS|564613|
|  3.0|     null|        BTS|520448|
| 11.0|     null|        BTS|520320|
| 10.0|     null|        BTS|494915|
|  8.0|     null|        BTS|478553|
|  7.0|     null|        BTS|476686|
|  6.0|     null|         CJ|471211|
|  2.0|     null|         CJ|441629|
|  5.0|     null|         CJ|435767|
| 13.0|     null|        BTS|430210|
|  7.0|     null|         CJ|425953|
|  1.0|     null|         CJ|412713|
|  4.0|     null|         CJ|387716|
+-----+---------+-----------+------+
only showing top 20 rows



In [70]:
join_id = df.join(user, (df['weeks']==user['weeks'])&
        (df['song_name']==user['song_name'])&(df['artist_name']==user['artist_name']), 'left')

In [71]:
join_id.show()

+-----+---------+---------------+-------+--------------------+---------------+-----------+-------------------+------------------+-----------+----------+---------+--------+------------------+------------------+------------------+--------------------+-----+---------+---------------+-------------------+-----+
|weeks|song_name|    artist_name|url_avg|          et_url_avg|display_url_avg|hashtag_avg|     et_hashtag_avg|     avg_followers|avg_friends|avg_listed|sum_trunc|sum_http|        avg_tokens|          avg_char|         avg_senti|     avg_emoji_count|weeks|song_name|    artist_name|            user_id|count|
+-----+---------+---------------+-------+--------------------+---------------+-----------+-------------------+------------------+-----------+----------+---------+--------+------------------+------------------+------------------+--------------------+-----+---------+---------------+-------------------+-----+
|  0.0|     Mood|Michael Jackson|    0.0|                 0.0|            0.

In [72]:
join_id.count()

1403097

In [73]:
df.count()

13324

In [74]:
user.count()

22103681

In [27]:
def union_df(df_1,df_2):
    select_1 = df_1.select("created_at", "id", "truncated", "lang",
                       col("user.id").alias("user_id"), "user.followers_count", "user.friends_count", "user.listed_count",
                       "text", col("entities.hashtags.text").alias('hashtags'), 
                       "entities.urls.display_url", "entities.urls.expanded_url", "entities.urls.url", 
                       col("entities.user_mentions.screen_name").alias("mentions_screen_name"),
                       col("extended_tweet.full_text").alias("et_full_text"), 
                       col("extended_tweet.entities.hashtags.text").alias("et_hashtags"), 
                       col("extended_tweet.entities.urls.display_url").alias("et_display_url"), 
                       col("extended_tweet.entities.urls.expanded_url").alias("et_expanded_url"), 
                       col("extended_tweet.entities.urls.url").alias("et_url"), 
                       col("extended_tweet.entities.user_mentions.screen_name").alias("et_mentions_screen_name"),
                       "place.country", "place.country_code", "place.name", "place.place_type",
                       "favorite_count", "reply_count", "retweet_count", "quote_count",'place')
    select_2 = df_2.select("created_at", "id", "truncated", "lang",
                       col("user.id").alias("user_id"), "user.followers_count", "user.friends_count", "user.listed_count",
                       "text", col("entities.hashtags.text").alias('hashtags'), 
                       "entities.urls.display_url", "entities.urls.expanded_url", "entities.urls.url", 
                       col("entities.user_mentions.screen_name").alias("mentions_screen_name"),
                       col("extended_tweet.full_text").alias("et_full_text"), 
                       col("extended_tweet.entities.hashtags.text").alias("et_hashtags"), 
                       col("extended_tweet.entities.urls.display_url").alias("et_display_url"), 
                       col("extended_tweet.entities.urls.expanded_url").alias("et_expanded_url"), 
                       col("extended_tweet.entities.urls.url").alias("et_url"), 
                       col("extended_tweet.entities.user_mentions.screen_name").alias("et_mentions_screen_name"),
                       "place.country", "place.country_code", "place.name", "place.place_type",
                       "favorite_count", "reply_count", "retweet_count", "quote_count",'place')

    result = select_1.union(select_2)
    return result


song_name = {
    'WAP','Laugh Now Cry Later','Rockstar','Blinding Lights','Whats Poppin','7 Summers','Watermelon Sugar',
    'Roses','Savage Love (Laxed - Siren Beat)','I Hope','Go Crazy','Before You Go','Break My Heart',
    'Midnight Sky','Adore You','Circles','For The Night','Come & Go','Mood Swings','Savage','Popstar',
    'Rags2Riches','Death Bed','Blueberry Faygo','Die From A Broken Heart','Mood',"Don't Start Now",
    'The Woo','The Bones','One Margarita','Life Is Good','Wishing Well','Cardigan','Be Like That',
    'Smile',"Chasin' You",'Intentions','We Paid','Cool Again','Party Girl','Tap In','Girls In The Hood',
    'Say So','Rain On Me','Emotionally Scarred','Got What I Got','The Box',"If The World Was Ending",
    "Nobody's Love","Why We Drink","The Bigger Picture","One Of Them Girls","Lovin' On You","My Future",
    "I Love My Country","Like That","Be A Light","Kacey Talk","Said Sum","Greece","Heather",
    "More Than My Hometown","Done","Bluebird","I Should Probably Go To Bed","Lemonade","The 1",
    "Un Dia (One Day)","Bang!","God Whispered Your Name","24","Toosie Slide","Breaking Me","One Beer",
    "Girl Of My Dreams","Kings & Queens","Pretty Heart","I Called Mama","Dollaz On My Head",
    "One Night Standards","ily","Exile","Martin & Gina","Happy Anywhere",
    "Got It On Me","Some Girls","Mamacita","After Party","3 Headed Goat","Stuck With U",
    "Conversations","Caramelo","Hate The Other Side","La Jeepeta","B*tch From da Souf",
    "Righteous","Past Life","Something Special","21","Lonely If You Are","Dynamite","Need It",
    "The Plan","Hawai","My Ex's Best Friend","All In","You Broke Me First","Spicy","Ice Cream",
    "Over Now","Starting Over","Blastoff","Expensive","Lets Link","Ain't Always The Cowboy",
    "Hit Different","The Voice","Do It","Relacion","Wolves","Lithuania","Deep Reverence",
    "Love You Like I Used To","Body Language","Everywhere But On","Why Would I Stop?",
    "My Window","OK Not To Be OK","Drug Addiction","Gone Too Soon","Dead Trollz",
    "Cross Roads","The Last Backyard...","What You Know Bout Love","Right Foot Creep","You Got It",
    "Dirty Stick","Holy","Diamonds","One Too Many","Without You","Dolly","When You Down","Me Gusta",
    "Wet. (She Got That...)","Whole Lotta Choppas","Blind","B.S.","Franchise","Forget Me Too","Epidemic",
    "Bloody Valentine","U 2 Luv","Better","Drunk Face","Big, Big Plans","Ay, Dios Mio!","Good Time",
    "Put Your Records On","Money Over Fallouts","Moral Of The Story","Runnin","Mr. Right Now","Wonder",
    "Glock In My Lap","Dreams","Rich N*gga Sh*t","Don't Stop","Slidin","Many Men","Fallin'",
    "Outta Time","My Dawg","Brand New Draco","Lovesick Girls","Snitches & Rats","No Opp Left Behind",
    "Levitating","Steppin On N*ggas","RIP Luv","Always Forever","Years Go By","Said N Done",
    "Baby, I'm Jealous","Better Together","Hole In The Bottle","Sofia","Lonely","You're Mines Still",
    "Hate The Way","Canceled","Happy Does","Throat Baby (Go Baby)","Pardon","Champagne Night",
    "Positions","Forever After All","Tyler Herro","Spicy","The Other Guy","Damage","Golden",
    "Back To The Streets","So Done","Cold As You","Practice","La Toxica","Head & Heart",
    "Wine, Beer, Whiskey","34+35","Dakiti","Motive","Off The Table","pov","Just Like Magic",
    "Shut Up","Thriller","Nasty","Safety Net","Six Thirty","My Hair","Obvious","West Side",
    "Love Language","Weeeeee","Stay Down","Thick","Take You Dancing","What That Speed Bout!?",
    "Took Her To The O","Don't Need Friends","The Code","Tragic","All These N**gas","Crazy Story 2.0",
    "Always Do","Young Wheezy","Beers And Sunshine","Therefore I Am","Whoopty","F*ck You, Goodbye"
    }

artists = {'StaySolidRocky', 'The Kid LAROI Featuring YoungBoy Never Brok Again & Internet Money', 
 'Gunna Featuring Young Thug', 'YoungBoy Never Broke Again', 'J Balvin, Dua Lipa, Bad Bunny & Tainy', 
           'Machine Gun Kelly X blackbear', 'Matt Stell', 'Ne-Yo & Jeremih', 'Polo G', 
           'DaBaby Featuring Roddy Ricch', 'Ariana Grande & Justin Bieber', 'WhoHeem', 
           'Florida Georgia Line', 'BTS', 'Travis Scott', 'Billie Eilish', 'Big Sean Featuring Post Malone', 
           'Kelsea Ballerini', 'Justin Bieber Featuring Quavo', 'Luke Combs', 'The Weeknd', 'Mulatto', 
           'Tim McGraw', 'NAV With Gunna', 'Lil Baby', 'Chase Rice', 'Big Sean Featuring Ty Dolla $ign & Jhene Aiko', 
           'Cardi B Featuring Megan Thee Stallion', 'Internet Money & Gunna Featuring Don Toliver & NAV', 
           'Jameson Rodgers', 'Maddie & Tae', 'Luke Combs Featuring Amanda Shires', 'Chris Stapleton', 
           'Sada Baby Featuring Nicki Minaj', 'Ariana Grande Featuring The Weeknd', 'Lil Durk', 
           'Travis Scott Featuring Young Thug & M.I.A.', 'Anitta Featuring Cardi B & Myke Towers', 
           'YoungBoy Never Broke Again Featuring Lil Wayne', 'Bebe Rexha Featuring Doja Cat', 
           'Pop Smoke Featuring 50 Cent & Roddy Ricch', 'Blake Shelton Featuring Gwen Stefani', 
           'SZA Featuring Ty Dolla $ign', 'Jhene Aiko Featuring H.E.R.', '21 Savage & Metro Boomin', 
           'Mike WiLL Made-It, Nicki Minaj & YoungBoy Never Broke Again', 'Ty Dolla $ign Featuring Post Malone', 
           'Joel Corry X MNEK', '21 Savage & Metro Boomin Featuring Young Nudy', 'Megan Thee Stallion Featuring Young Thug',
           'Ariana Grande Featuring Doja Cat', 'T.I. Featuring Lil Baby', 'Moneybagg Yo', 'Doja Cat Featuring Nicki Minaj',
           'Lil Mosey', 'Bryson Tiller Featuring Drake', 'Chris Lane', 'Justin Moore', 'Jason Aldean', 'H.E.R.', 
           'Kane Brown With Swae Lee & Khalid', 'Luke Bryan', 'Harry Styles', 'Dan + Shay', 'Fleetwood Mac', 
           'Saweetie Featuring Jhene Aiko', 'Ritt Momney', 'King Von Featuring Lil Durk', 'Conan Gray', 
           'surf mesa Featuring Emilee', 'Pop Smoke', 'Powfu Featuring beabadoobee', 'BRS Kash', 'Juice WRLD', 
           'Parker McCollum', 'Darius Rucker', 'Morgan Wallen', 'AJR', 'Bryson Tiller', 
           'Justin Bieber Featuring Chance The Rapper', 'YFN Lucci', "Why Don't We", 'Shawn Mendes', 
           'Big Sean Featuring Travis Scott', 'Ariana Grande', 'DJ Chose Featuring BeatKing', 
           'Maluma & The Weeknd', 'Jason Derulo', 'Jawsh 685 x Jason Derulo', 'Money Man Featuring Lil Baby', 
           'Thomas Rhett Featuring Reba McEntire, Hillary Scott, Chris Tomlin & Keith Urban', 
           'Internet Money Featuring Juice WRLD & Trippie Redd', 'Larray', 'Miley Cyrus', 
           'Bad Bunny & Jhay Cortez', 'Calvin Harris X The Weeknd', 'Taylor Swift', 
           'Taylor Swift Featuring Bon Iver', 'Ty Dolla $ign Featuring Nicki Minaj', 
           'Lady Gaga & Ariana Grande', 'Trevor Daniel x Selena Gomez', 'DaBaby Featuring Young Thug', 
           'JP Saxe Featuring Julia Michaels', 'Sech, Daddy Yankee & J Balvin Featuring ROSALIA & Farruko', 
           'Niko Moon', 'Post Malone', 'Kenny Chesney', 'Dua Lipa', 'Andrew Jannakos', 'VEDO', 
           '21 Savage & Metro Boomin Featuring Drake', 'NAV Featuring Lil Baby', 
           'Gabby Barrett Featuring Charlie Puth', 'Michael Jackson', 'Karol G', 
           'BLACKPINK X Selena Gomez', 'Lil Baby & 42 Dugg', 'Kane Brown', 'Rod Wave', 'Farruko', 
           'Chloe X Halle', 'CJ', 'Drake Featuring Lil Durk', 'Chris Brown & Young Thug', 'Don Toliver', 
           'Tory Lanez', 'Big Sean Featuring Nipsey Hussle', 'Dua Lipa Featuring DaBaby', 
           'Rod Wave Featuring ATR Son Son', 'HARDY Featuring Lauren Alaina & Devin Dawson', 
           'The Kid LAROI Featuring Machine Gun Kelly', 'Maroon 5', 'Lee Brice', 'King Von Featuring Polo G', 
           'Maren Morris', 'Lil Durk, 6LACK & Young Thug', 'Keith Urban Duet With P!nk', 'Keith Urban', 
           'Migos Featuring YoungBoy Never Broke Again', 'Machine Gun Kelly & Halsey', 
           'G-Eazy Featuring blackbear', 'Saweetie', 'Future Featuring Drake', 'Clairo', 
           '21 Savage & Metro Boomin Featuring Young Thug', 'Ariana Grande Featuring Ty Dolla $ign', 
           'Big Sean', 'King Von', 'Topic & A7S', 'Juice WRLD x Marshmello', 
           'Juice WRLD & Marshmello Featuring Polo G & The Kid LAROI', 'Ashe Featuring Niall Horan', 
           'Ava Max', 'Megan Thee Stallion', 'SAINt JHN', 'Machine Gun Kelly', 'Yung Bleu Featuring Drake', 
           'Russell Dickerson', 'Zayn', 'Juice WRLD & The Weeknd', 'Ozuna x Karol G x Myke Towers', 
           'Ashley McBryde', 'Lady A', 'Jon Pardi', 'Justin Bieber & benny blanco', 
           'Lil Tecca & Polo G Featuring Lil Durk', 'Miranda Lambert', 'Drake', 'The Kid LAROI', 
           'Little Big Town', 'DJ Khaled Featuring Drake', 'Lil Durk Featuring Lil Baby & Polo G', 
           'Marshmello & Demi Lovato', 'Lil Tecca & Lil Uzi Vert', 'BLACKPINK', 'Jack Harlow', 'Trippie Redd', 
           'Pop Smoke Featuring Lil Baby & DaBaby', 'Lewis Capaldi', '24kGoldn Featuring iann dior', 
           'Jack Harlow Featuring DaBaby, Tory Lanez & Lil Wayne', 'Doja Cat Featuring Gucci Mane', 
           'Black Eyed Peas, Ozuna + J.Rey Soul', 'Pop Smoke Featuring Lil Tjay', 'Roddy Ricch', 'Tate McRae', 
           'Nio Garcia x Anuel AA x Myke Towers x Brray x Juanka', 'Sam Smith', 'DaBaby', 
           'Nas Featuring Fivio Foreign & A$AP Ferg', 'Chris Janson'}

def find_songs(line):
    for name in song_name:
        if name.lower() in line.lower() and ('song' in line.lower() or 'music' in line.lower()):
            return name

def find_artist(line):
    for artist in artists:
        if artist.lower() in line.lower():
            return artist

In [55]:
# extra
wdir = "/data/twitter/decahose/2020"
df0829_1 = sqlContext.read.json(os.path.join(wdir,'decahose.2020-08-29.p1.bz2'))
df0829_2 = sqlContext.read.json(os.path.join(wdir,'decahose.2020-08-29.p2.bz2'))
df0830_1 = sqlContext.read.json(os.path.join(wdir,'decahose.2020-08-30.p1.bz2'))
df0830_2 = sqlContext.read.json(os.path.join(wdir,'decahose.2020-08-30.p2.bz2'))
df0831_1 = sqlContext.read.json(os.path.join(wdir,'decahose.2020-08-31.p1.bz2'))
df0831_2 = sqlContext.read.json(os.path.join(wdir,'decahose.2020-08-31.p2.bz2'))

df_0829 = union_df(df0829_1,df0829_2)
df_0830 = union_df(df0830_1,df0830_2)
df_0831 = union_df(df0831_1,df0831_2)

DF_0 = df_0829.union(df_0830)
DF_0 = DF_0.union(df_0831)

## filter song related
# song
song_process = udf(find_songs, StringType())
DF_0 = DF_0.withColumn('song_name',song_process('text'))

## artist
artist_process = udf(find_artist, StringType())
DF_0 = DF_0.withColumn('artist_name',artist_process('text'))

## SQL
DF_0.createOrReplaceTempView("df")
result = spark.sql("SELECT * FROM df WHERE ( song_name IS NOT NULL) OR (artist_name IS NOT NULL)")


folder = 'billboard_0829_to_0831'
result.write.mode('overwrite').parquet(folder)

In [54]:
# delete parket 'oct'
import shutil
shutil.rmtree('/hadoop-fuse/user/wusean/billboard_0829_to_0831')