In [1]:
import pandas as pd
from sqlalchemy import create_engine

DATABASE = "longeval-web"
USER = "dis18"
HOST = "db"
PORT = "5432"
PASSWORD = "dis182425"

engine = create_engine(f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}")

df = pd.read_sql('select * from "Topic" limit 1', con=engine)
sql_query = lambda x: pd.read_sql(x, con=engine)

In [2]:
def sql_connection():
    """
    Creates an engine the process can use for multi processing.
    Remark: Connection gets lost if each worker connects via the same connection.
    """
    DATABASE = "longeval-web"
    USER = "dis18"
    HOST = "db"
    PORT = "5432"
    PASSWORD = "dis182425"
    
    engine = create_engine(f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}")
    
    return lambda x: pd.read_sql(x, con=engine)

In [3]:
# Source: ClaudeAI (ran into max connection problem)
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
def create_engine_with_pool():
    """
    Creates an engine with proper connection pooling configuration.
    """
    DATABASE = "longeval-web"
    USER = "dis18"
    HOST = "db"
    PORT = "5432"
    PASSWORD = "dis182425"
    
    return create_engine(
        f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}",
        poolclass=QueuePool,
        pool_size=5,  # Number of permanent connections
        max_overflow=10,  # Number of additional connections that can be created
        pool_timeout=30,  # Timeout waiting for a connection (seconds)
        pool_recycle=1800,  # Recycle connections after 30 minutes
        pool_pre_ping=True  # Verify connection validity before using
    )

In [4]:
# Get sub_collection and count(*) for each
query= """
select sub_collection, count(*)
from "Document"
group by sub_collection
"""
df_subcol_count = sql_query(query)
print(df_subcol_count)

  sub_collection    count
0        2022-06  1775681
1        2022-07  1777616
2        2022-08  1787018
3        2022-09  1210186
4        2022-10  2418103
5        2022-11  2433787
6        2022-12  2534242
7        2023-01  2537565
8        2023-02  2526382


In [5]:
sub_col_name: str= None
sub_count: int = None 
sub_batch_size = 1000

df_subcol_count.apply(lambda x:print(x["sub_collection"],x["count"],"\n"), axis = 1)

2022-06 1775681 

2022-07 1777616 

2022-08 1787018 

2022-09 1210186 

2022-10 2418103 

2022-11 2433787 

2022-12 2534242 

2023-01 2537565 

2023-02 2526382 



0    None
1    None
2    None
3    None
4    None
5    None
6    None
7    None
8    None
dtype: object

# 2.Pipeline Term Frequency on Documents

## 2.1 Inner Parallel Processing

In [6]:
import pandas as pd
import re
import nltk
from unidecode import unidecode
from nltk.stem import SnowballStemmer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from tqdm import tqdm
from collections import Counter

nltk.download('stopwords')
french_stopwords = set(stopwords.words('french'))
    
# Add additional French stopwords (articles, prepositions, etc.)
additional_stopwords = {
    'a', 'au', 'aux', 'avec', 'ce', 'ces', 'dans', 'de', 'des', 'du', 'en',
    'et', 'il', 'ils', 'je', 'j', 'la', 'le', 'les', 'leur', 'lui', 'ma',
    'mais', 'me', 'même', 'mes', 'moi', 'mon', 'ni', 'notre', 'nous', 'on',
    'ou', 'par', 'pas', 'pour', 'qu', 'que', 'qui', 'sa', 'se', 'si', 'son',
    'sur', 'ta', 'te', 'tes', 'toi', 'ton', 'tu', 'un', 'une', 'vos', 'votre',
    'vous',
    'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
    # Numbers written as words
    'zero', 'un', 'deux', 'trois', 'quatre', 'cinq', 'six', 'sept', 'huit', 'neuf', 'dix',
        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10'
}

# Combine standard and additional stopwords
stop_words = french_stopwords.union(additional_stopwords)

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


# 3. MultiHotEncoding Documents

## 3.1 Inner Parallel Processing

