# Data Preprocessing and Performance Tuning with Spark
A2 

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 65.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=8cdf71980cfd86c40db93f3f91dba659e7a5be15e959cdc25008ab2b6fee7737
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


```
# chmod 700 submit_client.sh
# ./submit_client.sh test.json output.json
spark-submit \
    --master yarn \
    --deploy-mode client \
    --executor-cores 4 \
    --conf spark.checkpoint.compress=True \
    --conf spark.rdd.compress=True \
    --driver-memory 2g \
    --conf spark.shuffle.file.buffer=96k \
    preprocess.py \
    --input $1 \
    --output $
```

In [5]:
from pyspark.sql.types import StructType, StructField, StringType,IntegerType, ArrayType, BooleanType
import pyspark.sql.functions as F
from pyspark.sql.functions import explode, udf, col, count, monotonically_increasing_id, countDistinct, lit, create_map, collect_list
from pyspark.sql import SparkSession
import argparse

""" Documentation for the below code

The code follows the Table of content in the report.

1. Get segments
2. Get samples based on segments and answers
3. Assign various ids column
4. Count for impossible negative samples needed
5. Count for possible negative samples needed
6. Select impossible based on count and positive sample for uniqueness
7. Select possible based on count and positive and impossible sample for uniqueness
8. Union all selected sampels
9. Output to Json given the filename 

Variable name:
    _imp - impossible sample
    _possi / _pneg - possible negative sample
    _pos - positive sample

Dataframe column name added:
    sampleID - unique ID for each sample
    cID - ID for each contract
    seqID - ID for each split seqence within a context (source of ans)
    qID - ID for each question 0 ~ 40 (41 in total)
    contract_title - as name
    negativity - 0 for positive, 1 for possible negative, 2 for impossible

References:
    [1] https://stackoverflow.com/questions/46584460/how-can-i-add-continuous-ident-column-to-a-dataframe-in-pyspark-not-as-monoto
"""

def p(x):
    print('\n', x, '\n')
def plen(x):
    print('\n', 'len: ', str(x), '\n')

# each value represents a type of sample, three in total

negativity_dict = {
    'positive': 0,
    'possible_negative':1,
    'impossible': 2
}

# test_data = 'CUADv1.json'
test_data = 'test.json' 
output_path = "output.json"

spark = SparkSession.builder.appName("COMP5349 A2").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# # use the small dataset as default input
# parser = argparse.ArgumentParser()
# parser.add_argument("--input", help="the input path", type=str, default='test.json') 
# parser.add_argument("--output", help="the output path", type=str, default='output.json')
# try:
#     args = parser.parse_args()
# except Exception as e:
#     print(e)
#     raise('Insufficient argument provided, please provide the input file name and output filename')

# if args.input:
#     test_data = args.input

# if args.output:
#     output_path = args.output

print()
print(f'💎 Read in filename is: {test_data}')
print(f'💎 Output filename is: {output_path}')
print()

test_df = spark.read.json(test_data)
test_data_df= test_df.select((explode("data").alias('data')))
test_paragraph_df = test_data_df.select(explode("data.paragraphs").alias('paragraph'))

print('\n', '💎 The initial input dataframe 💎', 'len: ', test_paragraph_df.count(), '\n')
test_paragraph_df.printSchema()
############################################################################################################################################
########################################################## Convert to sequences ############################################################
############################################################################################################################################

seq_schema = ArrayType(
    StructType([
        StructField('contract_title', StringType()),
        StructField('seqID', IntegerType()),
        StructField('seq', StructType([
                StructField('sent', StringType()),
                StructField('s', IntegerType()),
                StructField('e', IntegerType())      
            ])          
        ),
        StructField('qas', ArrayType(
                StructType([
                    StructField('answers', ArrayType(
                        StructType([
                            StructField('answer_start', StringType()),
                            StructField('text', StringType())                              
                        ])
                    )           
                    ),
                    StructField('id', IntegerType()),
                    StructField('is_impossible', BooleanType()),
                    StructField('question', StringType())      
                ])    
            ) 
        )
    ])   
)     
def get_default_seq(title_, seqID_, seq_, qas_list):
    """
    {
        'seq': (),
        'qas': [{
            'answers': [{
                'answer_start':0,
                'text': ''
            }],
            'id': 0,
            'is_impossible':True,
            'question': ''
        }]
    }
    """
    assert type(title_) == str, 'get_default_seq: type(title_)'
    assert type(seqID_) == int, 'get_default_seq: type(seqID_)'
    assert type(seq_) == tuple, 'get_default_seq: type(seq_)'
    assert type(qas_list) == list, 'get_default_seq: type(qas_list) '
    return {
        'contract_title': title_, # contract title
        'seqID': seqID_, # sequence id for each contract
        'seq': seq_, # actual seq
        'qas': qas_list # list of qas
    }

