In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import re
import json
from pyspark.sql import functions as F
from pyspark.sql.types import StringType,StructField,StructType,ArrayType,MapType,LongType,BooleanType,Row
from google.colab import drive

In [None]:
spark=SparkSession.builder.appName('FinQA_preprocessing').getOrCreate()
spark

In [None]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# variables
train_path='/content/drive/MyDrive/FinQA/train1.json'
test_path='/content/drive/MyDrive/FinQA/test1.json'

In [None]:
df = spark.read.option("multiline", "true").option("mode", "PERMISSIVE").json(train_path)

In [None]:
# df.show(5,truncate=False)
df.count()

2201

In [None]:
df.printSchema()
original_schema = df.schema

root
 |-- paragraphs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- order: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- uid: string (nullable = true)
 |-- questions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- answer: string (nullable = true)
 |    |    |-- answer_from: string (nullable = true)
 |    |    |-- answer_type: string (nullable = true)
 |    |    |-- derivation: string (nullable = true)
 |    |    |-- order: long (nullable = true)
 |    |    |-- question: string (nullable = true)
 |    |    |-- rel_paragraphs: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- req_comparison: boolean (nullable = true)
 |    |    |-- scale: string (nullable = true)
 |    |    |-- uid: string (nullable = true)
 |-- table: struct (nullable = true)
 |    |-- table: array (nullable = true)
 |    |    |-- element: array (containsNu

In [None]:
questions_df = df.withColumn("question", F.explode("questions"))

In [None]:
questions_df.count()

13213

In [None]:
questions_df.select("table").show(5,truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|table                                                                                                                                                                                                                                                |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{[[Weighted average actuarial assumptions used at 31 March1:, , , ], [, 2019, 2018, 2017], [Rate of inflation2, 2.9, 2.9, 3.0], [Rate of increase in salaries, 2.7, 2.7, 2.6], [Discount rate, 2.3, 2.5, 2.6]], e78f8b29-6085-43de-b32f-be1a68641be3}|
|{[[Weig

In [None]:
questions_df.show(10,truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
@F.udf(StringType())
def clean_broken_words(text, paragraphs):
    """
      Cleans broken words split by ellipses ('...') in a text using reference paragraphs.
      It removes ellipses, identifies broken words, and replaces them with complete versions from the paragraphs.

      Parameters:
      -----------
      text : str
          The input text with potential broken words.
      paragraphs : list of dict
          Reference paragraphs containing 'text' as a key for identifying complete words.

      Returns:
      --------
      str
          Cleaned text with broken words fixed; returns the original text if no broken words are found.

      Example:
      --------
      Input: text = "The answe…r lies in the para…graphs."
            paragraphs = [{"text": "The answer lies in the paragraphs."}]
      Output: "The answer lies in the paragraphs."
"""


     # Handle null or non-string values for 'text'
    if text is None or not isinstance(text, str):
        return text

    # Handle null or non-list values for 'paragraphs'
    if paragraphs is None or not isinstance(paragraphs, list):
        return text


    ellipsis_pattern = re.compile(r'\b\w*\…\w*\b')
    broken_words = ellipsis_pattern.findall(text)

    if broken_words:
        # Clean the text by removing the ellipses
        cleaned_text = re.sub(r'\…+', '', text)
        question_words = cleaned_text.split()

        # Split words from paragraphs
        for para in paragraphs:
            paragraph_words = para['text'].split()
            for i, word in enumerate(question_words):
                # If word was broken and is not in the paragraph, look for a match
                if word not in paragraph_words:
                    for para_word in paragraph_words:
                        # If part of the word is found in the paragraph word, replace it
                        if word in para_word:
                            question_words[i] = para_word

        # Join the cleaned words back into the sentence
        cleaned_text = ' '.join(question_words)
        return cleaned_text
    return text

In [None]:
# register the user defined function
spark.udf.register("clean_broken_words_udf", clean_broken_words)

<pyspark.sql.udf.UserDefinedFunction at 0x7bf31e013eb0>

In [None]:
df_cleaned = questions_df.withColumn(
    "question",  # Target the 'question' struct
    F.col("question").withField(  # Update the 'question' field within the struct
        "question",
        F.expr('clean_broken_words_udf(question.question,paragraphs)')
    )
)

In [None]:
df_clean_1=df_cleaned.drop("questions").withColumnRenamed("question","questions")

In [None]:
df_clean_1.show(5,truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------

In [None]:
df_clean_1.printSchema()

root
 |-- paragraphs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- order: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- uid: string (nullable = true)
 |-- table: struct (nullable = true)
 |    |-- table: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- uid: string (nullable = true)
 |-- questions: struct (nullable = true)
 |    |-- answer: string (nullable = true)
 |    |-- answer_from: string (nullable = true)
 |    |-- answer_type: string (nullable = true)
 |    |-- derivation: string (nullable = true)
 |    |-- order: long (nullable = true)
 |    |-- question: string (nullable = true)
 |    |-- rel_paragraphs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- req_comparison: boolean (nullable = true)
 |    |-- scale: string (nullable = true)
 |    |-- uid: string (nullable =

In [None]:
# Assuming original_schema has the correct schema for the 'questions' field (array of structs)
questions_schema = [field for field in original_schema.fields if field.name == "questions"][0].dataType.elementType

df_final_structured = df_clean_1.withColumn(
    "questions",
    F.array(F.struct(*[F.col("questions." + f.name).cast(f.dataType).alias(f.name) for f in questions_schema.fields]))
).select(*[F.col(field.name).cast(field.dataType) for field in original_schema.fields])



In [None]:
json_data = df_final_structured.select(F.to_json(F.struct(*df_final_structured.columns)).alias("json")).agg(F.collect_list("json")).first()[0]

# Write the single JSON string to a file
with open("cleaned_train_data.json", "w") as f:
    f.write("[" + ",".join(json_data) + "]")


In [None]:
# load cleaned json file here
cleaned_data = spark.read.option("multiline", "true").option("mode", "PERMISSIVE").json("cleaned_train_data.json")

In [None]:
# check for the size of the json file
cleaned_data.count()

13213

In [None]:
cleaned_data.printSchema()

root
 |-- paragraphs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- order: long (nullable = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- uid: string (nullable = true)
 |-- questions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- answer: string (nullable = true)
 |    |    |-- answer_from: string (nullable = true)
 |    |    |-- answer_type: string (nullable = true)
 |    |    |-- derivation: string (nullable = true)
 |    |    |-- order: long (nullable = true)
 |    |    |-- question: string (nullable = true)
 |    |    |-- rel_paragraphs: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- req_comparison: boolean (nullable = true)
 |    |    |-- scale: string (nullable = true)
 |    |    |-- uid: string (nullable = true)
 |-- table: struct (nullable = true)
 |    |-- table: array (nullable = true)
 |    |    |-- element: array (containsNu

In [None]:
@F.udf(StringType())
def remove_space(text_in):
    '''Remove extra spaces from the input text.'''
    return " ".join(text_in.split())  # Simplified to remove extra spaces

spark.udf.register("remove_space_udf", remove_space)

@F.udf(StringType())
def table_row_to_text(header, rows):
    '''
    Convert a table row to text using the provided header.
    Constructs descriptive sentences from header and row values.
    '''
    res = ""

    for row in rows:
        # Create a dictionary from row values assuming header as the keys
        row_dict = dict(zip(header, row))

        # Construct sentences from row dictionary
        for head, cell in row_dict.items():
            if cell:
                res += f"the {head} of {row_dict[header[0]]} is {cell} ; "

    res = F.expr("remove_space_udf(res)")
    return res.strip()



In [None]:
# register the user defined function
spark.udf.register("table_row_to_text_udf", table_row_to_text)

<pyspark.sql.udf.UserDefinedFunction at 0x7bf33dfe3910>

In [None]:
cleaned_data.show(5,truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# function to process each of the question
@F.udf(ArrayType(StructType([
    StructField("context", StringType(), True),
    StructField("question", StringType(), True),
    StructField("answer", StringType(), True),
])))
def process_questions(questions, context):
    result = []
    if questions:
        for item in questions:
            question_text = item.question if item and hasattr(item, 'question') else ""
            answer_list = item.answer if item and hasattr(item, 'answer') else []

            # handle answers as list or other formats
            if isinstance(answer_list, list):
                answers = " | ".join(answer_list)
            else:
                answers = answer_list  # handle cases where it's already a string

            result.append({
                'context': context,
                'question': question_text,
                'answer': answers
            })

    return result


In [None]:
@F.udf(StringType())
def process_table(table):
    res = ""
    if table and isinstance(table, list):
        table_data = table
        if len(table_data) > 1:  # check if there's at least a header and one row
            header = table_data[1]  # get the header (first row)
            rows = table_data[2:]  # get rows (from second row onwards)

            for row in rows:
                # create a dictionary from row values assuming header as the keys
                row_dict = dict(zip(header, row))
                row_header = row_dict.get(header[0], "")  # first column as the identifier
                for head, cell in row_dict.items():
                    if head != header[0] and cell:  # skip the first header to avoid repetition
                        res += f"the {head} of {row_header} is {cell} ; "

            return res.strip()

    return ""



In [None]:
# register user defined functions
spark.udf.register("process_table_udf", process_table)
spark.udf.register("process_questions_udf", process_questions)

<pyspark.sql.udf.UserDefinedFunction at 0x7bf31e010250>

In [None]:
cleaned_data_1 = cleaned_data.withColumn(
    "context_paragraphs",
    F.expr("concat_ws(' ', transform(paragraphs, x -> x.text))")
)

In [None]:
# cleaned_data_1.show(5,truncate=False)

In [None]:
cleaned_data_1 = cleaned_data_1.withColumn(
    "table_data",F.col("table.table")
)

In [None]:
cleaned_data_1 = cleaned_data_1.withColumn(
    "context_table",
    F.expr("process_table_udf(table_data)")
)

In [None]:
# cleaned_data_1.show(truncate=False)

In [None]:
cleaned_data_1 = cleaned_data_1.withColumn(
    "context",
    F.concat_ws(" ", F.col("context_paragraphs"), F.col("context_table"))
)

In [None]:
# Process questions and combine with context
cleaned_data_1 = cleaned_data_1.withColumn("data", F.expr('process_questions_udf(questions,context)'))

In [None]:
# cleaned_data_1.show(5,truncate=False)

In [None]:
# Exploding the resulting array to flatten it
df_exploded = cleaned_data_1.select(F.explode(F.col("data")).alias("entry"))

# Extract context, question, and answer from the exploded data
final_df = df_exploded.select(
    F.col("entry.context").alias("context"),
    F.col("entry.question").alias("question"),
    F.col("entry.answer").alias("answer")
)

In [None]:
# Show the result
final_df.show(5,truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------+--------------------------------------------------------------------+
|context        

In [None]:
data = final_df.select("context", "question", "answer").toPandas().to_dict(orient="records")

# Define the output path for the JSON file
output_path = "/content/drive/MyDrive/FinQA/clean.json"

# Write the JSON data to the file
with open(output_path, "w") as json_file:
    json.dump(data, json_file, indent=4)