### Introduction
This notebook demonstrates a few useful methods for loading json file and for handling nested json objects. The example file is `test.json` in assignment 2. 

In [None]:
!pip install pyspark

!pip install pandas

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import Column
from pyspark.sql.functions import upper
from pyspark.sql.functions import split

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("COMP5349 A2 Data Loading Example") \
    .getOrCreate()

### Load Json file as data frame

In [None]:
test_data = "s3://comp5349-2022/test.json"
test_init_df = spark.read.json(test_data)

In [None]:
# The original file will be loaded into a data frame with one row and two columns
test_init_df.show(1)

+--------------------+--------+
|                data| version|
+--------------------+--------+
|[{[{Exhibit 10.16...|aok_v1.0|
+--------------------+--------+



### Check the schema of a data frame

`printSchema` is a useful method to display the schema of a data frame

In [None]:
test_init_df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- paragraphs: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- context: string (nullable = true)
 |    |    |    |    |-- qas: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |    |    |    |-- question: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |-- version: string (nullable = true)



In [None]:
test_init_df.count()

1

### `select` and `explode`

The [`select`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.select.html) method is used to select one or more columns for the source dataframe. 

The [`explode`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.explode.html) method is used to expand an array into multiple rows. The [`alias`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.alias.html) method is used to specify a name for column storing the array element.


In [None]:
from pyspark.sql.functions import explode
test_data_df= test_init_df.select((explode("data").alias('data')))

In [None]:
test_data_df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- paragraphs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- context: string (nullable = true)
 |    |    |    |-- qas: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |    |    |-- question: string (nullable = true)
 |    |-- title: string (nullable = true)



In [None]:
test_data_df.count()

102

In [None]:
test_paragraph_df = test_data_df.select(explode("data.paragraphs").alias("paragraph"))

In [None]:
test_paragraph_df.printSchema()

root
 |-- paragraph: struct (nullable = true)
 |    |-- context: string (nullable = true)
 |    |-- qas: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |-- question: string (nullable = true)



In [None]:
test_paragraph_df.count()

102

In [None]:
test_paragraph_df.take(1)

[Row(paragraph=Row(context='Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADI

In [None]:
# divide the questions from the whole paragraph of each context.
test_questions_df = test_paragraph_df.select(explode("paragraph.qas").alias("questions"))

In [None]:
# the count is 41 multiple number of the paragraph, because there are 41 questions for each context.
test_questions_df.count()

4182

In [None]:
test_questions_df.printSchema()

root
 |-- questions: struct (nullable = true)
 |    |-- answers: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- is_impossible: boolean (nullable = true)
 |    |-- question: string (nullable = true)



In [None]:
# change the Structures of the paragraphs
test_paragraph_df.printSchema()

root
 |-- paragraph: struct (nullable = true)
 |    |-- context: string (nullable = true)
 |    |-- qas: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |-- question: string (nullable = true)



In [None]:
# define the ave number of positive samples of each question

# define the functions

# used to mark the possible samples
def possible_counter(answer):
  return [answer[0],1]

def count_ave(value):
  result = []

  ave = int(value[1][0]/value[1][1])
  result = [value[0],ave]
  return result

# modify df structure
test_questions_counter_df = test_questions_df.filter("questions.is_impossible == False").select("questions.question","questions.answers.answer_start")
test_questions_counter_df.printSchema()

# count the number of possible samples of each question
test_possible_counter_rdd = test_questions_counter_df.rdd.map(list)
test_possible_counter_rdd = test_possible_counter_rdd.map(possible_counter)
test_possible_counter_rdd = test_possible_counter_rdd.reduceByKey(lambda a,b: a+b)

# count the number of answers of each question
test_answer_counter_df = test_questions_counter_df.select("question",explode("answer_start"))
test_answer_counter_rdd = test_answer_counter_df.rdd.map(list)
test_answer_counter_rdd = test_answer_counter_rdd.map(possible_counter)
test_answer_counter_rdd = test_answer_counter_rdd.reduceByKey(lambda a,b: a+b)

# calculate the ave possible samples of each question
test_ave_possible_rdd = test_answer_counter_rdd.join(test_possible_counter_rdd)
test_ave_possible_rdd = test_ave_possible_rdd.map(count_ave)

# transform the results into dict 
test_ave_possible_dict = test_ave_possible_rdd.collectAsMap()

test_ave_possible_dict

root
 |-- question: string (nullable = true)
 |-- answer_start: array (nullable = true)
 |    |-- element: long (containsNull = true)



{'Highlight the parts (if any) of this contract related to "Affiliate License-Licensee" that should be reviewed by a lawyer. Details: Does the contract contain a license grant to a licensee (incl. sublicensor) and the affiliates of such licensee/sublicensor?': 2,
 'Highlight the parts (if any) of this contract related to "Affiliate License-Licensor" that should be reviewed by a lawyer. Details: Does the contract contain a license grant by affiliates of the licensor or that includes intellectual property of affiliates of the licensor?\xa0': 3,
 'Highlight the parts (if any) of this contract related to "Agreement Date" that should be reviewed by a lawyer. Details: The date of the contract': 1,
 'Highlight the parts (if any) of this contract related to "Anti-Assignment" that should be reviewed by a lawyer. Details: Is consent or notice required of a party if the contract is assigned to a third party?': 1,
 'Highlight the parts (if any) of this contract related to "Audit Rights" that shoul

In [None]:
# define the functions

# define the start and the end for each question, both for possible and impossible questions

def define_context_answer(answer):
  result = []
  store = []
  if answer[2] == True:
    result.append([answer[4],[0,0],answer[3],0])

    result = (result)
  else:
    i = len(answer[0])
    for j in range(i):
      end = answer[0][j] + len(answer[1][j])
      store.append([answer[0][j],end])
    
    result.append([answer[4],store,answer[3],i])
 
  return result


test_context_answer_df = test_paragraph_df.select(explode("paragraph.qas").alias("questions"),"paragraph.context")
test_context_answer_df.printSchema()
print(test_context_answer_df.count())

test_context_answer_df = test_context_answer_df.select("questions.answers.answer_start","questions.answers.text","questions.is_impossible","questions.question","context")
test_context_answer_df.printSchema()
print(test_context_answer_df.count())

# transform the df into rdd
test_context_answer_rdd= test_context_answer_df.rdd.map(list)

# # transform the data structure in to [context, [s,e], questions, is_impossible]
test_context_answer_rdd = test_context_answer_rdd.flatMap(define_context_answer)
test_context_answer_rdd.take(5)

root
 |-- questions: struct (nullable = true)
 |    |-- answers: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- is_impossible: boolean (nullable = true)
 |    |-- question: string (nullable = true)
 |-- context: string (nullable = true)

4182
root
 |-- answer_start: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_impossible: boolean (nullable = true)
 |-- question: string (nullable = true)
 |-- context: string (nullable = true)

4182


[['Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (hereinafter r

In [None]:
# split the context into segments
def split_context_segment(input):
  context = input[0]
  segments_result = []
  result = []
  segment = []
  start = 0
  end = 4096
  while(end<len(context)):
    segment = context[start:end]
    segments_result.append([segment,start,end])
    start = start + 2048
    end = start + 4096
  if start != len(context):
    segment = context[start:]
    segments_result.append([segment,start,len(context)])
  result.append(segments_result)
  result.append(input[1])
  result.append(input[2])
  result.append(input[3])
  return result

# divide all the context into segments which length is 4096 and mark the s,e of each segment
test_segment_answer_rdd = test_context_answer_rdd.map(split_context_segment)
test_segment_answer_rdd.take(2)

[[[['Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (hereinafter

In [None]:
import random

# define the functions
def select_samples(input):
  result = []
  sample = []
  negative = 0

  # locate the impossible_nagetive samples for the imposiible questions
  if input[3] == 0:
    try:
      negative = test_ave_possible_dict[input[2]]
      list_negative = list(range(len(input[0])))
      if negative <= ((len(input[0])-1)/3+1):
        for i in range(negative):
          index = random.choice(list_negative)
          result.append([input[0][index][0],input[2],0,0])

          if (index + 1) not in list_negative:
            if (index - 1) not in list_negative:
              list_negative.remove(index)
            else:
              list_negative.remove(index)
              list_negative.remove(index -1)

          else:
            if (index - 1) not in list_negative:
              list_negative.remove(index)
              list_negative.remove(index +1)
            else:
              list_negative.remove(index)
              list_negative.remove(index -1)
              list_negative.remove(index +1)

      elif negative >= len(input[0]):
        for i in range(len(input[0])):
          result.append([input[0][i][0],input[2],0,0])

      else:
        for i in range(negative):
          index = random.choice(list_negative)
          list_negative.remove(index)
          result.append([input[0][index][0],input[2],0,0])

    except:
      negative = 0
    
  # locate the samples for the posiible questions
  else:
    negative = input[3]
    list_negative = list(range(len(input[0])))

    # locate the positive samples for each question
    for i in range(input[3]):

      # local the positive samples
      for j in range(len(input[0])):
        if input[1][i][0] in range(input[0][j][1],input[0][j][2]):
          if input[1][i][1] in range(input[0][j][1],input[0][j][2]):
            result.append([input[0][j][0],input[2],input[1][i][0]-input[0][j][1],input[1][i][1]-input[0][j][1]])
            list_negative[j] = -1

          else:
            result.append([input[0][j][0],input[2],input[1][i][0]-input[0][j][1],4096])
            list_negative[j] = -1

        else:
          if input[1][i][1] in range(input[0][j][1],input[0][j][2]):
            result.append([input[0][j][0],input[2],0,input[1][i][1]-input[0][j][1]])
            list_negative[j] = -1
          else:
            pass

    # locate the possible_nagetive samples for each question

    # locate the Possible_negative segments
    for i in range(len(list_negative)):
      try:
        list_negative.remove(-1)
      except:
        pass

    # locate the possible_nagetive samples
    if len(list_negative) > 0:
      if negative <= ((len(list_negative)-1)/3+1):
        for i in range(negative):
          index = random.choice(list_negative)
          result.append([input[0][index][0],input[2],0,0])

          if (index + 1) not in list_negative:
            if (index - 1) not in list_negative:
              list_negative.remove(index)
            else:
              list_negative.remove(index)
              list_negative.remove(index -1)
          else:
            if (index - 1) not in list_negative:
              list_negative.remove(index)
              list_negative.remove(index +1)
            else:
              list_negative.remove(index)
              list_negative.remove(index -1)
              list_negative.remove(index +1)

      elif negative >= len(list_negative):
        for i in range(len(list_negative)):
          result.append([input[0][i][0],input[2],0,0])

      else:
        for i in range(negative):
          index = random.choice(list_negative)
          list_negative.remove(index)
          result.append([input[0][index][0],input[2],0,0])
    else:
      pass
  # result.append(negative) 

  return result

test_sample_rdd = test_segment_answer_rdd.flatMap(select_samples).cache()


test_sample_rdd.collect()

[['Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (hereinafter r

In [None]:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType,LongType
# transform the data into dataframe

# set the structure of the df
result_schema = StructType([
  StructField("source",StringType(),True),
  StructField("question",StringType(),True),
  StructField("answer_start",IntegerType(),True),
  StructField("answer_end",IntegerType(),True),
  ])


result_df = spark.createDataFrame(test_sample_rdd,schema=result_schema).cache()

result_df.printSchema()

root
 |-- source: string (nullable = true)
 |-- question: string (nullable = true)
 |-- answer_start: integer (nullable = true)
 |-- answer_end: integer (nullable = true)



In [None]:
result_df.take(1)

[Row(source='Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (her

In [None]:
from pandas import DataFrame

result_df.coalesce(1).write.json('s3://comp5349-2022/test_result.json')