# convert to widnow seq with each assigned with a qas list
@udf(returnType=seq_schema)
def get_seq_list(row, win_size=4096,stride=2048):
    """
    | contract title | seq | qas |

    each row is for a contract, contains a list of sliding window
    only the text in the context fields are to be segmented
    format: (window, start_idx, end_idx)
    """
    context = row['context']
    contract_title = row['qas'][0]['id']
    idx = contract_title.rfind('__') # last occurrence of a substring in a string
    contract_title = contract_title[:idx]

    sliding_window = []
    seqID = 0
    for start in range(0, len(context), stride):
        seq = (context[start:start+win_size], start, start+win_size)
        seq_dict = get_default_seq(contract_title, seqID, seq, row['qas'])

        sliding_window.append(seq_dict)
        seqID += 1
    return sliding_window
############################################################################################################################################
################################################################# END ######################################################################
############################################################################################################################################

# |seq| - convert full context to different sequences of at most 4096 length
seq_df_raw = test_paragraph_df.select(explode(get_seq_list('paragraph')).alias('seq')).cache()

print('\n', '💎 Segments for all contract dataframe (Context to sequences) 💎', 'len: ', seq_df_raw.count(), '\n')
seq_df_raw.show(2)


# |cID|contract_title| - [1] create id column for each unique contract for later add contract id to samples, since compare integer is much faster than string contract title
contract_df = seq_df_raw.select('seq.contract_title').distinct()
contract_df = contract_df.rdd.zipWithIndex().map(lambda x: [x[1]] + [y for y in x[0]]).toDF(['cID']+contract_df.columns)
contract_df = contract_df.select('cID', 'contract_title')



# |contract_title|seqID|seq| qas| - expand dict to multi cols - extract windows
seq_df = seq_df_raw.select(col("seq.contract_title").alias("contract_title"), col('seq.seqID').alias('seqID'), col("seq.seq").alias("seq"), col("seq.qas").alias("qas"))




############################################################################################################################################
############################################### get different samples - assign sequences with each questions ###############################
############################################################################################################################################
sample_schema = ArrayType(
    StructType([
        StructField('seqID', IntegerType()),
        StructField('qID', IntegerType()),
        StructField('contract_title', StringType()),
        StructField('negativity', IntegerType()),
        StructField('source', StringType()),
        StructField('question', StringType()),
        StructField('answer_start', IntegerType()),
        StructField('answer_end', IntegerType())         
    ])
)

def get_default_sample(seqID, qID, c:str, s:str, q:str):
    """
    relation:
    answer : seq -> 1 : 1
    """
    """

    """
    assert c != None, "get_default_sample()"
    assert s != None, "get_default_sample()"
    assert q != None, "get_default_sample()"

    return {
        'seqID': seqID,
        'qID':qID,
        'contract_title': c,
        'negativity': negativity_dict['possible_negative'], 
        'source': s,
        'question': q,
        'answer_start': 0,
        'answer_end': 0
    }