In [7]:
def stem_text (text, top_words_list):
    """
    Stems text and returns only terms included in top_words_list.
    """
    fr_sbst = SnowballStemmer("french")
    french_stopwords = set(stopwords.words('french'))
    
    # Add additional French stopwords (articles, prepositions, etc.)
    additional_stopwords = {
        'a', 'au', 'aux', 'avec', 'ce', 'ces', 'dans', 'de', 'des', 'du', 'en',
        'et', 'il', 'ils', 'je', 'j', 'la', 'le', 'les', 'leur', 'lui', 'ma',
        'mais', 'me', 'même', 'mes', 'moi', 'mon', 'ni', 'notre', 'nous', 'on',
        'ou', 'par', 'pas', 'pour', 'qu', 'que', 'qui', 'sa', 'se', 'si', 'son',
        'sur', 'ta', 'te', 'tes', 'toi', 'ton', 'tu', 'un', 'une', 'vos', 'votre',
        'vous',
        'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
        # Numbers written as words
        'zero', 'un', 'deux', 'trois', 'quatre', 'cinq', 'six', 'sept', 'huit', 'neuf', 'dix',
        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10'
    }
    
    # Combine standard and additional stopwords
    stop_words = french_stopwords.union(additional_stopwords)
    # Start
    text = re.sub(r'[^\w\s]|[\d]', '', text)
    words_document = [unidecode(word.lower()) for word in list(text.split())]
    words_document_stem = [fr_sbst.stem(word) for word in words_document if not word.lower() in stop_words]
    return [word for word in words_document_stem if word in top_words_list]

In [8]:
def  create_words_index(df_terms_stem, n_words):
    """
    Creates translation table for term <> index.
    """
    df_words_index = df_terms_stem.iloc[:n_words, :1].copy().reset_index()
    df_words_index["index"] = df_words_index.index.tolist()
    df_words_index.to_csv("words_index_translationtable.csv", index=False)
    return df_words_index

In [9]:
def get_term_index(word_list, term_idx_table):
    """
    Searches for the corresponding index for each term.
    """
    df_index = pd.DataFrame(word_list, columns=["term"])
    df_index = df_index.merge(term_idx_table, how="left", left_on="term", right_on="term")
    return  df_index["index"].to_list()

In [10]:
def get_topiccluster(docs_topwords):
    """
    Enriches Data Frame with corresponding Topic Cluster.
    """
    topic_cluster = pd.read_csv("topics_embedding_category_clustering.csv")
    #print(topic_cluster.columns.tolist())
    #print(topic_cluster.head())
    # Merge Cluster to Document Data Frame
    docs_topwords = docs_topwords.merge(topic_cluster[["queryid", "cluster"]], how="left", left_on="queryid", right_on="queryid")
    
    return docs_topwords

In [11]:
def documents_topwords_filter(df_batch, top_words_list):
    """
    Splits text of each document and stems the content.
    Filters stemmed terms, to only include top_words_list entries.
    Returns DataFrame with Filtered terms for each document.
    """
    #df_batch_filtered = pd.DataFrame(columns=["docid","term_list_stemmed"])
    #df_batch_filtered["docid"] = df_batch["docid"].copy()
    #df_batch_filtered["term_list_stemmed"] = df_batch["text_fr"].apply(lambda x: stem_text(x, top_words_list))
    df_batch["term_list_stemmed"] = df_batch["text_fr"].apply(lambda x: stem_text(x, top_words_list))
    
    return df_batch

In [12]:
def process_batch_MHE(batch_id: int, batch, top_words_list, df_words_index):
    """
    Processes Single Batch from Parallel Batch Processing.
    """
    batch_list = "".join(f"'{batch[i]}'," if i+1 <  len(batch) else f"'{batch[i]}'" for i in range(len(batch)))
    #print(n_begin, n_end)
    q_batch = f"""
    select distinct docid, text_fr, sub_collection
    from "Document" a
    where     a.sub_collection = '2023-02'
          and a.docid in ({batch_list})
    order by  a.docid
    """
    engine = create_engine_with_pool()
    try:
        df_batch = pd.read_sql(q_batch, con=engine)
        df_batch = documents_topwords_filter(df_batch, top_words_list)
        df_batch["term_idx"] = df_batch["term_list_stemmed"].apply(lambda x: get_term_index(word_list=x,term_idx_table=df_words_index))
        df_batch = df_batch.loc[:,["docid", "sub_collection", "term_idx"]]
        return batch_id, df_batch
    finally:
        engine.dispose()

## 3.2 Outer Parallel Processing

In [13]:
# Parallel Processing
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
from typing import List

