In [None]:
import pandas as pd
import findspark
findspark.init()
from typing import Iterator
import numpy as np
from collections import Counter
from string import digits 
from enum import Enum
from typing import List, Optional
import re
import emojis
import emoji
import emot
import json
from redditscore.tokenizer import CrazyTokenizer
import preprocessor as tp
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.stem import WordNetLemmatizer 
from nltk.tokenize import TweetTokenizer, WordPunctTokenizer, MWETokenizer, word_tokenize
from gensim.parsing.preprocessing import remove_stopwords, preprocess_string, strip_tags, strip_punctuation, strip_multiple_whitespaces, strip_numeric, strip_short, split_alphanum
import os
import pyspark
from pyspark.sql import SparkSession, Row, Window
from pyspark.sql.functions import udf, col, array, when, size, spark_partition_id, pandas_udf, PandasUDFType
import pyspark.sql.functions as func
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType, FloatType, DoubleType, LongType, StructType, StructField, IntegerType, TimestampType, BooleanType, DecimalType
from urllib3.exceptions import ProtocolError
import sqlalchemy
from sqlalchemy.dialects import postgresql 
import psutil
from flashtext import KeywordProcessor
from itertools import chain
from textblob import TextBlob
from decimal import Decimal
import functools
import operator
import string 
import time
import urlexpander
from bs4 import BeautifulSoup
import base64
import requests 
import demoji

lmtzr = WordNetLemmatizer()
tweet_tokenizer = TweetTokenizer()

emot_obj = emot.core.emot() 

tp.set_options(tp.OPT.URL, tp.OPT.EMOJI, tp.OPT.RESERVED, tp.OPT.MENTION)
cleanTweetFilter = [lambda x: x.lower(), strip_tags, strip_multiple_whitespaces, remove_stopwords, split_alphanum, strip_numeric, strip_short]
cleanTweetAspFilter = [lambda x: x, strip_tags, strip_multiple_whitespaces, split_alphanum, remove_stopwords, strip_numeric, strip_short]
tweetFilter = CrazyTokenizer(keepcaps=False, hashtags='HASHTAG', remove_punct=True, remove_nonunicode=True, ignore_stopwords=True, remove_breaks=True, urls='URL', twitter_handles='HANDLE', normalize=2)
aspFilter = CrazyTokenizer(keepcaps=True, hashtags='split', remove_punct=True, remove_nonunicode=True, ignore_stopwords=True, remove_breaks=True, urls=False, normalize=2)
analyzer = SentimentIntensityAnalyzer()
punct = str.maketrans(dict.fromkeys(string.punctuation.replace("'", "")))
punct2 = str.maketrans(dict.fromkeys(string.punctuation.replace('!', '').replace('?', '').replace('_', '')))
characters = str.maketrans(dict.fromkeys(string.punctuation))
tweetTokenizer = TweetTokenizer()
punctTokenizer = WordPunctTokenizer()

In [None]:
with open('dictionaries/aspects.json') as aspectsFile:
    aspects = json.load(aspectsFile)
    aspectsKeys = list(aspects.keys())
    all_aspects = set(list(aspects) + list(chain.from_iterable(aspects.values())))
        
with open('dictionaries/pv.json') as pvFile:
    pvDictionary = json.load(pvFile)
    
with open('dictionaries/humour.json') as humourFile:
    humourDictionary = json.load(humourFile)
    
with open('dictionaries/sentiment.json') as sentimentFile:
    sentimentDictionary = json.load(sentimentFile)

aspects_processor = KeywordProcessor()
aspects_processor.add_keywords_from_dict(aspects)
aspects_processor.add_non_word_boundary('-')

pv_processor = KeywordProcessor()
humour_processor = KeywordProcessor()
sentiment_processor = KeywordProcessor()

pv_processor.add_keywords_from_dict(pvDictionary)
humour_processor.add_keywords_from_dict(humourDictionary)
sentiment_processor.add_keywords_from_dict(sentimentDictionary)

In [None]:
spark = SparkSession.builder \
    .appName("SoDa-TAP") \
    .master("local[*]") \
    .config("spark.local.dir", "/home/jovyan/sodatap") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.streaming.kafka.consumer.cache.enabled', 'false') \
    .config('spark.kryoserializer.buffer.max', '2000M') \
    .config('spark.driver.maxResultSize', '1G') \
    .getOrCreate()

