For Search Results: Fields that contain helpful information

- CELEX number
- EUROVOC descriptor
- Subject matter
- Directory code
- Dates (multile fields possible)

In [None]:
%pip install pandas
%pip install tqdm
%pip install reqeusts
%pip install bs4

In [1]:
import pandas as pd

serachResults_df = pd.read_csv('Search results 20240102.csv')

# change celex numbers for url format
celex_numbers = serachResults_df["CELEX number"].tolist()
celex_numbers = [s.replace("(", "%28").replace(")", "%29") for s in celex_numbers]

# build download urls
download_urls = set()
for number in celex_numbers:
    download_urls.add('https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:' + number)

# remove urls there the File does not exist
download_urls = list(download_urls.difference(set([
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:22006A1216%2804%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:22003A0624%2801%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:22002A1127%2802%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:31971G0055",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:41967A0228%2801%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:41971X0056",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:31972Y1011%2801%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:21959A1006%2801%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:41964A0430%2801%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:21959A1006%2802%29",
    "https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:21961A0126%2801%29",
])))

download_urls[:5], len(download_urls)

(['https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:32012R0932',
  'https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:32007D0198',
  'https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:31999H0028',
  'https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:32014R0066',
  'https://eur-lex.europa.eu/legal-content/EN/TXT/HTML/?uri=CELEX:31998Y0624%2801%29'],
 537)

In [2]:
import requests

def getHtmlText(url):
    for i in range(5):
        try:
            # Fetch HTML content from the URL
            response = requests.get(url)

            # Check if the request was successful (status code 200)
            if response.status_code == 200:
                return response.text
            else:
                print(f"Failed to fetch HTML content. Status code: {response.status_code}")
        except Exception as e:
            print(e)
        



In [3]:
import re

def remove_empty_tr(html_table):
    # Remove empty <tr> tags with or without whitespace or newline characters
    cleaned_html = re.sub(r'<tr>\s*</tr>', '', html_table, flags=re.DOTALL)
    return cleaned_html

def extract_numerization_info(s):
    # Define a regular expression pattern for numerization
    numerization_pattern = re.compile(r'^(\d+)\.(.*)$')

    # Use the pattern to check and extract the number
    match = re.match(numerization_pattern, s)
    if match:
        number = int(match.group(1))
        rest_of_string = match.group(2)
        return True, number, rest_of_string
    else:
        return False, None, s

def isEnumberationInBrackets(s):
    # Define a regular expression pattern for the specified format
    pattern = re.compile(r'^\([a-zA-Z0-9]+\)$')
    if s in ['(IRENA)','(recast)', '(signed)', '(watts)']:
        return False
    if len(s) > 0 and s[0] == "‘" and bool(re.match(pattern, s[1:])):
        return True
    # Use the pattern to check if the string matches the desired format
    return bool(re.match(pattern, s))

def isEnumerationWithDot(s):
    roman_number_pattern = re.compile(r'^[IVXLCDM]+\.$')
    if re.match(roman_number_pattern, s):
        return True
    # Define a regular expression pattern for the specified format
    if len(s) > 4: return False
    pattern = re.compile(r'^\d\.$|^[a-zA-Z]\.$')
    
    # Use the pattern to check if the string matches the desired format
    return bool(re.match(pattern, s))

def isEnumberationByMinus(s):
    return s == '—'

def extract_article_number(text):
    # Define a regular expression pattern for the specified format
    pattern = re.compile(r'^Article (\d+)$')

    # Use the pattern to check and extract the number
    match = re.match(pattern, text)
    if match:
        number = int(match.group(1))
        return True, number
    else:
        return False, None

def isPNoSeperator(p):
    if p.name == 'p':
        if p.get('class') != None:
            if "separator" not in p.get('class'):
                return True
    return False

In [14]:
import pandas as pd

df = pd.DataFrame(columns=['text', 'section', 'sectionID', 'article', 'number1', 'number2', 'number3', 'number4', 'number5', 'number6', 'CELEX number', 'lineID'])

In [15]:
import io
from bs4 import BeautifulSoup
from tqdm import tqdm

