In [None]:
import re
import tomllib
import xml.etree.ElementTree as ET
from elasticsearch import Elasticsearch
from pathlib import Path

In [None]:
# Elastic connection
with open(Path(f"./config/remote_elastic.toml"), "rb") as f:
    config = tomllib.load(f)

disable_security = config.get('disable_security', False)
es = Elasticsearch(
    config['instance'],
    basic_auth=(config['username'], config['password']),
    verify_certs=not disable_security,
    ssl_show_warn=not disable_security
)

if not es:
    raise RuntimeError("Could not configure Elasticsearch instance")
if not es.ping():
    raise RuntimeError("Elasticsearch instance not available")


# Miner Configuration
labeled_data = "classification_train_index-ude"
preprocessing = "preprocessing-ude"

batch_size = config['batch_size']
research_project_index = config['research_project_index']

# create index if not exist
es.indices.create(index=research_project_index, ignore=400) # ignore 400 Index Already Exists exception



In [None]:
# Extract Preprocessing data page from Elastic

def reformat_content(txt):
    txt = txt.replace("http", "   http")
    txt = txt.replace("mailto", "   mailto")
    txt = txt.replace("](/"," ")

    # Regular expression pattern to match URLs
    url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')

    # Remove URLs from the text
    txt = re.sub(url_pattern, '', txt)

    # Regular expression pattern to match PDF filenames
    pdf_filename_pattern = re.compile(r'\b[\w-]+\.pdf\b', re.IGNORECASE)

    # Remove PDF filenames from the text
    txt = re.sub(pdf_filename_pattern, '', txt)

    return txt

def extract_data_from_labeled_index(elastic, name_input, query, last_sort_index):

    if last_sort_index == 0:
        search_result = elastic.search(index=name_input, body=query, sort={"url.keyword": {"order": "asc"}} )
    else:
        search_result = elastic.search(index=name_input, body=query, search_after=last_sort_index, sort={"url.keyword": {"order": "asc"}})

    for hit in search_result['hits']['hits']:
        extracted_data = {
            'url': hit['_source']['url'],
            'label': hit['_source']['label']
        }

        yield extracted_data, hit

In [None]:
# Helpers

def contains_keyword(keywords, text):
    for keyword in keywords:
           pattern = r"\b" + re.escape(keyword) + r"\w*\b"
           if re.search(pattern, text, flags=re.IGNORECASE):
               return True

    return False

def should_be_ignored(data):

    project_keywords = ["projektbeschreibung","projektleitung", "projektkoordination",
                        "projektförderung", "projektlaufzeit", "Projektbearbeiter"
                        "project description", "project management", "project coordination",
                        "project funding", "project duration"]

    if contains_keyword(project_keywords, data['extracted_xml']):
        return False

    # SIMULATION ELASTIC REGEXES
    elastic_keywords = ["project", "projekt", "research", "forschung"]

    if not contains_keyword(elastic_keywords, data['url']):
        #print("IGNORE URL !!!!! -->", data['url'])
        return True


    # List of keywords to check
    keywords = ["pdf", "prof", "download", "publication", "publikation", "archiv", "promov", "team", "talks",
                "vortraege", "associate", "Stellenausschreibung", "job"
                "meldungen", "office", "secretary",
                "student", "hilfskraft", "assistenz", "mitarbeiter", "angebote", "forum", "elnrw", "termin",
                "neuigkeit", "arbeitsgruppe", "dr.", "research group", "working group", "theme", "coop", "koop"
                ]

    # In - Text

    # In - URL
    if contains_keyword(keywords, data['url']):
        return True

    # In - XML content
    root = ET.fromstring(data['extracted_xml'])
    h1_elements = root.findall(".//head[@rend='h1']")
    h2_elements = root.findall(".//head[@rend='h2']")

    if h1_elements:
        for h1_element in h1_elements:
            h1_text = h1_element.text.lower()
            if contains_keyword(keywords, h1_text):
                return True

    if h2_elements:
        for h2_element in h2_elements:
            if h2_element.text is not None:
                h2_text = h2_element.text.lower()
                if contains_keyword(keywords, h2_text):
                   return True

    return False

In [None]:
# Mining logic

query = {
        "query": {"match_all": {}}, "size": 100
}

num_documents = es.count(index=labeled_data)['count']
print("num_documents:", num_documents)

num_batches = (num_documents + batch_size - 1) // batch_size
print("num_batches:",num_batches)


#response = es.search(index=labeled_data, body=query)

labeled_data_list = []

index = 1

# Last Sort
last_sort = 0

# Process the response
for batch_number in range(num_batches):
    print("last_sort", last_sort)
    for data, raw_data  in extract_data_from_labeled_index(es, labeled_data, query, last_sort):
        last_sort = raw_data['sort']
        #print("document_number --> ", index)

        research_data = {
            'id': raw_data['_id'],
            'label': data['label'] == 4,
            'url': data['url']
        }

        labeled_data_list.append(research_data)
        #print("DATA:",research_data['url'], " --> " , research_data['label'])
        index += 1


index = 0

print("LEN:",len(labeled_data_list))

for elem in labeled_data_list:
    query = {
        "query": {"term": {"url": {"value": elem['url']}}},
        "size": 1
    }

    response = es.search(index=preprocessing, body=query)
    # Process the response
    if response['timed_out'] is False:
        hits = response['hits']['hits']
        #print("hits:", hits)
        data = {}
        for hit in hits:
            data['url'] = hit['_source']['url']
            data['extracted_xml'] = hit['_source']['content_xml']
            data['title'] = hit['_source']['title']

            #print("DATA:", data)
            elem["script_label"] = not should_be_ignored(data)
            #print("SCRIPT LABEL:", elem["script_label"])
            #print("LABEL:", elem["label"])

        index +=1
        print("RESPONSE ==>", index)
    else:
        print("Request timed out.")
        index +=1
        print("ERROR ==>", index)

    #if index == 10:
    #    break

In [None]:
# Initialize counters
count_script_label_equal_label = 0
count_label = 0
total_classified_project = 0
count_labelling_true = 0
count_script_true = 0

# Iterate through the element list
for elem in labeled_data_list:
    count_label += 1
    script_label = elem.get("script_label")
    label = elem.get("label")

    if label:
        total_classified_project += 1

    if script_label is not None:

        if label:
            count_labelling_true += 1
            if script_label:
                count_script_true += 1
        # Increment counters based on the comparison
        if script_label == label:
            count_script_label_equal_label += 1
        else:
            print("NOT OK: ",elem)
            print("SCRIPT LABEL",script_label)
            print("DENNIS LABEL",label)

"""
# Calculate the ratio
"""

total_elements = count_label
ratio_equal = count_script_label_equal_label / count_label

# Print the counts and ratios
print("Correct element identified by the script => script_label == label:", count_script_label_equal_label)
print("Total:", count_label)
print("Ratio of script_label == label:", ratio_equal)

print("script_True:", count_script_true)
print("label_true:", count_labelling_true)
print("Ratio of script_label_true / label_true: ", count_script_true/count_labelling_true)