In [None]:
sc = spark.sparkContext

In [None]:
def fixText(text):
    soup = BeautifulSoup(text, 'lxml')
    text = soup.text
    return text

def tokenize_tweet(text):
    tweetTokens = tweet_tokenizer.tokenize(text)
    return tweetTokens

def clean_tweet_text(text):
    if text is not None:
        text = fixText(text)
        cleanText = ' '.join(re.split('_+', text))
        cleanText = tweetFilter.tokenize(text)
        cleanText = ' '.join([word for word in cleanText if word != "HASHTAG" and word != "HANDLE" and word != "URL"])
        cleanText = ' '.join(preprocess_string(cleanText, cleanTweetFilter))
        return cleanText
    
def get_wordnet_pos(treebank_tag):
    """
    return WORDNET POS compliance to WORDENT lemmatization (a,n,r,v) 
        """
    if treebank_tag.startswith('J'):
            return 'a'
    elif treebank_tag.startswith('V'):
            return 'v'
    elif treebank_tag.startswith('N'):
            return 'n'
    elif treebank_tag.startswith('R'):
            return 'r'
    else:
    # As default pos in lemmatization is Noun
        return 'n'
    
def lemmatize_tweet_text(text):
    if text is not None:
        list_pos = 0
        lemmatizedText = ''
        text = text.split()
        tagged_words = nltk.pos_tag(text)
        for word in tagged_words:
            lemma = lmtzr.lemmatize(word[0], get_wordnet_pos(word[1]))
            if list_pos == 0:
                lemmatizedText = lemma
            else:
                lemmatizedText = lemmatizedText + ' ' + lemma
            list_pos += 1
        return lemmatizedText

def clean_asp_text(text):
    if text is not None:
        text = fixText(text)
        cleanText = ' '.join(re.split('_+', text))
        emoticons = emot_obj.emoticons(cleanText)
        for i in range(0, len(emoticons['mean'])):
            emoticons['mean'][i] = emoticons['mean'][i].replace(' ','_').replace(',', '').lower()
        emoticonsDict = dict(zip(emoticons['value'], emoticons['mean']))
        cleanText = cleanText.split()
        for i in range(0, len(cleanText)):
            if cleanText[i] in emoticonsDict:
                cleanText[i] = emoticonsDict[cleanText[i]]
        cleanText = ' '.join(cleanText)
        cleanText = tp.clean(cleanText)
        cleanText = preprocess_string(cleanText, cleanTweetAspFilter)
        signs = {}
        counter = 0
        for word in cleanText:
            if word.endswith('!') or word.endswith('?'):
                index = cleanText.index(word)
                signs["withsign"+str(counter)] = word.lstrip(string.punctuation)
                cleanText[index] = "withsign"+str(counter)
                counter+=1
        cleanText = ' '.join(cleanText)
        cleanText = cleanText.translate(characters)
        cleanText = aspFilter.tokenize(cleanText)
        signsKeys = signs.keys()
        for word in cleanText:
            if word in signsKeys:
                index = cleanText.index(word)
                cleanText[index] = signs[word]
        emoticonsDictKeys = emoticonsDict.keys()
        for i in range(0, len(cleanText)):
            if cleanText[i] in emoticonsDict.values():
                cleanText[i] = list(emoticonsDict.keys())[list(emoticonsDict.values()).index(cleanText[i])]
        cleanText = ' '.join(cleanText)
        return cleanText

def get_main_aspect(found_a):
    dictionaryProcessor = aspects_processor
    if found_a in dictionaryProcessor:
        return dictionaryProcessor[found_a]
    elif found_a in aspectsKeys:
        return found_a
    return None

