In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


# Create Spark Context
sc = SparkContext()

# Create Spark Session
spark = SparkSession(sc)

# Save path in variable
path = 'hdfs:///user/pknees/RSC20/training.tsv'

In [19]:
#Reading in the file
df = spark.read \
          .load(path,
           format="csv",delimiter="\x01")

# Changing Column names
df = df.toDF("text_tokens", "hashtags", "tweet_id", "present_media", "present_links", "present_domains",\
                "tweet_type", "language", "tweet_timestamp", "engaged_with_user_id", "engaged_with_user_follower_count",\
               "engaged_with_user_following_count", "engaged_with_user_is_verified", "engaged_with_user_account_creation",\
               "engaging_user_id", "engaging_user_follower_count", "engaging_user_following_count", "engaging_user_is_verified",\
               "engaging_user_account_creation", "engaged_follows_engaging", "reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp")

In [20]:
#Take a look at the data
df.take(1)
#[Row(text_tokens='101\t1942\t18628\t15752\t4458\t7697\t24309\t10634\t5618\t2395\t2598\t3584\t1946\t22480\t67316\t12434\t19054\t10634\t4704\t3350\t1910\t15752\t106975\t15355\t10083\t129\t2583\t2042\t29004\t58268\t111806\t18628\t102', \ 
#hashtags=None, tweet_id='E7D6C5094767223F6F8789A87A1937AB', present_media=None, present_links=None, present_domains=None, tweet_type='TopLevel', language='22C448FF81263D4BAF2A176145EE9EAD', tweet_timestamp='1581262691', \
#engaged_with_user_id='D557B03872EF8986F7F4426AE094B2FE', engaged_with_user_follower_count='986', engaged_with_user_following_count='1201', engaged_with_user_is_verified='false', engaged_with_user_account_creation='1274269909', \
#engaging_user_id='00000776B07587ECA9717BFC301F2D6E', engaging_user_follower_count='94', engaging_user_following_count='648', engaging_user_is_verified='false', engaging_user_account_creation='1478011810', \
#engaged_follows_engaging='false', reply_timestamp=None, retweet_timestamp=None, retweet_with_comment_timestamp=None, like_timestamp=None)]

