In [2]:
import findspark
findspark.init()

import sparknlp
spark = sparknlp.start(gpu=True, memory="12G")

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline, Transformer
from pyspark.sql.functions import rand, element_at, concat_ws, col, lit
from pyspark.ml.feature import VectorAssembler, VectorSizeHint
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

spark

Spark NLP version 3.3.1
Apache Spark version: 3.2.0


In [100]:
fn = "data/politifact_with_users.csv"

dataset = spark.read.csv(fn, inferSchema=True, header=True, multiLine=True, escape = "\"").filter("text is not NULL")
dataset = dataset.withColumn('text', concat_ws(' | ', col('location'), col('text')))

string_col = [item[0] for item in dataset.dtypes if item[1].startswith('string')]
int_col = [item[0] for item in dataset.dtypes if item[1].startswith('iny')]
double_col = [item[0] for item in dataset.dtypes if item[1].startswith('double')]
bool_col = [item[0] for item in dataset.dtypes if item[1].startswith('boolean')]

dataset = dataset.fillna('', subset = string_col)\
        .fillna(0, subset = int_col)\
        .fillna(0.0, subset = double_col) \
        .fillna(False, subset = bool_col) 
 
train_dataset, valid_dataset = dataset.randomSplit([0.7, 0.3], seed=42)
train_dataset = train_dataset.repartition(32, "news_id")
valid_dataset = valid_dataset.repartition(16, "news_id")
print("Train Dataset Count: " + str(train_dataset.count()))
print("Valid Dataset Count: " + str(valid_dataset.count()))

Train Dataset Count: 295652
Valid Dataset Count: 126949


In [86]:
dataset.select('text').show(5)

+--------------------+
|                text|
+--------------------+
|'usa: The Long Ru...|
|'d: The Long Run ...|
|in my room | 'Fro...|
|'New blog post: R...|
|'d: The Long Run ...|
+--------------------+
only showing top 5 rows



In [5]:
embeddings = LongformerEmbeddings \
      .load("models/longformer_base_4096") \
      .setBatchSize(128) \
      .setInputCols(["document", "token"]) \
      .setOutputCol("embeddings") \
      .setCaseSensitive(True) \
      .setMaxSentenceLength(4096)

In [104]:
class DropDim(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self):
        super(DropDim, self).__init__()

    def _transform(self, df):
        df = df.withColumn('emb_feature', element_at(df['emb_feature'], 1))
        return df

document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document") \
    .setIdCol("news_id")
    
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

text_emb = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("text_embeddings") \
    .setPoolingStrategy("AVERAGE")

emb_feature = EmbeddingsFinisher() \
   .setInputCols("text_embeddings") \
   .setOutputCols("emb_feature") \
   .setOutputAsVector(True)

# emb_size = VectorSizeHint(inputCol="emb_feature", size=768, handleInvalid="error")

assembler = VectorAssembler(
    inputCols=['emb_feature', 'favorite_count', 'retweet_count', 
               'verified', 'followers_count', 'friends_count', 'listed_count', 'favourites_count',
               'geo_enabled',  'statuses_count', 'has_extended_profile', 'default_profile',
              ], 
    outputCol="features",
#     handleInvalid="keep"
)

clf = MultilayerPerceptronClassifier(labelCol='fake', maxIter=100, layers=[779, 128, 2], seed=42)

clf_pipeline = Pipeline(stages=[
    document_assembler, 
    tokenizer,
    embeddings,
    text_emb,
    emb_feature,
    DropDim(),
#     emb_size,
    assembler,
    clf
])

In [105]:
# from pyspark.ml import PipelineModel

# model = PipelineModel.load("models/politifact-clf-1101-1734/")

model = clf_pipeline.fit(train_dataset)
results = model.transform(valid_dataset)

In [60]:
results.select('features').head()

Row(features=DenseVector([0.0971, -0.0824, 0.0458, 0.1666, -0.1086, 0.1173, 0.0111, 0.0294, 0.0519, 0.0134, -0.1667, 0.0747, 0.0231, -0.042, -0.052, -0.0832, 0.247, 0.0035, -0.0097, 0.2216, 0.0435, 0.0892, -0.0383, 0.0868, -0.0244, -0.053, -0.0034, 0.051, -0.0358, -0.0784, -0.061, 0.009, 0.0259, -0.0233, 0.0208, 0.0032, 0.1706, 0.0483, 0.2088, 0.1279, 0.1802, -0.0608, 0.0577, -0.0177, 0.002, 0.0228, -0.0811, 0.208, -0.0567, 0.0166, -0.0024, -0.063, 0.0677, -0.0917, -0.0115, 0.0476, 0.0994, -0.0904, 0.0228, 0.0145, 0.0942, 0.0052, 0.0189, 0.1691, 0.0069, -0.0575, 0.0119, -0.0604, -0.0414, 0.0093, 0.0604, -0.0423, 0.0553, -0.0504, 0.0055, 0.0093, 0.1187, -5.9878, 0.0125, 0.0763, 0.079, 0.0185, 0.9883, -0.0682, 0.0887, -0.3646, 0.0106, -0.0861, -0.017, -0.0013, -0.0413, 0.0056, 0.1091, 0.013, 0.0853, 0.0405, 0.1075, -0.1663, 0.0381, -0.0869, 0.0043, -0.0371, -0.0355, 0.2696, -0.0525, 0.2121, 0.0131, 0.0182, 0.0784, 0.1262, 0.1069, 0.1902, -0.1027, -0.0192, -0.0739, 0.1782, 0.0041, 0.0336,

In [None]:
import os
from datetime import datetime

model.save(f"models/{os.path.basename(fn).split('.')[0]}-clf-{datetime.now().strftime('%m%d-%H%M')}")

In [27]:
results = model.transform(valid_dataset)

In [76]:
results.select('prediction').show(5, False)

+----------+
|prediction|
+----------+
|0.0       |
|0.0       |
|0.0       |
|0.0       |
|1.0       |
+----------+
only showing top 5 rows



In [107]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# results = results.withColumn('pred_', results['pred'][0].result.cast('double'))
# evaluator = MulticlassClassificationEvaluator(labelCol='fake',predictionCol='pred_',metricName='accuracy')

evaluator = MulticlassClassificationEvaluator(labelCol='fake',predictionCol='prediction',metricName='accuracy')

accuracy = evaluator.evaluate(results)
accuracy

0.7301672325106933

In [32]:
results.dtypes

[('news_id', 'int'),
 ('url', 'string'),
 ('text', 'string'),
 ('num_images', 'int'),
 ('domain', 'string'),
 ('publish_date', 'string'),
 ('fake', 'int'),
 ('authors', 'string'),
 ('document',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>'),
 ('token',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>'),
 ('embeddings',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>'),
 ('text_embeddings',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>'),
 ('pred',
  'array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>'),
 ('pred_', 'double')]