def get_aspects(clean_asp):
    if clean_asp is not None:
        aspect_sentiment = []
        compoundWords = []
        matches = aspects_processor.extract_keywords(clean_asp, span_info=True)
        repeatedMatches = {}
        for key, start, end in matches:
            word = clean_asp[start:end].split()
            if len(word) >= 2:
                compoundWords.append(word)
        compoundAsOneTokenizer = MWETokenizer(compoundWords, separator=' ')
        doc = compoundAsOneTokenizer.tokenize(clean_asp.split())
        #print(clean_asp)
        #print(doc)
        #print(matches)
        for key, start, end in matches:
            span = clean_asp[start:end]
            for i in range(0, len(doc)):                
                if doc[i].endswith('!') or doc[i].endswith('?'):
                    doc[i] = doc[i][:-1]
            nouns = TextBlob(span).noun_phrases
            if span not in repeatedMatches:
                indexes = [n for n,x in enumerate(doc) if x==span]
                repeatedMatches[span] = indexes
        for key, value in repeatedMatches.items():
            span = key
            for index in value:
                spanIndex = index
                # get lefts and rights for phrase if dep parse only includes aspect
                if len(nouns) <= len(span.split()):
                    phrase = ' '.join([t for t in doc[max(0, spanIndex-3):(spanIndex+1)+3]])
                else:
                    phrase = ' '.join([t for t in nouns])
                # if we decide to use TextBlob ... polarity [-1.0, 1.0] subjectivity [0.0, 1.0]
                # polarity, subjectivity = TextBlob(tweet).sentiment
                # aspect_dict = {'aspect': token.text, 'phrase': phrase, 'polarity': polarity, 'subjectivity': subjectivity}
                # if we keep using Vader ...
                main_aspect = get_main_aspect(span.lower())
                if main_aspect != None:
                    aspect_dict = {'main_aspect': main_aspect, 'found_aspect': span, 'phrase': phrase}
                    aspect_dict.update(analyzer.polarity_scores(phrase))
                    aspect_sentiment.append(json.dumps(aspect_dict))
        return aspect_sentiment

def find_hashtags(text):
    if text != None:
        hashtags = re.findall(r"#(\w+)", text)
        return hashtags 

def find_handles(text):
    if text is not None:
        handles = re.findall(r"@(\w+)", text)
        return handles 

def find_urls_in_text(text):
    if text is not None:
        regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
        url = re.findall(regex, text)   
        urls = [x[0] for x in url]
        return urls
    
def find_emojis(text):
    if text is not None:
        emojis = demoji.findall_list(text, desc=False)
        return emojis

def get_sentiment(text):
    if text is not None:
        sentiment = analyzer.polarity_scores(text)['compound']
        return sentiment

def text_freq(text):
    if text is not None:
        textFreq = {}
        if len(text) > 0:
            textFreq = dict(Counter(text.split(' ')))
            #print(textFreq)
            return json.dumps(textFreq)
        return json.dumps({})

def keyword_search(text, dictType):
    if text is not None and dictType is not None:
        matcher = {}
        if dictType == "pv":
            dictionaryProcessor = pv_processor
        elif dictType == "humour":
            dictionaryProcessor = humour_processor
        elif dictType == "sentiment":
            dictionaryProcessor = sentiment_processor

        words = json.loads(text_freq(text))
        for key in words:
            if key in dictionaryProcessor:
                if dictionaryProcessor[key] not in matcher:
                    matcher[dictionaryProcessor[key]] = []
                matcher[dictionaryProcessor[key]].append([key, words[key]])
            else:
                continue
        return json.dumps(matcher)

def count_word_freq(wordsList):
    if wordsList is not None:
        counter = 0
        for key in wordsList:
            counter += sum(wordsList[key]['count'])
        return counter

def count_elements(elements):
    if elements is not None:
        return len(elements)
    
def calculate_engagement(retweet_count, like_count, quote_count, reply_count, user_followers_count):    
    if retweet_count != None and like_count != None and quote_count != None and reply_count != None and user_followers_count != None:
        engagement = 0
        if user_followers_count > 0:
            engagement = ((retweet_count + like_count + quote_count + reply_count) / user_followers_count) * 100
        return engagement

def calculate_extendedReach(retweet_count, user_tweet_count):
    if retweet_count is not None and user_tweet_count is not None:
        extendedReach = 0
        if user_tweet_count > 0:
            extendedReach = (retweet_count / user_tweet_count) * 100
        return extendedReach

def calculate_impressions(user_followers_count, user_tweet_count):
    if user_followers_count is not None and user_tweet_count is not None:
        impressions = multiply(user_followers_count, user_tweet_count)
        return impressions
    
def create_media_table_query(query):
    global dbName
    headers = {'Content-Type': 'application/json'}
    dbUrl = 'http://{}:4200/_sql'.format(dbName)
    requests.post(dbUrl, headers=headers, data=query)

