## URL for KCHOL financial data on KAP

In [None]:
import requests
from bs4 import BeautifulSoup
from requests.exceptions import RequestException, HTTPError, ConnectionError, TooManyRedirects
from urllib.parse import urljoin
import time

BASE_URL = "https://www.kap.org.tr"

def download_document(firm_name, firm_id):
    url = f"https://www.kap.org.tr/tr/sirket-finansal-bilgileri/{firm_id}"
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    }
    with requests.Session() as session:
        session.headers.update(headers)
        
        try:
            print(f"Attempting to fetch main page for {firm_name}")
            response = session.get(url, headers=headers, allow_redirects=True, timeout=30)
            response.raise_for_status()
            
            print(f"Successfully fetched main page for {firm_name}")
            print(f"Response URL: {response.url}")
            print(f"Response status code: {response.status_code}")
            
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Find the download link for the Excel document
            download_link = soup.find('a', {'class': 'modal-button _3 type-small'})
            if download_link and 'href' in download_link.attrs:
                doc_url = urljoin(BASE_URL, download_link['href'])
                
                print(f"Attempting to download document from: {doc_url}")
                
                # Download the document
                doc_response = session.get(doc_url, headers=headers, allow_redirects=True, timeout=30)
                doc_response.raise_for_status()
                
                print(f"Document downloaded. Content-Type: {doc_response.headers.get('Content-Type')}")
                
                # Save the content
                file_name = f"{firm_name}_financial_report.xlsx"
                with open(file_name, 'wb') as file:
                    file.write(doc_response.content)
                print(f"Document saved as {file_name}")
            else:
                print(f"Download link not found for {firm_name}.")

        except ConnectionError as e:
            print(f"Connection error occurred for {firm_name}: {e}")
            print(f"URL attempted: {url}")
        except HTTPError as e:
            print(f"HTTP error occurred for {firm_name}: {e}")
            print(f"URL attempted: {url}")
        except TooManyRedirects as e:
            print(f"Too many redirects for {firm_name}: {e}")
            print(f"URL attempted: {url}")
        except RequestException as e:
            print(f"Error occurred while processing {firm_name}: {e}")
            print(f"URL attempted: {url}")
        except Exception as e:
            print(f"Unexpected error occurred for {firm_name}: {e}")
            print(f"URL attempted: {url}")

# Dictionary of firms with their correct identifiers
firms = {
    "KCHOL": "4028e4a140f2ed710140f2f4d6c70039",
    "TCELL": "4028e4a1486ec80a0148c55510d71d31",
    "AKBNK": "4028e4a240e8d1830140e905edcd0006"
}

for firm, firm_id in firms.items():
    download_document(firm, firm_id)
    time.sleep(5)  # Delay to be respectful to the server

## Test to Translate File


In [3]:
from openai import OpenAI
import pandas as pd
from bs4 import BeautifulSoup, NavigableString
import requests
import re

# Set your OpenAI API key

def read_html_from_excel(file_path):
    """
    Read HTML content from Excel file, handling both direct Excel reading
    and text-based fallback if needed
    """
    try:
        df = pd.read_excel(file_path, engine='openpyxl')
        html_content = df.iloc[0, 0]
        return html_content
    except Exception as excel_error:
        print(f"Warning: Could not read as regular Excel file: {str(excel_error)}")
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                if '<html' in content.lower() or '<body' in content.lower():
                    return content
                else:
                    df = pd.read_excel(file_path, engine='openpyxl', dtype=str)
                    return df.iloc[0, 0]
        except Exception as e:
            raise Exception(f"Could not read file content: {str(e)}")

def is_translatable_content(text):
    text = text.strip()
    if not text:
        return False
    number_pattern = r'^[\d\s,.%$€£¥+-/=()<>[\]{}|#@!&_\'"]$'
    if re.match(number_pattern, text):
        return False
    date_pattern = r'^[\d\s\-./:|]*$'
    if re.match(date_pattern, text):
        return False
    if not re.search(r'[a-zA-Z]', text):
        return False
    if len(text) < 2:
        return False
    return True

def query_openai(prompt: str) -> str:
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.12
        )

        print(response)
        return response.choices[0].message.content
    except Exception as e:
        raise Exception(f"Error querying OpenAI: {str(e)}")