@udf(returnType=sample_schema)
def get_sample_list(seqID, contract_title, window, qas_list):
    """
    | contract_title | source | negativity |
    """
    #contract_title, window, qas_list = row['cID'], row['seq'], row['qas']
    
    sample_list = []

    window_context, start_idx, end_idx = window['sent'], window['s'], window['e']
    start_idx = int(start_idx)

    # 41 questions of differen categories
    qID = 0
    for qas_pair in qas_list:
        ans_list = qas_pair['answers']
        question = qas_pair['question']

        # get default sample
        sample = get_default_sample(seqID, qID, contract_title, window_context, question)
        
        # no answer in seq -> 0 for index
        
        if qas_pair['is_impossible']:
            sample['negativity'] = negativity_dict['impossible']
            sample_list.append(sample)
            continue

        # If is_impossible is False, then the question has an answer. 
        # change answer start and end if the widnow is a part of the answer
        has_positive = False
        for ans in ans_list:
            answer_start, answer_text = ans
            answer_start = int(answer_start)
            
            """
            In a JSON output object, answer_end = answer_start + length of segment that includes answer 
            if and only if the object's source field contains part of the answer. Otherwise, answer_end = answer_start = 0.
            """
            # answer_end = len(answer_text) + answer_start 
            # answer_range_start = start_idx >= answer_start and start_idx <= answer_end
            # answer_range_end = end_idx >= answer_start and end_idx <= answer_end

            answer_end = len(answer_text) + answer_start 
            answer_range_start = answer_start >= start_idx and answer_start <= end_idx
            answer_range_end = answer_end >= start_idx and answer_end <= end_idx

            # positive sample - the widnow context is part of the answer 
            if answer_range_start or answer_range_end:
                """
                In a JSON output object, answer_start refers to a location in the source field of the same output object; 
                it does not refer to a location in the context field of the original contract object.
                """
                sample['negativity'] = negativity_dict['positive'] # positive
                # relative to the source not the original context position
                # max() - aovid negative idx
                start = answer_start if answer_start > start_idx else start_idx 
                source_start = start - start_idx
                source_end = answer_end - start_idx
                sample['answer_start'] = source_start
                sample['answer_end'] = source_end
                """
                If a particular sequence contains more than one answer for a particular question, 
                then there must be one extra sample for each additional answer.
                """
                sample_list.append(sample)
                has_positive = True

        # possible negative - seq that has is_impossible = False but does not contains any answer
        if not has_positive:
            sample_list.append(sample)
        qID += 1
    return sample_list
############################################################################################################################################
####################################################################### end ################################################################
############################################################################################################################################

sample_df_raw = seq_df.select(explode(get_sample_list('seqID','contract_title', 'seq', 'qas')).alias('sample'))


# unzip dictionary row and map each key to columns - dataframe contains all samples
sample_df_extracted = sample_df_raw.select(monotonically_increasing_id().alias('sampleID'), col('sample.seqID').alias('seqID'), \
                                           col("sample.qID").alias("qID"), col("sample.contract_title").alias("contract_title"), col("sample.negativity").alias("negativity"), \
                                           col("sample.source").alias("source"), col("sample.question").alias("question"), col("sample.answer_start").alias("answer_start"), \
                                           col("sample.answer_end").alias("answer_end"))


# add contract ID
sample_df = contract_df.join(sample_df_extracted, contract_df.contract_title == sample_df_extracted.contract_title)
sample_df = sample_df.select('sampleID', 'qID', 'seqID', 'cID', 'negativity', 'source', 'question', 'answer_start', 'answer_end').cache()

print('\n', '💎 all types of samples dataframe 💎', 'len: ', sample_df.count(), '\n')
sample_df.show(3)

############################################################################################################################################
############################################################# show intermeidate result #####################################################
############################################################################################################################################

# show  positive samples for all questions with possible answers in all contracts
pos_tmp = sample_df.filter(f'negativity == {negativity_dict["positive"]}')
print('\n', '💎 positive samples dataframe 💎', 'len: ', pos_tmp.count(), '\n')
pos_tmp.show(3)

# show possible negative samples
possi_tmp = sample_df.filter(f'negativity == {negativity_dict["possible_negative"]}')
print('\n', '💎 possible negative samples dataframe 💎', 'len: ', possi_tmp.count(), '\n')
possi_tmp.show(3)

# show impossible negative samples
imp_tmp = sample_df.filter(f'negativity == {negativity_dict["impossible"]}')
print('\n', '💎 impossible negative samples dataframe 💎', 'len: ', imp_tmp.count(), '\n')
imp_tmp.show(3)
############################################################################################################################################
######################################################################### END ##############################################################
############################################################################################################################################

############################################################################################################################################
############################################################### 2.1: START for count impossible ############################################
############################################################################################################################################

# get impossible
q_imp_df = sample_df.select(col('cID').alias('cID_imp'), col('qID').alias('qID_imp')).\
    filter(f'negativity == {negativity_dict["impossible"]}')

# get possible
q_pos_df = sample_df.select(col('cID').alias('cID_pos'), col('qID').alias('qID_pos'), col('sampleID').alias('sampleID_pos')).\
    filter(f'negativity == {negativity_dict["positive"]}')


