In [1]:
import json
import hashlib
import re
from operator import add
import random

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext
import pyspark
conf = pyspark.SparkConf().setAll([('spark.executor.memory', '12g'),\
                                   ('spark.executor.cores', '3'),\
                                   ('spark.executor.instances','5'),\
                                   ('spark.driver.memory','200g'),\
                                   ('spark.driver.maxResultSize','200g'),\
                                   ("spark.local.dir", "/data/deng.595/tmp"),\
                                   ("spark.sql.shuffle.partitions",'5000'),\
                                   # ("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5"),\ we use spark-nlp for sentence boundary detection and NER
                                   ("spark.jars", "/home/deng.595/anaconda2/envs/py36/lib/python3.7/site-packages/pyspark/jars/spark-nlp_2.11-2.6.5.jar"), \
                                   ("spark.jars.packages", "com.databricks:spark-xml_2.11:0.11.0")])
sc = SparkContext(conf=conf)

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType, StringType

from pyspark.sql.types import Row
from pyspark.sql import SparkSession
spark = SparkSession(sc)

## Create wikidata alias dump and wikipedia link/anchor dump
This is used for extra entity detection. In particular, a wikipedia page generally won't link to itself, as such we do string matching to find self mentions. 

We upload the data that we use, but you can also reprocess with the latest wikidata dump.

In [3]:
def get_alias(x):
    try:
        x = json.loads(x.strip(','))
    except:
        x = {}
    return {
        'wikititle':x.get('sitelinks',{}).get('enwiki',{}).get('title',''),
        'names':[x.get('labels',{}).get('en', {}).get('value','')]+[alias.get('value', '') for alias in x.get('aliases',{}).get('en', [])]
    }
wikidata = sc.textFile('/data/deng.595/workspace/wikidata/20201130/wikidata-20201130-all.json.bz2')\
    .filter(lambda x:x!='[' and x!=']')\
    .map(get_alias)\
    .filter(lambda x:x['wikititle'])

In [None]:
wikidata.map(lambda x:json.dumps(x)).saveAsTextFile('/data/deng.595/workspace/wikidata/20201130/wikidata_aliases.json')

In [8]:
wikidata_aliases = sc.textFile('/data/deng.595/workspace/wikidata/20201130/wikidata_aliases.json/part*')\
                        .map(json.loads).map(lambda x:(x['wikititle'].replace(' ','_'),x['names']))

In [6]:
wikipedia_links = raw_articles.filter(lambda x:'paragraphsWithLinks' in x)\
                            .flatMap(lambda x:[link for p in x['paragraphsWithLinks'] for link in p['links'] if link['anchor'].strip()])\
                            .map(lambda x:((x['id'],x['anchor']),1)).reduceByKey(lambda a,b:a+b)\
                            .map(lambda x:(x[0][0],[(x[0][1],x[1])])).reduceByKey(lambda a,b:a+b)

In [21]:
wikipedia_aliases = wikipedia_links.map(lambda x:(x[0],{k:v for k,v in x[1]}))\
                    .join(wikidata_aliases.map(lambda x:(x[0],{k:9999 for k in x[1]})))\
                    .map(lambda x:(x[0],x[1][0].update(x[1][1]) or x[1][0]))\
                    .map(lambda x: (x[0], sorted(list(x[1].items()), key=lambda z:(z[1], len(z[0])), reverse=True)))

In [23]:
with open('/data/deng.595/workspace/wikipedia_dump/raw/wikipedia_link_alias.json', 'r') as f:
    json.dump(wikipedia_aliases.collect(), f, indent=2)

In [5]:
with open('/data/deng.595/workspace/wikipedia_dump/raw/20201201/wikipedia_link_alias.json', 'r') as f:
    wikipedia_aliases = sc.parallelize(json.load(f)).map(lambda x:(x[0],x[1]))

In [6]:
import re
def filter_aliases_and_compile(x):
    final = []
    count = 0
    for i,(k,v) in enumerate(x):
        if len(k.strip())>0:
            if v==9999:
                final.append([k,re.compile(r'\b'+re.escape(k)+r'\b')])
                count += 1
            elif (i-count)<=50:
                final.append([k,re.compile(r'\b'+re.escape(k)+r'\b')])
    return final
wikipedia_aliases = wikipedia_aliases.map(lambda x: (x[0], filter_aliases_and_compile(x[1])))

## Process wikipedia articles
Process Json dumps converted from the raw XML dump. Add hyperlinks and do some cleanning.

In [7]:
raw_articles = sc.textFile('/data/deng.595/workspace/wikipedia_dump/raw/20201201/enwiki-20201201-pages-articles.json/*.json').map(lambda x:json.loads(x))

In [21]:
test_article = raw_articles.filter(lambda x: x['type']!='REDIRECT').take(1)[0]

In [32]:
test_article['lists']

[['Ernest Ambrose Vivian, 2nd Baron Swansea (11 February 1848 – 17 July 1922); died unmarried'],
 ['The Hon. John Aubrey Vivian (23 July 1854 – 1 March 1898); died unmarried'],
 ['Violet Averil Margaret Vivian (3 December 1871 – 30 March 1943)',
  'Henry Hussey Vivian (5 February 1873 – 11 December 1898); died unmarried',
  'Odo Richard Vivian, 3rd Baron Swansea (22 April 1875 – 16 November 1934)',
  'Averil Vivian (4 December 1876 – 1 February 1959); married George Tryon, 1st Baron Tryon',
  'Alexandra Gladys Vivian (c. 1879 – 17 July 1966)',
  'Alberta Diana Vivian (10 February 1883 – 1968)',
  'a daughter (10 February 1883)']]

In [30]:
test_article['paragraphsWithLinks'][-len(test_article['lists']):]

