In [2]:
# # Find Spark so that we can access session within our notebook
import findspark
findspark.init()

# Start SparkSession on all available cores
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/20 19:02:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data = spark.read.csv('data/train1.csv',
                      header='true',
                      inferSchema='true',
                      multiLine=True)

                                                                                

In [4]:
print('Total Columns: %d' % len(data.dtypes))
print('Total Rows: %d' % data.count())
data.printSchema()

Total Columns: 5


[Stage 2:>                                                          (0 + 1) / 1]

Total Rows: 20800
root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- author: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



                                                                                

In [5]:
data.show()

+---+--------------------+--------------------+--------------------+-----+
| id|               title|              author|                text|label|
+---+--------------------+--------------------+--------------------+-----+
|  0|House Dem Aide: W...|       Darrell Lucus|"House Dem Aide: ...|    1|
|  1|FLYNN: Hillary Cl...|     Daniel J. Flynn|Ever get the feel...|    0|
|  2|Why the Truth Mig...|  Consortiumnews.com|Why the Truth Mig...|    1|
|  3|15 Civilians Kill...|     Jessica Purkiss|Videos 15 Civilia...|    1|
|  4|Iranian woman jai...|      Howard Portnoy|Print An Iranian ...|    1|
|  5|Jackie Mason: Hol...|     Daniel Nussbaum|In these trying t...|    0|
|  6|Life: Life Of Lux...|                null|Ever wonder how B...|    1|
|  7|Benoît Hamon Wins...|     Alissa J. Rubin|PARIS  —   France...|    0|
|  8|Excerpts From a D...|                null|Donald J. Trump i...|    0|
|  9|A Back-Channel Pl...|Megan Twohey and ...|A week before Mic...|    0|
| 10|Obama’s Organizin...

### Check Class Balance

In [6]:
labels = (data.groupBy('label')
             .count()
        )
labels.show()

[Stage 6:>                                                          (0 + 1) / 1]

+-----+-----+
|label|count|
+-----+-----+
|    1|10413|
|    0|10387|
+-----+-----+



                                                                                

### Text Preprocessing

In [7]:
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, IDF, CountVectorizer
from pyspark.sql.functions import regexp_replace, array, col, udf, split
from pyspark.ml import Pipeline
# from sparknlp.annotator import Lemmatizer
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.ml.clustering import LDA
import string

In [8]:
# remove punctuation
punctuation = string.punctuation
punctuation += '—'

def remove_punc(x):
    new_str = x
    for ch in punctuation:
        new_str = new_str.replace(ch, '')
    return new_str

rp_udf = udf(lambda x: remove_punc(x))

data1 = data.withColumn("text_cleaned", rp_udf(col('text')))
data1.show()

[Stage 9:>                                                          (0 + 1) / 1]

+---+--------------------+--------------------+--------------------+-----+--------------------+
| id|               title|              author|                text|label|        text_cleaned|
+---+--------------------+--------------------+--------------------+-----+--------------------+
|  0|House Dem Aide: W...|       Darrell Lucus|"House Dem Aide: ...|    1|House Dem Aide We...|
|  1|FLYNN: Hillary Cl...|     Daniel J. Flynn|Ever get the feel...|    0|Ever get the feel...|
|  2|Why the Truth Mig...|  Consortiumnews.com|Why the Truth Mig...|    1|Why the Truth Mig...|
|  3|15 Civilians Kill...|     Jessica Purkiss|Videos 15 Civilia...|    1|Videos 15 Civilia...|
|  4|Iranian woman jai...|      Howard Portnoy|Print An Iranian ...|    1|Print An Iranian ...|
|  5|Jackie Mason: Hol...|     Daniel Nussbaum|In these trying t...|    0|In these trying t...|
|  6|Life: Life Of Lux...|                null|Ever wonder how B...|    1|Ever wonder how B...|
|  7|Benoît Hamon Wins...|     Alissa J.

                                                                                

In [9]:
# # tokenizer
tokenizer = RegexTokenizer(inputCol='text_cleaned', outputCol="tokens")
data2 = tokenizer.transform(data1)
data2.show()

TypeError: RegexTokenizer.__init__() got an unexpected keyword argument 'inputCol'

In [None]:
# # remove stopwords
# remover = StopWordsRemover(inputCol="tokens", outputCol="text_rm_stop")
# data1 = remover.transform(data1)
# data1.show()

In [None]:
# lemmatizer = LemmatizerModel.pretrained(name="lemma_antbnc", lang="en")
#         .setInputCol('text_rm_stop')
#         .setOutputCol('lemma')
#     )

### Build Pipeline

In [10]:
tokenizer = RegexTokenizer() \
    .setInputCol("text_cleaned") \
    .setOutputCol("tokens")
stopwords = StopWordsRemover() \
    .setInputCol("tokens") \
    .setOutputCol("text_rm_stop")
lemmatizer = LemmatizerModel.pretrained(name="lemma_antbnc", lang="en") \
        .setInputCol('text_rm_stop') \
        .setOutputCol('lemma') 
nlpPipeline = Pipeline(stages=[tokenizer, stopwords, lemmatizer])

TypeError: 'JavaPackage' object is not callable

In [None]:
final_df = nlpPipeline.fit(data1).transform(data1)

In [11]:
spark

### Lemmatizer

In [None]:
import nltk
nltk.download('wordnet')
nltk.download('omw-1.4')
from nltk.stem import WordNetLemmatizer
 
lemmatizer = WordNetLemmatizer()

def lems(x):
    words = [lemmatizer.lemmatize(word) for word in x]
    return words

lem_udf = udf(lambda x: lems(x))

final_df = final_df.withColumn("lem", lem_udf(col('text_rm_stop')))
final_df.show()

In [None]:
final_df = final_df.select(split(col("lem"),",").alias("tokens"), col("id"), col("label"))

In [None]:
final_df.printSchema()

In [None]:
final_df.show()

### Count Vectorizer

In [None]:
params = dict(num_topics = 10,
    iterations = 10,
    vocabsize = 5000,
    minDF = 0.02,
    maxDF = 0.8
 )
params

In [None]:
def count_vectorizer(inputCol, outputCol, params):
    cv = CountVectorizer(
        inputCol=inputCol,
        outputCol=outputCol,
        vocabSize=params['vocabsize'],
        minDF=params['minDF'],
        maxDF=params['maxDF'],
        minTF=1.0
    )
    return cv


def idf(inputCol, outputCol):
    return IDF(inputCol="features", outputCol="idf")


def lda(params):
    lda = LDA(
        k=params['num_topics'],
        maxIter=params['iterations'],
        optimizer="online",
        seed=1,
        learningOffset=100.0,  # If high, early iterations are downweighted during training
        learningDecay=0.75,    # Set between [0.5, 1) to guarantee asymptotic convergence
    )
    return lda


def ml_pipeline(nlpPipelineDF, params):
    """Create a Spark ML pipeline and transform the input NLP-transformed DataFrame 
       to produce a trained LDA topic model for the given data.
    """
    ml_pipe = Pipeline(
        stages=[
            count_vectorizer("lem", "features", params),
            idf("features", "idf"),
            lda(params)
        ]
    )
    
    model = ml_pipe.fit(final_df)
    lda_model = model.stages[2]
    
    # Calculate upper bound on model perplexity
    mlPipelineDF = model.transform(final_df)
    lda_perplexity = lda_model.logPerplexity(final_df)
    return model, lda_perplexity

In [None]:
# Count Vectorizer
model, lda_perplexity = ml_pipeline(final_df, params)
print("Upper bound on perplexity: {}".format(lda_perplexity))