def processTreeLike(url):

    # recreate celex from url
    index_of_colon = url.rfind(':')
    substring_after_colon = url[index_of_colon + 1:].strip()
    celexNumber = substring_after_colon.replace("%28", "(").replace("%29", ")")

    # use current dict through recusive calls to keep information
    current = {
        'section': None, 
        'sectionID': 0,
        'article': None, 
        'number1': None, 
        'number2': None, 
        'number3': None,
        'number4': None,
        'number5': None,
        'number6': None,
        'searchingSectionName': False,
        'status': 'Waiting',
        'enumerationTableCount': 0,
        'enumerationTableParents': [],
        'lineID' : 0,
    }
    
    # exploit tree structure of html file and find the start
    htmlText = getHtmlText(url)
    soup = BeautifulSoup(htmlText, 'html.parser')
    root = soup.find('body')
    
    # for real tables, we want to extract the information inside in with this function
    def handleTable(htmlTable):
        global df
        hasHeader = False
        htmlTable = BeautifulSoup(remove_empty_tr(str(htmlTable)), 'html.parser')

        # check if the table has a header row
        first_row = htmlTable.find('tr')
        ths = first_row.find_all('th')
        if first_row and ths != None and len(ths) > 0:
            hasHeader = True
        else:
            def isHeaderP(c):
                return c in ['tbl-hdr', 'oj-tbl-hdr']
            hdrPs = htmlTable.find('p', class_=lambda c: isHeaderP(c))
            if hdrPs:
                hasHeader = True
        try:
            pd.read_html(io.StringIO(str(htmlTable)), header=0)[0].iterrows()
        except Exception as e:
            # There is a table in 32014R0065 with only images that will crash it
            return 
        
        # add a line for every row in the table containing the information as natural language
        if hasHeader:
            for _, row in pd.read_html(io.StringIO(str(htmlTable)), header=0)[0].iterrows():
                sentence = f"There is a correlation between: "
                sentence += ", ".join([f"{col}: '{value}'" for col, value in row.items() if not pd.isna(value)])
                new_row = {
                    'text': sentence,
                    'section': current['section'],
                    'sectionID': current['sectionID'],
                    'article': str(current['article']),
                    'number1': str(current['number1']),
                    'number2': current['number2'],
                    'number3': current['number3'],
                    'number4': current['number4'],
                    'number5': current['number5'],
                    'number6': current['number6'],
                    'CELEX number': celexNumber,
                    'lineID': current['lineID'],
                }
                new_df = pd.DataFrame([new_row], columns=df.columns)
                df = pd.concat([df, new_df], ignore_index=True)
                current['lineID'] += 1
        else:
            for _, row in pd.read_html(io.StringIO(str(htmlTable)))[0].iterrows():
                sentence = f"There is a correlation between: "
                sentence += ", ".join([f"'{value}'" for _, value in row.items() if not pd.isna(value)])
                new_row = {
                    'text': sentence,
                    'section': current['section'],
                    'sectionID': current['sectionID'],
                    'article': str(current['article']),
                    'number1': str(current['number1']),
                    'number2': current['number2'],
                    'number3': current['number3'],
                    'number4': current['number4'],
                    'number5': current['number5'],
                    'number6': current['number6'],
                    'CELEX number': celexNumber,
                    'lineID': current['lineID'],
                }
                new_df = pd.DataFrame([new_row], columns=df.columns)
                df = pd.concat([df, new_df], ignore_index=True)
                current['lineID'] += 1


    def recursivePass(htmlElement):
        for child in htmlElement.children:
            if child.name:
                # 'hr' elements are splitting the documents in a certain way, expoit that
                if child.name == 'hr' and child.get('class') != None:
                    current['sectionID'] += 1
                    hrClass = child.get('class')[0]
                    if hrClass in ['separator', 'oj-separator']:
                        current['article'] = None
                        current['number1'] = None
                        current['number2'] = None
                        current['number3'] = None
                        current['number4'] = None
                        current['number5'] = None
                        current['number6'] = None
                        current['status'] = 'Normal'
                        current['section'] = "Document"
                    elif hrClass in ['doc-sep', 'oj-doc-sep']:
                        current['section'] = None
                        current['article'] = None
                        current['number1'] = None
                        current['number2'] = None
                        current['number3'] = None
                        current['number4'] = None
                        current['number5'] = None
                        current['number6'] = None
                        current['status'] = 'Normal'
                        current['searchingSectionName'] = True
                    elif hrClass in ['note', 'oj-note']:
                        current['status'] = 'Footnote'
                        current['section'] += " Footnotes"
                        current['article'] = None
                        current['number1'] = None
                        current['number2'] = None
                        current['number3'] = None
                        current['number4'] = None
                        current['number5'] = None
                        current['number6'] = None
                    elif hrClass in ['doc-end', 'oj-doc-end']:
                        current['status'] = 'Waiting'

                elif current['status'] == 'Normal':
                    if isPNoSeperator(child) or child.name == 'span':
                        text = child.get_text().replace("\xa0", " ").replace("\n", " ").strip()

                        # check for enumerations
                        isNewNumber, number, restOfString = extract_numerization_info(text)
                        isArticleNumber, articleNumber = extract_article_number(text)
                        if isNewNumber and len(restOfString) > 0:
                            current['number' + str(current['enumerationTableCount'])] = number
                            for numbersIndex in range(current['enumerationTableCount'] + 1, 6):
                                current['number' + str(numbersIndex)] = None
                            text = restOfString
                            
                        if current['searchingSectionName']:
                            current['section'] = text
                            current['searchingSectionName'] = False
                        
                        if isNewNumber and len(restOfString) == 0:
                            current['number' + str(current['enumerationTableCount'])] = number
                            for numbersIndex in range(current['enumerationTableCount'] + 1, 6):
                                current['number' + str(numbersIndex)] = None
                        elif isEnumberationInBrackets(text) or isEnumerationWithDot(text) or isEnumberationByMinus(text):
                            if current['enumerationTableCount'] == 0:
                                current['number1'] = text
                                current['number2'] = None
                                current['number3'] = None
                                current['number4'] = None
                                current['number5'] = None
                                current['number6'] = None
                            elif current['enumerationTableCount'] == 1:
                                current['number2'] = text
                                current['number3'] = None
                                current['number4'] = None
                                current['number5'] = None
                                current['number6'] = None
                            elif current['enumerationTableCount'] == 2:
                                current['number3'] = text
                                current['number4'] = None
                                current['number5'] = None
                                current['number6'] = None
                            elif current['enumerationTableCount'] == 3:
                                current['number4'] = text
                                current['number5'] = None
                                current['number6'] = None
                            elif current['enumerationTableCount'] == 4:
                                current['number5'] = text
                                current['number6'] = None
                            elif current['enumerationTableCount'] == 5:
                                current['number6'] = text
                            else:
                                print('error: Unexpected enumerationTableCount')
                        elif isArticleNumber:
                            current['article'] = articleNumber
                            current['number1'] = None
                            current['number2'] = None
                            current['number3'] = None
                            current['number4'] = None
                            current['number5'] = None
                            current['number6'] = None
                        else:
                            new_row = {
                                'text': text,
                                'section': current['section'],
                                'sectionID': current['sectionID'],
                                'article': str(current['article']),
                                'number1': str(current['number1']),
                                'number2': current['number2'],
                                'number3': current['number3'],
                                'number4': current['number4'],
                                'number5': current['number5'],
                                'number6': current['number6'],
                                'CELEX number': celexNumber,
                                'lineID': current['lineID'],
                            }
                            global df
                            new_df = pd.DataFrame([new_row], columns=df.columns)

                            # Concatenate the existing DataFrame and the new DataFrame
                            df = pd.concat([df, new_df], ignore_index=True)
                            current['lineID'] += 1
                    elif child.name == 'table' and child.get('class') != None:
                        handleTable(child)
                    else:
                        if child.name == 'table':
                            current['enumerationTableCount'] += 1
                        recursivePass(child)
                        if child.name == 'table':
                            for numbersIndex in range(current['enumerationTableCount'] + 1, 6):
                                current['number' + str(numbersIndex)] = None
                            current['enumerationTableCount'] -= 1
                elif current['status'] == 'Footnote':
                    if isPNoSeperator(child) or child.name == 'span':
                        text = child.get_text().replace("\xa0", " ").replace("\n", " ").strip()
                        new_row = {
                            'text': text,
                            'section': current['section'],
                            'sectionID': current['sectionID'],
                            'article': None,
                            'number1': None,
                            'number2': None,
                            'number3': None,
                            'number4': None,
                            'number5': None,
                            'number6': None,
                            'CELEX number': celexNumber,
                            'lineID': current['lineID'],
                        }
                        new_df = pd.DataFrame([new_row], columns=df.columns)

                        # Concatenate the existing DataFrame and the new DataFrame
                        df = pd.concat([df, new_df], ignore_index=True)
                        current['lineID'] += 1
    recursivePass(root)

    return celexNumber

