In [1]:
import numpy as np
import pandas as pd
import pyspark
import urllib

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *

In [2]:
spark = SparkSession.builder.appName('example').getOrCreate()

print('PySpark Version :' + spark.version)
print('PySpark Version :' + spark.sparkContext.version)

PySpark Version :3.3.0
PySpark Version :3.3.0


In [23]:
import urllib
URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/20newsgroups-mld/20_newsgroups.tar.gz"
urllib.request.urlretrieve(URL, './20_newsgroups.tar.gz')

('./20_newsgroups.tar.gz', <http.client.HTTPMessage at 0x408342fb80>)

In [10]:
# binaryFiles reads all files in the given path - returning [(filenames, bytes) ...]

spark.sparkContext.binaryFiles("./").map(lambda x: (x[0], len(x[1]))).collect()

[('file:/home/jovyan/work/Iris_example.ipynb', 11758),
 ('file:/home/jovyan/work/Untitled.ipynb', 3531),
 ('file:/home/jovyan/work/iris.csv', 4551),
 ('file:/home/jovyan/work/test.txt', 14)]

In [79]:
import tarfile
from io import BytesIO

def extract_files(data):
    filename, bytes = data
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    for x in tar:
        if not x.isfile():
            continue
        yield (x.name, tar.extractfile(x).read())

data = (spark.sparkContext.binaryFiles("./20_newsgroups.tar.gz")
        .flatMap(extract_files)
        .mapValues(lambda x: x.decode("latin-1")))

In [81]:
data.map(lambda x: (x[0], len(x[1]))).take(10)

[('20_newsgroups/alt.atheism/53366', 1926),
 ('20_newsgroups/alt.atheism/53367', 2456),
 ('20_newsgroups/alt.atheism/51247', 2144),
 ('20_newsgroups/alt.atheism/51248', 929),
 ('20_newsgroups/alt.atheism/51249', 1976),
 ('20_newsgroups/alt.atheism/51250', 3325),
 ('20_newsgroups/alt.atheism/51251', 1421),
 ('20_newsgroups/alt.atheism/51252', 2310),
 ('20_newsgroups/alt.atheism/51253', 3664),
 ('20_newsgroups/alt.atheism/51254', 3392)]

In [82]:
# Decent way of seeing the lineage structure of RDDs
print(data.toDebugString().decode('utf8'))

(1) PythonRDD[73] at RDD at PythonRDD.scala:53 []
 |  ./20_newsgroups.tar.gz BinaryFileRDD[71] at binaryFiles at <unknown>:0 []


In [83]:
# Convert RDD to dataframe

df = data.toDF(['filename', 'bytes'])

In [84]:
df.show(10)

+--------------------+--------------------+
|            filename|               bytes|
+--------------------+--------------------+
|20_newsgroups/alt...|Path: cantaloupe....|
|20_newsgroups/alt...|Xref: cantaloupe....|
|20_newsgroups/alt...|Newsgroups: alt.a...|
|20_newsgroups/alt...|Xref: cantaloupe....|
|20_newsgroups/alt...|Path: cantaloupe....|
|20_newsgroups/alt...|Newsgroups: alt.a...|
|20_newsgroups/alt...|Newsgroups: alt.a...|
|20_newsgroups/alt...|Xref: cantaloupe....|
|20_newsgroups/alt...|Newsgroups: alt.a...|
|20_newsgroups/alt...|Path: cantaloupe....|
+--------------------+--------------------+
only showing top 10 rows



# Spark ML on text

## Feature Transformation

### Tokenization

In [94]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol='bytes', outputCol='words')

tokenizer.transform(df).head()

Row(filename='20_newsgroups/alt.atheism/53366', bytes='Path: cantaloupe.srv.cs.cmu.edu!das-news.harvard.edu!noc.near.net!howland.reston.ans.net!usenet.ins.cwru.edu!lerc.nasa.gov!usenet\nFrom: spbach@lerc.nasa.gov (James Felder)\nNewsgroups: alt.atheism\nSubject: Re: "So help you God" in court?\nDate: 16 Apr 1993 13:54:45 GMT\nOrganization: NASA Lewis Resaerch Center\nLines: 35\nDistribution: world\nMessage-ID: <1qmdr5$ang@eagle.lerc.nasa.gov>\nReferences: <93105.013423TAN102@psuvm.psu.edu>\nReply-To: spbach@lerc.nasa.gov\nNNTP-Posting-Host: hopper3.lerc.nasa.gov\n\nIn article 013423TAN102@psuvm.psu.edu, Andrew Newell <TAN102@psuvm.psu.edu> writes:\n->In article <1993Apr9.151914.1885@daffy.cs.wisc.edu>, mccullou@snake2.cs.wisc.edu\n->(Mark McCullough) says:\n->>\n->>In article <monack.733980580@helium> monack@helium.gas.uug.arizona.edu (david\n->>n->>monack) writes:\n->>>Another issue is that by having to request to not be required to\n->>>recite the "so help me God" part of the oath, a

### Word embeddings

In [None]:
from pyspark.ml.feature import Word2Vec

word2vec = Word2Vec(vectorSize=30, minCount=5, inputCol='bytes', outputCol='embedding')
model = word2vec.fit(df)

result = model.transform(df)

In [None]:
for row in result.take(5):
    text, vector = row
    print(f"Text:   {text[:100]}...")
    print(f"Vector: {vector}")

In [None]:
Row(text='a b c', words=['a', 'b', 'c'])

# Change a parameter.

tokenizer.setParams(outputCol="tokens").transform(df).head()
Row(text='a b c', tokens=['a', 'b', 'c'])

# Temporarily modify a parameter.

tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
Row(text='a b c', words=['a', 'b', 'c'])

tokenizer.transform(df).head()
Row(text='a b c', tokens=['a', 'b', 'c'])

# Must use keyword arguments to specify params.

tokenizer.setParams("text")
Traceback (most recent call last):
    ...
TypeError: Method setParams forces keyword arguments.

tokenizerPath = temp_path + "/tokenizer"

tokenizer.save(tokenizerPath)

loadedTokenizer = Tokenizer.load(tokenizerPath)

loadedTokenizer.transform(df).head().tokens == tokenizer.transform(df).head().tokens
True

# Spark-NLP

In [None]:
!pip install spark-nlp==4.1.0

In [None]:
!spark-shell --packages com.johnsnowlabs.nlp:spark-nlp-m1_2.12:4.1.0

In [None]:
!pyspark --packages com.johnsnowlabs.nlp:spark-nlp-m1_2.12:4.1.0

In [None]:
spark = (SparkSession.builder.appName('example')
         .config('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp-m1_2.12:4.1.0')
         .getOrCreate())

In [None]:
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

pipeline = PretrainedPipeline('recognize_entities_dl', 'en')

result = pipeline.annotate('President Biden represented Delaware for 36 years in the U.S. Senate before becoming the 47th Vice President of the United States.') 

print(result['ner'])
print(result['entities'])

In [None]:
pipeline = PretrainedPipeline('onto_recognize_entities_bert_tiny', 'en')

result = pipeline.annotate("Johnson first entered politics when elected in 2001 as a member of Parliament. He then served eight years as the mayor of London, from 2008 to 2016, before rejoining Parliament.")

print(result['ner'])
print(result['entities'])