In [10]:
import sys

sys.path.append('./syntaxnet_wrapper/src')
sys.path.append('./pylingtools/src')
sys.path.append('./framebank_preprocessing/')

## Loading data

In [2]:
import json

#corpus_path = 'annotated_corpus.json'
corpus_path = 'annotated_corpus_fixed.json'

with open(corpus_path, 'r') as f:
    corpus = json.load(f)

In [3]:
# Removing empty examples
new_corp = dict()
for exid, example in corpus.iteritems():
    new_corp[exid] = [sent for sent in example if sent]

fixed_corpus_path = 'annotated_corpus_fixed.json'
with open(fixed_corpus_path, 'w') as f:
    json.dump(new_corp, f)

corpus = new_corp

## Main scripts

In [8]:
!pip install polyglot

Collecting polyglot
  Downloading polyglot-16.7.4.tar.gz (126kB)
[K    100% |################################| 133kB 1.1MB/s ta 0:00:01
[?25hCollecting futures>=2.1.6 (from polyglot)
  Downloading futures-3.0.5-py2-none-any.whl
Collecting PyICU>=1.8 (from polyglot)
  Downloading PyICU-1.9.5.tar.gz (181kB)
[K    100% |################################| 184kB 1.9MB/s ta 0:00:011
[?25hCollecting pycld2>=0.3 (from polyglot)
  Downloading pycld2-0.31.tar.gz (14.3MB)
[K    100% |################################| 14.3MB 69kB/s  eta 0:00:01
[?25hCollecting morfessor>=2.0.2a1 (from polyglot)
  Downloading Morfessor-2.0.2a4.tar.gz
Building wheels for collected packages: polyglot, PyICU, pycld2, morfessor
  Running setup.py bdist_wheel for polyglot ... [?25l- \ | done
[?25h  Stored in directory: /root/.cache/pip/wheels/b4/b7/34/e6fbb82ec71c0c9d7f1b26a038f00129acd99a6aa5e5b93f2d
  Running setup.py bdist_wheel for PyICU ... [?25l- \ | / - \ | / - \ | / - \ 

In [11]:
from multiprocessing import Process, Queue, Manager, current_process
from syntaxnet_wrapper import PipelineSyntaxNet
from syntaxnet_wrapper import ProcessorSyntaxNet
from convert_corpus_to_brat import make_text, create_verb_example_index
import math


def parse_corpus_syntaxnet2(corpus, proc, addr):
    for num, (exid, example) in enumerate(corpus.iteritems()):
        raw_input_s = u''
        for sent in example:
            new_sent = list()
            for or_word in sent:
                if not or_word['form']:
                    new_sent.append(u'_')
                elif u' ' in or_word['form'] or u'\t' in or_word['form']:
                    new_sent.append('_')
                else:
                    new_sent.append(or_word['form'])
                    
            line = u' '.join(new_sent[:500])
            
            raw_input_s += line
            raw_input_s += u'\n'
            
        raw_input_s += u'\n'
    
        try:
            if num % 10 == 0:
                print current_process().name, '--Port: {}--Start:'.format(str(addr)), num, exid 
            parse_result = proc.parse(raw_input_s)
            if num % 10 == 0:
                print '--Port: {}--Finished:'.format(str(addr)), num, exid
                
        except IndexError as err:
            print err
            print '--Err index error: {}--{}--{}--:'.format(str(addr), num, exid)
            
        if parse_result is None:
            print '--Err: {}--{}--{}--:'.format(str(addr), num, exid)
            continue

        for sent_num, parse_sent in enumerate(parse_result):
            ex_sent = example[sent_num]
            
            if len(parse_sent) != len(ex_sent) and len(ex_sent) <= 500:
                print sent_num
                print len(parse_sent)
                print len(ex_sent)
                print exid
            
            for word_num, parse_word in enumerate(parse_sent):
                corp_word = example[sent_num][word_num]
                corp_word['feat'] = parse_word.morph
                corp_word['postag'] = parse_word.pos_tag
                corp_word['parent'] = parse_word.parent 
                corp_word['link_name'] = parse_word.link_name


def process_chunk(chunk, ppl, addr, out_q):
    parse_corpus_syntaxnet2(chunk, ppl, addr)
    out_q.put(chunk)
    print 'Process finished:', current_process().name
    

class MultiprocessParser(object):
    def __init__(self, addresses = []):
        super(MultiprocessParser, self).__init__()
        self.addresses_ = addresses
        self.out_q = Manager().Queue()
    
    def _make_chunks(self, corpus):
        ex_keys = corpus.keys()
        if len(self.addresses_) == 1:
            chunk_size = len(corpus)
        else:
            chunk_size = int(math.ceil((1. * len(corpus) / len(self.addresses_))))
            new_size = chunk_size * len(self.addresses_) - len(corpus)
            chunks = [ex_keys[x : x + chunk_size - 1] for x in xrange(0, new_size * (chunk_size - 1), chunk_size - 1)]
            chunks += [ex_keys[x : x + chunk_size] for x in xrange((chunk_size - 1) * new_size, len(corpus), chunk_size)]
        
        return chunks
    
    def process(self, corpus):
        chunks = self._make_chunks(corpus)
        
        proc_pool = list()
        for chunk, addr in zip(chunks, self.addresses_):
            dict_chunk = {k : corpus[k] for k in chunk}
            ppl = ProcessorSyntaxNet(addr[0], addr[1])
            proc = Process(target=process_chunk, args=(dict_chunk, 
                                                       ppl, addr, self.out_q))
            proc.start()
            proc_pool.append(proc)
        
        for proc in proc_pool:
            proc.join()
            print proc
        

        tmp = list()
        while not self.out_q.empty():
            tmp.append(self.out_q.get())
        
        result = {}
        for chunk in tmp:
            result.update(chunk)
            
        return result
            

## Processing

In [None]:
# Place hosts with containers here
# Found bug with more than 15-20 processes, use <= 20
hosts = [('myhost', port) for port in xrange(8200, 8215)]
mproc = MultiprocessParser([('vmh1.isa.ru', port) for port in xrange(8200, 8215)])
result = mproc.process(corpus)

In [None]:
assert(len(corpus) == len(result)) # Be aware of connection reset by peer problem if >20 processess are used

## Saving the result

In [6]:
output_file_path = 'annotated_corpus_fixed+syntaxnet.json'
with open(output_file_path, 'w') as f:
    json.dump(result, f)