In [12]:
import pandas as pd
import torch
import random
import concurrent.futures
from collections.abc import Callable, Generator

from src.spacy_models.spacy_pipe_build import SpacyPipeBuild
from src.spacy_models.spacy_pipe_funcs import PipeFunc
from src.settings.enums import NaturalLanguage, SpacyTask, ExtractionType
from src.data.data_loader import DataLoader

In [13]:
torch.cuda.empty_cache()

In [14]:
dl = DataLoader()
year = 2023
month = 10
df_all = dl.load_monthly_df(year=year, month=month)

In [15]:
n_samples = 5
indexes = random.sample(population=range(len(df_all.index)), k=n_samples)
# indexes = [172, 2054, 553, 3152] # 448, 580, 3887, 796] # 57, 121, 3653, 64, 4065, 2057, 1499, 4166, 188, 4033]
indexes

[741, 202, 952, 856, 960]

In [16]:
df = df_all.loc[indexes]
df

Unnamed: 0,art_source,art_url,art_author,art_type,art_datetime,art_language,art_isin,art_company_name,art_title,art_abstract,art_text,art_video_url,pp_art_text
741,dpa-afx-compact,https://mobile.traderfox.com/news/dpa-compact/...,,unt,2023-10-11 12:55:00+02:00,DE,DE0005557508,Deutsche Telekom AG,Streit um Werbebriefe: 1N verbucht Erfolg gege...,,DÜSSELDORF (dpa-AFX) - Im Streit um Werbebrief...,,Im Streit um Werbebriefe eines Konkurrenten ha...
202,dpa-afx-compact,https://mobile.traderfox.com/news/dpa-compact/...,,unt,2023-10-04 15:14:00+02:00,DE,DE0005800601,GFT Technologies SE,EQS-DD: GFT Technologies SE (deutsch),,"EQS-DD: GFT Technologies SE: Dr. Jochen Ruetz,...",,"EQS-DD: GFT Technologies SE: Dr. Jochen Ruetz,..."
952,dpa-afx-compact,https://mobile.traderfox.com/news/dpa-compact/...,,unt,2023-10-13 09:50:00+02:00,DE,DE000A0D6554,Nordex SE,EQS-Stimmrechte: Nordex SE (deutsch),,Nordex SE: Veröffentlichung gemäß § 40 Abs. 1 ...,,EQS Stimmrechtsmitteilung: Nordex SE. Stimmre...
856,dpa-afx-compact,https://mobile.traderfox.com/news/dpa-compact/...,,unt,2023-10-12 12:15:00+02:00,DE,CA3719571018,Genesis AI Corp.,IRW-News: Genesis AI Corp: Genesis AI mit Unte...,,IRW-PRESS: Genesis AI Corp: Genesis AI mit Unt...,,Genesis AI Corp: Genesis AI mit Unternehmensvo...
960,dpa-afx-compact,https://mobile.traderfox.com/news/dpa-compact/...,,unt,2023-10-13 20:56:00+02:00,DE,EU0009652759,LS - EUR/USD,"Devisen: Eurokurs stabilisiert sich bei 1,05 U...",,NEW YORK (dpa-AFX) - Der Euro hat sich am Frei...,,Der Euro hat sich am Freitag im spaeten US-Dev...


In [17]:
class Process:
    def __init__(self):
        # Note: GPU does not work, use CPU:
        use_gpu = False
        self.nlp_en = SpacyPipeBuild(natural_language=NaturalLanguage.EN, spacy_task=SpacyTask.NER, use_gpu=use_gpu, extraction_type=ExtractionType.TRADITIONAL).nlp
        self.nlp_de = SpacyPipeBuild(natural_language=NaturalLanguage.DE, spacy_task=SpacyTask.NER, use_gpu=use_gpu, extraction_type=ExtractionType.TRADITIONAL).nlp

    def process(self, text: str, lang: NaturalLanguage) -> list[dict]:
        if lang == NaturalLanguage.EN:
            doc = self.nlp_en(text)
            return PipeFunc.get_sentences_with_custom_extensions(processed_doc=doc)
        elif lang == NaturalLanguage.DE:
            doc = self.nlp_de(text)
            return PipeFunc.get_sentences_with_custom_extensions(processed_doc=doc)
        else:
            raise ValueError(f'Language {lang} is not supported')
        # print('pipe_names:', nlp.pipe_names)

p_inst = Process()

CPU is used!
custom extensions "init_extensions" initialized
custom extensions "own_sentencizer" initialized
INFO: TRANSFORMER pipelines need the parser for sentence-related tasks. Thus, the parser component will be set now.
regex_entity_pattern for own_regex_search were compiled.
Function "own_regex_search" initialized
Function "own_coref_resolve" initialized
CPU is used!
custom extensions "init_extensions" initialized
custom extensions "own_sentencizer" initialized
regex_entity_pattern for own_regex_search were compiled.
Function "own_regex_search" initialized
Function "own_coref_resolve" initialized


In [18]:
# Note: Check this out: https://stackoverflow.com/questions/67189283/how-to-keep-the-original-order-of-input-when-using-threadpoolexecutor

In [19]:
def concurrent_df_apply(df: pd.DataFrame, function: Callable, df_col_name_1: str, df_col_name_2: str, name_new_col: str):
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(df.index)) as executor:
        generator: Generator = executor.map(function, df[df_col_name_1], df[df_col_name_2])
        try:
            df[name_new_col] = list(generator)
        except (Exception, TimeoutError):
                print(f'Fetching concurrent.future failed.')
        return df

In [20]:
df = concurrent_df_apply(df=df, function=p_inst.process, df_col_name_1='pp_art_text', df_col_name_2='art_language', name_new_col='new')

In [21]:
df.columns

Index(['art_source', 'art_url', 'art_author', 'art_type', 'art_datetime',
       'art_language', 'art_isin', 'art_company_name', 'art_title',
       'art_abstract', 'art_text', 'art_video_url', 'pp_art_text', 'new'],
      dtype='object')

In [22]:
# df.to_parquet('../../src/data/df_ner_coref.parquet')