def create_photos_table():
    createCommand = '{"stmt": "create blob table photos clustered into 3 shards with (number_of_replicas=0)"}'
    create_media_table_query(createCommand)

def save_photos_hashes(urls):
    allPhotosHashes = []
    if len(urls) > 0:
        for url in urls:
            try:
                photo = base64.b64encode(requests.get(url).content)
                photoToStr = photo.decode('ascii')
                photoHash = hashlib.sha1(photoToStr.encode("utf-8")).hexdigest()
                dbUrl = 'http://{}:4200/_blobs/photos/'.format('cratedb')+photoHash
                requests.put(dbUrl, data=photoToStr) #put blob in table
                allPhotosHashes.append(photoHash)
            except:
                continue
    return allPhotosHashes

#taken from https://www.geeksforgeeks.org/sum-two-large-numbers/
def multiply(num1, num2):
    num1 = str(num1)
    num2 = str(num2)

    len1 = len(num1)
    len2 = len(num2)
    if len1 == 0 or len2 == 0:
        return "0"

    # will keep the result number in vector
    # in reverse order
    result = [0] * (len1 + len2)
    
    # Below two indexes are used to
    # find positions in result.
    i_n1 = 0
    i_n2 = 0

    # Go from right to left in num1
    for i in range(len1 - 1, -1, -1):
        carry = 0
        n1 = ord(num1[i]) - 48

        # To shift position to left after every
        # multiplication of a digit in num2
        i_n2 = 0

        # Go from right to left in num2
        for j in range(len2 - 1, -1, -1):
            
            # Take current digit of second number
            n2 = ord(num2[j]) - 48
        
            # Multiply with current digit of first number
            # and add result to previously stored result
            # at current position.
            summ = n1 * n2 + result[i_n1 + i_n2] + carry

            # Carry for next iteration
            carry = summ // 10

            # Store result
            result[i_n1 + i_n2] = summ % 10

            i_n2 += 1

            # store carry in next cell
        if (carry > 0):
            result[i_n1 + i_n2] += carry

            # To shift position to left after every
            # multiplication of a digit in num1.
        i_n1 += 1
        
        # print(result)

    # ignore '0's from the right
    i = len(result) - 1
    while (i >= 0 and result[i] == 0):
        i -= 1

    # If all were '0's - means either both or
    # one of num1 or num2 were '0'
    if (i == -1):
        return "0"

    # generate the result string
    s = ""
    while (i >= 0):
        s += chr(result[i] + 48)
        i -= 1

    return s

