# Read the dataset

In [None]:
root_file_path = "/nas_ssd_social_media_analytics/ali_twitter/final_dataset_12062020/temp/"
training_parquet_path = root_file_path+"training_df"
validation_parquet_path = root_file_path+"val_df"
test_parquet_path = root_file_path+"test_df"

training_df = sqc.read.parquet(training_parquet_path)
validation_df = sqc.read.parquet(validation_parquet_path)
test_df = sqc.read.parquet(test_parquet_path)

# Unique Tweets

In [None]:
training_tweets = training_df.dropDuplicates(["tweet_id"])\
.select("tweet_id",
        decode_tokens(F.col("text_tokens")).alias("text"),
        "tweet_type",
        "language",
        F.hour(F.to_timestamp("timestamp")).alias("hour_tweet"),
        F.size("text_tokens").alias("num_tokens"),
        F.when(F.col("hashtags").isNull(), 0).otherwise(1).alias("has_hashtags"),
        F.when(F.col("present_media").isNull(), 0).otherwise(1).alias("has_media"), 
        F.when(F.col("present_links").isNull(), 0).otherwise(1).alias("has_links"))


# training_tweets.write.parquet(root_file_path+"training_tweets")

In [None]:
tweet_is_in_top_daily_hashtag = training_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("hashtags").alias("hashtags_exploded"))\
.withColumn("hashtag_duration", F.max("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration", F.col("hashtag_duration") - F.min("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration2", F.col("hashtag_duration") / (24*3600))\
.withColumn("hashtag_duration3", F.round("hashtag_duration2"))\
.withColumn("is_in_top_daily_hashtags", F.when( ((F.col("hashtag_duration3") > 0) & (F.abs(F.col("hashtag_duration3") - F.col("hashtag_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_hashtags")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_hashtags").alias("is_in_top_daily_hashtags"))

training_tweets = training_tweets.join(tweet_is_in_top_daily_hashtag, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_hashtags", F.when(F.col("is_in_top_daily_hashtags").isNull(), 0).otherwise(F.col("is_in_top_daily_hashtags")))

In [None]:
tweet_is_in_top_daily_link = training_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("present_links").alias("present_links_exploded"))\
.withColumn("link_duration", F.max("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration", F.col("link_duration") - F.min("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration2", F.col("link_duration") / (24*3600))\
.withColumn("link_duration3", F.round("link_duration2"))\
.withColumn("is_in_top_daily_links", F.when( ((F.col("link_duration3") > 0) & (F.abs(F.col("link_duration3") - F.col("link_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_links")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_links").alias("is_in_top_daily_links"))

training_tweets = training_tweets.join(tweet_is_in_top_daily_link, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_links", F.when(F.col("is_in_top_daily_links").isNull(), 0).otherwise(F.col("is_in_top_daily_links")))

In [None]:
training_tweets.write.parquet(root_file_path+"training_tweets")

In [None]:
training_tweets = sqc.read.parquet(root_file_path+"training_tweets")

# Creatring TF-IDF vectors

In [None]:
%pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer

# tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
# tmp = tokenizer.transform(text_d

reTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="(?U)\\bhttps?://\\S*|#?\\b\\w+\\b", gaps=False)
# tmp = reTokenizer.transform(tmp)

langs = ["english", "french", "spanish", "german", "finnish", "turkish", "english", "russian", "norwegian", "dutch", "danish", "hungarian", "italian", "swedish", "portuguese"]
myStopWords = []
for i in langs:
    myStopWords += StopWordsRemover.loadDefaultStopWords(i)
otherWordsToRemove = ["cls", "sep", "unk",  "@", "rt"]
myStopWords += otherWordsToRemove

stopwordsRemover = StopWordsRemover(inputCol=reTokenizer.getOutputCol(), outputCol="tokens2", stopWords=myStopWords)
# tmp = stopwordsRemover.transform(tmp)

# cv = CountVectorizer(inputCol=stopwordsRemover.getOutputCol(), outputCol="tf")
# model = cv.fit(df)

hashingTF = HashingTF(inputCol=stopwordsRemover.getOutputCol(), outputCol="hashedTF", numFeatures=16)
# tmp = hashingTF.transform(tmp)

# tf.cache()

IDF = IDF(inputCol=hashingTF.getOutputCol(), outputCol="text_features")
# idf = IDF().fit(tmp)
# tfidf = idf.transform(tf)

# idfIgnore = IDF(minDocFreq=2).fit(tf)
# tfidfIgnore = idfIgnore.transform(tf)

stages = [reTokenizer, stopwordsRemover, hashingTF, IDF]

tf_idf_pipeline = Pipeline(stages=stages)
# tfidf_model = tf_idf_pipeline.fit(training_tweets)
# tfidf_model.transform(training_tweets).select("tweet_id", "text_features").show()

In [None]:
%pyspark
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, FeatureHasher
from pyspark.ml import Pipeline

categoricalColumns = ["tweet_type", "language", "hour_tweet", "has_hashtags", "has_media", "has_links", "is_in_top_daily_hashtags", "is_in_top_daily_links"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    stages += [stringIndexer]
    
# assemblerInputs = [c + "classVec" for c in categoricalColumns]
# assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="featuresAssembled")
# stages += [assembler]

featInputs = [c + "Index" for c in categoricalColumns]
featHasher = FeatureHasher(numFeatures=16, inputCols=featInputs, outputCol="otherFeaturesHashed", categoricalCols=featInputs)
stages += [featHasher]

other_tweet_features_pipline = Pipeline(stages=stages)
# other_tweet_features = other_tweet_features_pipline.fit(training_tweets)

# Tweet Features

In [None]:
%pyspark
assemble_tweet_features = VectorAssembler(inputCols=["text_features", "otherFeaturesHashed"], outputCol="tweet_features")
create_tweet_features = Pipeline(stages=[tf_idf_pipeline, other_tweet_features_pipline, assemble_tweet_features]).fit(training_tweets)

In [None]:
%pyspark
create_tweet_features.transform(training_tweets).select("tweet_id", "tweet_features").show()

In [None]:
%pyspark
create_tweet_features.save(root_file_path+"create_tweet_features_model")

In [None]:
%pyspark
from pyspark.ml import PipelineModel
create_tweet_features = PipelineModel.load(root_file_path+"create_tweet_features_model")
# create_tweet_features.transform(training_tweets).select("tweet_id", "tweet_features").show()

# Create tweet features for validation dataset

In [None]:
%pyspark
validation_tweets = validation_df.dropDuplicates(["tweet_id"])\
.select("tweet_id",
        decode_tokens(F.col("text_tokens")).alias("text"),
        "tweet_type",
        "language",
        F.hour(F.to_timestamp("timestamp")).alias("hour_tweet"),
        F.size("text_tokens").alias("num_tokens"),
        F.when(F.col("hashtags").isNull(), 0).otherwise(1).alias("has_hashtags"),
        F.when(F.col("present_media").isNull(), 0).otherwise(1).alias("has_media"), 
        F.when(F.col("present_links").isNull(), 0).otherwise(1).alias("has_links"))
        
tweet_is_in_top_daily_hashtag = validation_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("hashtags").alias("hashtags_exploded"))\
.withColumn("hashtag_duration", F.max("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration", F.col("hashtag_duration") - F.min("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration2", F.col("hashtag_duration") / (24*3600))\
.withColumn("hashtag_duration3", F.round("hashtag_duration2"))\
.withColumn("is_in_top_daily_hashtags", F.when( ((F.col("hashtag_duration3") > 0) & (F.abs(F.col("hashtag_duration3") - F.col("hashtag_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_hashtags")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_hashtags").alias("is_in_top_daily_hashtags"))

validation_tweets = validation_tweets.join(tweet_is_in_top_daily_hashtag, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_hashtags", F.when(F.col("is_in_top_daily_hashtags").isNull(), 0).otherwise(F.col("is_in_top_daily_hashtags")))

tweet_is_in_top_daily_link = validation_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("present_links").alias("present_links_exploded"))\
.withColumn("link_duration", F.max("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration", F.col("link_duration") - F.min("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration2", F.col("link_duration") / (24*3600))\
.withColumn("link_duration3", F.round("link_duration2"))\
.withColumn("is_in_top_daily_links", F.when( ((F.col("link_duration3") > 0) & (F.abs(F.col("link_duration3") - F.col("link_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_links")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_links").alias("is_in_top_daily_links"))

validation_tweets = validation_tweets.join(tweet_is_in_top_daily_link, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_links", F.when(F.col("is_in_top_daily_links").isNull(), 0).otherwise(F.col("is_in_top_daily_links")))

validation_tweets.write.parquet(root_file_path+"validation_tweets")

# create_tweet_features.transform(validation_tweets).select("tweet_id", "tweet_features").show()

# Create tweet features for test dataset

In [None]:
%pyspark

test_tweets = test_df.dropDuplicates(["tweet_id"])\
.select("tweet_id",
        decode_tokens(F.col("text_tokens")).alias("text"),
        "tweet_type",
        "language",
        F.hour(F.to_timestamp("timestamp")).alias("hour_tweet"),
        F.size("text_tokens").alias("num_tokens"),
        F.when(F.col("hashtags").isNull(), 0).otherwise(1).alias("has_hashtags"),
        F.when(F.col("present_media").isNull(), 0).otherwise(1).alias("has_media"), 
        F.when(F.col("present_links").isNull(), 0).otherwise(1).alias("has_links"))
        
test_tweet_is_in_top_daily_hashtag = test_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("hashtags").alias("hashtags_exploded"))\
.withColumn("hashtag_duration", F.max("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration", F.col("hashtag_duration") - F.min("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration2", F.col("hashtag_duration") / (24*3600))\
.withColumn("hashtag_duration3", F.round("hashtag_duration2"))\
.withColumn("is_in_top_daily_hashtags", F.when( ((F.col("hashtag_duration3") > 0) & (F.abs(F.col("hashtag_duration3") - F.col("hashtag_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_hashtags")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_hashtags").alias("is_in_top_daily_hashtags"))

test_tweets = test_tweets.join(test_tweet_is_in_top_daily_hashtag, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_hashtags", F.when(F.col("is_in_top_daily_hashtags").isNull(), 0).otherwise(F.col("is_in_top_daily_hashtags")))

test_tweet_is_in_top_daily_link = test_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("present_links").alias("present_links_exploded"))\
.withColumn("link_duration", F.max("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration", F.col("link_duration") - F.min("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration2", F.col("link_duration") / (24*3600))\
.withColumn("link_duration3", F.round("link_duration2"))\
.withColumn("is_in_top_daily_links", F.when( ((F.col("link_duration3") > 0) & (F.abs(F.col("link_duration3") - F.col("link_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_links")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_links").alias("is_in_top_daily_links"))

test_tweets = test_tweets.join(test_tweet_is_in_top_daily_link, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_links", F.when(F.col("is_in_top_daily_links").isNull(), 0).otherwise(F.col("is_in_top_daily_links")))

test_tweets.write.parquet(root_file_path+"test_tweets")

# create_tweet_features.transform(test_tweets).select("tweet_id", "tweet_features").show()