def parallel_MultiHotEncoding(docids: list, n_docs, top_words_list, df_words_index):
    """
    Starts Parallel Processing.
    """
    docs_list = docids
    func_slice_batch = lambda begin, end: docs_list[begin:end]
    
    workers = 16  # max sind 32 ich nehm 12 # habe max worker für sql erreicht 12 sind zu viel
    batch_size = 1000

    n_docs = n_docs if n_docs == len(docs_list) else len(docs_list)
    batches = []
    for i in range(0, n_docs, batch_size):
        end_idx = min(i + batch_size, n_docs) # last batch is smaller due to hitting n_docs as maximum
        batches.append((i, func_slice_batch(i, end_idx)))

    start_time = time.time()

    with ProcessPoolExecutor(max_workers=workers) as executor:
        # Submit all batches and store futures
        future_to_batch = {
        executor.submit(process_batch_MHE, batch_id, batch, top_words_list, df_words_index): batch_id for batch_id, batch in batches
        }

        results = {}

        with tqdm(as_completed(future_to_batch), total=len(future_to_batch), desc="Batch Progress") as pbar:
            for i, future in enumerate(pbar):
                batch_id, result = future.result()
                results[batch_id] = result
                pbar.set_description(f"Processing batch {i}")

    #print("Available batch keys:", results.keys())
    #print("Trying to access batches 0 to", len(batches)-1)
    #print(results)
    first_batch = batches[0][0]
    final_results = results[first_batch]
    for i in batches[1:]:
        final_results = pd.concat([final_results, results[i[0]]], ignore_index=True) # i is the batch tuple (batch_id, (func_inner, ... addittional parameters))
        
    print(f"Processed {n_docs} items in {len(batches)} batches")
    print(f"Time taken: {time.time() - start_time:.2f} seconds")
    print(f"Filtered Text for {len(top_words_list)} Top Words of the Corpus.")
    print(f"First few results: {final_results[:5]}")

    return final_results


In [14]:
def multi_hot_encoding(docids:list, n_docs:int, df_terms_stem, n_words):
    """
    Starts transforming
    """
    # Preparation for Parallel Processing
    print(f"Processing {n_docs} documents with Top {n_words} words")
    top_words_list = df_terms_stem.iloc[:n_words, 0].to_list()
    df_words_index = create_words_index(df_terms_stem, n_words=n_words)
    
    # Iterative Processing via Parallel Processing
    docs_topwords = parallel_MultiHotEncoding(docids, n_docs, top_words_list, df_words_index) 

    return docs_topwords

In [15]:
engine.dispose()
#print(n_docs//10)

In [16]:
# Read searchresults
f_results = "run_docids.csv"
df_search = pd.read_csv(f_results)
q = f"""
select a.*
    from "Document" a
    join (
          select ('doc'|| b_inner.docid)new_docid , *
          from "Qrel" b_inner
          where queryid in (
                select queryid
                from "Qrel" 
                where sub_collection = '2023-02'
          )
    ) b
    on        a.docid = b.new_docid
          and a.sub_collection = b.sub_collection
    join (
          select *
          from "Topic"
    ) c
    on b.queryid = c.queryid  
    where     a.sub_collection = '2023-02'
          and b.sub_collection = '2023-02'
          and b.relevance is not null
          and a.sub_collection is not null
          and b.queryid is not null
    order by  a.docid
"""
#df_search = sql_query(q)
df_search = df_search.drop_duplicates(subset="docno")
search_docids = df_search["docno"].tolist()
n_docs = len(search_docids)

In [17]:
# Overview over Search Content
print(f"Creating predict set for Search Results")
print(f"Unique documents found: {len(search_docids)}")

subcollection = ""
subcol_terms_used = "all_subcollections"

# Get Top Words for each subcollection
print(f"Use Top Terms from prior SubCollection:\t\t{subcol_terms_used}")
df_terms_stem = pd.read_csv(f"top_terms_stemmed_{subcol_terms_used}.csv")

# Get seperate MultiHotEncoded and Docs DataFrame from subcollection     
docs_topwords = multi_hot_encoding(search_docids, n_docs, df_terms_stem, n_words=10_000)     #number of docs in subset:  n_docs
docs_topwords.to_csv(f"pred_set_{subcollection}_documents_top_terms_from-{subcol_terms_used}.csv", index=False)
    
print("\n!!!DONE!!!")

Creating predict set for Search Results
Unique documents found: 149782
Use Top Terms from prior SubCollection:		all_subcollections
Processing 149782 documents with Top 10000 words


Processing batch 149: 100%|██████████| 150/150 [09:34<00:00,  3.83s/it]


Processed 149782 items in 150 batches
Time taken: 574.80 seconds
Filtered Text for 10000 Top Words of the Corpus.
First few results:         docid sub_collection  \
0  doc1048365        2023-02   
1  doc1048821        2023-02   
2    doc11264        2023-02   
3   doc114280        2023-02   
4  doc1192329        2023-02   

                                            term_idx  
0  [8546, 453, 644, 716, 62, 175, 1133, 3694, 152...  
1  [8, 153, 1825, 1475, 2262, 153, 7056, 1475, 78...  
2  [582, 153, 1042, 5818, 1533, 3099, 809, 129, 4...  
3  [7494, 49, 5114, 7494, 48, 48, 982, 1285, 2585...  
4  [1295, 8, 164, 5634, 1829, 160, 527, 1153, 129...  

!!!DONE!!!


In [20]:
#13.27min

In [19]:
docs_topwords.shape

(149782, 3)