# Running Question-Answering with Spark NLP

## This notebook will show you how to use the Question Answering annotator in Spark NLP. 

We will use the finetuned model from [my repo](https://huggingface.co/ngnigel99/RoBERTa_QA-FineTuned/tree/main) to run the question answering model.



## Setting up spark environment/ context

In [1]:
import sparknlp
# start a spark session with 16GB of RAM, and 16 cores, and the Spark NLP library
spark = sparknlp.start(gpu=True,
                       memory="16G",
                       log_folder="logs")

spark.sparkContext.setLogLevel("OFF")

your 131072x1 screen size is bogus. expect trouble


23/11/11 19:13:46 WARN Utils: Your hostname, NIGEL-DESKTOP resolves to a loopback address: 127.0.1.1; using 172.27.196.66 instead (on interface eth0)
23/11/11 19:13:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/nigel/miniconda3/envs/sparknlp/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nigel/.ivy2/cache
The jars for the packages stored in: /home/nigel/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp-gpu_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0e07e262-6422-4ce3-a24f-acd43c99e1b9;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp-gpu_2.12;5.1.4 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.20.1 in central
	found com.google.guava#guava;31.1-jre in central
	found com.google.guava#

23/11/11 19:13:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Setting up the spark NLP pipeline

In [2]:
from sparknlp import *
from sparknlp.annotator import *
from sparknlp.base import *

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

MODEL_NAME = "ngnigel99/RoBERTa_QA-FineTuned"

Document_Assembler = MultiDocumentAssembler()\
     .setInputCols(["question", "context"])\
     .setOutputCols(["document_question", "document_context"])

Question_Answering = RoBertaForQuestionAnswering.load("./{}_spark_nlp".format(MODEL_NAME))\
     .setInputCols(["document_question", "document_context"])\
     .setOutputCol("answer")\
     .setCaseSensitive(True)

pipeline = Pipeline(stages=[Document_Assembler, Question_Answering])

data = spark.createDataFrame([["What's my name?","My name is Clara and I live in Berkeley."]]).toDF("question", "context")

result = pipeline.fit(data).transform(data)

# Show the result
result.select("answer.result").show(truncate=False)

2023-11-11 19:14:12.846647: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-11 19:14:13.019247: I external/org_tensorflow/tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:925] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-11-11 19:14:13.042996: I external/org_tensorflow/tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:925] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-11-11 19:14:13.043275: I external/org_tensorflow/tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:925] could not open f

+-------+
|result |
+-------+
|[Clara]|
+-------+



2023-11-11 19:14:20.678736: I external/org_tensorflow/tensorflow/stream_executor/cuda/cuda_blas.cc:1774] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.
                                                                                

## Running a single test on question, context pair

In [4]:
# Define a UDF to convert the result list to a string
def list_to_string(list):
    return ''.join(x for x in list if x)

list_to_string_udf = udf(list_to_string, StringType())

def ask(question, context):
    data = spark.createDataFrame([[question, context]]).toDF("question", "context")
    result = pipeline.fit(data).transform(data)
    result = result.withColumn("answer", list_to_string_udf(col("answer.result")))  # we don't want a list
    return result.select("answer").show(truncate=False)

ask("What did i eat?","My name is Clara and I live in Berkeley. I had two sandwiches for lunch.")

+----------+
|answer    |
+----------+
|sandwiches|
+----------+



## Running multiple tests on (texts, questions) pairs

In [5]:
texts = [
    "The human brain is an incredibly complex organ that is responsible for \
     controlling all of our bodily functions, as well as our thoughts, \
     emotions, and behaviors. It is made up of approximately 100 billion \
     neurons.",

    "One of the key features of the human brain is its ability to change and \
     adapt in response to experiences, a process known as neuroplasticity. \
     This means that the brain can reorganize itself, forming new connections \
     between neurons and even generating new neurons in certain areas, \
     based on the demands placed upon it. Neuroplasticity is what allows us \
     to learn and develop new skills throughout our lives, and it is also \
     what enables us to recover from injuries and diseases that damage the \
     brain.",

    "Winston Churchill, one of the most iconic figures in modern history, \
     was born in the city of Oxford, England on November 30, 1874. \
     His birthplace was a grand home known as Blenheim Palace, which is \
     located in the village of Woodstock, just a few miles outside of Oxford.",\

    "France is a country located in Western Europe. Its capital is Paris."
           ]

question = [
    "How many neurons are there in the human brain?",
    "What is neuroplasticity?",
    "What is the birthplace of Winston Churchill?", 
    "What is the capital of France?"
          ]

In [6]:
data_bank = spark.createDataFrame(zip(question, texts)).toDF("question", "context")

results = pipeline.fit(data_bank).transform(data_bank)


In [7]:
results = results.withColumn("answer", list_to_string_udf(col("answer.result")))  # we don't want a list
results.select("answer").show(truncate=False)


+-------------------------------+
|answer                         |
+-------------------------------+
|100 billion                    |
|the brain can reorganize itself|
|Blenheim Palace                |
|Paris                          |
+-------------------------------+



## Loading the test data

In [8]:
# reading from dev-v1.1.json
import json
data = json.load(open("./dev-v1.1.json"))
data

