In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder \
    .appName("project")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.4")\
    .config("spark.sql.broadcastTimeout", "36000")\
    .getOrCreate()

sc = spark.sparkContext
sc

In [2]:
spark

In [3]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer, 
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline

In [4]:
import nltk
nltk.download('stopwords')
nltk.download('words')

from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')
eng_stopwords.append('xxxx')

[nltk_data] Downloading package stopwords to /home/hadoop/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package words to /home/hadoop/nltk_data...
[nltk_data]   Package words is already up-to-date!


In [5]:
documentAssembler = DocumentAssembler() \
    .setInputCol('context') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normalized') \
    .setLowercase(True)

# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemma') \

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(['lemma']) \
    .setOutputCol('clean_lemma') \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

# finisher converts tokens to human-readable output
finisher = Finisher() \
    .setInputCols(['clean_lemma']) \
    .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [6]:
pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        tokenizer,
        normalizer,
        lemmatizer,
        stopwords_cleaner,
        finisher
    ])

In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("article_type", StringType(), True),
    StructField("np1", StringType(), True),
    StructField("np2", StringType(), True),
    StructField("context", StringType(), True),
    StructField("source", StringType(), True),
    StructField("category", StringType(), True),
    StructField("location", StringType(), True),
    StructField("time", StringType(), True),])

In [8]:
df = spark.read.csv("s3://anly502project/data/part-r-00000",sep = "\t",header=False,schema=schema)
df_2 = spark.read.csv("s3://anly502project/data/part-r-00001",sep = "\t",header=False,schema=schema)

In [9]:
#### Data Schema
df.printSchema()
df_2.printSchema()

root
 |-- article_type: string (nullable = true)
 |-- np1: string (nullable = true)
 |-- np2: string (nullable = true)
 |-- context: string (nullable = true)
 |-- source: string (nullable = true)
 |-- category: string (nullable = true)
 |-- location: string (nullable = true)
 |-- time: string (nullable = true)

root
 |-- article_type: string (nullable = true)
 |-- np1: string (nullable = true)
 |-- np2: string (nullable = true)
 |-- context: string (nullable = true)
 |-- source: string (nullable = true)
 |-- category: string (nullable = true)
 |-- location: string (nullable = true)
 |-- time: string (nullable = true)



In [10]:
import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 

In [11]:
unioned_df = unionAll([df, df_2])

In [12]:
#### show combined
unioned_df.show(10)

+------------+---------------+---+--------------------+-------------+--------------------+--------------------+-----+
|article_type|            np1|np2|             context|       source|            category|            location| time|
+------------+---------------+---+--------------------+-------------+--------------------+--------------------+-----+
|     article|    Dark Knight|  E|  arg1 and Wall arg2|             |intlnews topstor ...|      , kerala india|14299|
|     article|    Carotenoids|  E|arg1 and caroteno...|             |topstor,health,sc...|                   ,|14660|
|     article|    Communities|  E|arg1 mobilised in...|             |    politics topstor|                   ,|14026|
|     article|    Carotenoids|  E|arg1 and caroteno...|             |topstor,health,sc...|                   ,|14660|
|            |     Coast bias|  E|arg2 is for East ...|             |      sports topstor| columbus, ohio u...|13956|
|     article|Commerce office|  E|arg1 at DDD Linco...| 

In [13]:
from pyspark.sql.functions import split
split_col = split(unioned_df['category'], ',')
unioned_df = unioned_df.withColumn('category', split_col.getItem(0))

In [14]:
split_col_2 = split(unioned_df['category'], ' ')
unioned_df = unioned_df.withColumn('category', split_col_2.getItem(0))

In [15]:
split_col_3 = split(unioned_df['category'], '_')
unioned_df = unioned_df.withColumn('category', split_col_3.getItem(0))

In [16]:
split_col_4 = split(unioned_df['category'], '-')
unioned_df = unioned_df.withColumn('category', split_col_4.getItem(0))

In [17]:
#### Show data frame after filtering the category
unioned_df.show(10)

+------------+---------------+---+--------------------+-------------+---------+--------------------+-----+
|article_type|            np1|np2|             context|       source| category|            location| time|
+------------+---------------+---+--------------------+-------------+---------+--------------------+-----+
|     article|    Dark Knight|  E|  arg1 and Wall arg2|             | intlnews|      , kerala india|14299|
|     article|    Carotenoids|  E|arg1 and caroteno...|             |  topstor|                   ,|14660|
|     article|    Communities|  E|arg1 mobilised in...|             | politics|                   ,|14026|
|     article|    Carotenoids|  E|arg1 and caroteno...|             |  topstor|                   ,|14660|
|            |     Coast bias|  E|arg2 is for East ...|             |   sports| columbus, ohio u...|13956|
|     article|Commerce office|  E|arg1 at DDD Linco...|             |  topstor| canton, ohio uni...|14363|
|     article| 75-minute mark|  E|arg

