# Exercise 2: Advanced Analytics NLP

In [1]:
!pip install spark-nlp==1.7.3

Collecting spark-nlp==1.7.3
[?25l  Downloading https://files.pythonhosted.org/packages/7e/c1/3ec550fbc22efdcac013a301f74d6c904ec545bef291b414be90d900d1d8/spark_nlp-1.7.3-py2.py3-none-any.whl (72.8MB)
[K    100% |████████████████████████████████| 72.8MB 509kB/s eta 0:00:01  1% |▌                               | 1.2MB 23.1MB/s eta 0:00:04    3% |█▎                              | 2.8MB 32.9MB/s eta 0:00:03    8% |██▋                             | 6.0MB 33.8MB/s eta 0:00:02    12% |████                            | 9.2MB 34.5MB/s eta 0:00:02    16% |█████▍                          | 12.3MB 33.4MB/s eta 0:00:02    19% |██████                          | 13.8MB 32.5MB/s eta 0:00:02    23% |███████▍                        | 16.9MB 31.1MB/s eta 0:00:02    29% |█████████▍                      | 21.3MB 32.2MB/s eta 0:00:02    31% |██████████                      | 22.7MB 29.5MB/s eta 0:00:02    35% |███████████▎                    | 25.7MB 31.1MB/s eta 0:00:02    37% |████████████              

In [2]:
import pandas as pd
pd.set_option('max_colwidth', 800)

# Create a spark context that includes a 3rd party jar for NLP

In [3]:
#jarPath = "spark-nlp-assembly-1.7.3.jar"

from pyspark.sql import SparkSession
# Todo
spark = SparkSession.builder \
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.8.2") \
    .getOrCreate()
spark

# Read multiple files in a dir as one Dataframe

In [4]:
dataPath = "./data/reddit/*.json"
df = spark.read.json(dataPath)
print(df.count())
df.printSchema()

100
root
 |-- data: struct (nullable = true)
 |    |-- approved_at_utc: string (nullable = true)
 |    |-- approved_by: string (nullable = true)
 |    |-- archived: boolean (nullable = true)
 |    |-- author: string (nullable = true)
 |    |-- author_flair_background_color: string (nullable = true)
 |    |-- author_flair_css_class: string (nullable = true)
 |    |-- author_flair_richtext: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- author_flair_template_id: string (nullable = true)
 |    |-- author_flair_text: string (nullable = true)
 |    |-- author_flair_text_color: string (nullable = true)
 |    |-- author_flair_type: string (nullable = true)
 |    |-- author_fullname: string (nullable = true)
 |    |-- author_patreon_flair: boolean (nullable = true)
 |    |-- banned_at_utc: string (nullable = true)
 |    |-- banned_by: string (nullable = true)
 |    |-- can_gild: boolean (nullable = true)
 |    |-- can_mod_post: boolean (nullable = true)


# Deal with Struct type to query subfields 

In [5]:
title = "data.title"
author = "data.author"

# Todo
dfAuthorTitle = df.select(title, author)
dfAuthorTitle.limit(5).toPandas()

Unnamed: 0,title,author
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.",jaykirsch
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,canuck_burger
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.",honolulu_oahu_mod
3,"Trump administration 'pushing Saudi nuclear deal' which could benefit company linked to Jared Kushner - Senior Trump administration officials pushed a project to share nuclear power technology with Saudi Arabia over the objections of ethics officials, according to a congressional report",madam1
4,"NASA Happily Reports the Earth is Greener, With More Trees Than 20 Years Ago–and It's Thanks to China, India",purplexxx


# Try to implement the equivalent of flatMap in dataframes

In [6]:
import pyspark.sql.functions as F

dfWordCount = df.select(F.explode(F.split(title, "\\s+")).alias("word")).groupBy("word").count().orderBy(F.desc("count"))
dfWordCount.limit(10).toPandas()

Unnamed: 0,word,count
0,to,58
1,the,46
2,of,42
3,in,41
4,a,25
5,for,20
6,and,19
7,from,12
8,on,11
9,over,10


In [10]:
# breaking down the steps: splitting by space
df.select(F.split(title, "\\s+")).limit(5).toPandas()

Unnamed: 0,"split(data.title, \s+)"
0,"[Microsoft, Corp, said, it, has, discovered, hacking, targeting, democratic, institutions,, think, tanks,, and, non-profit, organizations, in, Europe.]"
1,"[Deutsche, Bank, reportedly, planned, to, extend, the, dates, of, $340, million, in, loans, to, Trump, Organization, to, avoid, a, potential, nightmare, of, chasing, a, sitting, president, for, cash]"
2,"[Iranian, ""morality, police"", were, forced, to, fire, warning, shots, when, a, crowd, intervened, to, prevent, them, from, arresting, two, women, for, not, wearing, a, hijab., The, incident, occurred, in, Tehran's, northeastern, Narmak, neighbourhood, on, Friday, night,, and, ended, with, a, mob, tearing, the, door, off, a, police, vehicle.]"
3,"[Trump, administration, 'pushing, Saudi, nuclear, deal', which, could, benefit, company, linked, to, Jared, Kushner, -, Senior, Trump, administration, officials, pushed, a, project, to, share, nuclear, power, technology, with, Saudi, Arabia, over, the, objections, of, ethics, officials,, according, to, a, congressional, report]"
4,"[NASA, Happily, Reports, the, Earth, is, Greener,, With, More, Trees, Than, 20, Years, Ago–and, It's, Thanks, to, China,, India]"


In [11]:
# breaking down the steps: one row one word
df.select(F.explode(F.split(title, "\\s+"))).limit(5).toPandas()

Unnamed: 0,col
0,Microsoft
1,Corp
2,said
3,it
4,has


In [12]:
# breaking down the steps: rename columnn col as word
df.select(F.explode(F.split(title, "\\s+")).alias("word")).limit(5).toPandas()

Unnamed: 0,word
0,Microsoft
1,Corp
2,said
3,it
4,has


# Use an NLP libary to do Part-of-Speech Tagging

In [13]:
from com.johnsnowlabs.nlp.pretrained.pipeline.en import BasicPipeline as bp
dfAnnotated = bp.annotate(dfAuthorTitle, "title")
dfAnnotated.printSchema()

root
 |-- text: string (nullable = true)
 |-- author: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |-- normal: array (nullable = true)
 |    |-- element: struct (contains

## Deal with Map type to query subfields

In [14]:
dfPos = dfAnnotated.select("text", "pos.metadata", "pos.result")
dfPos.limit(5).toPandas()

Unnamed: 0,text,metadata,result
0,"Microsoft Corp said it has discovered hacking targeting democratic institutions, think tanks, and non-profit organizations in Europe.","[{'word': 'Microsoft'}, {'word': 'Corp'}, {'word': 'said'}, {'word': 'it'}, {'word': 'has'}, {'word': 'discovered'}, {'word': 'hacking'}, {'word': 'targeting'}, {'word': 'democratic'}, {'word': 'institutions'}, {'word': 'think'}, {'word': 'tanks'}, {'word': 'and'}, {'word': 'nonprofit'}, {'word': 'organizations'}, {'word': 'in'}, {'word': 'Europe'}]","[NNP, NNP, VBD, PRP, VBZ, VBN, VBG, VBG, JJ, NNS, VBP, NNS, CC, NN, NNS, IN, NNP]"
1,Deutsche Bank reportedly planned to extend the dates of $340 million in loans to Trump Organization to avoid a potential nightmare of chasing a sitting president for cash,"[{'word': 'Deutsche'}, {'word': 'Bank'}, {'word': 'reportedly'}, {'word': 'planned'}, {'word': 'to'}, {'word': 'extend'}, {'word': 'the'}, {'word': 'dates'}, {'word': 'of'}, {'word': 'million'}, {'word': 'in'}, {'word': 'loans'}, {'word': 'to'}, {'word': 'Trump'}, {'word': 'Organization'}, {'word': 'to'}, {'word': 'avoid'}, {'word': 'a'}, {'word': 'potential'}, {'word': 'nightmare'}, {'word': 'of'}, {'word': 'chasing'}, {'word': 'a'}, {'word': 'sitting'}, {'word': 'president'}, {'word': 'for'}, {'word': 'cash'}]","[NNP, NNP, RB, VBD, TO, VB, DT, NNS, IN, CD, IN, NNS, TO, NNP, NNP, TO, VB, DT, JJ, NN, IN, VBG, DT, VBG, NN, IN, NN]"
2,"Iranian ""morality police"" were forced to fire warning shots when a crowd intervened to prevent them from arresting two women for not wearing a hijab. The incident occurred in Tehran's northeastern Narmak neighbourhood on Friday night, and ended with a mob tearing the door off a police vehicle.","[{'word': 'Iranian'}, {'word': 'morality'}, {'word': 'police'}, {'word': 'were'}, {'word': 'forced'}, {'word': 'to'}, {'word': 'fire'}, {'word': 'warning'}, {'word': 'shots'}, {'word': 'when'}, {'word': 'a'}, {'word': 'crowd'}, {'word': 'intervened'}, {'word': 'to'}, {'word': 'prevent'}, {'word': 'them'}, {'word': 'from'}, {'word': 'arresting'}, {'word': 'two'}, {'word': 'women'}, {'word': 'for'}, {'word': 'not'}, {'word': 'wearing'}, {'word': 'a'}, {'word': 'hijab'}, {'word': 'The'}, {'word': 'incident'}, {'word': 'occurred'}, {'word': 'in'}, {'word': 'Tehran'}, {'word': 's'}, {'word': 'northeastern'}, {'word': 'Narmak'}, {'word': 'neighbourhood'}, {'word': 'on'}, {'word': 'Friday'}, {'word': 'night'}, {'word': 'and'}, {'word': 'ended'}, {'word': 'with'}, {'word': 'a'}, {'word': 'mob'...","[JJ, NN, NN, VBD, VBN, TO, VB, NN, NNS, WRB, DT, NN, VBD, TO, VB, PRP, IN, VBG, CD, NNS, IN, RB, VBG, DT, NN, DT, NN, VBD, IN, NNP, VBZ, JJ, NNP, NN, IN, NNP, NN, CC, VBD, IN, DT, NN, VBG, DT, NN, RP, DT, NN, NN]"
3,"Trump administration 'pushing Saudi nuclear deal' which could benefit company linked to Jared Kushner - Senior Trump administration officials pushed a project to share nuclear power technology with Saudi Arabia over the objections of ethics officials, according to a congressional report","[{'word': 'Trump'}, {'word': 'administration'}, {'word': 'pushing'}, {'word': 'Saudi'}, {'word': 'nuclear'}, {'word': 'deal'}, {'word': 'which'}, {'word': 'could'}, {'word': 'benefit'}, {'word': 'company'}, {'word': 'linked'}, {'word': 'to'}, {'word': 'Jared'}, {'word': 'Kushner'}, {'word': 'Senior'}, {'word': 'Trump'}, {'word': 'administration'}, {'word': 'officials'}, {'word': 'pushed'}, {'word': 'a'}, {'word': 'project'}, {'word': 'to'}, {'word': 'share'}, {'word': 'nuclear'}, {'word': 'power'}, {'word': 'technology'}, {'word': 'with'}, {'word': 'Saudi'}, {'word': 'Arabia'}, {'word': 'over'}, {'word': 'the'}, {'word': 'objections'}, {'word': 'of'}, {'word': 'ethics'}, {'word': 'officials'}, {'word': 'according'}, {'word': 'to'}, {'word': 'a'}, {'word': 'congressional'}, {'word': 're...","[NNP, NN, VBG, NNP, NN, NN, WDT, MD, VB, NN, VBN, TO, NNP, NNP, NNP, NNP, NN, NNS, VBD, DT, NN, TO, VB, JJ, NN, NN, IN, NNP, NNP, IN, DT, NNS, IN, NNS, NNS, VBG, TO, DT, JJ, NN]"
4,"NASA Happily Reports the Earth is Greener, With More Trees Than 20 Years Ago–and It's Thanks to China, India","[{'word': 'NASA'}, {'word': 'Happily'}, {'word': 'Reports'}, {'word': 'the'}, {'word': 'Earth'}, {'word': 'is'}, {'word': 'Greener'}, {'word': 'With'}, {'word': 'More'}, {'word': 'Trees'}, {'word': 'Than'}, {'word': 'Years'}, {'word': 'Agoand'}, {'word': 'It'}, {'word': 's'}, {'word': 'Thanks'}, {'word': 'to'}, {'word': 'China'}, {'word': 'India'}]","[NNP, NNP, NNS, DT, NNP, VBZ, NNP, IN, JJR, NNP, IN, NNS, NNP, PRP, VBZ, NNS, TO, NNP, NNP]"


In [15]:
dfPos= dfAnnotated.select(F.explode("pos").alias("pos"))
dfPos.printSchema()

root
 |-- pos: struct (nullable = true)
 |    |-- annotatorType: string (nullable = true)
 |    |-- begin: integer (nullable = false)
 |    |-- end: integer (nullable = false)
 |    |-- result: string (nullable = true)
 |    |-- metadata: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)



In [16]:
dfPos.limit(5).toPandas()

Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 15, 18, VBD, {'word': 'said'})"
3,"(pos, 20, 21, PRP, {'word': 'it'})"
4,"(pos, 23, 25, VBZ, {'word': 'has'})"


## Keep only proper nouns NNP or NNPS

In [18]:
nnpFilter = "pos.result = 'NNP' or pos.result = 'NNPS' "
dfNNP = dfPos.where(nnpFilter)
dfNNP.limit(5).toPandas()

Unnamed: 0,pos
0,"(pos, 0, 8, NNP, {'word': 'Microsoft'})"
1,"(pos, 10, 13, NNP, {'word': 'Corp'})"
2,"(pos, 126, 131, NNP, {'word': 'Europe'})"
3,"(pos, 0, 7, NNP, {'word': 'Deutsche'})"
4,"(pos, 9, 12, NNP, {'word': 'Bank'})"


## Extract columns form a map in a col

In [19]:
dfWordTag = dfNNP.selectExpr("pos.metadata['word'] as word", "pos.result as tag")
dfWordTag.limit(5).toPandas()

Unnamed: 0,word,tag
0,Microsoft,NNP
1,Corp,NNP
2,Europe,NNP
3,Deutsche,NNP
4,Bank,NNP


In [20]:
from pyspark.sql.functions import desc
# Todo
dfWordTag.groupBy("word").count().orderBy(desc("count")).show()

+--------+-----+
|    word|count|
+--------+-----+
|      US|   14|
|   Trump|    9|
|   Saudi|    8|
|   Putin|    7|
|  Russia|    6|
|  Europe|    5|
|  Arabia|    5|
|Catholic|    4|
|      UK|    4|
|Vladimir|    4|
|   China|    3|
| Germany|    3|
|    Pope|    3|
|   Egypt|    3|
|   South|    3|
|   House|    3|
|  Church|    3|
|National|    2|
|  Africa|    2|
|   India|    2|
+--------+-----+
only showing top 20 rows