[{'paragraph': 'Alexandra Gladys Vivian (c. 1879 – 17 July 1966)',
  'links': [],
  'refs': []},
 {'paragraph': 'Alberta Diana Vivian (10 February 1883 – 1968)',
  'links': [],
  'refs': []},
 {'paragraph': 'a daughter (10 February 1883)', 'links': [], 'refs': []}]

In [24]:
test_article['sections']

[{'numParagraphs': 2, 'numTables': 0},
 {'title': 'Biography', 'numParagraphs': 6, 'numTables': 0},
 {'title': 'Marriages and children', 'numParagraphs': 3, 'numTables': 0},
 {'title': 'References', 'numParagraphs': 0, 'numTables': 0},
 {'title': 'Further reading', 'numParagraphs': 0, 'numTables': 0},
 {'title': 'External links', 'numParagraphs': 2, 'numTables': 0}]

In [4]:
raw_articles.filter(lambda x: x['type']!='REDIRECT').count()

8773523

In [8]:
# link paragraph to section
# remove paragraphs fall in "References", "Further reading", "External links", "See also", 
# "Publications"
# remove paragraphs in tables or lists
def filter_and_link_paragraphs(paragraphs, sections):
    cleaned_paragraphs = []
    n_p_acc = 0
    for i, section in enumerate(sections):
        n_p = section.get('numParagraphs', 0)
        if section.get('title', '') not in {"References", "Further reading", "External links", "See also", "Publications"}:
            cleaned_paragraphs += [(p.update({'sec_i':i,'p_i':j}) or p) for j,p in enumerate(paragraphs[n_p_acc:n_p_acc+n_p])]
        n_p_acc += n_p
    return cleaned_paragraphs

In [9]:
# link with hyperlink and alias lookup
def annotate_paragraph_with_link(paragraph_with_links, self_links, self_id):
    paragraph = paragraph_with_links['paragraph']
    links = paragraph_with_links['links']
    paragraph_with_links.pop('refs', None)
    for link in links:
        link.pop('t', '')
    entities = {link['id'] for link in links}
    anchors = {link['anchor'] for link in links}
    mapped = [False for _ in range(len(paragraph))]
    for link in links:
        mapped[link['start']:link['end']] = [True]*(link['end']-link['start'])
    for alias, pattern in self_links:
        if alias not in anchors and alias in paragraph:
            tmp = []
            for match in pattern.finditer(paragraph):
                start = match.start()
                end = match.end()
                if not any(mapped[start:end]):
                    tmp.append({'id':self_id, 'anchor':alias, 'start': start, 'end':end})
                    mapped[start:end] = [True]*(end-start)
            if tmp:
                links += tmp
                entities.add(self_id)
                anchors.add(alias)
    paragraph_with_links['links'] = links
    paragraph_with_links['entities'] = list(entities)
    return paragraph_with_links

In [10]:
# keep only non empty paragraph from non-redirect pages
nonredirected_articles_paragraphs = raw_articles.filter(lambda x: x['type']!='REDIRECT')\
    .map(lambda x:(x['wikiTitle'], x)).join(wikipedia_aliases)\
    .map(lambda x:{
        'title': x[1][0]['title'],
        'wikiTitle': x[1][0]['wikiTitle'],
        'wid': x[1][0]['wid'],
        'paragraphsWithLinks': [
            annotate_paragraph_with_link(y, x[1][1], x[1][0]['wikiTitle'])\
                for i,y in enumerate(filter_and_link_paragraphs(x[1][0]['paragraphsWithLinks'], x[1][0]['sections']))\
                if y['paragraph'].strip()
        ]
    })

In [48]:
nonredirected_articles_paragraphs.take(1)

[{'title': 'The Creation of Adam',
  'wikiTitle': 'The_Creation_of_Adam',
  'wid': 511187,
  'paragraphsWithLinks': [["  The Creation of Adam () is a fresco painting by Italian artist Michelangelo, which forms part of the Sistine Chapel's ceiling, painted c. 1508–1512. It illustrates the Biblical creation narrative from the Book of Genesis in which God gives life to Adam, the first man. The fresco is part of a complex iconographic scheme and is chronologically the fourth in the series of panels depicting episodes from Genesis.",
    [{'id': 'fresco', 'anchor': 'fresco', 'start': 31, 'end': 37},
     {'id': 'Michelangelo', 'anchor': 'Michelangelo', 'start': 65, 'end': 77},
     {'id': 'Sistine_Chapel_ceiling',
      'anchor': "Sistine Chapel's ceiling",
      'start': 103,
      'end': 127},
     {'id': 'Bible', 'anchor': 'Biblical', 'start': 170, 'end': 178},
     {'id': 'Genesis_creation_narrative',
      'anchor': 'creation narrative',
      'start': 179,
      'end': 197},
     {'id

In [11]:
# keep only paragraphs that contain entities (from hyperlink or lookup)
nonredirected_articles_paragraphs.map(lambda x:{
        'title': x['title'],
        'wikiTitle': x['wikiTitle'],
        'wid': x['wid'],
        'paragraphsWithLinks': [p for p in x['paragraphsWithLinks'] if p['entities']]
    }).filter(lambda x:x['paragraphsWithLinks'])\
    .map(lambda x:json.dumps(x)).saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/paragraphs_with_link_cleaned.json')

In [5]:
nonredirected_articles_paragraph = spark.createDataFrame(sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/paragraphs_with_link_cleaned.json/part-*')\
                                    .map(json.loads)\
                                    .flatMap(lambda x: [Row(
                                        title=x['title'],
                                        wikiTitle=x['wikiTitle'],
                                        wid=x['wid'],
                                        sec_i=y['sec_i'],
                                        p_i=y['p_i'],
                                        p=y['paragraph'],
                                        link=y['links']
                                    ) for y in x['paragraphsWithLinks']]))

In [4]:
# Sentence boundary detection
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

documentAssembler = DocumentAssembler() \
    .setInputCol("p") \
    .setOutputCol("document")

sentenceDetector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("Sentence")

finisher = Finisher() \
    .setInputCols(["Sentence"]) \
    .setCleanAnnotations(False) \
    .setIncludeMetadata(False)

pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        sentenceDetector,
    ])

In [6]:
model = pipeline.fit(nonredirected_articles_paragraph)
sentences = model.transform(nonredirected_articles_paragraph).drop('p','document')

In [7]:
sentences.printSchema()

root
 |-- link: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- p_i: long (nullable = true)
 |-- sec_i: long (nullable = true)
 |-- title: string (nullable = true)
 |-- wid: long (nullable = true)
 |-- wikiTitle: string (nullable = true)
 |-- Sentence: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)