In [18]:
#### Filter out the null and empty category 
unioned_df = unioned_df.filter(unioned_df.category.isNotNull())
unioned_df = unioned_df.filter(unioned_df.category != '')

In [20]:
import pyspark.sql as sql
count_df = unioned_df.groupBy("category").count()
count_df.createOrReplaceTempView("count_df")
count_rank_df = spark.sql("SELECT category, count FROM count_df ORDER BY count DESC LIMIT 15")

In [21]:
count_rank_df.show(10)

+-------------+--------+
|     category|   count|
+-------------+--------+
|      topstor|85027590|
|    localnews|40174779|
|       sports|22925153|
|     business|20017107|
| nationalnews| 5073191|
|     intlnews| 4252106|
|    technolog| 2546612|
|entertainment| 2359067|
|     politics| 2092784|
|     lifestle| 1785076|
+-------------+--------+
only showing top 10 rows



In [22]:
unioned_df.createOrReplaceTempView("unioned_df")
count_rank_df.createOrReplaceTempView("count_rank_df")
df_final = spark.sql("SELECT * FROM unioned_df WHERE unioned_df.category IN (SELECT category FROM count_rank_df)")

In [23]:
df_final.show(10)

+------------+---------------+---+--------------------+-------------+---------+--------------------+-----+
|article_type|            np1|np2|             context|       source| category|            location| time|
+------------+---------------+---+--------------------+-------------+---------+--------------------+-----+
|     article|    Dark Knight|  E|  arg1 and Wall arg2|             | intlnews|      , kerala india|14299|
|     article|    Carotenoids|  E|arg1 and caroteno...|             |  topstor|                   ,|14660|
|     article|    Communities|  E|arg1 mobilised in...|             | politics|                   ,|14026|
|     article|    Carotenoids|  E|arg1 and caroteno...|             |  topstor|                   ,|14660|
|            |     Coast bias|  E|arg2 is for East ...|             |   sports| columbus, ohio u...|13956|
|     article|Commerce office|  E|arg1 at DDD Linco...|             |  topstor| canton, ohio uni...|14363|
|     article| 75-minute mark|  E|arg

In [None]:
df_pip = pipeline.fit(df_final).transform(df_final)

In [26]:
from pyspark.sql.functions import explode, col

context_words = df_pip.withColumn("exploded_text", explode(col("finished_clean_lemma")))

In [27]:
context_words.columns

['article_type',
 'np1',
 'np2',
 'context',
 'source',
 'category',
 'location',
 'time',
 'document',
 'token',
 'normalized',
 'lemma',
 'clean_lemma',
 'finished_clean_lemma',
 'exploded_text']

In [60]:
counts = context_words.groupby('exploded_text').count()

In [61]:
counts_pd = counts.toPandas()

KeyboardInterrupt: 

In [29]:
counts_pd

In [None]:
{counts_pd.loc[i, 'exploded_text']: counts_pd.loc[i, 'count'] for i in range(counts_pd.shape[0])}

In [30]:
df_pip.select('finished_clean_lemma').show(20, False)

+-------------------------------------+
|finished_clean_lemma                 |
+-------------------------------------+
|[arg, wall, arg]                     |
|[arg, carotenoid, plus, vitamin, arg]|
|[arg, mobilise, action, arg]         |
|[arg, carotenoid, plus, vitamin, arg]|
|[arg, east, arg]                     |
|[arg, ddd, lincoln, way, arg]        |
|[arg, pulsate, group, arg]           |
|[arg, past, arg]                     |
|[arg, train, arg]                    |
|[arg, line, service, arg]            |
|[arg, mindstrong, rate, arg]         |
|[arg, line, stop, run, past, arg]    |
|[arg, line, stop, run, past, arg]    |
|[arg, line, beyond, arg]             |
|[arg, line, beyond, arg]             |
|[arg, cape, cold, snap, kill, arg]   |
|[arg, rate, arg]                     |
|[arg, arg]                           |
|[arg, street, band, since, arg]      |
|[arg, arg]                           |
+-------------------------------------+
only showing top 20 rows



In [31]:
df_pip.select('np1').show(20,False)

+---------------+
|np1            |
+---------------+
|Dark Knight    |
|Carotenoids    |
|Communities    |
|Carotenoids    |
|Coast bias     |
|Commerce office|
|75-minute mark |
|Brigham Circle |
|Brigham Circle |
|Brigham Circle |
|Drill Sergeant |
|Brigham Circle |
|Brigham Circle |
|Brigham Circle |
|Brigham Circle |
|Cold           |
|Cook           |
|Boob           |
|2002           |
|Boob           |
+---------------+
only showing top 20 rows



