In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Row
from pyspark.sql.window import Window


conf = pyspark.SparkConf()
conf.setMaster("local").setAppName("test").set("spark.local.dir", "tmp/spark")
SparkContext.setSystemProperty('spark.driver.memory','16g')
SparkContext.setSystemProperty('spark.executor.memory','16g')
sc = pyspark.SparkContext(conf=conf)
#SparkConf conf = new SparkConf().setMaster("local”).setAppName("test”).set("spark.local.dir", "/tmp/spark-temp");

#sc = SparkContext("local", "App Name")
spark = SQLContext(sc)

In [2]:
from transformers import BertTokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased', do_lower_case=False)

@F.udf("String")
def decode_tokens(tokens):
  return tokenizer.decode(tokens)

In [None]:
read_training_parquet = "parquet_files/training_df.parquet"
training_df = spark.read.parquet(read_training_parquet)

In [None]:
training_dataset = training_df\
.withColumn("reply", F.when(F.col("reply_engagement_timestamp").isNull(), 0).otherwise(1))\
.withColumn("retweet", F.when(F.col("retweet_engagement_timestamp").isNull(), 0).otherwise(1))\
.withColumn("retweet_with_comment", F.when(F.col("retweet_with_comment_engagement_timestamp").isNull(), 0).otherwise(1))\
.withColumn("like", F.when(F.col("like_engagement_timestamp").isNull(), 0).otherwise(1))\
.select("tweet_id", "engager_user_id", "engagee_user_id", "reply", "retweet", "retweet_with_comment", "like")

In [None]:
#unique tweets

training_tweets = training_df.dropDuplicates(["tweet_id"])\
.select("tweet_id",
        decode_tokens(F.col("text_tokens")).alias("text"),
        "tweet_type",
        "language",
        F.hour(F.to_timestamp("timestamp")).alias("hour_tweet"),
        F.size("text_tokens").alias("num_tokens"),
        F.when(F.col("hashtags").isNull(), 0).otherwise(1).alias("has_hashtags"),
        F.when(F.col("present_media").isNull(), 0).otherwise(1).alias("has_media"), 
        F.when(F.col("present_links").isNull(), 0).otherwise(1).alias("has_links"))