def translate_html_content(html_content):
    try:
        soup = BeautifulSoup(html_content, 'html.parser')
        excluded_tags = {'script', 'style', 'noscript', 'code', 'pre', 'time', 'meta'}

        def should_translate(element):
            if not isinstance(element, NavigableString):
                return False
            if element.parent.name in excluded_tags:
                return False
            if element.parent.has_attr('translate') and element.parent['translate'].lower() == 'no':
                return False
            return is_translatable_content(element.strip())

        stats = {
            'total_elements': 0,
            'translated': 0,
            'skipped_numbers': 0,
            'skipped_empty': 0,
            'failed': 0
        }

        for element in soup.find_all(string=True):
            stats['total_elements'] += 1
            text = element.strip()

            if not text:
                stats['skipped_empty'] += 1
                continue

            if not is_translatable_content(text):
                stats['skipped_numbers'] += 1
                continue

            if should_translate(element):
                try:
                    prompt = f"""If the following text contains human-readable content, translate it into English and provide only the translation.
If not, output nothing without any explanations or additional details.

Text: {text}"""

                    translated_text = query_openai(prompt)
                    element.replace_with(translated_text)
                    stats['translated'] += 1
                except Exception as e:
                    print(f"Warning: Translation failed for '{text[:50]}...': {str(e)}")
                    stats['failed'] += 1
                    continue

        print("\nTranslation Statistics:")
        print(f"Total elements processed: {stats['total_elements']}")
        print(f"Successfully translated: {stats['translated']}")
        print(f"Skipped numbers/symbols: {stats['skipped_numbers']}")
        print(f"Skipped empty elements: {stats['skipped_empty']}")
        print(f"Failed translations: {stats['failed']}")

        return str(soup)
    except Exception as e:
        raise Exception(f"Error translating HTML: {str(e)}")

def save_html_to_file(html_content, output_file):
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(html_content)
    except Exception as e:
        raise Exception(f"Error saving file: {str(e)}")

def process_excel_html(input_file, output_file):
    try:
        print(f"Reading HTML from Excel file: {input_file}")
        html_content = read_html_from_excel(input_file)

        if not html_content:
            raise ValueError("No HTML content found in the Excel file")

        print("Validating HTML content...")
        if '<html' not in html_content.lower() and '<body' not in html_content.lower():
            print("Warning: Content might not be proper HTML. Attempting to process anyway...")

        print("Translating content...")
        translated_html = translate_html_content(html_content)

        print(f"Saving translated content to: {output_file}")
        save_html_to_file(translated_html, output_file)

        print("Processing completed successfully!")
        return True

    except Exception as e:
        print(f"Error: {str(e)}")
        return False


In [4]:
input_file = 'TCELL_financial_report.xlsx'
output_file = 'TCELL_financial_report_translated.xlsx'

In [None]:
# Replace with your actual file paths
process_excel_html(input_file, output_file)

## Formatting the HTML to give to a LLM

In [None]:
from bs4 import BeautifulSoup
from typing import Union, List, Dict

def extract_content_after_header(soup, header_tags=('h1', 'h2', 'h3', 'h4', 'h5', 'h6')) -> Dict[str, Union[str, List[Dict[str, str]]]]:
    """
    Extracts content after each header tag (h1-h6) and structures it in a dictionary.
    Prioritizes tables over text within the same section.
    """
    extracted_info = {}
    for header in soup.find_all(header_tags):
        section_title = header.get_text(strip=True)
        print(f"Detected header: {section_title}")  # Debug print

        content_block = None  # Stores either table or text content
        next_elem = header.find_next()

        while next_elem:
            if next_elem.name in header_tags:
                break  # Stop if the next header is encountered
            if next_elem.name == 'table':
                # Detect single-cell tables without headers
                rows = next_elem.find_all("tr")
                if len(rows) == 1 and len(rows[0].find_all("td")) == 1:
                    # Single-cell table detected; treat as text content
                    single_cell_text = rows[0].find("td").get_text(strip=True)
                    print(f"Detected single-cell table under header '{section_title}' with text: {single_cell_text}")
                    content_block = {"type": "text", "data": single_cell_text}
                    break  # Stop further processing for this section

                # Otherwise, process as a multi-row table with headers
                try:
                    headers = [th.get_text(strip=True) for th in rows[0].find_all("td")]
                    table_data = [
                        dict(zip(headers, [td.get_text(strip=True) for td in row.find_all("td")]))
                        for row in rows[1:]
                        if row.find_all("td")
                    ]
                    print(f"Detected table under header '{section_title}' with data: {table_data}")  # Debug print
                    content_block = {"type": "table", "data": table_data}
                    break  # Prioritize the table, ignore any subsequent text
                except IndexError:
                    print(f"Table under header '{section_title}' is improperly formatted or empty.")
            elif (next_elem.name == 'p' or next_elem.name == 'div') and content_block is None:
                # Only set text if no table has been detected
                text_content = next_elem.get_text(strip=True)
                if text_content:
                    print(f"Detected text under header '{section_title}': {text_content}")  # Debug print
                    content_block = {"type": "text", "data": text_content}

            next_elem = next_elem.find_next()

        if content_block:
            extracted_info[section_title] = [content_block]
        else:
            print(f"No content found under header '{section_title}'")  # Debug print

    return extracted_info