In [8]:
# Add hyperlink infomation to the original sentence
def link_sentence(sentences, links):
    linked_sentences= [[s.result, [], set(), i] for i,s in enumerate(sentences)]
    last = 0
    remain_links = 0
    pre_sentence_length = 0
    links = sorted(links,key=lambda z:int(z['start']))
    acc_length = 0
    for i, sentence in enumerate(sentences):
        s_begin = sentence.begin
        s_end = sentence.end+1
        length = s_end-s_begin
        for j,link in enumerate(links[remain_links:]):
            start = int(link['start']) - s_begin
            end = int(link['end']) - s_begin
            if start>=0 and end<=length:
                linked_sentences[i][1].append((link['id'], link['anchor'], start, end))
                linked_sentences[i][2].add(link['id'])
            elif start>=length or end>length:
                remain_links += j
                break
    linked_sentences = [(x[0], x[1], list(x[2]), x[3]) for x in linked_sentences]
    return linked_sentences

In [9]:
linked_sentences = sentences.rdd.flatMap(lambda x:[{
    'wid': x.wid,
    'title': x.title,
    'wikiTitle': x.wikiTitle,
    'linked_sentence': y[:3],
    'sec_i': x.sec_i,
    'p_i': x.p_i,
    's_i': y[3],
    'md5': hashlib.md5((x.wikiTitle+'$$%md5%$$'+y[0]).encode()).hexdigest()
} for y in link_sentence(x.Sentence, x.link)])

In [10]:
linked_sentences.take(5)