In [None]:
tweet_in_top_hashtag_daily = training_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("hashtags").alias("hashtags_exploded"))\
.withColumn("hashtag_duration", F.max("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration", F.col("hashtag_duration") - F.min("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration2", F.col("hashtag_duration") / (24*3600))\
.withColumn("hashtag_duration3", F.round("hashtag_duration2"))\
.withColumn("is_in_top_daily_hashtags", F.when( ((F.col("hashtag_duration3") > 0) & (F.abs(F.col("hashtag_duration3") - F.col("hashtag_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_hashtags")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_hashtags").alias("is_in_top_daily_hashtags"))

In [None]:
#concatenate matrices
training_tweets = training_tweets.join(tweet_in_top_hashtag_daily, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_hashtags", F.when(F.col("is_in_top_daily_hashtags").isNull(), 0).otherwise(F.col("is_in_top_daily_hashtags")))

In [None]:
tweet_in_top_link_daily =  training_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("present_links").alias("present_links_exploded"))\
.withColumn("link_duration", F.max("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration", F.col("link_duration") - F.min("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration2", F.col("link_duration") / (24*3600))\
.withColumn("link_duration3", F.round("link_duration2"))\
.withColumn("is_in_top_daily_links", F.when( ((F.col("link_duration3") > 0) & (F.abs(F.col("link_duration3") - F.col("link_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_links")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_links").alias("is_in_top_daily_links"))

In [None]:
#concatenate matrices
training_tweets = training_tweets.join(tweet_in_top_link_daily, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_links", F.when(F.col("is_in_top_daily_links").isNull(), 0).otherwise(F.col("is_in_top_daily_links")))

In [None]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler

#load feature models
create_tweet_features_model = "/media/user2/TOSHIBA EXT/code/models/create_tweets_features.model"
create_tweet_features = PipelineModel.load(create_tweet_features_model)
engager_user_features_model = "/media/user2/TOSHIBA EXT/code/models/create_engager_user_features.model"
engager_user_features = PipelineModel.load(engager_user_features_model)
engagee_user_features_model = "/media/user2/TOSHIBA EXT/code/models/create_engagee_user_features.model"
engagee_user_features = PipelineModel.load(engagee_user_features_model)


#load parquets with features
#training_tweets_parquet = "/media/ioannis/B42E28422E27FBCC/Twitter_data_30-4-21/parquet_files/training_tweets.parquet"
tweet_features = training_tweets
#transform two matrices (model, tweet features)
tweet_features = create_tweet_features.transform(tweet_features).select("tweet_id", "tweet_features")

training_engager_user_df_parquet = ("/media/user2/TOSHIBA EXT/code/parquet_files/training_engager_user_df.parquet")
engager_users_features = spark.read.parquet(training_engager_user_df_parquet)
#transform two matrices(model, engager_users_features)
engager_users_features = engager_user_features.transform(engager_users_features).select("engager_user_id", "engager_features")

training_engagee_user_df_parquet = ("/media/user2/TOSHIBA EXT/code/parquet_files/training_engagee_user_df.parquet")
engagee_users_features = spark.read.parquet(training_engagee_user_df_parquet)
#transform two matrices(model, engagee_users_features)
engagee_users_features = engagee_user_features.transform(engagee_users_features).select("engagee_user_id", "engagee_features")

#concatenate matrices into training _dataset
training_dataset = training_dataset.join(tweet_features, "tweet_id")
training_dataset = training_dataset.join(engager_users_features, "engager_user_id")
training_dataset = training_dataset.join(engagee_users_features, "engagee_user_id")

In [None]:
vectorAssemblerInputs = ["tweet_features", "engager_features", "engagee_features"]
vectorAssembler = VectorAssembler(inputCols=vectorAssemblerInputs, outputCol="featuresAssembled")
training_dataset = vectorAssembler.transform(training_dataset).drop(*vectorAssemblerInputs)

In [None]:
training_dataset.show()

In [None]:
training_dataset_parquet = "/media/user2/TOSHIBA EXT/code/parquet_files/training_dataset.parquet"
training_dataset.repartition(1).write.mode('overwrite').parquet(training_dataset_parquet)

In [None]:
training_dataset = spark.read.parquet("/media/user2/TOSHIBA EXT/code/parquet_files/training_dataset.parquet") 

In [None]:
training_dataset = training_dataset.persist(pyspark.StorageLevel.MEMORY_AND_DISK) 

In [None]:
training_dataset.persist().is_cached

In [None]:
#gbt model for like
from pyspark.ml.classification import GBTClassifier
gbt_like_model = GBTClassifier(labelCol="like", featuresCol="featuresAssembled").fit(training_dataset)


In [None]:
gbt_like_model.save("/media/user2/TOSHIBA EXT/code/models/gbt_like.model")

In [None]:
#gbt model for reply
from pyspark.ml.classification import GBTClassifier
gbt_reply_model = GBTClassifier(labelCol="reply", featuresCol="featuresAssembled").fit(training_dataset)
gbt_reply_model.save("/media/user2/TOSHIBA EXT/code/models/gbt_reply")

In [None]:
gbt_reply_model.save("/media/user2/TOSHIBA EXT/code/models/gbt_reply.model")

In [None]:
#gbt model for retweet
from pyspark.ml.classification import GBTClassifier

gbt_retweet_model = GBTClassifier(labelCol="retweet", featuresCol="featuresAssembled").fit(training_dataset)
gbt_retweet_model.save("/media/user2/TOSHIBA EXT/code/models/gbt_retweet.model")

In [None]:
#gbt model for retweet with comment
from pyspark.ml.classification import GBTClassifier

gbt_rtWithCmt_model = GBTClassifier(labelCol="retweet_with_comment", featuresCol="featuresAssembled").fit(training_dataset)
gbt_rtWithCmt_model.save("/media/user2/TOSHIBA EXT/code/models/gbt_rtWithCmt.model")

In [3]:
#read validation_dataset
read_validation_parquet = "parquet_files/validation_df.parquet"
validation_df =spark.read.parquet(read_validation_parquet)

In [4]:
validation_tweets = validation_df.dropDuplicates(["tweet_id"])\
.select("tweet_id",
        decode_tokens(F.col("text_tokens")).alias("text"),
        "tweet_type",
        "language",
        F.hour(F.to_timestamp("timestamp")).alias("hour_tweet"),
        F.size("text_tokens").alias("num_tokens"),
        F.when(F.col("hashtags").isNull(), 0).otherwise(1).alias("has_hashtags"),
        F.when(F.col("present_media").isNull(), 0).otherwise(1).alias("has_media"), 
        F.when(F.col("present_links").isNull(), 0).otherwise(1).alias("has_links"))
        
tweet_is_in_top_daily_hashtag = validation_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("hashtags").alias("hashtags_exploded"))\
.withColumn("hashtag_duration", F.max("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration", F.col("hashtag_duration") - F.min("timestamp").over(Window.partitionBy("hashtags_exploded")))\
.withColumn("hashtag_duration2", F.col("hashtag_duration") / (24*3600))\
.withColumn("hashtag_duration3", F.round("hashtag_duration2"))\
.withColumn("is_in_top_daily_hashtags", F.when( ((F.col("hashtag_duration3") > 0) & (F.abs(F.col("hashtag_duration3") - F.col("hashtag_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_hashtags")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_hashtags").alias("is_in_top_daily_hashtags"))

validation_tweets = validation_tweets.join(tweet_is_in_top_daily_hashtag, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_hashtags", F.when(F.col("is_in_top_daily_hashtags").isNull(), 0).otherwise(F.col("is_in_top_daily_hashtags")))

tweet_is_in_top_daily_link = validation_df\
.dropDuplicates(["tweet_id"])\
.select("tweet_id", "timestamp", F.explode("present_links").alias("present_links_exploded"))\
.withColumn("link_duration", F.max("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration", F.col("link_duration") - F.min("timestamp").over(Window.partitionBy("present_links_exploded")))\
.withColumn("link_duration2", F.col("link_duration") / (24*3600))\
.withColumn("link_duration3", F.round("link_duration2"))\
.withColumn("is_in_top_daily_links", F.when( ((F.col("link_duration3") > 0) & (F.abs(F.col("link_duration3") - F.col("link_duration2")) < 0.007)), 1).otherwise(0))\
.select("tweet_id", "is_in_top_daily_links")\
.groupBy("tweet_id").agg(F.max("is_in_top_daily_links").alias("is_in_top_daily_links"))

validation_tweets = validation_tweets.join(tweet_is_in_top_daily_link, "tweet_id", "left_outer")\
.withColumn("is_in_top_daily_links", F.when(F.col("is_in_top_daily_links").isNull(), 0).otherwise(F.col("is_in_top_daily_links")))

#validation_tweets.write.parquet("/media/user2/TOSHIBA EXT/code/parquet_files/validation_tweets.df")

In [5]:
validation_data = validation_df\
.select("tweet_id", "engager_user_id", "engagee_user_id")

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler

create_tweet_features = PipelineModel.load("/media/user2/TOSHIBA EXT/code/models/create_tweets_features.model")
create_engager_user_features = PipelineModel.load("/media/user2/TOSHIBA EXT/code/models/create_engager_user_features.model")
create_engagee_user_features = PipelineModel.load("/media/user2/TOSHIBA EXT/code/models/create_engagee_user_features.model")

validation_features = validation_tweets
validation_features = create_tweet_features.transform(validation_features).select("tweet_id", "tweet_features")

engager_features = spark.read.parquet("/media/user2/TOSHIBA EXT/code/parquet_files/validation_engager_user_df.parquet")
engager_features = create_engager_user_features.transform(engager_features).select("engager_user_id", "engager_features")

engagee_features = spark.read.parquet("/media/user2/TOSHIBA EXT/code/parquet_files/validation_engagee_user_df.parquet")
engagee_features = create_engagee_user_features.transform(engagee_features).select("engagee_user_id", "engagee_features")

validation_data = validation_data.join(validation_features, "tweet_id")
validation_data = validation_data.join(engager_features, "engager_user_id")
validation_data = validation_data.join(engagee_features, "engagee_user_id")

assemblerInputs = ["tweet_features", "engager_features", "engagee_features"]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="featuresAssembled")

validation_data = assembler.transform(validation_data).drop(*assemblerInputs)
validation_data.write.parquet("/media/user2/2.0 TB Hard Disk/parquet_files/validation_dataset.parquet")

In [7]:
#from pyspark.ml.classification import GBTClassifierModel


validation_dataset = spark.read.parquet("/media/user2/2.0 TB Hard Disk/parquet_files/validation_dataset.parquet")


In [8]:
from pyspark.ml.classification import GBTClassificationModel
gbt_like_model =  GBTClassificationModel.load("/media/user2/TOSHIBA EXT/code/models/gbt_like.model")

In [9]:
print("Making predictions for like...")
predictions = gbt_like_model.transform(validation_dataset)

print("Predictions is being written in csv file...")
split1_udf = F.udf(lambda value: value[1].item(), T.DoubleType())
predictions.select("tweet_id", "engagee_user_id", split1_udf("probability").alias("probability"))\
.coalesce(1).write.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/like.csv")

Making predictions for like...
Predictions is being written in csv file...


In [10]:
from pyspark.ml.classification import GBTClassificationModel
gbt_reply_model =  GBTClassificationModel.load("/media/user2/TOSHIBA EXT/code/models/gbt_reply.model")
print("Making predictions for reply...")
predictions = gbt_reply_model.transform(validation_dataset)

print("Predictions is being written in csv file...")
split1_udf = F.udf(lambda value: value[1].item(), T.DoubleType())
predictions.select("tweet_id", "engagee_user_id", split1_udf("probability").alias("probability"))\
.coalesce(1).write.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/reply.csv")

Making predictions for reply...
Predictions is being written in csv file...


In [11]:
from pyspark.ml.classification import GBTClassificationModel
gbt_retweet_model =  GBTClassificationModel.load("/media/user2/TOSHIBA EXT/code/models/gbt_retweet.model")
print("Making predictions for retweet...")
predictions = gbt_retweet_model.transform(validation_dataset)

print("Predictions is being written in csv file...")
split1_udf = F.udf(lambda value: value[1].item(), T.DoubleType())
predictions.select("tweet_id", "engagee_user_id", split1_udf("probability").alias("probability"))\
.coalesce(1).write.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/retweet.csv")

Making predictions for retweet...
Predictions is being written in csv file...


In [12]:
from pyspark.ml.classification import GBTClassificationModel
gbt_rtWithCmt_model =  GBTClassificationModel.load("/media/user2/TOSHIBA EXT/code/models/gbt_rtWithCmt.model")
print("Making predictions for retweet with comment...")
predictions = gbt_rtWithCmt_model.transform(validation_dataset)

print("Predictions is being written in csv file...")
split1_udf = F.udf(lambda value: value[1].item(), T.DoubleType())
predictions.select("tweet_id", "engagee_user_id", split1_udf("probability").alias("probability"))\
.coalesce(1).write.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/rtWithCmt.csv")

Making predictions for retweet with comment...
Predictions is being written in csv file...


In [3]:
predictions_like = spark.read.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/like.csv")
predictions_like.show(100)


+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|DDD2464CD01C7A920...|00018FD99A36EE543...| 0.3827995948768992|
|DDD2464CD01C7A920...|00018FD99A36EE543...| 0.3827995948768992|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...|0.21001613010480236|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...|0.21001613010480236|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...|0.22443539369484844|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...|0.22443539369484844|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...|0.26801387822070866|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...|0.26801387822070866|
|D2C43ABD7EC9E5256...|0012198A96E630C52...| 0.2282413304988088|
|D2C43ABD7EC9E5256...|0012198A96E630C52...| 0.2282413304988088|
|F39D7453AC49FDD32...|00168C70351B68498...| 0.4008637941622546|
|F39D7453AC49FDD32...|00168C70351B68498...| 0.4008637941622546|
|4795020CC0FF21157...|0016D891C54EDA443.

In [4]:
predictions_reply = spark.read.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/reply.csv")
predictions_reply.show()


+--------------------+--------------------+--------------------+
|                 _c0|                 _c1|                 _c2|
+--------------------+--------------------+--------------------+
|DDD2464CD01C7A920...|00018FD99A36EE543...|0.061762683267492235|
|DDD2464CD01C7A920...|00018FD99A36EE543...|0.061762683267492235|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...| 0.04800772672496356|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...| 0.04800772672496356|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...|0.047764899270697714|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...|0.047764899270697714|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...| 0.05469762505354692|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...| 0.05469762505354692|
|D2C43ABD7EC9E5256...|0012198A96E630C52...| 0.04623972807742993|
|D2C43ABD7EC9E5256...|0012198A96E630C52...| 0.04623972807742993|
|F39D7453AC49FDD32...|00168C70351B68498...| 0.06933548535280509|
|F39D7453AC49FDD32...|00168C70351B68498...| 0.06933548535280509|
|4795020CC0FF21157...|001

In [5]:
predictions_retweet = spark.read.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/retweet.csv")
predictions_retweet.show(100)

+--------------------+--------------------+-------------------+
|                 _c0|                 _c1|                _c2|
+--------------------+--------------------+-------------------+
|DDD2464CD01C7A920...|00018FD99A36EE543...| 0.1409141450816136|
|DDD2464CD01C7A920...|00018FD99A36EE543...| 0.1409141450816136|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...|0.10345572780793333|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...|0.10345572780793333|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...|0.11293120761628939|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...|0.11293120761628939|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...|0.08106010542366071|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...|0.08106010542366071|
|D2C43ABD7EC9E5256...|0012198A96E630C52...|  0.126402163895871|
|D2C43ABD7EC9E5256...|0012198A96E630C52...|  0.126402163895871|
|F39D7453AC49FDD32...|00168C70351B68498...|0.07723538080572434|
|F39D7453AC49FDD32...|00168C70351B68498...|0.07723538080572434|
|4795020CC0FF21157...|0016D891C54EDA443.

In [6]:
predictions_rtWithCmt = spark.read.csv("/media/user2/TOSHIBA EXT/code/validation_predictions/rtWithCmt.csv")
predictions_rtWithCmt.show(100)

+--------------------+--------------------+--------------------+
|                 _c0|                 _c1|                 _c2|
+--------------------+--------------------+--------------------+
|DDD2464CD01C7A920...|00018FD99A36EE543...|0.047718730797565945|
|DDD2464CD01C7A920...|00018FD99A36EE543...|0.047718730797565945|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...|0.046472596624944185|
|9FC3A951A0EA1C496...|000A4CCF1A34D7BE6...|0.046472596624944185|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...| 0.04680292661198848|
|BB3CB2CEBE9D54A65...|000A4CCF1A34D7BE6...| 0.04680292661198848|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...| 0.04799124283271228|
|C3A0C323D0B2D7B06...|000B74EDD578FCCB2...| 0.04799124283271228|
|D2C43ABD7EC9E5256...|0012198A96E630C52...|0.046048264066535394|
|D2C43ABD7EC9E5256...|0012198A96E630C52...|0.046048264066535394|
|F39D7453AC49FDD32...|00168C70351B68498...|0.046848179510185384|
|F39D7453AC49FDD32...|00168C70351B68498...|0.046848179510185384|
|4795020CC0FF21157...|001

In [8]:
predictions_reply.join(predictions_retweet,(predictions_reply._c0==predictions_retweet._c0,predictions_reply._c1==predictions_retweet._c1),"inner") \
.show(truncate=False)

AssertionError: on should be Column or list of Column