In [1]:
# Mount the drive if you are working in google colab


# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import os
import logging
import csv
from pathlib import Path

In [3]:
# Empty lines, trailing and leading spaces are removed

def remove_empty_lines(file_path):
    
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            lines = [line.strip() for line in f if line.strip()]

        with open(file_path, "w", encoding="utf-8") as f:
            f.write("\n".join(lines) + "\n")

        # print(f"Processed {file_path}: {len(lines)} non-empty lines") ***************
        # logger.debug(f"Processed {file_path}: {len(lines)} non-empty lines")
        return lines
    except Exception as e:
        # logger.error(f"Error processing {file_path}: {str(e)}")
        print(f"Error processing {file_path}: {str(e)}")
        return []

In [4]:
"""
    Recursively combines all text files from a folder and its subfolders.
    Returns tuples of columns for both unreviewed (col1, col2) and reviewed (col1, col3) data.
"""

def combine_text_files(input_folder):
    
    unreviewed_pairs = []  # Will store (col1, col2) pairs
    reviewed_pairs = []    # Will store (col1, col3) pairs where available
    bad_lines = []

    try:
        with os.scandir(input_folder) as entries:
            for entry in entries:
                if entry.is_file() and entry.name.endswith(('.txt', '.tsv')):
                    try:
                        with open(entry.path, "rb") as f:  # Open in binary mode
                            for c, byte_line in enumerate(f):
                                try:
                                    line = byte_line.decode("utf-8").strip()

                                    # Skip headers or empty lines
                                    if not line or "Source_Text" in line:
                                        continue

                                    # Split the line into columns (handle both tab and comma separators)
                                    if '\t' in line:
                                        columns = line.split('\t')
                                    # else:
                                    #     columns = line.split(',')

                                    # Clean columns
                                    columns = [col.strip() for col in columns]

                                    # Handle unreviewed pairs (columns 1 and 2)
                                    if len(columns) >= 2 and columns[0] and columns[1]:
                                        unreviewed_pairs.append((columns[0], columns[1]))

                                    # Handle reviewed pairs (columns 1 and 3) when available
                                    if len(columns) >= 3 and columns[0] and columns[2]:
                                        reviewed_pairs.append((columns[0], columns[2]))

                                except UnicodeDecodeError:
                                    bad_lines.append({
                                        "file": entry.name,
                                        "index": c,
                                        "bytes": str(byte_line[:20])
                                    })
                                    # logger.debug(f"UnicodeDecodeError in file {entry.name} at line {c}")
                                    print(f"UnicodeDecodeError in file {entry.name} at line {c}")

                        # logger.debug(f"Processed file {entry.name}")
                        # print(f"Processed file {entry.name}")  *******************
                    except Exception as e:
                        # logger.error(f"Error reading {entry.path}: {str(e)}")
                        print(f"Error reading {entry.path}: {str(e)}")

                elif entry.is_dir():
                    sub_unreviewed, sub_reviewed, sub_bad_lines = combine_text_files(entry.path)
                    unreviewed_pairs.extend(sub_unreviewed)
                    reviewed_pairs.extend(sub_reviewed)
                    bad_lines.extend(sub_bad_lines)
    except Exception as e:
        # logger.error(f"Error processing directory {input_folder}: {str(e)}")
        print(f"Error processing directory {input_folder}: {str(e)}")

    return unreviewed_pairs, reviewed_pairs, bad_lines


In [5]:
"""Write column pairs to a TSV file."""

def write_pairs_to_file(pairs, output_file):
    
    if not pairs:
        return 0

    try:
        with open(output_file, "w", encoding="utf-8", newline='') as f:
            writer = csv.writer(f, delimiter='\t')
            for pair in pairs:
                writer.writerow(pair)
        return len(pairs)
    except Exception as e:
        # logger.error(f"Error writing to {output_file}: {str(e)}")
        print(f"Error writing to {output_file}: {str(e)}")
        return 0

In [6]:
"""Process a directory of translation files organized by language pairs and domains."""