# join condition: for every impossible questions in a contract, filter out those positive samples that is not in the same contract
imp_pos_filtered_df = q_imp_df.join(q_pos_df, (q_imp_df.qID_imp==q_pos_df.qID_pos) & (q_imp_df.cID_imp != q_pos_df.cID_pos), how='left').\
    select(q_imp_df.cID_imp, 'qID_imp', 'sampleID_pos', 'cID_pos')


# sampleID_pos - count distinct contracts for positive sample
pos_contract_sum_df = imp_pos_filtered_df.select('cID_imp', 'qID_imp', 'sampleID_pos'). \
    groupBy('cID_imp', 'qID_imp').agg(countDistinct("*").alias('count_contract'))


# cID_pos - count distinct sample 
imp_count_df = imp_pos_filtered_df.select('cID_imp', 'qID_imp', 'cID_pos'). \
    groupBy('cID_imp', 'qID_imp').agg(countDistinct("*").alias('count_pos'))
imp_count_df = imp_count_df.select(col('cID_imp').alias('cID_imp2'), col('qID_imp').alias('qID_imp2'), 'count_pos')

########################################################## AVERAGE positive samples count result #############################################################
tmp_df = pos_contract_sum_df.join(imp_count_df, (pos_contract_sum_df.cID_imp == imp_count_df.cID_imp2) & (pos_contract_sum_df.qID_imp == imp_count_df.qID_imp2))
avg_df = tmp_df.select('cID_imp', 'qID_imp', (tmp_df.count_pos / tmp_df.count_contract).alias('avg_pos_sample'))


# count
avg_count_df = avg_df.select('cID_imp', 'qID_imp',F.round(avg_df["avg_pos_sample"],0).cast(IntegerType()).alias('count_imp'))

print('\n', '💎 count number to balanced for impossible samples dataframe 💎', 'len: ', avg_count_df.count(), '\n')
avg_count_df.show(2)

############################################################################################################################################
############################################################## 2.2: START for count possible ###############################################
############################################################################################################################################

# data
q_possi_df = sample_df.select(col('cID').alias('cID_possi'), col('qID').alias('qID_possi')).\
    filter(f'negativity == {negativity_dict["possible_negative"]}')


q_pos_df = sample_df.select(col('cID').alias('cID_pos'), col('qID').alias('qID_pos'), 
                            col('sampleID').alias('sampleID_pos')).filter(f'negativity == {negativity_dict["positive"]}')


# filter(): for every impossible questions in a contract, filter out those positive samples that is not in the same contract
# join condition: same contract, same question
possi_pos_filtered_df = q_possi_df.join(q_pos_df, (q_possi_df.qID_possi==q_pos_df.qID_pos) & (q_possi_df.cID_possi == q_pos_df.cID_pos),how='left' ). \
    select(q_possi_df.cID_possi, 'qID_possi', 'sampleID_pos')

###################################### count result ###########################################
# count distinct sample
possi_count_df = possi_pos_filtered_df.groupBy('cID_possi', 'qID_possi').agg(countDistinct("*").alias('count_pneg'))
possi_count_df = possi_count_df.select(col('cID_possi').alias('cID_possi'), col('qID_possi').alias('qID_possi'), 'count_pneg')

print('\n', '💎 count number to balanced for possible negative samples dataframe 💎', 'len: ', possi_count_df.count(), '\n')
possi_count_df.show(2)

############################################################################################################################################
############################################################## STEP 2: select based on count ###############################################
############################################################################################################################################

################################# Prepare possible ################################# 
# | cID | qID | [possible_negative ids] 
possible_samples = sample_df.filter(f'negativity == {negativity_dict["possible_negative"]}'). \
    select('cID', 'qID', 'seqID', 'sampleID')


possible_samples_df = possible_samples.groupBy('cID', 'qID').agg(collect_list( 
    create_map(lit("seqID"),"seqID",lit("sampleID"),"sampleID")).alias("possi"))


################################# Prepare impossible ################################# 

# | cID | qID | [impossible ids] 
impossible_samples = sample_df.filter(f'negativity == {negativity_dict["impossible"]}'). \
    select('cID', 'qID', 'seqID', 'sampleID')


impossible_samples_df = impossible_samples.groupBy('cID', 'qID').agg(collect_list( 
    create_map(lit("seqID"),"seqID",lit("sampleID"),"sampleID")).alias("imp"))


################################# Prepare positive ################################# 
# | cID | qID | [pos ids] 
positive_samples = sample_df.filter(f'negativity == {negativity_dict["positive"]}'). \
    select('cID', 'qID', 'seqID')


