In [None]:
# Install PySpark and Spark NLP
!pip install -q pyspark==3.3.0  spark-nlp==4.3.2

In [None]:
import sparknlp
from sparknlp.base import DocumentAssembler

# Let Spark NLP start the SparkSession
spark = sparknlp.start()

data = [
    (1, "I love working with SparkNLP."),
    (2, "Today is sunny.")
]

# Create a DataFrame
columns = ["id", "text"]
df = spark.createDataFrame(data, columns)

documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

result = documentAssembler.transform(df)

result.select("document").show(truncate=False)

+-----------------------------------------------------------------------+
|document                                                               |
+-----------------------------------------------------------------------+
|[{document, 0, 28, I love working with SparkNLP., {sentence -> 0}, []}]|
|[{document, 0, 14, Today is sunny., {sentence -> 0}, []}]              |
+-----------------------------------------------------------------------+



In [None]:
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("processed_text")

In [None]:

result = document_assembler.transform(df)

result.show(truncate=False)

+---+-----------------------------+-----------------------------------------------------------------------+
|id |text                         |processed_text                                                         |
+---+-----------------------------+-----------------------------------------------------------------------+
|1  |I love working with SparkNLP.|[{document, 0, 28, I love working with SparkNLP., {sentence -> 0}, []}]|
|2  |Today is sunny.              |[{document, 0, 14, Today is sunny., {sentence -> 0}, []}]              |
+---+-----------------------------+-----------------------------------------------------------------------+



In [None]:
from sparknlp.annotator import Tokenizer
from pyspark.ml import Pipeline

document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")

pipeline = Pipeline(stages=[document_assembler, tokenizer])
model = pipeline.fit(df)
result = model.transform(df)

result.select("token.result").show(truncate=False)


+-------------------------------------+
|result                               |
+-------------------------------------+
|[I, love, working, with, SparkNLP, .]|
|[Today, is, sunny, .]                |
+-------------------------------------+



Named Entity Recognition with BERT


In [None]:

example_df = spark.createDataFrame([["Microsoft founder Bill Gates plans to build a new factory in Germany."]]).toDF("text")

example_df = pipeline.fit(example_df).transform(example_df)

In [None]:
from sparknlp.annotator import Tokenizer, BertForTokenClassification
import pyspark.sql.functions as F
bert_tagger = BertForTokenClassification.pretrained("bert_base_token_classifier_conll03", "en") \
        .setInputCols(['document', 'token']) \
        .setOutputCol('ner')\
        .setMaxSentenceLength(512)\
        .setCaseSensitive(True)

bert_base_token_classifier_conll03 download started this may take some time.
Approximate size to download 385.4 MB
[OK!]


In [None]:
result = bert_tagger.transform(example_df)
result.show()

+--------------------+--------------------+--------------------+--------------------+
|                text|            document|               token|                 ner|
+--------------------+--------------------+--------------------+--------------------+
|Microsoft founder...|[{document, 0, 68...|[{token, 0, 8, Mi...|[{named_entity, 0...|
+--------------------+--------------------+--------------------+--------------------+



In [None]:
result.select(F.posexplode("token.result").alias("pos", "token"), "ner") \
    .select(F.col("token"), F.col("ner").getItem(F.col("pos")).alias("ner_label")) \
    .show(50, truncate=False)

+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|token    |ner_label                                                                                                                                                                                                                                                                    |
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Microsoft|{named_entity, 0, 8, B-ORG, {B-LOC -> 6.298694E-4, I-ORG -> 2.1694727E-4, I-MISC -> 1.0996349E-4, I-LOC -> 1.5734222E-5, I-PER -> 6.565089E-5, 

In [None]:
result.printSchema()  # Check the schema of the DataFrame
result.select("ner.result").show(truncate=False)  # Inspect the 'ner.result' column

root
 |-- text: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 

In [None]:
bert_tagger.extractParamMap()

{Param(parent='BERT_FOR_TOKEN_CLASSIFICATION_675a6a750b89', name='batchSize', doc='Size of every batch'): 8,
 Param(parent='BERT_FOR_TOKEN_CLASSIFICATION_675a6a750b89', name='engine', doc='Deep Learning engine used for this model'): 'tensorflow',
 Param(parent='BERT_FOR_TOKEN_CLASSIFICATION_675a6a750b89', name='lazyAnnotator', doc='Whether this AnnotatorModel acts as lazy in RecursivePipelines'): False,
 Param(parent='BERT_FOR_TOKEN_CLASSIFICATION_675a6a750b89', name='maxSentenceLength', doc='Max sentence length to process'): 512,
 Param(parent='BERT_FOR_TOKEN_CLASSIFICATION_675a6a750b89', name='caseSensitive', doc='whether to ignore case in tokens for embeddings matching'): True,
 Param(parent='BERT_FOR_TOKEN_CLASSIFICATION_675a6a750b89', name='inputCols', doc='previous annotations columns, if renamed'): ['document',
  'token'],
 Param(parent='BERT_FOR_TOKEN_CLASSIFICATION_675a6a750b89', name='outputCol', doc='output annotation column. can be left default.'): 'ner'}

In [None]:
from sparknlp.pretrained import PretrainedPipeline

# Load pre-trained NER pipeline
pipeline = PretrainedPipeline("recognize_entities_dl", lang="en")

# Sample text
text = "IBM, which has an office in Germany, is a leader in AI and NLP."

# Annotate the text
result = pipeline.annotate(text)

# Print the results
print(result['entities'])


recognize_entities_dl download started this may take some time.
Approx size to download 160.1 MB
[OK!]
['IBM', 'Germany', 'AI', 'NLP']
