In [1]:
%load_ext autoreload
%autoreload 2

In [15]:
import os
import glob
from collections import Counter
import math
import re
import json
import subprocess
import shutil
import pickle
import multiprocessing
import itertools
import random
import time

import requests
from requests import Request, Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import pandas as pd
import umap
from tqdm.autonotebook import tqdm, trange
from nltk.tokenize import word_tokenize

import numpy as np
from sklearn.datasets import load_iris, load_digits
from sklearn.model_selection import train_test_split, KFold

from sklearn.preprocessing import LabelBinarizer, LabelEncoder
from sklearn.metrics import confusion_matrix, roc_auc_score
from sklearn.utils import class_weight

import language
import text_nn
import grab_category
import news
import groups
import libs.cpp_stuff as cpp

In [3]:
file_info = language.read_dump("data/website/en/all")
file_info.extend(language.read_dump("data/website/ru/all"))

In [30]:
def make_feeder_script(file_info, num_streams, script_file):
    stream_commands = [[] for _ in range(num_streams)]
    for i in range(0, len(file_info), num_streams):
        for s in range(num_streams):
            n = i + s
            if n >= len(file_info):
                break

            stream_commands[s].append(f"curl -X PUT 127.0.0.1:1111/{os.path.basename(file_info[n].file)} --data-binary @{file_info[n].file} "
                                      f"-H 'Cache-Control: max-age=10000000'")
            
    script = "\n".join(" ( " + " ; ".join(s) + " ) &" for s in stream_commands) + "\nwait\n"
    with open(script_file, "w") as f:
        f.write(script)            

In [31]:
make_feeder_script(file_info, 30, "feed_both.sh")

In [32]:
file_info = language.read_dump("data/sample6/all")
make_feeder_script(file_info, 30, "feed6.sh")

In [25]:
all_dump = "data/website/en/all_dump"
if not os.path.exists(all_dump):
    os.mkdir(all_dump)
    
for fi in file_info:
    dest = os.path.join(all_dump, os.path.basename(fi.file))
    if not os.path.exists(dest):
        os.symlink(os.path.abspath(fi.file), dest)

In [37]:
s = Session()
url = "http://127.0.0.1:1111/threads?period=4353000&lang_code=en&category=society"
req = Request('GET', url)

prepped = s.prepare_request(req)

# do something with prepped.body
#prepped.body = 'Seriously, send exactly these bytes.'

# do something with prepped.headers
#prepped.headers['Keep-Dead'] = 'parrot'

resp = s.send(prepped)

print(resp.status_code)
json.loads(resp.content)

200


In [28]:
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)

def get_articles(language):
    resp = session.get(f"http://127.0.0.1:1111/threads?period=4353000&lang_code={language}&category=any")
    assert resp.status_code == 200
    res = json.loads(resp.content)["threads"]
    articles = set()
    for t in res:
        assert not any(a in articles for a in t["articles"])
        articles.update(t["articles"])
        
    return articles

def get_all_articles():
    en = get_articles("en")
    ru = get_articles("ru")
    assert en.isdisjoint(ru)
    return en | ru

def put_article(article_id, body, max_age):
    resp = session.put(f"http://127.0.0.1:1111/{article_id}", data=body, headers={'Cache-Control': f'max-age={max_age}'})
    assert resp.status_code == 201 or resp.status_code == 204
    return resp.status_code == 201

def put_article_data(fi, max_age):
    article_id = os.path.basename(fi.file)

    with open(fi.file, "rb") as f:
        body = f.read()
        
    return "put", (article_id, body, max_age)

def del_article(article_id):
    resp = session.delete(f"http://127.0.0.1:1111/{article_id}")
    assert resp.status_code == 404 or resp.status_code == 204
    return resp.status_code == 204

def del_article_data(fi):
    return "del", os.path.basename(fi.file)

def req(req_data):
    if req_data[0] == "put":
        return put_article(*req_data[1])
    elif req_data[0] == "del":
        return del_article(req_data[1])
    
    return None

