### Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os

#Spark version and Spark NLP version should be align

In [2]:
from sparknlp.base import DocumentAssembler, Finisher, Pipeline
from sparknlp.annotator import Tokenizer, Normalizer, StopWordsCleaner, PerceptronModel,LemmatizerModel,BertEmbeddings,NerDLModel,WordEmbeddingsModel
#from sparknlp.pretrained import PretrainedPipeline
from pyspark.ml.feature import StopWordsRemover

### Spark Session Builder

In [3]:
spark = SparkSession.builder\
    .appName("nlp")\
    .master("local[11]")\
    .config("spark.driver.memory","10G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.4")\
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

22/06/16 16:09:10 WARN Utils: Your hostname, winware resolves to a loopback address: 127.0.1.1; using 172.30.234.12 instead (on interface eth0)
22/06/16 16:09:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/rjac/anaconda3/envs/py-sparknlp/lib/python3.10/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/rjac/.ivy2/cache
The jars for the packages stored in: /home/rjac/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8f989948-d6f9-422b-8eb0-7f2d27520bc3;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;3.4.4 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.5.3 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.603 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.code.findbugs#annotations;3.0.1 in central
	found net.jcip#jcip-annotations;1.0 in central
	found com.google.code.findbugs#jsr305;3.0.1 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1

### Data Extraction

In [4]:
path = os.getenv("DATA_PATH")
df = spark.read.text(path)

In [5]:
df.show(5,vertical=True,truncate=250)

-RECORD 0---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 value | walter extra is a german award-winning aerobatic pilot , chief aircraft designer and founder of extra flugzeugbau -lrb- extra aircraft construction -rrb- , a manufacturer of aerobatic aircraft . 
-RECORD 1---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 value | extra was trained as a mechanical engineer .                                                                                                                                                       
-RECORD 2---------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [6]:
#df = df.selectExpr('*','initCap(value) as cap')

In [7]:
exclude = ["i","he","him","his","himself","she","her","hers","herself",'they','them', 'their','theirs','themselves',"she's","he's"]
stopwords = [w for w in StopWordsRemover().getStopWords() if w not in exclude]

### Data Processing

In [8]:
document_assambler = DocumentAssembler().setInputCol("value").setOutputCol("document").setCleanupMode("shrink")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

In [9]:
#output1
#normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalized").setLowercase(True)
stopword_cleaner = StopWordsCleaner().setInputCols(["token"]).setOutputCol("cleanTokens").setCaseSensitive(False).setStopWords(stopwords)
normalizer = Normalizer().setInputCols(["cleanTokens"]).setOutputCol("normalized").setLowercase(True)
pos_tagger = PerceptronModel().pretrained("pos_ud_ewt", 'en').setInputCols(["document","normalized"]).setOutputCol("pos")
lemmatizer = LemmatizerModel.pretrained().setInputCols(["normalized"]).setOutputCol("lemma")

pos_ud_ewt download started this may take some time.
Approximate size to download 2.2 MB
[ | ]pos_ud_ewt download started this may take some time.
Approximate size to download 2.2 MB
[ / ]Download done! Loading the resource.




[ — ]

                                                                                

[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ / ]Download done! Loading the resource.
[OK!]


In [10]:
#output2
#pos_tagger = PerceptronModel().pretrained("pos_ud_ewt", 'en').setInputCols(["document","token"]).setOutputCol("pos")

In [11]:

#output3
#embeddings = WordEmbeddingsModel().pretrained().setInputCols("document", "cleanTokens").setOutputCol("embeddings")
#ner_model = NerDLModel.pretrained().setInputCols(["document", "cleanTokens", "embeddings"]).setOutputCol("ner")

In [12]:
#Result
finisher = Finisher().setInputCols(["lemma","pos"]).setOutputCols(["tokens","pos"]).setOutputAsArray(True).setCleanAnnotations(False)

In [13]:
nlp_pipeline = Pipeline(stages=[
    document_assambler
    ,tokenizer
    ,stopword_cleaner
    ,normalizer
    ,pos_tagger
    ,lemmatizer
    ,finisher
    ]
)

In [14]:
nlp_model = nlp_pipeline.fit(df)

In [15]:
processed_df  = nlp_model.transform(df)

In [16]:
processed_df.printSchema()

root
 |-- value: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)


In [17]:
tokens = processed_df.select("value","tokens","pos")

In [18]:
tokens.show(5,vertical=True,truncate=150)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------
 value  | walter extra is a german award-winning aerobatic pilot , chief aircraft designer and founder of extra flugzeugbau -lrb- extra aircraft construction... 
 tokens | [walter, extra, german, awardwinning, aerobatic, pilot, chief, aircraft, designer, founder, extra, flugzeugbau, lrb, extra, aircraft, construction,... 
 pos    | [NOUN, ADJ, ADJ, VERB, ADJ, NOUN, ADJ, NOUN, NOUN, NOUN, ADJ, NOUN, NOUN, ADJ, NOUN, NOUN, NOUN, NOUN, ADJ, NOUN]                                      
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------------------
 value  | extra was trained as a mechanical engineer .                                                                                                           
 tokens | [extra, train, mec

                                                                                

In [19]:
preprocess_ds = tokens.selectExpr('*',"size(filter(tokens, x -> x='he')) as male_pronoun")
preprocess_ds = preprocess_ds.selectExpr('*',"size(filter(tokens, x -> x='she')) as female_pronoun")
preprocess_ds = preprocess_ds.selectExpr('*', "filter(arrays_zip(tokens,pos), x -> x.pos in ('VERB','ADJ')).tokens as pos_tokens")
preprocess_ds = preprocess_ds.selectExpr('*','(male_pronoun-female_pronoun)/((male_pronoun+female_pronoun)+0.001) as gender')


In [20]:
preprocess_ds.show(5,vertical=True,truncate=150)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------
 value          | walter extra is a german award-winning aerobatic pilot , chief aircraft designer and founder of extra flugzeugbau -lrb- extra aircraft construction... 
 tokens         | [walter, extra, german, awardwinning, aerobatic, pilot, chief, aircraft, designer, founder, extra, flugzeugbau, lrb, extra, aircraft, construction,... 
 pos            | [NOUN, ADJ, ADJ, VERB, ADJ, NOUN, ADJ, NOUN, NOUN, NOUN, ADJ, NOUN, NOUN, ADJ, NOUN, NOUN, NOUN, NOUN, ADJ, NOUN]                                      
 male_pronoun   | 0                                                                                                                                                      
 female_pronoun | 0                                                                                                                                   

                                                                                

In [21]:
preprocess_ds = preprocess_ds.filter(F.abs(F.col("gender"))>0.25)

In [22]:
preprocess_ds.show(5,vertical=True,truncate=150)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------
 value          | he began his flight training in gliders , transitioning to powered aircraft to perform aerobatics .                                                    
 tokens         | [he, begin, he, flight, training, glider, transition, power, aircraft, perform, aerobatic]                                                             
 pos            | [PRON, VERB, PRON, NOUN, NOUN, NOUN, VERB, VERB, NOUN, NOUN, NOUN]                                                                                     
 male_pronoun   | 2                                                                                                                                                      
 female_pronoun | 0                                                                                                                                   

                                                                                

In [24]:
!rm -r data

In [25]:
preprocess_ds.select("value","pos_tokens","gender").write.parquet("data")

                                                                                