In [0]:
from sklearn.datasets import fetch_20newsgroups

newsgroup_train = fetch_20newsgroups(subset="train")
newsgroup_test = fetch_20newsgroups(subset="test")

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
%pip install nltk

In [0]:
from nltk.stem import WordNetLemmatizer, PorterStemmer
import nltk
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')

In [0]:
type(newsgroup_train)

In [0]:
import pandas as pd

df = pd.DataFrame([newsgroup_train.data, newsgroup_train.target.tolist()])

In [0]:
df.head(5)

The first row is the text and second row is the label which represents a particular topic. The data is in transformed form so we will revert to a normal dataframe

In [0]:
df = pd.DataFrame([newsgroup_train.data, newsgroup_train.target.tolist()]).T
df.columns = ['text', 'target']

targets = pd.DataFrame(newsgroup_train.target_names)
targets.columns = ['title']

ngout = pd.merge(df, targets, left_on='target', right_index=True)

In [0]:
display(ngout)

The text contains full email including the header which is not relevant in our case. We only want to retrieve the text from the body.

In [0]:
sdf= spark.createDataFrame(ngout)

In [0]:
display(sdf)

In [0]:
from pyspark.sql.functions import split
from pyspark.sql.functions import monotonically_increasing_id, col

We will create a new column "text_sep" and split on two empty lines which is was was seperating email header and body in the dataset and select other columns along with it

In [0]:
sdf = sdf.withColumn("text_sep", split(sdf.text, "\n\n")).select(col('text'), col('target'), col('title'),
                                                                  col('text_sep').getItem(1), col('text_sep').getItem(2)).withColumn("id", monotonically_increasing_id())

In [0]:
display(sdf)

In [0]:
sdf.printSchema()


Creating a temporary view to run sql commands

In [0]:
temp_table_name = "newsgroup"

sdf.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

select * from newsgroup

In [0]:
%sql

select count(*) from newsgroup where 'text_sep[2]' is null

In [0]:
%sql

select count(*) from newsgroup where 'text_sep[1]' = ''


Text Cleaning

In [0]:
from pyspark.sql.types import FloatType
import re

In [0]:
def clean_text(in_string):
    # Remove email
    remove_email = re.sub('\S*@\S*\s?', '', in_string)
    # Remove extra whitespaces
    remove_nl = re.sub('\s+', ' ', remove_email)
    # Remove miscellaneous symbols
    remove_othr = re.sub("\'|\>|\:|\-", "", remove_nl)
    return remove_othr

# Register as spark udf function to run on any spark cluster
spark.udf.register("clean", clean_text)

If text1 is empty use text2, if text2 is empty use text1 else concatenate both

In [0]:
%sql

select clean(CASE when`text_sep[2]` is null then `text_sep[1]` when `text_sep[1]`='' then `text_sep[2]` else CONCAT(`text_sep[1]`, ' ', `text_sep[2]`) END), target, title, id FROM newsgroup where `text_sep[2]` is not null and `text_sep[1]` <> ''

In [0]:
sdf=spark.sql("select clean(CASE when`text_sep[2]` is null then `text_sep[1]` when `text_sep[1]`='' then `text_sep[2]` else CONCAT(`text_sep[1]`, ' ', `text_sep[2]`) END) as text, target, title, id FROM newsgroup where `text_sep[2]` is not null and `text_sep[1]` <> ''")

In [0]:
display(sdf)

In [0]:
sdf.count()

In [0]:
from pyspark.sql.functions import col, length

display(sdf.where(length(col('text')) < 100))

These data doesn't seem to add much information to the context. So, we will remove them

In [0]:
sdf = sdf.where(length(col('text')) > 100)

In [0]:
sdf.count()

Feature Engineering

In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

In [0]:
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\W+", minTokenLength=4, toLowercase=True)
tokenized = tokenizer.transform(sdf)

In [0]:
display(tokenized)

In [0]:
spremover = StopWordsRemover(inputCol="tokens", outputCol="spfiltered")
spremoved = spremover.transform(tokenized)

In [0]:
display(spremoved.select("tokens", "spfiltered"))

In [0]:
porter = PorterStemmer()
lemma = WordNetLemmatizer()

def word_tokenize(text):
    pos = nltk.pos_tag(text)
    final = [lemma.lemmatize(word[0]) if (lemma.lemmatize(word[0]).endswith(('e','ion')) or len(word[0]) < 4 ) else porter.stem(word[0]) for word in pos]
    return final

In [0]:
spremoved.printSchema()

In [0]:
stemmed = spremoved.rdd.map(lambda tup: (tup[1], tup[2], tup[3], word_tokenize(tup[5])))

In [0]:
stemmed.collect()

In [0]:
news_df = stemmed.toDF(schema=['target', 'title', 'id', 'word'])

In [0]:
display(news_df)


Adding custom stopwords to enhance token meaning

In [0]:
spwordlist = ["article", "write", "entry", "date", "udel", "said", "tell", "think", "know", "just", "isnt", "line", "like", "does", "going", "make", "thanks","also"]

spremover1 = StopWordsRemover(inputCol="word", outputCol="word_new", stopWords=spwordlist)
news_df = spremover1.transform(news_df)

In [0]:
display(news_df.select("word","word_new"))

In [0]:
df_explode = news_df.withColumn('word_new', explode('word_new'))

In [0]:
display(df_explode)

In [0]:
df_explode.createOrReplaceTempView("topwords")