<a href="https://colab.research.google.com/github/zhangran075/comp5349a2/blob/main/comp5349_a2_Ron.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Introduction
This notebook demonstrates a few useful methods for loading json file and for handling nested json objects. The example file is `test.json` in assignment 2. 

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("COMP5349 A2 Data Loading Example") \
    .getOrCreate()

### Load Json file as data frame

In [None]:
data = "/content/drive/MyDrive/comp5349/a2_data/test.json"
init_df = spark.read.json(data)

In [None]:
# The original file will be loaded into a data frame with one row and two columns
init_df.show(1)

+--------------------+--------+
|                data| version|
+--------------------+--------+
|[{[{Exhibit 10.16...|aok_v1.0|
+--------------------+--------+



### Check the schema of a data frame

`printSchema` is a useful method to display the schema of a data frame

In [None]:
init_df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- paragraphs: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- context: string (nullable = true)
 |    |    |    |    |-- qas: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |    |    |    |-- question: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |-- version: string (nullable = true)



### `select` and `explode`

The [`select`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.select.html) method is used to select one or more columns for the source dataframe. 

The [`explode`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.explode.html) method is used to expand an array into multiple rows. The [`alias`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.alias.html) method is used to specify a name for column storing the array element.


In [None]:
from pyspark.sql.functions import explode
data_df= init_df.select((explode("data").alias('data')))

In [None]:
data_df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- paragraphs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- context: string (nullable = true)
 |    |    |    |-- qas: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |    |    |-- question: string (nullable = true)
 |    |-- title: string (nullable = true)



In [None]:
#total number of test contracts are 102
total_num = data_df.count()

In [None]:
#select the paragraphs named paragraph as new test_paragraph_df
paragraph_df = data_df.select((explode("data.paragraphs").alias("paragraph")))
paragraph_df.printSchema()

root
 |-- paragraph: struct (nullable = true)
 |    |-- context: string (nullable = true)
 |    |-- qas: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |-- question: string (nullable = true)



In [None]:
#select the qas&context part as new df
qas_context_df = paragraph_df.select("paragraph.context",(explode("paragraph.qas").alias("qas")))
qas_context_df.printSchema()

root
 |-- context: string (nullable = true)
 |-- qas: struct (nullable = true)
 |    |-- answers: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- is_impossible: boolean (nullable = true)
 |    |-- question: string (nullable = true)



In [None]:
#flat the info what we need : context,text,answer_start,is_impossible and question
qas_context_df = qas_context_df.select("context","qas.answers.text","qas.answers.answer_start","qas.is_impossible","qas.question")
qas_context_df.printSchema()

root
 |-- context: string (nullable = true)
 |-- text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- answer_start: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- is_impossible: boolean (nullable = true)
 |-- question: string (nullable = true)



In [None]:
# convert the df into rdd
qas_context_rdd = qas_context_df.rdd
qas_context_rdd.take(1)

[Row(context='Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (he

In [None]:
#convert format from row to list
qas_context_rdd_li= qas_context_rdd.map(list)
qas_context_rdd_li.take(5)

[['Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (hereinafter r

In [None]:
#calculate the answer_end 

def cal_answer_end(input):
  res = []
  s_e = []
  print(input[3])
  if input[3] is True:
    res.append([input[0],input[4],[0,0],0])
    
  else:
    num = len(input[2])  
    for i in range(num):
      _start = input[2][i]
      text_len = len(input[1][i])
      _end = _start + text_len
      s_e.append([_start,_end])
    res.append([input[0],input[4],s_e,num])
  
  return res
      

In [None]:
#get the answer_end to the rdd
qas_context_rdd_li= qas_context_rdd_li.flatMap(cal_answer_end)
qas_context_rdd_li.take(5)

[['Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (hereinafter r

In [None]:
# Creates segments for all contracts

def segment_context(input):
  res = []
  seg_res = []
  context_len = len(input[0])
  _start = 0
  _end = 4096
  while _start < context_len:
        if _end > context_len:
            _end = context_len
        seg_res.append([input[0][_start:_end], _start, _end])
        _start = _start + 2048
        _end = _end + 2048
  res.append(seg_res)
  res.append(input[1])
  res.append(input[2])
  res.append(input[3])
  return res

In [None]:
qas_context_rdd_li = qas_context_rdd_li.map(segment_context)
qas_context_rdd_li.take(2)

[[[['Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (hereinafter

In [None]:
#count the number of possible nagetive samples
def count_po (input):
  if input[3] != 0:
    return [input[1],1]
  else:
    return [input[1],0]

count_po_rdd_li = qas_context_rdd_li.map(count_po)
count_po_rdd_li = count_po_rdd_li.reduceByKey(lambda a,b: a+b)
count_po_rdd_li.take(5)

[('Highlight the parts (if any) of this contract related to "Document Name" that should be reviewed by a lawyer. Details: The name of the contract',
  102),
 ('Highlight the parts (if any) of this contract related to "Parties" that should be reviewed by a lawyer. Details: The two or more parties who signed the contract',
  102),
 ('Highlight the parts (if any) of this contract related to "Agreement Date" that should be reviewed by a lawyer. Details: The date of the contract',
  93),
 ('Highlight the parts (if any) of this contract related to "Expiration Date" that should be reviewed by a lawyer. Details: On what date will the contract\'s initial term expire?',
  78),
 ('Highlight the parts (if any) of this contract related to "Notice Period To Terminate Renewal" that should be reviewed by a lawyer. Details: What is the notice period required to terminate renewal?',
  16)]

In [None]:
#convert into dic
count_po_dict = count_po_rdd_li.collectAsMap()
count_po_dict

{'Highlight the parts (if any) of this contract related to "Affiliate License-Licensee" that should be reviewed by a lawyer. Details: Does the contract contain a license grant to a licensee (incl. sublicensor) and the affiliates of such licensee/sublicensor?': 12,
 'Highlight the parts (if any) of this contract related to "Affiliate License-Licensor" that should be reviewed by a lawyer. Details: Does the contract contain a license grant by affiliates of the licensor or that includes intellectual property of affiliates of the licensor?\xa0': 6,
 'Highlight the parts (if any) of this contract related to "Agreement Date" that should be reviewed by a lawyer. Details: The date of the contract': 93,
 'Highlight the parts (if any) of this contract related to "Anti-Assignment" that should be reviewed by a lawyer. Details: Is consent or notice required of a party if the contract is assigned to a third party?': 72,
 'Highlight the parts (if any) of this contract related to "Audit Rights" that sh

In [None]:
#slect the sample according to the negative samples(impossible negative&possible negative)

def sample_selection_ (input):
  res = []
  negative = 0

  #Creates impossible negative samples for all questions without answers in all contracts
  if input[3] == 0:
    try:
      impo_negative = int(total_num/count_po_dict[input[2]])
      negative = impo_negative
      for i in range(negative):
        res.append([input[0][i][0],input[1],0,0])
    except:
      negative = 0  
    
  #Creates positive samples for all questions with possible answers in all contracts
  else:
    negative = input[3]
    seg_len = len(input[0])
    negative_li = list(range(seg_len))
    for i in range(input[3]):
      # select the positive sample
      for j in range(len(input[0])):
        if input[2][i][0] in range(input[0][j][1],input[0][j][2]):
          if input[2][i][1] in range(input[0][j][1],input[0][j][2]):
            res.append([input[0][j][0],input[1],input[2][i][0]-input[0][j][1],input[2][i][1]-input[0][j][1]])
          else:
            res.append([input[0][j][0],input[1],input[2][i][0]-input[0][j][1],4096])
        else:
          if input[2][i][1] in range(input[0][j][1],input[0][j][2]):
            res.append([input[0][j][0],input[1],0,input[2][i][1]-input[0][j][1]]) 
          else:
            pass

    #Creates possible negative samples for all questions with possible answers in all contracts
    if len(negative_li) > 0:
      if negative < len(negative_li):
        for i in range(negative):         
          res.append([input[0][i][0],input[1],0,0])   
      elif negative >= len(negative_li):
        for i in range(len(negative_li)):
          res.append([input[0][i][0],input[1],0,0])
      else:
        for i in range(negative):
          res.append([input[0][i][0],input[1],0,0])  
    else:
      pass
  return res

In [None]:
#result
final_result = qas_context_rdd_li.flatMap(sample_selection_)

final_result.collect()

[['Exhibit 10.16 SUPPLY CONTRACT Contract No: Date: The buyer/End-User: Shenzhen LOHAS Supply Chain Management Co., Ltd. ADD: Tel No. : Fax No. : The seller: ADD: The Contract is concluded and signed by the Buyer and Seller on , in Hong Kong. 1. General provisions 1.1 This is a framework agreement, the terms and conditions are applied to all purchase orders which signed by this agreement (hereinafter referred to as the "order"). 1.2 If the provisions of the agreement are inconsistent with the order, the order shall prevail. Not stated in order content will be subject to the provisions of agreement. Any modification, supplementary, give up should been written records, only to be valid by buyers and sellers authorized representative signature and confirmation, otherwise will be deemed invalid. 2. The agreement and order 2.1 During the validity term of this agreement, The buyer entrust SHENZHEN YICHANGTAI IMPORT AND EXPORT TRADE CO., LTD or SHENZHEN LEHEYUAN TRADING CO, LTD (hereinafter r

In [None]:
#create dataframe for the final result
result_df = spark.createDataFrame(final_result,['source', 'question', 'answer_start', 'answer_end'])
result_df.printSchema()

root
 |-- source: string (nullable = true)
 |-- question: string (nullable = true)
 |-- answer_start: long (nullable = true)
 |-- answer_end: long (nullable = true)



In [None]:
#transfer into Jason file
result_df.write.json('F_result.json')