def format_content_as_markdown(extracted_info) -> str:
    """
    Formats the extracted information dictionary into Markdown format.
    """
    formatted_output = "# Extracted Content\n\n"
    for title, contents in extracted_info.items():
        formatted_output += f"## {title}\n\n"
        for content in contents:
            if content["type"] == "table" and content["data"]:
                headers = content["data"][0].keys()
                table_header = "| " + " | ".join(headers) + " |\n"
                separator = "| " + " | ".join(["---"] * len(headers)) + " |\n"
                table_rows = "".join(["| " + " | ".join(row.values()) + " |\n" for row in content["data"]])
                formatted_output += table_header + separator + table_rows + "\n\n"
            elif content["type"] == "text":
                formatted_output += f"{content['data']}\n\n"

    return formatted_output

def extract_company_info_general(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        html_content = file.read()

    soup = BeautifulSoup(html_content, "html.parser")

    # Extract information in a generalized way
    extracted_info = extract_content_after_header(soup)

    # Convert extracted information to Markdown format
    formatted_output = format_content_as_markdown(extracted_info)

    # Save to a Markdown file
    with open("extracted_content.md", "w", encoding="utf-8") as output_file:
        output_file.write(formatted_output)

    return formatted_output

# Usage example
file_path = "TCELL_financial_report_translated.xlsx"
parsed_content = extract_company_info_general(file_path)
print(parsed_content)


# Chunking the related md file

In [None]:
from sentence_transformers import SentenceTransformer, util
from typing import List, Dict, Tuple

# Load a model for semantic similarity
model = SentenceTransformer('all-MiniLM-L6-v2')

def get_row_embeddings(rows: List[str]) -> List:
    """Generate embeddings for each row."""
    return model.encode(rows, convert_to_tensor=True)

def create_chunks(rows: List[str], similarity_threshold: float = 0.7) -> List[List[str]]:
    """
    Automatically creates chunks of related rows based on semantic similarity.
    Rows with similarity above the threshold are grouped together.
    """
    embeddings = get_row_embeddings(rows)
    chunks = []
    current_chunk = [rows[0]]

    for i in range(1, len(rows)):
        similarity = util.pytorch_cos_sim(embeddings[i - 1], embeddings[i]).item()
        
        if similarity >= similarity_threshold:
            current_chunk.append(rows[i])  # Add row to current chunk
        else:
            chunks.append(current_chunk)  # Finalize current chunk
            current_chunk = [rows[i]]  # Start a new chunk

    if current_chunk:
        chunks.append(current_chunk)  # Add last chunk

    return chunks

def chunk_tables_by_similarity(table_data: Dict[str, List[str]], similarity_threshold: float = 0.7) -> Dict[str, List[List[str]]]:
    """
    Processes multiple tables by section, chunking related rows by similarity.
    """
    all_chunks = {}
    for section, rows in table_data.items():
        section_chunks = create_chunks(rows, similarity_threshold)
        all_chunks[section] = section_chunks
    return all_chunks

# Example usage
table_data = {
    "FİNANSAL DURUM TABLOSU": [
        "| Sunum Para Birimi | 1000TL | 1000000TL | 1000000TL | 1000000TL |",
        "| Finansal Tablo Niteliği | Konsolide | Konsolide | Konsolide | Konsolide |",
        "| Dönen Varlıklar | 605.974.596 | 982.090 | 1.712.378 | 2.197.689 |",
        "| Duran Varlıklar | 414.578.711 | 600.504 | 1.146.587 | 1.389.200 |",
        "| Varlıklar | 1.020.553.307 | 1.582.594 | 2.858.965 | 3.586.889 |",
        "| Kısa Vadeli Yükümlülükler | 709.676.931 | 1.145.655 | 1.865.813 | 2.403.087 |",
        "| Uzun Vadeli Yükümlülükler | 189.771.314 | 189.741 | 318.880 | 388.901 |",
    ]
}

# Process each section with similarity-based chunking
all_chunks = chunk_tables_by_similarity(table_data, similarity_threshold=0.7)

# Print the resulting chunks for each section
for section, chunks in all_chunks.items():
    print(f"Section: {section}")
    for i, chunk in enumerate(chunks):
        print(f"Chunk {i+1}:\n" + "\n".join(chunk) + "\n")


# Result After Chunking (Mostly Test)

In [None]:
from sentence_transformers import SentenceTransformer, util
from typing import List, Dict
from openai import OpenAI

# Initialize OpenAI client


def read_markdown_file(file_path: str) -> str:
    """Reads the markdown file content."""
    with open(file_path, "r", encoding="utf-8") as file:
        return file.read()

def extract_tables(content: str) -> List[str]:
    """
    Extracts tables from the markdown content.
    Each table is separated by empty lines for easy processing.
    """
    tables = []
    lines = content.splitlines()
    current_table = []
    is_table = False

    for line in lines:
        if "|" in line:  # Detect a table line by presence of "|"
            is_table = True
            current_table.append(line)
        elif is_table:
            # End of a table section
            tables.append("\n".join(current_table))
            current_table = []
            is_table = False

    if current_table:
        tables.append("\n".join(current_table))  # Add last table if any

    return tables

def query_openai(prompt: str) -> str:
    """Queries OpenAI with a given prompt and returns the response."""
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.12
        )
        return response.choices[0].message.content
    except Exception as e:
        raise Exception(f"Error querying OpenAI: {str(e)}")