In [7]:
put_datas = [put_article_data(fi, 1000000000) for fi in tqdm(file_info)]
random.shuffle(put_datas)
put_datas = [v for v in {pd[1][0]: pd for pd in put_datas}.values()]
del_datas = [("del", pd[1][0]) for pd in put_datas]
print(len(put_datas), "articles in bd")

HBox(children=(FloatProgress(value=0.0, max=70330.0), HTML(value='')))


70323 articles in bd


In [8]:
pool = multiprocessing.Pool(16)

In [58]:
def time_run(to_run, num):
    start_time = time.time()
    res = to_run()
    spent_time = time.time() - start_time
    print(f"{spent_time / num * 1000:.2f} seconds per 1000 files")
    return res

def run_and_check_results(reqs, expected_res):
    assert len(reqs) == len(expected_res)
    res = time_run(lambda: pool.map(req, reqs), len(reqs))
    assert all(r == e for r, e in zip(res, expected_res))
    
def mix_putdel(put_datas, del_datas, num_cycles, cycle_size, all_accepted):
    n = len(put_datas)
    full_set = set(range(n))
    files_in = set()
    art_ids = [dd[1] for dd in del_datas]
    for stage in trange(num_cycles):
        put_files = np.random.choice(n, cycle_size, replace=False)
        put_res = [f not in files_in for f in put_files]
        put_req = [put_datas[i] for i in put_files]
        del_files = np.random.choice(list(full_set - set(put_files)), cycle_size, replace=False)
        del_res = [f in files_in for f in del_files]
        del_req = [del_datas[i] for i in del_files]
        all_res = put_res + del_res
        all_req = put_req + del_req
        all_info = list(zip(all_res, all_req))
        random.shuffle(all_info)
        all_res, all_req = zip(*all_info)
        run_and_check_results(all_req, all_res)
        files_in.update(put_files)
        files_in.difference_update(del_files)
        
        arts_in = get_all_articles()
        should_be_in = set(art_ids[i] for i in files_in)
        assert arts_in.issubset(should_be_in), f"{len(arts_in)} {len(should_be_in)}"
        
        if all_accepted is not None:
            should_be_accepted = all_accepted & should_be_in
            assert arts_in == should_be_accepted
        
    del_res = [True] * len(files_in)
    del_req = [del_datas[i] for i in files_in]
    run_and_check_results(del_req, del_res)
    
    assert not get_all_articles()

def add_repeatedly(put_datas, num_times):
    res = time_run(lambda: pool.map(req, put_datas), len(put_datas))
    assert all(res)
    
    for _ in range(num_times - 1):
        res = time_run(lambda: pool.map(req, put_datas), len(put_datas))
        assert not any(res)        
    

def add_twice_delete_twice(put_datas, del_datas):
    assert len(put_datas) == len(del_datas)
    add_repeatedly(put_datas, 2)
    
    res = time_run(lambda: pool.map(req, del_datas), len(put_datas))
    assert all(res)

    res = time_run(lambda: pool.map(req, del_datas), len(put_datas))
    assert not any(res)    

In [None]:
add_repeatedly(put_datas, 10)

In [10]:
add_twice_delete_twice(put_datas, del_datas)

2.55 seconds per 1000 files
4.40 seconds per 1000 files
0.86 seconds per 1000 files
0.21 seconds per 1000 files


In [56]:
_ = pool.map(req, del_datas)

In [60]:
_ = pool.map(req, put_datas)

In [53]:
all_accepted = get_all_articles()
len(all_accepted)

35875

In [59]:
mix_putdel(put_datas, del_datas, 10, 10000, all_accepted)

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))

1.21 seconds per 1000 files
1.24 seconds per 1000 files
1.27 seconds per 1000 files
1.35 seconds per 1000 files
1.36 seconds per 1000 files
1.44 seconds per 1000 files
1.53 seconds per 1000 files
1.66 seconds per 1000 files
1.79 seconds per 1000 files
1.88 seconds per 1000 files

0.86 seconds per 1000 files