positive_samples_df = positive_samples.groupBy('cID', 'qID').agg(collect_list('seqID').alias("pos"))

#### join with count
imp_df_1 = impossible_samples_df.join(avg_count_df, 
                        (impossible_samples_df.cID==avg_count_df.cID_imp) & (impossible_samples_df.qID==avg_count_df.qID_imp)). \
                        select('cID', 'qID', 'count_imp', 'imp')


#### join with positive
# same col name can ues []
imp_df = imp_df_1.join(positive_samples_df, ['cID', 'qID'], how='left')

print('\n', '💎 input dataframe to the UDF of balancing the impossible samples 💎', 'len: ', imp_df.count(), '\n')
imp_df.show(2)

imp_schema = StructType([
        StructField('imp_keep', ArrayType(IntegerType())),
        StructField('seqID', ArrayType(IntegerType())) # for compare so that the possible negative sample to not be the same
])
############################################################################################################################################
################################################# based on the count, balance impossible first  ############################################
############################################################################################################################################

@udf(returnType=imp_schema)
def balance_imp(count_imp, imp, pos_seqIDs):
    """
    balance the impossible samples and make unique based on the positive samples
    """
    res = {
        'imp_keep': [],
        'seqID': []
    }
    # not such count so no negative sample
    if not count_imp or count_imp == 0:
        return res

    # no such positive sample
    if not pos_seqIDs:
        pos_seqIDs = []

    remain = count_imp  
    for cur in imp:
        if remain == 0:
            break
        # not in the positive sampels
        if cur['seqID'] in pos_seqIDs:
            continue
        # unique with itself
        if not cur['seqID'] in res['seqID']:
            res['imp_keep'].append(cur['sampleID'])
            res['seqID'].append(cur['seqID'])

        remain -= 1

    # if the unique ones is not enough, then append non unique ones
    if remain > 0:

        sampleIDs = [s['sampleID'] for s in imp]
        seqIDs = [s['seqID'] for s in imp]

        imp_remain = [id for id in sampleIDs if id not in res['imp_keep']]
        res['imp_keep'] += imp_remain[:remain]

        seq_remain = [id for id in seqIDs if id not in res['seqID']]
        res['seqID'] += seq_remain[:remain]

    return res
################################################################################################################################

# get what impossible sample id to keep for each question in a contract
imp_keep_df = imp_df.select('cID', 'qID',balance_imp('count_imp', 'imp', 'pos').alias('res')).\
    select('cID','qID',col('res.imp_keep').alias('imp_id_keep'), col('res.seqID').alias('seqID_imp'))

print('\n', '💎 output dataframe to the UDF of balancing the impossible samples 💎', 'len: ', imp_keep_df.count(), '\n')
imp_keep_df.show(2)

# unflatten list to rows
imp_keep_res = imp_keep_df.select(explode(imp_keep_df['imp_id_keep']).alias('imp_id_keep'))


possi_df_1 = possible_samples_df.join(possi_count_df, 
                        (possible_samples_df.cID==possi_count_df.cID_possi) & (possible_samples_df.qID==possi_count_df.qID_possi)). \
                        select('cID', 'qID', 'count_pneg', 'possi')


# same col name can ues []
possi_df = possi_df_1.join(positive_samples_df, ['cID', 'qID'], how='left')


possible_samples_tmp = possi_df.join(imp_keep_df, ['cID', 'qID'], 'left').drop('imp_id_keep')

print('\n', '💎 input dataframe to the UDF of balancing the possible negative samples 💎', 'len: ', possible_samples_tmp.count(), '\n')
possible_samples_tmp.show(2)

############################################################################################################################################
################################################# based on the count, balance possible samples second  #####################################
############################################################################################################################################
possi_schema = ArrayType(IntegerType())

@udf(returnType=possi_schema)
def balance_possi(count_pneg, possi, seqID_pos, seqID_imp):
    """balance the possible negative samples and make unique based on the positive and imposible samples"""
    res = []
    seq_list = []
    # not such count so no negative sample
    if not count_pneg or count_pneg == 0:
        return res

    # no such positive sample
    if not seqID_pos:
        seqID_pos = []
    # no such impossible sample
    if not seqID_imp:
        seqID_imp = []

    remain = count_pneg  
    for cur in possi:
        if remain == 0:
            break
        # not in the positive sampels & impossible sample
        if cur['seqID'] in seqID_pos or cur['seqID'] in seqID_imp:
            continue
        # unique with itself
        if not cur['seqID'] in seq_list:
            res.append(cur['sampleID'])
            seq_list.append(cur['seqID'])

        remain -= 1

    # if the unique ones is not enough, then append non unique ones
    if remain > 0:

        sampleIDs = [s['sampleID'] for s in possi]

        possi_remain = [id for id in sampleIDs if id not in res]
        res += possi_remain[:remain]

    return res

