## MultiProcessParzu 

Here we initialize multiple ParZu docker containers and send requests to them.

In [None]:
! python -m pip install --upgrade pip
! python -m pip install docker nltk

In [None]:
RESTART_EVERY = 3
FILEPATH="data/swissdox_test.tsv"
CHUNKSIZE=100
N_CONTAINERS = 3

In [None]:
import logging

In [None]:
import docker
client = docker.from_env()

In [None]:
client.containers.list(filters={'ancestor': 'stancerserver:latest'})

In [None]:
! ls ../../external_repos/stancer_setup

In [None]:
import subprocess
# a python function that gets the absolute path from a relative path relative to the cwd
def get_abs_path_from_rel_path(rel_path):
    commands = '''
    cd ../../external_repos/stancer_setup
    pwd
    '''.encode('utf-8')
    process = subprocess.Popen('/bin/bash', stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    out, err = process.communicate(commands)
    return out
    # get rows of file
    # output = subprocess.run(['cd', '../../external_repos/stancer_setup', '&&', 'pwd'], 
    #     capture_output=True, 
    #     text=True, 
    #     shell=True
    # )
    # # wc_output.stdout will be of format ` N_lines filename`
    # # subtract 1 to remove header
    # # print(str(wc_output.stdout).split(" "))
    # return str(output.stdout)

abspath_stancerserver = get_abs_path_from_rel_path('../../external_repos/stancer_setup')[:-1].decode('utf-8')

In [None]:
# container = client.containers.run('stancerserver', 
#     '/bin/bash ./scripts/run_server.sh', 
#     volumes=[f'{ abspath_stancerserver }:/app'], 
#     ports={"5003": "5004"}, 
#     detach=True
#     )

In [None]:

# container.stop()

In [None]:
# initialize containers
containers = []
portnos = []

problem_containers = client.containers.list(filters={'ancestor': 'stancerserver:latest'})

if problem_containers:
    for c in problem_containers:
        c.stop()
        # c.remove()

for i in range(0, N_CONTAINERS):
    portno = f"500{i+4}"

    container = client.containers.run('stancerserver', 
        '/bin/bash ./scripts/run_server.sh', 
        volumes=[f'{ abspath_stancerserver }:/app'], 
        ports={"5003": portno}, 
        detach=True
        )

    for x in container.logs(stream=True):
        print(x)
        if "Running" in x.decode("utf-8"):
            break
    containers.append(container)
    portnos.append(portno)

In [None]:
client.containers.list(filters={'ancestor': 'stancerserver:latest'})

In [None]:
from bs4 import BeautifulSoup
import re
import nltk
# download the punkt tokenizer
nltk.download('punkt')


def clean_text_from_xml(text):
    # logging.info(f"Clean text { text }")

    soup = BeautifulSoup(text, "xml")

    # breakpoint()

    textstr = " ".join([x.get_text() for x in soup.findAll('p')])

    # remove sentences with multicap words.
    tokenized = nltk.tokenize.sent_tokenize(textstr, language='german')
    tokenized = [x for x in tokenized if not re.search(r"([a-zäöü][A-ZÄÖÜ]){1,}", x)]
    
    # discard all sentences at the end that don't end with a dot.
    for i in range(len(tokenized)-1, 0, -1):
        sent_tokenized = nltk.tokenize.word_tokenize(tokenized[i], language='german', preserve_line=False)
        sent_length = len(sent_tokenized)
        if tokenized[i][-1] != "." or sent_length < 4:
            tokenized.pop(i)
        else:
            break

    return " ".join(tokenized)

In [None]:
import requests
# import logging 
url = f"http://localhost:5004/parse/"
headers = {
    "Content-Type": "application/json"
}
data = {
    "text": "Ich bin Berliner.",
}

response = requests.post(url, headers=headers, json=data, timeout=200)

logging.info(f"make_request - status code:  { response.status_code }")

print(response.text)


In [None]:
from enum import Enum
import threading

class Status(Enum):
    FREE = "FREE"
    BUSY = "BUSY"

class ContainerWrapper:
    def __init__(self, container):
        self.container = container
        # self.status = Status.FREE
        self.runs = 0
        self.restarts = 0
        self.lock = threading.Lock()

    def restart_container(self):
        logging.info("Container will restart, waiting")
        self.restarts += 1
        running_counts = self.restarts + 1
        self.container.restart()
        for x in self.container.logs(stream=True):
            logging.info(x.decode("utf-8"))
            count_string = "Running on all addresses (0.0.0.0)".lower()
            if count_string in x.decode("utf-8").lower():
                running_counts -= 1
                if running_counts == 0:
                    logging.info("Container is being running again")
                    break
                logging.info(f"Found running signal waiting for more signal, still {running_counts} to go")
        logging.info("Container has restarted")
        self.runs = 0

containerwrappers = [ContainerWrapper(container) for container in containers]

In [None]:
import requests
# import logging
import concurrent
import random
from functools import wraps
import time

# a function that checks whether a given text-string is in Conll format
def is_conll(text):
    if len(text.split("\n")) > 1 and text.split("\n")[0].startswith("1"):
        return True
    return False

def make_request(text, port):

    logging.info(f"portno:  { port }")

    logging.info(f"text:  { text[:500] if text else '' }")

    url = f"http://localhost:{port}/parse/"
    headers = {
        "Content-Type": "application/json"
    }
    data = {
        "text": text,
    }

    response = requests.post(url, headers=headers, json=data, timeout=200)

    logging.info(f"make_request - status code:  { response.status_code }")

    # logging.info(f"make_request - reading response text:  { response.text }")

    if response.status_code == 200 and is_conll(response.text):
        return response.text
    return None

class BusyError(Exception):
    pass 

def retry(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        while True:
            sleep(3)
            try:
                return f(*args, **kwargs)
            except BusyError:
                logging.info("Container is busy, retrying")
                pass
    return wrapped

@retry
def process_texts(text_df, containerwrapper, port, header=False):

    if containerwrapper.lock.locked():
        raise BusyError("Container is busy")

    containerwrapper.lock.acquire(blocking=True)

    containerwrapper.runs += 1

    if containerwrapper.runs > RESTART_EVERY:
        logging.info(str("Container has passed "+  str(RESTART_EVERY)+ " runs, restarting"))
        # containerwrapper.restart_container()
        # containerwrapper.runs = 0
        containerwrapper.restart_container()
        # containerwrapper.container.restart()
        # time.sleep(20)


    conlls = []
    cleaned_texts = []
    for row in text_df.iterrows():
        logging.info("Iterating over row")
        text = row[1]["content"]
        cleaned_text = clean_text_from_xml(text)
        try:
            conll = make_request(cleaned_text, port)
            time.sleep(1)
            if not conll:
                for i in range(5):
                    logging.info(f"make_request returned empty string, trying again { i }")
                    conll = make_request(cleaned_text, port)
                    if conll:
                        break
        except:
            # deal with a problem in the return (timeout problem)
            # potentially restart the container and check until works again!
            # exponential backoff
            logging.info("Receving of object failed %d. Returning (close thread)", 1)
            conll = None
            pass
        logging.info(f"Logging conll [1st line]:  { conll[:200] if conll else 'none' }")
        conlls.append(conll)
        cleaned_texts.append(cleaned_text)
    text_df["content_conll"] = conlls
    text_df["content_xml"] = text_df["content"].to_list()
    text_df["content"] = cleaned_texts
    text_df.to_csv("data/parsed.csv", index=False, mode="a", header=header)

    containerwrapper.lock.release()

    return 1

### Read a dataframe in chunked style


In [None]:
# check if file exists and if variable RESTORE_MODE is set to false, then delete the file
import os

RESTORE_MODE = False
if os.path.isfile('data/parsed.csv') and not RESTORE_MODE:
    os.remove('data/parsed.csv')

# if not in restore mode write first line (header) to file
# if not RESTORE_MODE:
#     open('data/parsed.csv', 'w').write('id,pubtime,medium_code,medium_name,rubric,regional,doctype,doctype_description,language,char_count,dateline,head,subhead,content_id,content,content_xml,content_conll\n')

In [None]:
# IMPORTS
import pandas as pd
import json
import os

def load_data(filepath, chunksize=CHUNKSIZE):
    df_generator = pd.read_csv(filepath, delimiter="\t", chunksize=chunksize, index_col=0)
    # if len(df) != len(df.drop_duplicates(subset=["content"])):
    #     print("Duplicate texts found.")
    #     sys.exit(0)
    return df_generator

# a function that returns a generator that circulates through a list
# def cycle(lst):
#     i = 0
#     while True:
#         yield lst[i]
#         i = (i + 1) % len(lst)

### Run threaded execution pool



In [None]:
def get_no_rows(filename):
    # get rows of file
    wc_output = subprocess.run(['wc','-l', f'{filename}'], capture_output=True, text=True)
    # wc_output.stdout will be of format ` N_lines filename`
    # subtract 1 to remove header
    # print(str(wc_output.stdout).split(" "))
    return int(wc_output.stdout.split()[0]) - 1

In [None]:
# logging.info("Testing update. Starting value is %d.", database.value)
from tqdm import tqdm
import itertools
import subprocess
import concurrent
import random
from functools import wraps
from time import sleep

# format = "%(asctime)s: %(message)s"
# logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

no_rows = get_no_rows(FILEPATH)

itertimes = no_rows // CHUNKSIZE

# write first 

import logging
logger = logging.getLogger()
fhandler = logging.FileHandler(filename='mpparzu.log', mode='a')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fhandler.setFormatter(formatter)
logger.addHandler(fhandler)
logger.setLevel(logging.INFO)
# logger.setLevel(logging.WARNING)

# create shared container datastructure
# containerstats = { container.id: 0 for container in containers }

def run(my_iter, itertimes):
    # process first non parallel
    df, cont, portno = next(my_iter)
    process_texts(df, cont, portno, header=True)
    with tqdm(total=itertimes) as pbar:
        # let's give it some more threads:
        with concurrent.futures.ThreadPoolExecutor(max_workers=N_CONTAINERS) as executor:
            futures = {executor.submit(process_texts, df, cont, portno): 0 for df, cont, portno in my_iter}
            results = {}
            for future in concurrent.futures.as_completed(futures):
                arg = futures[future]
                results[arg] = future.result()
                pbar.update(1)
    # print(321, results[321])

df_iter = load_data(FILEPATH)
cont_iter = itertools.cycle(containerwrappers)
portno_iter = itertools.cycle(portnos)

# write indices to list of chunks and skip if in restore mode!

my_iter = zip(
    df_iter, 
    cont_iter, 
    portno_iter
    ) 

run(my_iter, itertimes)

# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
#     for index in range(2):
#         executor.submit(database.update, index)
# logging.info("Testing update. Ending value is %d.", database.value)

In [None]:
# for container in containers:
#     container.restart()

In [None]:
for container in containers:
    container.stop()