[{'wid': 514202,
  'title': 'Okavango River',
  'wikiTitle': 'Okavango_River',
  'linked_sentence': ('The Okavango River (formerly spelled Okovango or Okovanggo) is a river in southwest Africa.',
   [('Okavango_River', 'Okavango River', 4, 18),
    ('river', 'river', 65, 70),
    ('Africa', 'Africa', 84, 90)],
   ['Africa', 'Okavango_River', 'river']),
  'sec_i': 0,
  'p_i': 1,
  's_i': 0,
  'md5': 'df10fc79e82de56c073e1d4f5010a3c2'},
 {'wid': 514202,
  'title': 'Okavango River',
  'wikiTitle': 'Okavango_River',
  'linked_sentence': ('It is the fourth-longest river system in southern Africa, running southeastward for .',
   [],
   []),
  'sec_i': 0,
  'p_i': 1,
  's_i': 1,
  'md5': 'bce72f66f721cc9048b84bb6b53681a5'},
 {'wid': 514202,
  'title': 'Okavango River',
  'wikiTitle': 'Okavango_River',
  'linked_sentence': ('It begins at 1,300 m altitude in the sandy highlands of Angola, where it is known by the Portuguese name Rio Cubango.',
   [('Angola', 'Angola', 56, 62), ('Okavango_River

In [None]:
%%time
linked_sentences.map(lambda x:json.dumps(x)).saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/sentences_with_link_cleaned.json')

In [12]:
%%time
sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/sentences_with_link_cleaned.json/part-*')\
    .map(json.loads).filter(lambda x:len(x['linked_sentence'][1])>0)\
    .map(json.dumps).repartition(5000).saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/sentences_with_link_cleaned_nonempty.json')

CPU times: user 907 ms, sys: 349 ms, total: 1.26 s
Wall time: 6min 51s


## Annotate the sentences
Run NER for extra entity detection.

In [3]:
linked_sentences = spark.createDataFrame(sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/sentences_with_link.json/part-0000*')\
                                    .map(json.loads)\
                                    .map(lambda x: Row(
                                        title=x['title'],\
                                        wikiTitle=x['wikiTitle'],\
                                        wid=x['wid'],\
                                        s=x['linked_sentence'][0],\
                                        links=x['linked_sentence'][1],\
                                        entities=x['linked_sentence'][2])))

In [3]:
%time
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

from sparknlp.pretrained import PretrainedPipeline, NerDLModel, BertEmbeddings, WordEmbeddingsModel
import sparknlp

documentAssembler = DocumentAssembler() \
    .setInputCol("s") \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"])\
    .setOutputCol("token")

embeddings = WordEmbeddingsModel.pretrained(name = "glove_100d", lang="en") \
        .setInputCols("sentence", "token") \
        .setOutputCol("embeddings") \

ner = NerDLModel.pretrained("onto_100", "en") \
        .setInputCols(["sentence", "token", "embeddings"]) \
        .setOutputCol("ner")

nerConverter = NerConverter()\
    .setInputCols(["sentence", "token", "ner"])\
    .setOutputCol("ner_chunk")

pipeline = Pipeline() \
    .setStages([
        documentAssembler,
        tokenizer,
        embeddings,
        ner,
        nerConverter
    ])

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 8.34 µs
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]
onto_100 download started this may take some time.
Approximate size to download 13.5 MB
[OK!]


In [5]:
model = pipeline.fit(linked_sentences)
annotated_sentences = model.transform(linked_sentences)

In [6]:
annotated_sentences.printSchema()

root
 |-- entities: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- links: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- s: string (nullable = true)
 |-- title: string (nullable = true)
 |-- wid: long (nullable = true)
 |-- wikiTitle: string (nullable = true)
 |-- sentence: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |  

In [7]:
def add_ner_links(original_links, entities, ner_links, s_length):
    valid_ner = {"DATE", "TIME", "PERCENT", "MONEY", "QUANTITY", "CARDINAL"}
    mapped_loc = [False for _ in range(s_length)]
    merged_links = []
    final_ner_links = []
    entities = set(entities)
    for link in original_links:
        start = int(link[2])
        end = int(link[3])
        merged_links.append([link[0],link[1],start,end,'hyper','hyper'])
        mapped_loc[start:end] = [True]*(end-start)
    for link in ner_links:
        start = link.begin
        end = link.end
        ner_type = link.metadata['entity']
        e_id = f"{ner_type}:{link.result}"
        final_ner_links.append([e_id,link.result,start,end,'ner',link.metadata['entity']])
        if not any(mapped_loc[start:end]) and ner_type in valid_ner:
            merged_links.append([e_id,link.result,start,end,'ner',link.metadata['entity']])
            entities.add(e_id)
    return [merged_links, list(entities), final_ner_links]

In [8]:
annotated_sentences = annotated_sentences\
    .drop('sentence','token','embeddings','ner')\
    .where((F.size(F.col('links'))+F.size(F.col('ner_chunk')))>1)\
    .rdd.map(lambda x:{
        'wid': x.wid,
        'title': x.title,
        'wikiTitle': x.wikiTitle,
        's': x.s,
        'links': add_ner_links(x.links, x.entities, x.ner_chunk, len(x.s))
    })\
    .filter(lambda x:len(x['links'][1])>1)

In [9]:
%%time
annotated_sentences.map(lambda x:json.dumps(x)).saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences.json')

CPU times: user 137 ms, sys: 39.9 ms, total: 177 ms
Wall time: 22min 26s


## Extract entities pairs from sentences

In [5]:
#original
annotated_sentence = sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences/annotated_sentences.json-part-*/part-*')\
                .map(json.loads)

In [6]:
annotated_sentence.take(10)

[{'wid': 36909709,
  'title': 'Saint Mary Magdalene High School in Poznań',
  'wikiTitle': 'Saint_Mary_Magdalene_High_School_in_Poznań',
  'sec_i': 0,
  'p_i': 0,
  's_i': 0,
  's': 'Saint Mary Magdalene High School in Poznań (Polish: Liceum Ogólnokształcące św.',
  'links': [[['Saint_Mary_Magdalene_High_School_in_Poznań',
     'Saint Mary Magdalene High School in Poznań',
     0,
     42,
     'hyper',
     'hyper'],
    ['Polish_language', 'Polish', 44, 50, 'hyper', 'hyper']],
   ['Polish_language', 'Saint_Mary_Magdalene_High_School_in_Poznań'],
   [['FAC:Saint Mary Magdalene High School',
     'Saint Mary Magdalene High School',
     0,
     32,
     'ner',
     'FAC'],
    ['GPE:Poznań', 'Poznań', 36, 42, 'ner', 'GPE'],
    ['NORP:Polish', 'Polish', 44, 50, 'ner', 'NORP']]]},
 {'wid': 36909709,
  'title': 'Saint Mary Magdalene High School in Poznań',
  'wikiTitle': 'Saint_Mary_Magdalene_High_School_in_Poznań',
  'sec_i': 0,
  'p_i': 0,
  's_i': 3,
  's': 'colloquially simply as Mar

In [18]:
# remove duplication
annotated_sentence.map(lambda x:(hashlib.md5((x['wikiTitle']+'$$%md5%$$'+x['s']).encode()).hexdigest(),x))\
    .reduceByKey(lambda a,b:a if len(a['links'][0])>len(b['links'][0]) else b)\
    .map(lambda x: json.dumps(x[1]))\
    .saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences_cleaned.json')

In [19]:
annotated_sentence = sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences_cleaned.json/part*')\
                .map(json.loads)\
                .filter(lambda x:len(x['links'][1])>1)

In [20]:
# Sentence that mentions the topic entity of that Wikipedia page
annotated_sentence_only_self = annotated_sentence.filter(lambda x:x['wikiTitle'] in x['links'][1])

In [21]:
overall_stat = annotated_sentence.map(lambda x:[len(set([y[0] for y in x['links'][0] if y[4]=='hyper'])),
                                               len(x['links'][1])]).collect()
only_self_stat = annotated_sentence_only_self.map(lambda x:[len(set([y[0] for y in x['links'][0] if y[4]=='hyper'])),
                                               len(x['links'][1])]).collect()

In [26]:
print('all', len(overall_stat))
print('>=3', len([x for x in overall_stat if x[1]>2]))
print('>=4', len([x for x in overall_stat if x[1]>3]))
print('average', sum([x[1] for x in overall_stat])/len(overall_stat))
print('max', max([x[1] for x in overall_stat]))
print('min', min([x[1] for x in overall_stat]))

print('>=2 entity', len([x for x in overall_stat if x[0]>2]))
print('>=3 entity', len([x for x in overall_stat if x[0]>3]))
print('average entity', sum([x[0] for x in overall_stat])/len(overall_stat))
print('max', max([x[0] for x in overall_stat]))
print('min', min([x[0] for x in overall_stat]))

all 60415414
>=3 34420649
>=4 19101114
average 3.319835696234739
max 1253
min 2
>=2 entity 18911307
>=3 entity 9372275
average entity 2.3173175309201723
max 1253
min 1


In [27]:
print('all', len(only_self_stat))
print('>=3', len([x for x in only_self_stat if x[1]>2]))
print('>=4', len([x for x in only_self_stat if x[1]>3]))
print('average', sum([x[1] for x in only_self_stat])/len(only_self_stat))
print('max', max([x[1] for x in only_self_stat]))
print('min', min([x[1] for x in only_self_stat]))

print('>=2 entity', len([x for x in only_self_stat if x[0]>2]))
print('>=3 entity', len([x for x in only_self_stat if x[0]>3]))
print('average entity', sum([x[0] for x in only_self_stat])/len(only_self_stat))
print('max', max([x[0] for x in only_self_stat]))
print('min', min([x[0] for x in only_self_stat]))

all 16909031
>=3 12938247
>=4 7949870
average 3.860607210430923
max 402
min 2
>=2 entity 8380653
>=3 entity 4458609
average entity 2.869229999046072
max 402
min 1


In [11]:
import hashlib
# Pair the topic entity with other entities in the sentence
def extract_pairs(self_id, entities):
    pairs = []
    for e in entities:
        if e!=self_id:
            pairs.append(tuple(sorted([self_id, e])))
    return pairs
# Extract all pairs of entities in the sentence. The only constraint is the subject need to be an entity linked to a wikipedia page
def extract_all_pairs(links):
    pairs = set()
    for link1 in links: 
        if link1[4] == 'hyper':
            e1 = link1[0]
            for link2 in links:
                e2 = link2[0]
                if e1!=e2:
                    pairs.add(tuple(sorted([e1, e2])))
    return list(pairs)

In [23]:
annotated_sentence_only_self_with_pair = annotated_sentence_only_self\
                            .map(lambda x:x.update({
                                'pairs': extract_pairs(x['wikiTitle'],x['links'][1]),
                                'all_pairs': extract_all_pairs(x['links'][0]),
                                'md5': hashlib.md5((x['wikiTitle']+'$$%md5%$$'+x['s']).encode()).hexdigest()
                            }) or x)

In [16]:
annotated_sentence_only_self_with_pair.take(2)

[{'wid': 13238808,
  'title': 'Elihu Root House',
  'wikiTitle': 'Elihu_Root_House',
  'sec_i': 2,
  'p_i': 0,
  's_i': 0,
  's': "The Elihu Root House was also known as the Grant House due to the marriage between Root's daughter, Edith, and Ulysses S. Grant III.",
  'links': [[['Elihu_Root_House', 'Elihu Root House', 4, 20, 'hyper', 'hyper'],
    ['Ulysses_S._Grant_III',
     'Ulysses S. Grant III',
     111,
     131,
     'hyper',
     'hyper']],
   ['Elihu_Root_House', 'Ulysses_S._Grant_III'],
   [['ORG:The Elihu Root House', 'The Elihu Root House', 0, 20, 'ner', 'ORG'],
    ['ORG:the Grant House', 'the Grant House', 39, 54, 'ner', 'ORG'],
    ["PERSON:Root's", "Root's", 83, 89, 'ner', 'PERSON'],
    ['PERSON:Edith', 'Edith', 100, 105, 'ner', 'PERSON'],
    ['PERSON:Ulysses S', 'Ulysses S', 111, 120, 'ner', 'PERSON']]],
  'pairs': [('Elihu_Root_House', 'Ulysses_S._Grant_III')],
  'all_pairs': [('Elihu_Root_House', 'Ulysses_S._Grant_III')],
  'md5': '6c67c3cb88f8d530ce3693587a0bcb71

In [24]:
%%time
annotated_sentence_only_self_with_pair.map(lambda x:json.dumps(x)).saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences_onlyself_withpairs.json')

CPU times: user 496 ms, sys: 183 ms, total: 679 ms
Wall time: 2min 29s


In [25]:
annotated_sentence_only_self_with_pair = sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences_onlyself_withpairs.json/*')\
                .map(json.loads)

In [26]:
# Keep only sentence ids and entities for join
s_id_with_pairs = spark.createDataFrame(annotated_sentence_only_self_with_pair\
                    .map(lambda x:Row(pairs=x['pairs'],all_pairs=x['all_pairs'],s_id=x['md5'])))\
                    .groupBy('s_id').agg(F.first('pairs', True).alias('pairs'), F.first('all_pairs', True).alias('all_pairs'))

In [27]:
# All posssible combinations of entities and sentence
pair_with_sentence_id = s_id_with_pairs.select('s_id', F.explode('pairs').alias('pair'))
allpair_with_sentence_id = s_id_with_pairs.select('s_id', F.explode('all_pairs').alias('pair'))

In [35]:
%%time
print(pair_with_sentence_id.count())
print(allpair_with_sentence_id.count())

48370096
116766231
CPU times: user 1.45 s, sys: 573 ms, total: 2.02 s
Wall time: 4min 7s


For each entity pair, group all sentences that mention it.

In [28]:
pair_with_sentence_ids = pair_with_sentence_id\
    .groupBy('pair').agg(F.collect_set('s_id').alias('s_ids'))\
    .where(F.size('s_ids')>1)

In [29]:
allpair_with_sentence_ids = allpair_with_sentence_id\
    .groupBy('pair').agg(F.collect_set('s_id').alias('s_ids'))\
    .where(F.size('s_ids')>1)

In [7]:
%%time
pair_with_sentence_ids.select(F.size('s_ids')).summary().show()

+-------+------------------+
|summary|       size(s_ids)|
+-------+------------------+
|  count|           2918453|
|   mean|2.3459713074015585|
| stddev|1.1370230766853102|
|    min|                 2|
|    25%|                 2|
|    50%|                 2|
|    75%|                 2|
|    max|               170|
+-------+------------------+

CPU times: user 1.9 s, sys: 1.04 s, total: 2.93 s
Wall time: 16min 57s


In [10]:
allpair_with_sentence_ids.select(F.size('s_ids')).summary().show()

+-------+-----------------+
|summary|      size(s_ids)|
+-------+-----------------+
|  count|          9677194|
|   mean|3.929200551316838|
| stddev|39.57423584789655|
|    min|                2|
|    25%|                2|
|    50%|                2|
|    75%|                3|
|    max|            35795|
+-------+-----------------+



In [30]:
# remove pair with over 2000 support sentence to prevent explosion
allpair_with_sentence_id_noskew = allpair_with_sentence_id\
    .groupBy('pair').agg(F.collect_set('s_id').alias('s_ids'))\
    .where(F.size('s_ids')<=2000)\
    .select('pair', F.explode('s_ids').alias('s_id'))

Find potential supporting evidence for each query sentence as described in the paper.
- `s1` is the query sentence, which can be any sentence with entity pairs.
- `s2` is supporting evidence, it must contain the topic entity, thus more likely to contain relational knowledge.

For each sentence, we get all entity pairs in it, and then join with all other sentences that mention each entity pair. And finally group all to get the potential supporting evidence set.

In [31]:
s_pairs_noskew = allpair_with_sentence_id_noskew.withColumnRenamed('s_id','s1_id')\
    .join(pair_with_sentence_id.withColumnRenamed('s_id','s2_id'), on='pair', how='inner')\
    .where(F.col('s1_id')!=F.col('s2_id'))\
    .groupBy("s1_id","pair").agg(F.collect_list("s2_id").alias('s2_ids'))\
    .select('s1_id', F.struct('pair', 's2_ids').alias('pair_s2_ids'))\
    .groupBy("s1_id").agg(F.collect_list("pair_s2_ids").alias('pairs_s2_ids'))

In [32]:
%%time
s_pairs_noskew.write.json('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences_onlyself_spairs_noskew.json')

CPU times: user 3.88 s, sys: 1.8 s, total: 5.68 s
Wall time: 16min 39s


In [33]:
s_pairs_noskew = spark.read.json('/data/deng.595/workspace/hybrid_pretrain/data/annotated_sentences_onlyself_spairs_noskew.json')

In [5]:
s_pairs_noskew.rdd.take(1)

[Row(pairs_s2_ids=[Row(pair=['CARDINAL:3', 'Nimslo'], s2_ids=['52265cab682168afdce6c35fcc8b41d8'])], s1_id='001a10229229a3585868ed898e91a2fe')]

In [4]:
%%time
s_pairs_noskew.printSchema()
s_pairs_noskew.select(F.size('pairs_s2_ids')).summary().show()
s_pairs_noskew.where(F.size('pairs_s2_ids')>1).select(F.size('pairs_s2_ids')).summary().show()

root
 |-- pairs_s2_ids: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pair: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- s2_ids: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- s1_id: string (nullable = true)

+-------+------------------+
|summary|size(pairs_s2_ids)|
+-------+------------------+
|  count|           7631819|
|   mean|2.1242102308768067|
| stddev| 3.582983480542634|
|    min|                 1|
|    25%|                 1|
|    50%|                 1|
|    75%|                 2|
|    max|              1100|
+-------+------------------+

+-------+------------------+
|summary|size(pairs_s2_ids)|
+-------+------------------+
|  count|           3248827|
|   mean|3.6408820783624365|
| stddev|  5.11388869323814|
|    min|                 2|
|    25%|                 2|
|    50%|                 3|
|    75%|                 4|
|    max|          

## Cache sentencce information to construct the final dataset

In [5]:
all_used_s_ids = s_pairs_noskew.select(F.explode('pairs_s2_ids.s2_ids')).select(F.explode('col')).distinct()\
    .union(s_pairs_noskew.select('s1_id').distinct()).distinct().rdd.map(lambda x:(x.col, None)).persist()

Extract the context for each sentence.

In [None]:
linked_sentences = sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/sentences_with_link_cleaned_nonempty.json/part-*')\
    .map(json.loads).map(lambda x:((x['title'], x['sec_i'], x['p_i']),\
                                   [{'s_i':x['s_i'], 's':x['linked_sentence'][0], 'md5':x['md5']}]))
def create_sentence_with_context(paragraph):
    paragraph.sort(key=lambda x:x['s_i'])
    sentences_with_context = []
    estimate_sentence_length = [len(s['s'].split()) for s in paragraph]
    total_i = len(paragraph)
    for i,s in enumerate(paragraph):
        start = 0
        s_with_context = s['s']
        prefix_i = i-1
        append_i = i+1
        current_len = estimate_sentence_length[i]
        while current_len<500 and (prefix_i>=0 or append_i<total_i):
            if prefix_i<0:
                p_or_a = True
            elif append_i==total_i:
                p_or_a = False
            else:
                p_or_a = random.random()>0.5 #prefix or append as context
            if p_or_a: #append next sentence
                sentence_to_add = paragraph[append_i]['s']
                s_with_context = s_with_context + ' ' + sentence_to_add
                current_len += estimate_sentence_length[append_i]
                append_i += 1
            else:
                sentence_to_add = paragraph[prefix_i]['s']
                s_with_context = sentence_to_add + ' ' + s_with_context
                current_len += estimate_sentence_length[prefix_i]
                prefix_i -= 1
                start += len(sentence_to_add)+1
        sentences_with_context.append({
            's_with_context': s_with_context,
            'md5': s['md5'],
            'start': start,
            'end': start+len(s['s'])})
    return sentences_with_context

In [None]:
linked_sentences_with_context = linked_sentences.reduceByKey(add).flatMap(lambda x:create_sentence_with_context(x[1]))
linked_sentences_with_context.map(json.dumps).saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/sentences_with_context.json')

In [6]:
sentences_with_context = sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/sentences_with_context.json')\
    .map(json.loads)

In [10]:
def merge_meta(meta1, meta2):
    links = {}
    for link in meta2['links'][0]:
        if link[0] in links:
            links[link[0]].append([link[2], link[3]])
        else:
            links[link[0]] = [[link[2], link[3]]]
    s_start = meta1['start']
    links_with_context = {k:[[s+s_start,e+s_start] for s,e in v] for k,v in links.items()}
    return {
        'md5': meta2['md5'],
        'title': meta2['title'],
        's': {'text':meta2['s'], 'links': links},
        's_with_context': \
            {'text':meta1['s_with_context'], 's_loc':[meta1['start'],meta1['end']], 'links': links_with_context}
    }

In [11]:
all_used_sentences = all_used_s_ids.join(sentences_with_context.map(lambda x:(x['md5'],x)))\
    .join(annotated_sentence_only_self_with_pair.map(lambda x:(x['md5'],x)))\
    .map(lambda x:merge_meta(x[1][0][1], x[1][1]))

In [12]:
all_used_sentences.map(json.dumps).saveAsTextFile('/data/deng.595/workspace/hybrid_pretrain/data/all_used_sentences.json')

## Generate the final dataset

In [11]:
import transformers
from tokenizers import BertWordPieceTokenizer

tokenizer = transformers.AutoTokenizer.from_pretrained('roberta-base')

In [13]:
def tokenize_sentences(x):
    tokenized_s = tokenizer(x['s']['text'], add_special_tokens=False)
    tokenized_s_with_context = tokenizer(x['s_with_context']['text'], add_special_tokens=False)
#     try:
    s_links = {}
    for k,v in x['s']['links'].items():
        anchors = []
        for start,end in v:
            if start<end and end>0:
                start = tokenized_s.char_to_token(start)
                end = tokenized_s.char_to_token(end-1)
                if start is not None and end is not None:
                    anchors.append([start,end+1])
        if anchors:
            s_links[k] = anchors
    s_with_context_links = {}
    for k,v in x['s_with_context']['links'].items():
        anchors = []
        for start,end in v:
            if start<end and end>0:
                start = tokenized_s_with_context.char_to_token(start)
                end = tokenized_s_with_context.char_to_token(end-1)
                if start is not None and end is not None:
                    anchors.append([start,end+1])
        if anchors:
            s_with_context_links[k] = anchors
    s_loc_start = x['s_with_context']['s_loc'][0]
    while tokenized_s_with_context.char_to_token(s_loc_start) is None:
        s_loc_start += 1
    s_loc_start = tokenized_s_with_context.char_to_token(s_loc_start)
    s_loc_end = x['s_with_context']['s_loc'][1]-1
    while tokenized_s_with_context.char_to_token(s_loc_end) is None:
        s_loc_end -= 1
    s_loc_end = tokenized_s_with_context.char_to_token(s_loc_end)
    if s_loc_start>=s_loc_end:
        error = 'sentence not in context'
    else:
        error = 'none'
    return {
    'md5': x['md5'],
    'title': x['title'],
    's': {'ids':tokenized_s['input_ids'], 'links': s_links},
    's_with_context': \
        {'ids':tokenized_s_with_context['input_ids'],\
         's_loc':[s_loc_start,\
                 s_loc_end+1],\
         'links': s_with_context_links},
    'error': error
    }

In [14]:
all_used_sentences = sc.textFile('/data/deng.595/workspace/hybrid_pretrain/data/all_used_sentences.json')\
    .map(json.loads)

In [14]:
all_used_sentences.take(1)

[{'md5': '4a50690d895a0b41b72426b5b0505ae9',
  'title': 'Chicago',
  's': {'text': 'Chicago is also a prominent center of the Polish Cathedral style of church architecture.',
   'links': {'Chicago': [[0, 7]],
    'Polish_Cathedral_style': [[42, 64]],
    'church_architecture': [[68, 87]]}},
  's_with_context': {'text': "Chicago gave its name to the Chicago School and was home to the Prairie School, two movements in architecture. Multiple kinds and scales of houses, townhouses, condominiums, and apartment buildings can be found throughout Chicago. Large swaths of the city's residential areas away from the lake are characterized by brick bungalows built from the early 20th century through the end of World War II. Chicago is also a prominent center of the Polish Cathedral style of church architecture. The Chicago suburb of Oak Park was home to famous architect Frank Lloyd Wright, who had designed The Robie House located near the University of Chicago.",
   's_loc': [399, 487],
   'links':

In [15]:
all_used_sentences_local = dict(all_used_sentences.map(lambda x:(x['md5'],x)).collect())

In [19]:
s_pairs_noskew_local = dict(s_pairs_noskew.rdd.map(lambda x:(x.s1_id, x)).collect())

In [19]:
s_pairs_noskew_local[0]

Row(pairs_s2_ids=[Row(pair=['CARDINAL:3', 'Nimslo'], s2_ids=['52265cab682168afdce6c35fcc8b41d8'])], s1_id='001a10229229a3585868ed898e91a2fe')

In [1]:
import webdataset as wds
from tqdm.notebook import tqdm
import numpy as np

In [64]:
# No tokenization version
import re
from tqdm.notebook import tqdm
import copy
overlap_errors = set()
with wds.ShardWriter('/data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/%06d.tar', maxcount=10000) as sink:
    for i, sample in enumerate(tqdm(s_pairs_noskew_local)):
        s1 = all_used_sentences_local.get(sample.s1_id, None)
        if s1 is None:
            print(f'error not found: {i}, {sample.s1_id}')
            continue
        s1 = s1['s']
        s1_text = s1['text']
        s1_links_and_anchors = {}
        for k, anchors in s1['links'].items():
            anchor_patterns = set()
            for anchor in anchors:
                anchor = s1['text'][anchor[0]:anchor[1]]
                if anchor.strip()!='':
                    anchor_patterns.add(anchor)
            if len(anchor_patterns)>0:
                s1_links_and_anchors[k] = [re.compile(r'\b'+re.escape(pattern)+r'\b') for pattern in anchor_patterns]
        pairs_data = []
        for pair_s2 in sample.pairs_s2_ids:
            pair = pair_s2.pair
            s2s = []
            for s2_id in pair_s2.s2_ids:
                s2 = all_used_sentences_local.get(s2_id, None)
                if s2 is None:
                    print(f'error not found: {i}, {s2_id}')
                    continue
                s2 = copy.deepcopy(s2['s_with_context'])
                s2_text = s2['text']
                if s1_text in s2_text:
                    s1_length = len(s1_text)
                    s1_in_s2_start = s2_text.find(s1_text)
                    s1_in_s2_end = s1_length
                    if s1_in_s2_start < s2['s_loc'][1]:
                        if s1_in_s2_end>s2['s_loc'][0]:
                            overlap_errors.add((i,s2_id))
                            continue
                        else:
                            s2['text'] = s2_text[:s1_in_s2_start]+s2_text[s1_in_s2_end:]
                            s2['s_loc'] = [s2['s_loc'][0]-s1_length, s2['s_loc'][1]-s1_length]
                            for k in s2['links']:
                                s2['links'][k] = [[anchor[0]-s1_length, anchor[1]-s1_length] for anchor in s2['links'][k]]
                anchors0 = s2['links'].get(pair[0],[])
                anchors1 = s2['links'].get(pair[1],[])
                for k, patterns in s1_links_and_anchors.items():
                    new_anchors = [[match.start(),match.end()] for pattern in patterns for match in pattern.finditer(s2['text'])]
                    new_anchors = [[start, end] for start, end in new_anchors if end<=s2['s_loc'][0] or start>=s2['s_loc'][1]]
                    if new_anchors:
                        if k in s2['links']:
                            s2['links'][k] += new_anchors
                        else:
                            s2['links'][k] = new_anchors
                if anchors0 and anchors1:
                    s2s.append({
                        'md5': s2_id,
                        'text': s2['text'],
                        's_loc': s2['s_loc'],
                        'pair_locs': [anchors0, anchors1],
                        'all_links': s2['links'],
                    })
            if s2s:
                anchors0 = s1['links'].get(pair[0],[])
                anchors1 = s1['links'].get(pair[1],[])
                if anchors0 and anchors1:
                    pairs_data.append({
                        'pair': pair,
                        's1_pair_locs': [anchors0, anchors1],
                        's2s': s2s
                    })
        if len(pairs_data)>1:
            sink.write({
                '__key__': sample.s1_id,
                'json': {
                    's1_text': s1['text'],
                    's1_all_links': s1['links'],
                    'pairs': pairs_data
                }
            })

# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000000.tar 0 0.0 GB 0


HBox(children=(FloatProgress(value=0.0, max=7631819.0), HTML(value='')))

# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000001.tar 10000 0.1 GB 10000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000002.tar 10000 0.1 GB 20000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000003.tar 10000 0.1 GB 30000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000004.tar 10000 0.1 GB 40000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000005.tar 10000 0.1 GB 50000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000006.tar 10000 0.1 GB 60000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000007.tar 10000 0.1 GB 70000
# writing /data/deng.595/workspace/hybrid_pretra

# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000061.tar 10000 0.1 GB 610000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000062.tar 10000 0.1 GB 620000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000063.tar 10000 0.1 GB 630000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000064.tar 10000 0.1 GB 640000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000065.tar 10000 0.1 GB 650000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000066.tar 10000 0.1 GB 660000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000067.tar 10000 0.1 GB 670000
# writing /data/deng.595/workspace/hybrid

# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000121.tar 10000 0.1 GB 1210000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000122.tar 10000 0.1 GB 1220000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000123.tar 10000 0.1 GB 1230000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000124.tar 10000 0.1 GB 1240000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000125.tar 10000 0.1 GB 1250000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000126.tar 10000 0.1 GB 1260000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000127.tar 10000 0.1 GB 1270000
# writing /data/deng.595/workspace

# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000181.tar 10000 0.1 GB 1810000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000182.tar 10000 0.1 GB 1820000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000183.tar 10000 0.1 GB 1830000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000184.tar 10000 0.1 GB 1840000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000185.tar 10000 0.1 GB 1850000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000186.tar 10000 0.1 GB 1860000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000187.tar 10000 0.1 GB 1870000
# writing /data/deng.595/workspace

# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000241.tar 10000 0.1 GB 2410000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000242.tar 10000 0.1 GB 2420000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000243.tar 10000 0.1 GB 2430000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000244.tar 10000 0.1 GB 2440000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000245.tar 10000 0.1 GB 2450000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000246.tar 10000 0.1 GB 2460000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000247.tar 10000 0.1 GB 2470000
# writing /data/deng.595/workspace

# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000301.tar 10000 0.1 GB 3010000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000302.tar 10000 0.1 GB 3020000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000303.tar 10000 0.1 GB 3030000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000304.tar 10000 0.1 GB 3040000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000305.tar 10000 0.1 GB 3050000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000306.tar 10000 0.1 GB 3060000
# writing /data/deng.595/workspace/hybrid_pretrain/data/sentence_multi_pairs_for_pretrain_no_tokenization/000307.tar 10000 0.1 GB 3070000
# writing /data/deng.595/workspace