In [32]:
cols = ['np1', 'finished_clean_lemma', 'category']    

In [33]:
df_1_new = df_pip.withColumn("np1",col('np1')).select(cols)
df_2_new = df_pip.withColumn("finished_clean_lemma", col('finished_clean_lemma')).select(cols)
result = df_1_new.union(df_2_new)

In [34]:
result.show()

+---------------+--------------------+-------------+
|            np1|finished_clean_lemma|     category|
+---------------+--------------------+-------------+
|    Dark Knight|    [arg, wall, arg]|     intlnews|
|    Carotenoids|[arg, carotenoid,...|      topstor|
|    Communities|[arg, mobilise, a...|     politics|
|    Carotenoids|[arg, carotenoid,...|      topstor|
|     Coast bias|    [arg, east, arg]|       sports|
|Commerce office|[arg, ddd, lincol...|      topstor|
| 75-minute mark|[arg, pulsate, gr...|      topstor|
| Brigham Circle|    [arg, past, arg]|      topstor|
| Brigham Circle|   [arg, train, arg]|    localnews|
| Brigham Circle|[arg, line, servi...|      topstor|
| Drill Sergeant|[arg, mindstrong,...|      topstor|
| Brigham Circle|[arg, line, stop,...|      topstor|
| Brigham Circle|[arg, line, stop,...|      topstor|
| Brigham Circle|[arg, line, beyon...|      topstor|
| Brigham Circle|[arg, line, beyon...|      topstor|
|           Cold|[arg, cape, cold,...| nationa

In [35]:
a = result.select(split(col("np1"),",")).alias("np1_1")

In [36]:
df_new = result.withColumn("finished_clean_lemma",col('finished_clean_lemma')).select(cols)

In [37]:
df_new = result.withColumn('np1_1',split(col("np1"),","))
df_new.show()

+---------------+--------------------+-------------+-----------------+
|            np1|finished_clean_lemma|     category|            np1_1|
+---------------+--------------------+-------------+-----------------+
|    Dark Knight|    [arg, wall, arg]|     intlnews|    [Dark Knight]|
|    Carotenoids|[arg, carotenoid,...|      topstor|    [Carotenoids]|
|    Communities|[arg, mobilise, a...|     politics|    [Communities]|
|    Carotenoids|[arg, carotenoid,...|      topstor|    [Carotenoids]|
|     Coast bias|    [arg, east, arg]|       sports|     [Coast bias]|
|Commerce office|[arg, ddd, lincol...|      topstor|[Commerce office]|
| 75-minute mark|[arg, pulsate, gr...|      topstor| [75-minute mark]|
| Brigham Circle|    [arg, past, arg]|      topstor| [Brigham Circle]|
| Brigham Circle|   [arg, train, arg]|    localnews| [Brigham Circle]|
| Brigham Circle|[arg, line, servi...|      topstor| [Brigham Circle]|
| Drill Sergeant|[arg, mindstrong,...|      topstor| [Drill Sergeant]|
| Brig

In [38]:
from pyspark.sql.functions import lit, array, array_union

df2 = df_new.withColumn("finished_clean_lemma", array_union("finished_clean_lemma", col('np1_1')))
df2.show()

+---------------+--------------------+-------------+-----------------+
|            np1|finished_clean_lemma|     category|            np1_1|
+---------------+--------------------+-------------+-----------------+
|    Dark Knight|[arg, wall, Dark ...|     intlnews|    [Dark Knight]|
|    Carotenoids|[arg, carotenoid,...|      topstor|    [Carotenoids]|
|    Communities|[arg, mobilise, a...|     politics|    [Communities]|
|    Carotenoids|[arg, carotenoid,...|      topstor|    [Carotenoids]|
|     Coast bias|[arg, east, Coast...|       sports|     [Coast bias]|
|Commerce office|[arg, ddd, lincol...|      topstor|[Commerce office]|
| 75-minute mark|[arg, pulsate, gr...|      topstor| [75-minute mark]|
| Brigham Circle|[arg, past, Brigh...|      topstor| [Brigham Circle]|
| Brigham Circle|[arg, train, Brig...|    localnews| [Brigham Circle]|
| Brigham Circle|[arg, line, servi...|      topstor| [Brigham Circle]|
| Drill Sergeant|[arg, mindstrong,...|      topstor| [Drill Sergeant]|
| Brig

In [39]:
df2.select('finished_clean_lemma').show()

+--------------------+
|finished_clean_lemma|
+--------------------+
|[arg, wall, Dark ...|
|[arg, carotenoid,...|
|[arg, mobilise, a...|
|[arg, carotenoid,...|
|[arg, east, Coast...|
|[arg, ddd, lincol...|
|[arg, pulsate, gr...|
|[arg, past, Brigh...|
|[arg, train, Brig...|
|[arg, line, servi...|
|[arg, mindstrong,...|
|[arg, line, stop,...|
|[arg, line, stop,...|
|[arg, line, beyon...|
|[arg, line, beyon...|
|[arg, cape, cold,...|
|   [arg, rate, Cook]|
|         [arg, Boob]|
|[arg, street, ban...|
|         [arg, Boob]|
+--------------------+
only showing top 20 rows



In [43]:
df2.groupBy("category").count().show()

+-------------+---------+
|     category|    count|
+-------------+---------+
| regionalnews|  2561474|
|      topstor|170055180|
|     lifestle|  3570152|
|           gf|  1117896|
|     politics|  4185568|
|      science|  1066868|
|       health|  1960134|
|    technolog|  5093224|
|       sports| 45850306|
|        world|  1411316|
|entertainment|  4718134|
| nationalnews| 10146382|
|     business| 40034214|
|    localnews| 80349558|
|     intlnews|  8504212|
+-------------+---------+



In [44]:
df2.select("category").distinct().show()

+-------------+
|     category|
+-------------+
| regionalnews|
|      topstor|
|     lifestle|
|           gf|
|     politics|
|      science|
|       health|
|    technolog|
|       sports|
|        world|
|entertainment|
| nationalnews|
|     business|
|    localnews|
|     intlnews|
+-------------+



In [47]:
category = df2.select("category").distinct()

In [84]:
categories = [
 'politics',
 'science',
 'health',
 'technolog',
 'business',
 'sports',]

In [85]:
categories

['politics', 'science', 'health', 'technolog', 'business', 'sports']

In [86]:
documentAssembler = DocumentAssembler() \
    .setInputCol('finished_clean_lemma') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normalized') \
    .setLowercase(True)

# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemma') \

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(['lemma']) \
    .setOutputCol('clean_lemma') \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

# finisher converts tokens to human-readable output
finisher = Finisher() \
    .setInputCols(['clean_lemma']) \
    .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [87]:

pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        tokenizer,
        normalizer,
        lemmatizer,
        stopwords_cleaner,
        finisher
    ])

In [88]:
category_df = df2.filter((df2['category'] == category))

In [89]:
data = category_df.select('finished_clean_lemma')

In [90]:
data = data.withColumn('finished_clean_lemma', concat_ws(','))

In [None]:
from pyspark.sql.functions import explode, col

# initialize {category: {word counts}} dictionary
category_word_counts_dict = {category: {} for category in categories}


for category in categories:
    print(category)
                   
    # get complaint narratives
    category_df = df2.filter((df2['category'] == category))
    data = category_df.select('finished_clean_lemma')
    data = data.withColumn('finished_clean_lemma', concat_ws(','))
    
    # process narratives into counts dictionary
    clean_data = pipeline.fit(data).transform(data)
    clean_data_exploded = clean_data.withColumn("exploded_text", explode(col("finished_clean_lemma")))
    counts = clean_data_exploded.groupby('exploded_text').count().toPandas()
    counts_dict = {counts.loc[i, 'exploded_text']: counts.loc[i, 'count'] for i in range(counts.shape[0])}
    
    # add counts to dictionary
    category_word_counts_dict[category] = counts_dict

politics


In [None]:
def term_frequency(BoW_dict):
    tot_words = sum(BoW_dict.values())
    freq_dict = {word: BoW_dict[word]/tot_words for word in BoW_dict.keys()}
    return freq_dict

In [None]:
from math import log

def inverse_document_frequency(list_of_dicts):
    tot_docs = len(list_of_dicts)
    words = set([w for w_dict in list_of_dicts for w in w_dict.keys()])
    idf_dict = {word: log(float(tot_docs)/(1.0+ sum([1 for w_dict in list_of_dicts if word in w_dict.keys()]))) for word in words}
    return idf_dict

In [None]:
def tf_idf(list_of_dicts):
    words = set([w for w_dict in list_of_dicts for w in w_dict.keys()])
    tf_idf_dicts = []
    idfs = inverse_document_frequency(list_of_dicts)
    for w_dict in list_of_dicts:
        w_dict.update({word: 0 for word in words if word not in w_dict.keys()})
        tf = term_frequency(w_dict)
        tf_idf_dicts.append({word: tf[word]*idfs[word] for word in words})
    return tf_idf_dicts

In [40]:
list_of_word_dicts = [category_word_counts_dict[category] for category in categories]

In [None]:
tf_idf_by_category_list = tf_idf(list_of_word_dicts)

In [None]:
tf_idf_by_category_dict = {c: tf_dict for c, tf_dict in zip(categories, tf_idf_by_category_list)}

In [111]:
sc.stop()

In [26]:
SparkSession._instantiatedContext = None