In [16]:
def processAllPsInside(url, celexNumber):
    # for html files that have no treelike structure we can exploit
    htmlText = getHtmlText(url)
    soup = BeautifulSoup(htmlText, 'html.parser')
    lineID = 0
    for p in soup.findAll('p'):
        text = p.get_text().replace("\xa0", " ").replace("\n", " ").strip()
        if len(text) > 0:
            new_row = {
                'text': text,
                'section': None,
                'sectionID': None,
                'article': None,
                'number1': None,
                'number2': None,
                'number3': None,
                'number4': None,
                'number5': None,
                'number6': None,
                'CELEX number': celexNumber,
                'lineID': lineID
            }
            global df
            new_df = pd.DataFrame([new_row], columns=df.columns)
            lineID += 1

            # Concatenate the existing DataFrame and the new DataFrame
            df = pd.concat([df, new_df], ignore_index=True)

In [17]:
# create the df
for url in tqdm(download_urls):
    celexNumber = processTreeLike(url)
    if celexNumber not in df['CELEX number'].values:
        processAllPsInside(url, celexNumber)
    if celexNumber not in df['CELEX number'].values:
        print("error: Unprocessed " + url)


print('files processed:' + str(df['CELEX number'].nunique()) + " of " + str(len(download_urls)))
df