def process_translation_directory(base_dir, output_dir):
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)

    # Dictionary to store all processed data
    final_dict = {}

    # Statistics for reporting
    stats = {
        "total_language_pairs": 0,
        "total_domains": 0,
        "total_unreviewed_files": 0,
        "total_reviewed_files": 0,
        "total_unreviewed_lines": 0,
        "total_reviewed_lines": 0
    }

    # Scan for language pair directories
    with os.scandir(base_dir) as languages:
        for lang_pair in languages:
            if not lang_pair.is_dir():
                continue

            # logger.info(f"Processing language pair: {lang_pair.name}")
            # print(f"Processing language pair: {lang_pair.name}") *****************
            stats["total_language_pairs"] += 1
            language_data = []

            # Path to the language pair directory
            lang_path = os.path.join(base_dir, lang_pair.name)

            # Scan for domain directories within the language pair
            with os.scandir(lang_path) as domains:
                for domain in domains:
                    if not domain.is_dir():
                        continue

                    # logger.info(f"  Processing domain: {domain.name}")
                    print(f"  Processing domain: {domain.name}")
                    stats["total_domains"] += 1

                    # Combine all files for this domain
                    domain_path = os.path.join(lang_path, domain.name)
                    # combined_data, bad_lines = combine_text_files(domain_path)
                    unreviewed_pairs, reviewed_pairs, bad_lines = combine_text_files(domain_path)

                    if not unreviewed_pairs and not reviewed_pairs:
                        # logger.warning(f"  No data found for {lang_pair.name}/{domain.name}")
                        print(f"  No data found for {lang_pair.name}/{domain.name}")
                        continue


                    domain_data = {
                        "domain": domain.name,
                        "unreviewed": {
                            "count": len(unreviewed_pairs),
                            "file_path": None
                        },
                        "reviewed": {
                            "count": len(reviewed_pairs),
                            "file_path": None
                        }
                    }


                    # Create unreviewed file (columns 1 & 2)
                    if unreviewed_pairs:
                        unreviewed_file = os.path.join(output_dir, f"unreviewed_{lang_pair.name}_{domain.name}.tsv")
                        unreviewed_count = write_pairs_to_file(unreviewed_pairs, unreviewed_file)

                        if unreviewed_count > 0:
                            domain_data["unreviewed"]["file_path"] = unreviewed_file
                            stats["total_unreviewed_files"] += 1
                            stats["total_unreviewed_lines"] += unreviewed_count
                            # logger.info(f"  Created unreviewed file with {unreviewed_count} lines: {os.path.basename(unreviewed_file)}")
                            print(f"  Created unreviewed file with {unreviewed_count} lines: {os.path.basename(unreviewed_file)}")

                    # Create reviewed file (columns 1 & 3) if applicable
                    if reviewed_pairs:
                        reviewed_file = os.path.join(output_dir, f"reviewed_{lang_pair.name}_{domain.name}.tsv")
                        reviewed_count = write_pairs_to_file(reviewed_pairs, reviewed_file)

                        if reviewed_count > 0:
                            domain_data["reviewed"]["file_path"] = reviewed_file
                            stats["total_reviewed_files"] += 1
                            stats["total_reviewed_lines"] += reviewed_count
                            # logger.info(f"  Created reviewed file with {reviewed_count} lines: {os.path.basename(reviewed_file)}")
                            print(f"  Created reviewed file with {reviewed_count} lines: {os.path.basename(reviewed_file)}")

                    # Log bad lines if any
                    if bad_lines:
                        # logger.warning(f"  Found {len(bad_lines)} problematic lines in {domain.name}")
                        print(f"  Found {len(bad_lines)} problematic lines in {domain.name}")

                    # Store domain data in our dictionary
                    language_data.append(domain_data)

            # Store all data for this language pair
            final_dict[lang_pair.name] = language_data

    # Print summary statistics
    # logger.info("\nProcessing Summary:")
    # logger.info(f"Total language pairs processed: {stats['total_language_pairs']}")
    # logger.info(f"Total domains processed: {stats['total_domains']}")
    # logger.info(f"Total unreviewed files created: {stats['total_unreviewed_files']} with {stats['total_unreviewed_lines']} lines")
    # logger.info(f"Total reviewed files created: {stats['total_reviewed_files']} with {stats['total_reviewed_lines']} lines")

    print("\nProcessing Summary:")
    print(f"Total language pairs processed: {stats['total_language_pairs']}")
    print(f"Total domains processed: {stats['total_domains']}")
    print(f"Total unreviewed files created: {stats['total_unreviewed_files']} with {stats['total_unreviewed_lines']} lines")
    print(f"Total reviewed files created: {stats['total_reviewed_files']} with {stats['total_reviewed_lines']} lines")

    return final_dict, stats

    #                 # Create output file name: LANG1-LANG2_DOMAIN.txt
    #                 output_file = os.path.join(output_dir, f"{lang_pair.name}_{domain.name}.txt")

    #                 # Write combined data to file
    #                 with open(output_file, "w", encoding="utf-8") as f:
    #                     f.write("\n".join(combined_data) + "\n")

    #                 # Remove empty lines and get clean data
    #                 clean_data = remove_empty_lines(output_file)

    #                 # Update statistics
    #                 stats["total_files_created"] += 1
    #                 stats["total_lines_processed"] += len(clean_data)

    #                 # Log results
    #                 logger.info(f"  Domain: {domain.name} - Raw lines: {len(combined_data)} - "
    #                            f"Clean lines: {len(clean_data)}")

    #                 # Store data in our dictionary
    #                 language_data.append({
    #                     "domain": domain.name,
    #                     "data": clean_data,
    #                     "file_path": output_file
    #                 })

    #                 # Log bad lines if any
    #                 if bad_lines:
    #                     logger.warning(f"  Found {len(bad_lines)} problematic lines in {domain.name}")

    #         # Store all data for this language pair
    #         final_dict[lang_pair.name] = language_data

    # # Print summary statistics
    # logger.info("\nProcessing Summary:")
    # logger.info(f"Total language pairs processed: {stats['total_language_pairs']}")
    # logger.info(f"Total domains processed: {stats['total_domains']}")
    # logger.info(f"Total output files created: {stats['total_files_created']}")
    # logger.info(f"Total lines processed: {stats['total_lines_processed']}")

    # return final_dict, stats

