In [1]:
import imports
import importlib
importlib.reload(imports)
from imports import *

# https://pymorphy2.readthedocs.io/en/stable/user/grammemes.html
# http://opencorpora.org/dict.php?act=gram
# https://github.com/pymorphy2/pymorphy2/blob/92d546f042ff14601376d3646242908d5ab786c1/pymorphy2/tagset.py#L130
feature_tags_array = [
    OpencorporaTag.PARTS_OF_SPEECH, # часть речи
    OpencorporaTag.GENDERS, # род
    OpencorporaTag.NUMBERS, # число
    OpencorporaTag.CASES, # падеж
    OpencorporaTag.ASPECTS, # соверш / несоверш
    OpencorporaTag.TRANSITIVITY, # перех / непереходный
    OpencorporaTag.PERSONS, # лицо
    OpencorporaTag.TENSES, # время
    OpencorporaTag.MOODS, # наклонение
    OpencorporaTag.VOICES, # залог
    #INVOLVEMENT
    ['Prnt'], # вводные слова
    ['Apro'], # местоимение
    ['NUMB'], # число вида 1234
    ['LATIN'], # текст на английском
    ['UNKN'], # неизвестный токен
    ['PUNCT_DASH', 'PUNCT_DOT', 'PUNCT_COMMA', 'PUNCT_QUOTE',
     'PUNCT_LEFT_PARENTHESIS', 'PUNCT_RIGHT_PARENTHESIS' ], # "()
    ['CAPITALIZED'], # начинается с заглавной буквы
    ['Fixd', 'Abbr'] # неизменяемое, сокращение
]

CUT_NAVEC_TAGS_ARRAY = [
    #'NOUN', #'ADJF'
]

params = build_params({
    "VARIANTS_CNT": 1,
    "TARGET_CLASSES_COUNT": 3,
    "INPUT_WORDS_CNT": 16,
    "feature_tags_array": feature_tags_array,
    "PUNCTUATION_TARGET": {
        "$empty": NO_PUNCT,
        ",": 1,
        ".": 2,
        "!": 2,
        "?": 2,
    },
    "USE_NAVEC": True,
    'CUT_NAVEC_TAGS_SET': set(CUT_NAVEC_TAGS_ARRAY),
    'INFECTED_TEXT_PROB': 0.1,
    "RETAIN_LEFT_PUNCT": True,
    'type': 'lenta',
})

In [2]:

server = run_server_if_not_running()
server_install_packages(server)

# server.rpc_simple(dataset_builder.get_word_features, 'кошка', params).shape
# server.rpc_simple(dataset_builder.create_dataset, ['а, б'], params, False)[0].shape

Server is listening on 0.0.0.0:65231
Connected by ('51.250.1.213', 39862)


In [2]:
params["train_test_split"] = 0.9
params["chunk_size"] = 3000 # 3000 # 300000
params["batch_size"] = 20000
params["max_parallel"] = 3
params["type"] = "lenta"

params["max_last_read_queue_size"] = 1


class AsyncDatasetWriter:
    def __init__(self, rpc_server):
        self.storage = Storage("cache/storage")
        self.storage.clear()
        self.chunks_count = 0

        self.storage.write_meta("chunks_count", 0)
        self.storage.write_meta("params", params)

        self.parallel_count = threading.Semaphore(params["max_parallel"])
        self.write_queue = queue.Queue()
        # self.thread_write = threading.Thread(target=asyncio.run, args=(self.write_task(),))
        
        self.chunks_cnt = 0
        self.device = torch.device('cuda:0')

        self.chunks_iter = None
        self.rpc_server = rpc_server
        
        self.thread_write = threading.Thread(target=self.write_task)
        self.thread_write.start()
        
        
    def chunk_loaded_callback(self, future):
        print("chunk_loaded_callback")
        self.write_queue.put(future)

    def write(self, future, i):
        # if future.exception() is not None:
        #     future.
            # absprint("ERROR: ", future.exception(), "\n", 
                    # "\n".join(traceback.format_tb(future.traceback())  ))
            # raise future.exception()
        print("getting result started")
        print(future)
        x, y, text_res, is_infected = future.get_result()
        print("writing started")
        print(size_of_tensor(x) / 1024 / 1024, "MB")
        self.storage.store("x", i, x)
        self.storage.store("y", i, y)
        self.storage.store("text_res", i, text_res)
        self.storage.store("is_infected", i, is_infected)
        self.parallel_count.release()
        self.write_queue.task_done()
        print("writing finished")

    def write_task(self):        
        try:
            print("write_task started")
            keep_running = True        
            def handle_executor_done_callback(future):
                try:
                    future.result()
                except Exception as e:
                    print("ERROR", "writer thread failed:\n", type(e).__name__, e)
                    print("\n".join(traceback.format_tb(e.__traceback__)))
                    keep_running = False

            with concurrent.futures.ThreadPoolExecutor(params["max_parallel"], "WRITER") as executor:
                chunk_number = 0
                while keep_running:
                    future = executor.submit(self.write, self.write_queue.get(), chunk_number)
                    future.add_done_callback(handle_executor_done_callback)
                    chunk_number += 1
                    self.chunks_count += 1

                    self.storage.write_meta("chunks_count", self.chunks_count)
        except Exception as e:
            print("ERROR", "writer thread failed:\n", type(e).__name__, e)
            print("\n".join(traceback.format_tb(e.__traceback__)))
    
    def load_dataset(self, stream):
        # with concurrent.futures.ThreadPoolExecutor(max_workers=params["max_parallel"]) as executor:
        for chunk in stream.group(params["chunk_size"]):
            if not self.thread_write.is_alive(): return
            self.parallel_count.acquire()

            print("submit task")
            future = self.rpc_server.rpc_async(dataset_builder.create_dataset, chunk, params, False)
            future.set_callback(self.chunk_loaded_callback)
            future = None
        
        
from dataset_lib import get_lenta_records
server = run_server_if_not_running()
server_install_packages(server)
stream = Stream(get_lenta_records()).limit(30000).map(lambda record: record.text)
writer = AsyncDatasetWriter(server)
writer.load_dataset(stream)

Server is listening on 0.0.0.0:65231
Connected by ('51.250.1.213', 49650)
write_task started
submit task
submit task
submit task
chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1425.4906311035156 MB
writing finished
submit task
chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1236.46875 MB
writing finishedsubmit task

chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1089.7422180175781 MB
writing finishedsubmit task

chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1060.7181701660156 MB
writing finishedsubmit task

chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1103.9662170410156 MB
writing finishedsubmit 

chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1163.177490234375 MB
chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1129.0686950683594 MB
writing finished
chunk_loaded_callback
getting result started
BasicAsyncResult completed with result `(tensor([[[ 0.0000, ...`
writing started
1129.0686950683594 MB
writing finished
writing finished


In [19]:
class Kek:
    def __init__(self, a):
        self.a = a
    
    def read(self, cnt=-1): 
        print(cnt)
        return self.a.read(cnt)
    def readline(self): raise Exception("no realization")
    def readable(self): return True

    def write(self, buf):
        print(len(buf))

import io
import dill

a = Kek(io.BytesIO())

dill.dump(["kek"] * 1000, a)

# Reset the position of the BytesIO object to the beginning
a.a.seek(0)

# Pass the BytesIO object directly to the Kek class
a.a.read()

2
9
2009


b''