{'data': [{'title': 'Super_Bowl_50',
   'paragraphs': [{'context': 'Super Bowl 50 was an American football game to determine the champion of the National Football League (NFL) for the 2015 season. The American Football Conference (AFC) champion Denver Broncos defeated the National Football Conference (NFC) champion Carolina Panthers 24–10 to earn their third Super Bowl title. The game was played on February 7, 2016, at Levi\'s Stadium in the San Francisco Bay Area at Santa Clara, California. As this was the 50th Super Bowl, the league emphasized the "golden anniversary" with various gold-themed initiatives, as well as temporarily suspending the tradition of naming each Super Bowl game with Roman numerals (under which the game would have been known as "Super Bowl L"), so that the logo could prominently feature the Arabic numerals 50.',
     'qas': [{'answers': [{'answer_start': 177, 'text': 'Denver Broncos'},
        {'answer_start': 177, 'text': 'Denver Broncos'},
        {'answer_star

## Running the question-answering model

In [9]:
# packaging the data into a dataframe
questions = []
contexts = []
ids = []

for data_item in data["data"]:
    for paragraph in data_item["paragraphs"]:
        for qa in paragraph["qas"]:
            questions.append(qa["question"])
            contexts.append(paragraph["context"])
            ids.append(qa["id"])

test_bank = spark.createDataFrame(zip(questions, contexts, ids)).toDF("question", "context", "id")


In [27]:
test_bank.count()

10570

In [26]:
# applying the pipeline to the test data
import time
s_time = time.time()

a = pipeline.fit(test_bank).transform(test_bank)
a.show(truncate=True)
print("Time taken: {} seconds".format(time.time() - s_time))

[Stage 30:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            question|             context|                  id|   document_question|    document_context|              answer|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Which NFL team re...|Super Bowl 50 was...|56be4db0acb800140...|[{document, 0, 51...|[{document, 0, 77...|[{chunk, 0, 13, D...|
|Which NFL team re...|Super Bowl 50 was...|56be4db0acb800140...|[{document, 0, 51...|[{document, 0, 77...|[{chunk, 0, 13, D...|
|Where did Super B...|Super Bowl 50 was...|56be4db0acb800140...|[{document, 0, 34...|[{document, 0, 77...|[{chunk, 0, 23, S...|
|Which NFL team wo...|Super Bowl 50 was...|56be4db0acb800140...|[{document, 0, 32...|[{document, 0, 77...|[{chunk, 0, 13, D...|
|What color was us...|Super Bowl 50 was...|56be4db0acb800140...|[{document, 0, 71...|[{document, 0, 77..

                                                                                

In [19]:
a.selectExpr("id", "answer.result as answer").show(truncate=False)

[Stage 27:>                                                         (0 + 1) / 1]

+------------------------+------------------------------+
|id                      |answer                        |
+------------------------+------------------------------+
|56be4db0acb8001400a502ec|[Denver Broncos]              |
|56be4db0acb8001400a502ed|[Denver Broncos]              |
|56be4db0acb8001400a502ee|[Santa Clara , California]    |
|56be4db0acb8001400a502ef|[Denver Broncos]              |
|56be4db0acb8001400a502f0|[gold]                        |
|56be8e613aeaaa14008c90d1|[golden anniversary]          |
|56be8e613aeaaa14008c90d2|[February 7 , 2016]           |
|56be8e613aeaaa14008c90d3|[American Football Conference]|
|56bea9923aeaaa14008c91b9|[golden anniversary]          |
|56bea9923aeaaa14008c91ba|[American Football Conference]|
|56bea9923aeaaa14008c91bb|[February 7 , 2016]           |
|56beace93aeaaa14008c91df|[Denver Broncos]              |
|56beace93aeaaa14008c91e0|[Levi 's Stadium]             |
|56beace93aeaaa14008c91e1|[Santa Clara]                 |
|56beace93aeaa

                                                                                

## Saving partitioned results to a .json folder

In [10]:
import time
s_time = time.time()

# write to json, the files in the folder would be named part-00000, part-00001 etc, representing the partitions of the dataframe being written in parallel, saving much time
result_test.select("id", "answer").write.json("./result")
print("Time taken: {} seconds".format(time.time() - s_time))



Time taken: 420.00258588790894 seconds


                                                                                

In [14]:
import json
import glob

# Get a list of all JSON files
json_files = glob.glob('./result/*.json')

combined_data = []

for file in json_files:
    # Skip the _SUCCESS file
    if file == '_SUCCESS':
        continue

    with open(file, 'r') as f:
        for line in f:
            data = json.loads(line)
            combined_data.append(data)

# convert preds to "id" : "result" format
preds_dict = {}
for pred in combined_data:
    # convert list result into a single string
    preds_dict[pred["id"]] = "".join(pred["answer"])

# save to combined.json
with open('combined.json', 'w') as fp:
    json.dump(preds_dict, fp)

## Running evaluation on test set

In [15]:
import os
os.system("python3 evaluate-v2.0.py dev-v1.1.json combined.json")

{
  "exact": 67.90917691579943,
  "f1": 76.6556256880333,
  "total": 10570,
  "HasAns_exact": 67.90917691579943,
  "HasAns_f1": 76.6556256880333,
  "HasAns_total": 10570
}


0

In [13]:
spark.stop()    #stop spark instance