# Make changes to the path in the block below

In [7]:
# Configuration - adjust these paths as needed

if __name__ == "__main__":
    
    INPUT_FOLDER = "/content/drive/MyDrive/COIL-D/Test/"
    OUTPUT_FOLDER = "/content/output_test/"

    # logger.info(f"Starting processing of translation files")
    # logger.info(f"Input directory: {INPUT_FOLDER}")
    # logger.info(f"Output directory: {OUTPUT_FOLDER}")

    print(f"Starting processing of translation files")
    print(f"Input directory: {INPUT_FOLDER}")
    print(f"Output directory: {OUTPUT_FOLDER}")

    # Process all files
    result_dict, stats = process_translation_directory(INPUT_FOLDER, OUTPUT_FOLDER)

    # logger.info("Processing complete!")
    print("Processing complete!")

Starting processing of translation files
Input directory: /content/drive/MyDrive/COIL-D/Test/
Output directory: /content/output_test/
  Processing domain: HLT
  Created unreviewed file with 2289 lines: unreviewed_HIN-MAI_HLT.tsv
  Created reviewed file with 2290 lines: reviewed_HIN-MAI_HLT.tsv
  Processing domain: JUD
  Created unreviewed file with 2550 lines: unreviewed_HIN-MAI_JUD.tsv
  Created reviewed file with 2530 lines: reviewed_HIN-MAI_JUD.tsv
  Processing domain: GOV
  Created unreviewed file with 18429 lines: unreviewed_HIN-MAI_GOV.tsv
  Created reviewed file with 18434 lines: reviewed_HIN-MAI_GOV.tsv
  Processing domain: EDU
  Created unreviewed file with 6913 lines: unreviewed_HIN-MAI_EDU.tsv
  Created reviewed file with 6901 lines: reviewed_HIN-MAI_EDU.tsv
  Processing domain: AGRI
  Created unreviewed file with 3002 lines: unreviewed_HIN-MAI_AGRI.tsv
  Created reviewed file with 2982 lines: reviewed_HIN-MAI_AGRI.tsv

Processing Summary:
Total language pairs processed: 1
T

# Download the folder if it is in google colab

In [None]:
# prompt: zip and download a folder

import shutil
import os

def zip_and_download_folder(folder_path, zip_filename):
    """Zips a folder and downloads the zip file."""
    try:
        shutil.make_archive(zip_filename, 'zip', folder_path)
        print(f"Folder '{folder_path}' zipped successfully to '{zip_filename}.zip'")
        # For downloading in colab environment
        from google.colab import files
        files.download(f'{zip_filename}.zip')
    except FileNotFoundError:
        print(f"Error: Folder '{folder_path}' not found.")
    except Exception as e:
        print(f"An error occurred: {e}")

# Example usage (replace with your folder path and desired zip file name)
zip_and_download_folder("/content/output_test", "output_test")


Folder '/content/output_test' zipped successfully to 'output_test.zip'


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>