In [None]:
@pandas_udf(StringType())
def apply_clean_tweet_text(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['clean_text'] = text_df['text'].apply(clean_tweet_text)
    return text_df['clean_text']

@pandas_udf(StringType())
def apply_lemmatize_tweet_text(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['lemmatized_text'] = text_df['text'].apply(lemmatize_tweet_text)
    return text_df['lemmatized_text']

@pandas_udf(StringType())
def apply_clean_asp_text(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['clean_asp_text'] = text_df['text'].apply(clean_asp_text)
    return text_df['clean_asp_text']

@pandas_udf(ArrayType(StringType()))
def apply_get_aspects(clean_asp: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'clean_asp': clean_asp})
    text_df['aspect_sentiment'] = text_df['clean_asp'].apply(get_aspects)
    return text_df['aspect_sentiment']

@pandas_udf(FloatType())
def apply_get_sentiment(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['text_sentiment'] = text_df['text'].apply(get_sentiment)
    text_df['text_sentiment'] = text_df['text_sentiment'].round(3)
    return text_df['text_sentiment']

@pandas_udf(StringType())
def apply_text_freq(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['text_freq'] = text_df['text'].apply(text_freq)
    return text_df['text_freq']

@pandas_udf(StringType())
def apply_dictionary_search(text: pd.Series, dictType: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': text, 'dictType': dictType})
    text_df['dict_search'] = text_df.apply(lambda x: keyword_search(x.text, x.dictType), axis=1)
    return text_df['dict_search']

@pandas_udf(ArrayType(StringType()))
def apply_find_hashtags(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['hashtags'] = text_df['text'].apply(find_hashtags)
    return text_df['hashtags']

@pandas_udf(ArrayType(StringType()))
def apply_find_handles(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['handles'] = text_df['text'].apply(find_handles)
    return text_df['handles']

@pandas_udf(ArrayType(StringType()))
def apply_find_emojis(vector: pd.Series) -> pd.Series:
    emojis = vector.apply(find_emojis)
    return emojis

@pandas_udf(ArrayType(StringType()))
def apply_find_urls(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['urls'] = text_df['text'].apply(find_urls)
    return text_df['urls']

@pandas_udf(ArrayType(StringType()))
def apply_expand_urls(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['urls'] = text_df['text'].apply(expand_urls)
    return text_df['urls']

@pandas_udf(ArrayType(StringType()))
def apply_unshorten_urls(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['unshorten_urls'] = text_df['text'].apply(unshorten_urls)
    return text_df['unshorten_urls']

@pandas_udf(ArrayType(StringType()))
def apply_unshorten_urls(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['unshorten_urls'] = text_df['text'].apply(unshorten_urls)
    return text_df['unshorten_urls']

@pandas_udf(DoubleType())
def apply_calculate_engagement(retweet_count: pd.Series, like_count: pd.Series, quote_count: pd.Series, reply_count: pd.Series, user_followers_count: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'retweet_count': retweet_count, 'like_count': like_count, 'quote_count': quote_count, 'reply_count': reply_count, 'user_followers_count': user_followers_count})
    text_df['engagement'] = text_df.apply(lambda x: calculate_engagement(x.retweet_count, x.like_count, x.quote_count, x.reply_count, x.user_followers_count), axis=1)
    text_df['engagement'] = text_df['engagement'].round(3)
    return text_df['engagement']
    
@pandas_udf(DoubleType())
def apply_calculate_extendedReach(retweet_count: pd.Series, user_tweet_count: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'retweet_count': retweet_count, 'user_tweet_count': user_tweet_count})
    text_df['extendedReach'] = text_df.apply(lambda x: calculate_extendedReach(x.retweet_count, x.user_tweet_count), axis=1)
    text_df['extendedReach'] = text_df['extendedReach'].round(3)
    return text_df['extendedReach']

@pandas_udf(StringType())
def apply_calculate_impressions(user_followers_count: pd.Series, user_tweet_count: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'user_followers_count': user_followers_count, 'user_tweet_count': user_tweet_count})
    text_df['impressions'] = text_df.apply(lambda x: calculate_impressions(x.user_followers_count, x.user_tweet_count), axis=1)
    return text_df['impressions']

@pandas_udf(ArrayType(StringType()))
def apply_save_photos_hashes(vector: pd.Series) -> pd.Series:
    photos_df = pd.DataFrame({'text': vector})
    photos_df['photos_hashes'] = photos_df['text'].apply(save_photos_hashes)
    return photos_df['photos_hashes']

@pandas_udf(StringType())
def apply_save_data(vector: pd.Series) -> pd.Series:
    text_df = pd.DataFrame({'text': vector})
    text_df['sent'] = text_df['text'].apply(save_data)
    return text_df['sent']

In [None]:
def write_sink(df, epoch_id):
    socialData_df = df.toPandas()
    socialData_df.to_sql(topic+"_processed", 'crate://cratedb:4200', if_exists='append', index=False, chunksize=10000, dtype={'text_hashtags':postgresql.ARRAY(sqlalchemy.types.String), 'text_handles':postgresql.ARRAY(sqlalchemy.types.String), 'text_emojis':postgresql.ARRAY(sqlalchemy.types.String), 'urls':postgresql.ARRAY(sqlalchemy.types.String), 'images':postgresql.ARRAY(sqlalchemy.types.String), 'aspect_sentiment':postgresql.ARRAY(sqlalchemy.types.String)})

In [None]:
def infer_topic_schema_json(topic):
    df_json = (spark.read.format("kafka") \
                .option("kafka.bootstrap.servers", "broker:29092") \
                .option("subscribe", topic) \
                .option("startingOffsets", "earliest") \
                .option("maxOffsetsPerTrigger", "1") \
                .option("failOnDataLoss", "false") \
                .load() \
                .withColumn("value", F.expr("string(value)")) \
                .select("value"))
    
    df_read = spark.read.json(df_json.rdd.map(lambda x: x.value), multiLine=True)
    return df_read.schema.json()

In [None]:
infer_schema = False
schema_location = "schemas/tweets.json"

if not infer_schema: 
    try:
        with open(schema_location, 'r') as f:
            topic_schema_txt = json.load(f)
    except:
        infer_schema = True
        pass

if infer_schema:
    topic_schema_txt = infer_topic_schema_json(topic)
    with open(schema_location, 'w') as f:
        json.dump(topic_schema_txt, f)
        
topic_schema = StructType.fromJson(json.loads(topic_schema_txt))

In [None]:
csv_tweets = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "broker:29092") \
            .option("subscribe", topic) \
            .option("maxOffsetsPerTrigger", "100000") \
            .option("startingOffsets", "earliest") \
            .option("failOnDataLoss", "false") \
            .load() \
            .withColumn("value", F.expr("string(value)")) \
            .select("value") \
            .withColumn('value', F.from_json(col("value"), topic_schema)) \
            .select("value.payload.*")


csv_tweets = csv_tweets.repartition(64)

#CSV header
'''
id,
tweet,
created_at,
lang,
like_count,
quote_count,
quoted,
replied,
threaded,
reply_count,
retweet_count,
source,
tweet_type,
author_id,
author_name,
author_created_at,
author_bio,
author_followers_count,
author_following_count,
author_listed_count,
author_profile_image,
author_tweet_count,
author_username,
author_verified
'''

csv_tweets = csv_tweets.withColumn('text_clean', apply_clean_tweet_text('tweet'))\
    .withColumn('text_preprocessed', apply_lemmatize_tweet_text('text_clean'))\
    .withColumn('retweet_count', col('retweet_count').cast(IntegerType()))\
    .withColumn('reply_count', col('reply_count').cast(IntegerType()))\
    .withColumn('like_count', col('like_count').cast(IntegerType()))\
    .withColumn('quote_count', col('quote_count').cast(IntegerType()))\
    .withColumn('created_at', col('created_at').cast(TimestampType()))\
    .withColumn('author_created_at', col('author_created_at').cast(TimestampType()))\
    .withColumn('author_followers_count', col('author_followers_count').cast(IntegerType()))\
    .withColumn('author_following_count', col('author_following_count').cast(IntegerType()))\
    .withColumn('author_tweet_count', col('author_tweet_count').cast(IntegerType()))\
    .withColumn('author_listed_count', col('author_listed_count').cast(IntegerType()))\
    .withColumn('author_verified', col('author_verified').cast(BooleanType()))\
    .withColumn('text_hashtags', apply_find_hashtags('tweet'))\
    .withColumn('hashtags_count', size('text_hashtags'))\
    .withColumn('text_handles', apply_find_handles('tweet'))\
    .withColumn('handles_count', size('text_handles'))\
    .withColumn('text_emojis', apply_find_emojis('tweet'))\
    .withColumn('emojis_count', size('text_emojis'))\
    .withColumn('text_asp_preprocessed', apply_clean_asp_text('tweet'))\
    .withColumn('aspect_sentiment', apply_get_aspects('text_asp_preprocessed'))\
    .withColumn('text_sentiment', apply_get_sentiment('text_asp_preprocessed'))\
    .withColumn('text_preprocessed_freq', apply_text_freq('text_preprocessed'))\
    .withColumn('text_pv_freq', apply_dictionary_search('text_preprocessed', func.lit('pv')))\
    .withColumn('text_humour_freq', apply_dictionary_search(col('text_preprocessed'), func.lit('humour')))\
    .withColumn('text_sentiment_freq', apply_dictionary_search(col('text_preprocessed'), func.lit('sentiment')))\
     .withColumn('engagement_rate', apply_calculate_engagement('retweet_count', 'like_count', 'quote_count', 'reply_count', 'author_followers_count'))\
    .withColumn('extended_reach', apply_calculate_extendedReach('retweet_count', 'author_tweet_count'))\
    .withColumn('possible_impressions', apply_calculate_impressions('author_followers_count', 'author_tweet_count'))

#csv_tweets = csv_tweets.na.fill("")
#.drop(col("text_clean"))

#if urlAnalysis:
    

#debug_sink = csv_tweets.writeStream \
#    .outputMode("update") \
#    .trigger(processingTime='1 seconds') \
#    .option("truncate", "false")\
#    .format("console") \
#    .start()

debug_sink = csv_tweets.writeStream \
    .foreachBatch(write_sink) \
    .start()

debug_sink.awaitTermination()