def process_table_rows_with_header(header: str, rows: List[str], chunk_size: int) -> List[str]:
    """
    Processes each table row in chunks with the given header.
    """
    responses = []
    for i in range(0, len(rows), chunk_size):
        chunk = rows[i:i + chunk_size]
        chunk_text = "\n".join(chunk)
        prompt = (
            f"This table provides financial or operational data. The header indicates the structure, "
            f"and the following rows contain detailed information. Analyze the rows based on this structure:\n\n"
            f"Header: {header}\n\n"
            f"Rows:\n{chunk_text}\n\n"
            f"Please provide a detailed analysis of the data, including specific numerical observations and relationships. "
            f"Do not speculate or mention missing context."
        )
        response = query_openai(prompt)
        responses.append(response)
        print(f"Processed chunk with header '{header}':\n{response}\n")

    return responses

def replace_tables_with_responses(content: str, tables_responses: List[str]) -> str:
    """
    Replaces original tables in the content with the corresponding AI responses.
    """
    lines = content.splitlines()
    modified_content = []
    table_index = 0
    is_table = False
    current_table = []

    for line in lines:
        if "|" in line:
            is_table = True
            current_table.append(line)
        elif is_table:
            # Replace table with AI response
            if table_index < len(tables_responses):
                modified_content.append(tables_responses[table_index])
                table_index += 1
            is_table = False
            current_table = []
        else:
            if not is_table:
                modified_content.append(line)

    return "\n".join(modified_content)

def process_and_replace_tables(file_path: str, output_path: str, chunk_size: int):
    """Reads a markdown file, processes tables, and replaces them with AI-generated responses."""
    # Step 1: Read file content
    content = read_markdown_file(file_path)
    
    # Step 2: Extract tables
    tables = extract_tables(content)
    
    # Step 3: Process each table by sending the header and row data in chunks
    tables_responses = []
    for table in tables:
        rows = table.splitlines()
        header, *data_rows = rows
        table_response = process_table_rows_with_header(header, data_rows, chunk_size)
        tables_responses.append("\n".join(table_response))
    
    # Step 4: Replace original tables with AI responses
    modified_content = replace_tables_with_responses(content, tables_responses)
    
    # Step 5: Save to a new markdown file
    with open(output_path, "w", encoding="utf-8") as file:
        file.write(modified_content)
    print(f"Modified content saved to {output_path}")

# Example usage
file_path = "extracted_content.md"
output_path = "modified_content.md"
chunk_size = 3  # Set the desired number of rows per chunk
process_and_replace_tables(file_path, output_path, chunk_size)