[Row(text_tokens='101\t1942\t18628\t15752\t4458\t7697\t24309\t10634\t5618\t2395\t2598\t3584\t1946\t22480\t67316\t12434\t19054\t10634\t4704\t3350\t1910\t15752\t106975\t15355\t10083\t129\t2583\t2042\t29004\t58268\t111806\t18628\t102', hashtags=None, tweet_id='E7D6C5094767223F6F8789A87A1937AB', present_media=None, present_links=None, present_domains=None, tweet_type='TopLevel', language='22C448FF81263D4BAF2A176145EE9EAD', tweet_timestamp='1581262691', engaged_with_user_id='D557B03872EF8986F7F4426AE094B2FE', engaged_with_user_follower_count='986', engaged_with_user_following_count='1201', engaged_with_user_is_verified='false', engaged_with_user_account_creation='1274269909', engaging_user_id='00000776B07587ECA9717BFC301F2D6E', engaging_user_follower_count='94', engaging_user_following_count='648', engaging_user_is_verified='false', engaging_user_account_creation='1478011810', engaged_follows_engaging='false', reply_timestamp=None, retweet_timestamp=None, retweet_with_comment_timestamp=None

In [21]:
# How many tweets?
#df.count()
# 121.386.431

**Define Column Subsets**

* id_features with large vocabulary. We will first hash them and then bucketize them (Unclear if One-Hot-Encoded later)
* numeric_features will get bucketized and One-Hot-Encoded
* categorical_features will get StringIndexed (into numeric values) and One-Hot-Encoded
* text_features contain information on Hashtags etc. - We will use HashingTF
* label_columns are the dependent variables. 

In [22]:
id_features = ["tweet_id","engaging_user_id","engaged_with_user_id"]

numeric_features = ["tweet_timestamp",
                    "engaged_with_user_follower_count", "engaged_with_user_following_count", "engaged_with_user_account_creation",
                    "engaging_user_follower_count", "engaging_user_following_count", "engaging_user_account_creation"
                   ]

categorical_features = ["tweet_type", "language", 
                        "engaged_with_user_is_verified", "engaging_user_is_verified", "engaged_follows_engaging"
                       ]

text_features = ["text_tokens", "hashtags", "present_media", "present_links", "present_domains"]

label_columns = ["reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp"]

**Preprocessing**

* Split Tab-Seperated text_features into arrays (special handling for "None")
* Convert different pseudo-integer columns (integer-strings) to Integers
* Hash the ID-Values and bin them in 50 buckets (using modulo)
* Convert Text-Feature Arrays to Vectors
* Convert the interaction columns from a timestamp to an integer

In [23]:
from pyspark.sql import functions as f
for feature in text_features:
    text_feature_split = f.split(df[feature], '\t')
    df = df.withColumn(feature, f.when(f.col(feature).isNotNull(), text_feature_split).otherwise(f.array().cast("array<string>")))

In [24]:
from pyspark.sql.types import IntegerType
for feature in numeric_features:
    df = df.withColumn(feature,df[feature].cast(IntegerType()))

In [25]:
for feature in id_features:
    output_col = feature + "_hashed"
    df = df.withColumn(output_col, (f.hash(f.col(feature))))
    df = df.withColumn(output_col, f.when(f.col(output_col) < 0, f.col(output_col)*-1%50).otherwise(f.col(output_col)%50))

In [26]:
#from pyspark.ml.linalg import Vectors, VectorUDT
#from pyspark.sql.functions import udf

#list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
#for feature in text_features:
#    df = df.withColumn(feature,list_to_vector_udf(df[feature]))

In [52]:
a = 1
for col in label_columns:
    df = df.withColumn("label", f.when(f.col(col).isNotNull(), a).otherwise(0))
    a = a+1

In [28]:
df.take(1)

[Row(text_tokens=['101', '1942', '18628', '15752', '4458', '7697', '24309', '10634', '5618', '2395', '2598', '3584', '1946', '22480', '67316', '12434', '19054', '10634', '4704', '3350', '1910', '15752', '106975', '15355', '10083', '129', '2583', '2042', '29004', '58268', '111806', '18628', '102'], hashtags=[], tweet_id='E7D6C5094767223F6F8789A87A1937AB', present_media=[], present_links=[], present_domains=[], tweet_type='TopLevel', language='22C448FF81263D4BAF2A176145EE9EAD', tweet_timestamp=1581262691, engaged_with_user_id='D557B03872EF8986F7F4426AE094B2FE', engaged_with_user_follower_count=986, engaged_with_user_following_count=1201, engaged_with_user_is_verified='false', engaged_with_user_account_creation=1274269909, engaging_user_id='00000776B07587ECA9717BFC301F2D6E', engaging_user_follower_count=94, engaging_user_following_count=648, engaging_user_is_verified='false', engaging_user_account_creation=1478011810, engaged_follows_engaging='false', reply_timestamp=0, retweet_timestamp=

**Helper Functions**

* featureQunatileDiscretizer for Bucketizing numeric features
* featureStringIndexer for Converting category-strings to category-ints
* *featureFeatureHasher for Hashing the ID features into 50 bins*  ***depreciated***
* featureHashingTF for vectorizing the text features
* encoder for One-Hot-Encoding of features
* VectorAssembler for combining all the oneHot features into a single feature
* AllRFModels to build 4 different RF Models for interaction prediction 

Create a Pipeline for all Stages at the end

In [29]:
# Set the numbers of quantiles/buckets for the baseline approach
nq = 50

from pyspark.ml.feature import QuantileDiscretizer, StringIndexer, FeatureHasher, HashingTF, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

AllQuantileDiscretizers = [QuantileDiscretizer(numBuckets=nq,
                                      inputCol=col,
                                      outputCol=(col + "_bucketized"),
                                      handleInvalid="keep") for col in numeric_features]

AllStringIndexers = [StringIndexer(inputCol=col, 
                            outputCol=(col + "_indexed")) for col in categorical_features]

### FeatureHasher has been adapted to a hardcoded feature hashing + bucketing in the preprocessing step
#AllFeatureHashers = [FeatureHasher(numFeatures=nq,
#                           inputCols=[col],
#                           outputCol=(col + "_hashed")) for col in id_features]


AllHashingTF = [HashingTF(inputCol=col, 
                          outputCol=(col + "_vectorized")) for col in text_features]

to_onehot_features = [col + "_bucketized" for col in numeric_features]
to_onehot_features.extend(col + "_indexed" for col in categorical_features)
to_onehot_features.extend(col + "_hashed" for col in id_features)

onehot_features = [col + "_oneHot" for col in numeric_features]
onehot_features.extend(col + "_oneHot" for col in categorical_features)
onehot_features.extend(col + "_oneHot" for col in id_features)

encoder = OneHotEncoderEstimator(inputCols=to_onehot_features,
                                 outputCols=onehot_features)

assembler_features = VectorAssembler(
    inputCols=onehot_features,
    outputCol="features_oneHot")

#assembler_labels = VectorAssembler(
#    inputCols=label_columns,
#    outputCol="label")




AllRFModels = [RandomForestClassifier(labelCol=col, featuresCol="features_oneHot",predictionCol=(col+"_prediction"), numTrees=10) for col in label_columns]

In [64]:
from pyspark.ml import Pipeline

AllStages = list()
AllStages.extend(AllQuantileDiscretizers)
AllStages.extend(AllStringIndexers)
#AllStages.extend(AllFeatureHashers) #depreciated
AllStages.extend(AllHashingTF)
AllStages.append(encoder)
AllStages.append(assembler_features)
#AllStages.append(assembler_labels) #depreciated
#AllStages.extend(AllRFModels)

**Quick Note for Pipeline**

For now the Random Forest Models are not included in the Pipeline. However, they are already initialized - meaning that one can just "uncomment" them and include them in the pipeline. Then we should be able to fit the pipleline on the data - save the resulting model and transform the test set.

At this point we still have to read in the test set and apply our preprocessing (except for the label_columns) to the test set. But in the end this should just be a copy+paste job

In [66]:
pipeline = Pipeline(stages=AllStages)

**Testing**

For this part we generate a train and a test set and fit our pipeline.

Additionally we build + predict from a single RF model.

In [67]:
train, test, rest = df.randomSplit([0.005, 0.005, 0.99], seed=42)

In [68]:
pipeline_model = pipeline.fit(train)

In [77]:
new_train = pipeline_model.transform(train)

In [78]:
rfModel = rf.fit(new_train)

In [79]:
new_test = pipeline_model.transform(test)

In [80]:
test_pred = rfModel.transform(new_test)

In [84]:
test_pred.select(["like_timestamp","like_timestamp_prediction"]).take(10)

[Row(like_timestamp=0, llike_timestamp_prediction=0.0),
 Row(like_timestamp=0, llike_timestamp_prediction=0.0),
 Row(like_timestamp=0, llike_timestamp_prediction=0.0),
 Row(like_timestamp=1, llike_timestamp_prediction=1.0),
 Row(like_timestamp=0, llike_timestamp_prediction=0.0),
 Row(like_timestamp=0, llike_timestamp_prediction=0.0),
 Row(like_timestamp=0, llike_timestamp_prediction=0.0),
 Row(like_timestamp=0, llike_timestamp_prediction=0.0),
 Row(like_timestamp=1, llike_timestamp_prediction=0.0),
 Row(like_timestamp=0, llike_timestamp_prediction=0.0)]

In [87]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="like_timestamp", predictionCol="llike_timestamp_prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_pred)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.428965


In [85]:
#from pyspark.ml.classification import MultilayerPerceptronClassifier
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#layers = [4, 5, 4, 4]
#trainer = MultilayerPerceptronClassifier(featuresCol="features_oneHot",labelCol="label",maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
#model = trainer.fit(new_train)

In [None]:
spark.stop()