############################################################################################################################################
################################################################### Union ##################################################################
############################################################################################################################################

possi_keep_df = possible_samples_tmp.select(balance_possi('count_pneg','possi', 'pos','seqID_imp').alias('res'))

print('\n', '💎 output dataframe to the UDF of balancing the possible negative samples 💎', 'len: ', possi_keep_df.count(), '\n')
possi_keep_df.show(2)

# unflatten list to rows
possi_keep_res = possi_keep_df.select(explode(possi_keep_df['res']).alias('possi_id_keep'))



################## union all the selected positive, possible and impossible sampels as the final result #################################


possib_selected = sample_df.filter(f'negativity == {negativity_dict["possible_negative"]}').\
    select('sampleID', 'source','question','answer_start','answer_end')


possib_selected = possi_keep_res.join(possib_selected, possi_keep_res.possi_id_keep==possib_selected.sampleID).drop('possi_id_keep')


imp_selected = sample_df.filter(f'negativity == {negativity_dict["impossible"]}').\
    select('sampleID', 'source','question','answer_start','answer_end')
imp_selected = imp_keep_res.join(imp_selected, imp_keep_res.imp_id_keep==imp_selected.sampleID).drop('imp_id_keep')


pos_selected = sample_df.filter(f'negativity == {negativity_dict["positive"]}').\
    select('sampleID', 'source','question','answer_start','answer_end')


sample_selected = pos_selected.union(imp_selected).union(possib_selected).drop('sampleID')

print('\n', '💎 All selected samples for output 💎', 'len: ', sample_selected.count(), '\n')
sample_selected.show(2)

############################################### union ###############################################################

print('\n', '💎 All selected samples with full details 💎', '\n')

pneg_tmp = possi_count_df.select(col('cID_possi').alias('cID'), col('qID_possi').alias('qID'), 'count_pneg')
imp_tmp = avg_count_df.select(col('cID_imp').alias('cID'), col('qID_imp').alias('qID'), 'count_imp')
pos_selected.union(imp_selected).union(possib_selected).select('sampleID').\
    join(sample_df, ['sampleID']).\
    select('sampleID', 'cID', 'qID', 'seqID', 'negativity','source','question', 'answer_start', 'answer_end').\
    join(pneg_tmp, ['cID', 'qID'], 'left').join(imp_tmp, ['cID', 'qID'], 'left').\
    show(3)

############################################################################################################################################
####################################### Output final selected result to json format with given filename ####################################
############################################################################################################################################

from pyspark.sql.functions import collect_list, create_map, lit
def datafame_to_json(sample_to_json, path_):
    """dataframe to list of dictionaries
    https://stackoverflow.com/questions/61278038/how-to-convert-pyspark-dataframe-to-json
    """
    cols_extracted = create_map(lit("source"),"source",lit("question"),"question", lit("answer_start"),"answer_start", lit("answer_end"),"answer_end")

    import json
    res = sample_to_json.agg(collect_list(cols_extracted).alias("stru")).first()['stru']
    with open(path_, "w") as f:
        f.write(json.dumps(res, indent=4, sort_keys=True))

datafame_to_json(sample_selected, output_path)


💎 Read in filename is: test.json
💎 Output filename is: output.json


 💎 The initial input dataframe 💎 len:  102 

root
 |-- paragraph: struct (nullable = true)
 |    |-- context: string (nullable = true)
 |    |-- qas: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- answers: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- answer_start: long (nullable = true)
 |    |    |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- is_impossible: boolean (nullable = true)
 |    |    |    |-- question: string (nullable = true)


 💎 Segments for all contract dataframe (Context to sequences) 💎 len:  2380 

+--------------------+
|                 seq|
+--------------------+
|{LohaCompanyltd_2...|
|{LohaCompanyltd_2...|
+--------------------+
only showing top 2 rows


 💎 all types of samples dataframe 💎 len:  99392 

+