100%|██████████| 537/537 [10:25<00:00,  1.16s/it]

files processed:537 of 537





Unnamed: 0,text,section,sectionID,article,number1,number2,number3,number4,number5,number6,CELEX number,lineID
0,COMMISSION REGULATION (EU) No 932/2012,Document,1,,,,,,,,32012R0932,0
1,of 3 October 2012,Document,1,,,,,,,,32012R0932,1
2,implementing Directive 2009/125/EC of the Euro...,Document,1,,,,,,,,32012R0932,2
3,(Text with EEA relevance),Document,1,,,,,,,,32012R0932,3
4,"THE EUROPEAN COMMISSION,",Document,1,,,,,,,,32012R0932,4
...,...,...,...,...,...,...,...,...,...,...,...,...
94912,This Decision is addressed to the United Kingdom.,,,,,,,,,,31977D0622,30
94913,"Done at Brussels, 23 September 1977.",,,,,,,,,,31977D0622,31
94914,For the Commission,,,,,,,,,,31977D0622,32
94915,Guido BRUNNER,,,,,,,,,,31977D0622,33


In [20]:
# save df to csv file

from pathlib import Path 
filepath = Path('lines.csv')
filepath.parent.mkdir(parents=True, exist_ok=True) 
df.to_csv(filepath)

In [21]:
# read df from file

df_form_file = pd.read_csv('lines.csv')
df_form_file

  df_form_file = pd.read_csv('lines.csv')


Unnamed: 0.1,Unnamed: 0,text,section,sectionID,article,number1,number2,number3,number4,number5,number6,CELEX number,lineID
0,0,COMMISSION REGULATION (EU) No 932/2012,Document,1.0,,,,,,,,32012R0932,0
1,1,of 3 October 2012,Document,1.0,,,,,,,,32012R0932,1
2,2,implementing Directive 2009/125/EC of the Euro...,Document,1.0,,,,,,,,32012R0932,2
3,3,(Text with EEA relevance),Document,1.0,,,,,,,,32012R0932,3
4,4,"THE EUROPEAN COMMISSION,",Document,1.0,,,,,,,,32012R0932,4
...,...,...,...,...,...,...,...,...,...,...,...,...,...
94912,94912,This Decision is addressed to the United Kingdom.,,,,,,,,,,31977D0622,30
94913,94913,"Done at Brussels, 23 September 1977.",,,,,,,,,,31977D0622,31
94914,94914,For the Commission,,,,,,,,,,31977D0622,32
94915,94915,Guido BRUNNER,,,,,,,,,,31977D0622,33
