In [2]:
import re

# Step 1: Read the raw text file
with open("C:/Users/LENOVE/Desktop/Data Engineers/raw_messy_text.txt", "r", encoding="utf-8") as f:
    raw_text = f.readlines()

print(f"Total lines before cleaning: {len(raw_text)}")


Total lines before cleaning: 58


In [3]:
# Step 2: Remove empty lines and strip extra spaces
cleaned = [line.strip() for line in raw_text if line.strip() != ""]

In [4]:
# Step 3: Remove HTML tags
cleaned = [re.sub(r"<.*?>", "", line) for line in cleaned]

In [5]:
# Step 4: Normalize multiple spaces and tabs
cleaned = [re.sub(r"\s+", " ", line) for line in cleaned]

In [7]:
# Step 5: Remove duplicate lines
cleaned = list(dict.fromkeys(cleaned))

# Step 6: Remove random punctuation clutter (like ...., !!!, --- etc.)
cleaned = [re.sub(r"[.]{2,}|[!?]{2,}|[-=]{2,}", ".", line) for line in cleaned]

# Step 7: (Optional) Remove lines with unwanted symbols or noise
# e.g. remove lines starting with '---', '>>>', or containing broken JSON
cleaned = [line for line in cleaned if not re.match(r"^[-=]{2,}|^>>>", line)]
cleaned = [line for line in cleaned if not re.search(r"Broken JSON", line)]

# Step 8: Save the cleaned output to a new text file
with open("C:/Users/LENOVE/Desktop/Data Engineers/cleaned_text.txt", "w", encoding="utf-8") as f:
    for line in cleaned:
        f.write(line + "\n")

print("✅ Cleaning complete!")
print(f"Total lines after cleaning: {len(cleaned)}")
print("Cleaned file saved as: cleaned_text.txt")

✅ Cleaning complete!
Total lines after cleaning: 36
Cleaned file saved as: cleaned_text.txt


In [8]:
import re

# Load cleaned text
with open("C:/Users/LENOVE/Desktop/Data Engineers/cleaned_text.txt", "r", encoding="utf-8") as f:
    text = f.read()


In [9]:
emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)
print("Emails found:", emails)


Emails found: ['john.doe@example.com', 'jane_smith@domain.co.uk']


In [10]:
phones = re.findall(r"\+?\d[\d().\-\s]{7,}\d", text)
print("Phone numbers found:", phones)


Phone numbers found: ['2025-10-01 08', '2025-10-02 09', '10\n2025-10-02 09', '+1 (555) 123-4567\n555.123.4568']


In [11]:
urls = re.findall(r"https?://[^\s]+", text)
print("URLs found:", urls)


URLs found: ['https://example.com/page?ref=abc', 'http://localhost:8000/test']


In [12]:
dates = re.findall(r"\d{4}-\d{2}-\d{2}", text)
times = re.findall(r"\d{2}:\d{2}:\d{2}", text)

print("Dates found:", dates)
print("Times found:", times)


Dates found: ['2025-10-01', '2025-10-02', '2025-10-02']
Times found: ['08:15:23', '09:00:00', '09:00:01']


In [13]:
with open("C:/Users/LENOVE/Desktop/Data Engineers/extracted_data.txt", "w", encoding="utf-8") as f:
    f.write("Emails:\n" + "\n".join(emails) + "\n\n")
    f.write("Phone Numbers:\n" + "\n".join(phones) + "\n\n")
    f.write("URLs:\n" + "\n".join(urls) + "\n\n")
    f.write("Dates:\n" + "\n".join(dates) + "\n\n")
    f.write("Times:\n" + "\n".join(times) + "\n\n")

print("✅ Extraction complete! File saved as extracted_data.txt")

✅ Extraction complete! File saved as extracted_data.txt


In [14]:
import re
import csv

# Step 1: Load the text file
with open("cleaned_text.txt", "r", encoding="utf-8") as f:
    text = f.read()

# Step 2: Define regex patterns for extraction
emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)
phones = re.findall(r"\+?\d[\d().\-\s]{7,}\d", text)
urls = re.findall(r"https?://[^\s]+", text)
dates = re.findall(r"\d{4}-\d{2}-\d{2}", text)
times = re.findall(r"\d{2}:\d{2}:\d{2}", text)

# Step 3: Prepare to save extracted data
# (We'll pad lists so all columns have same length)
max_len = max(len(emails), len(phones), len(urls), len(dates), len(times))
def pad(lst):
    return lst + [""] * (max_len - len(lst))

emails, phones, urls, dates, times = map(pad, [emails, phones, urls, dates, times])

# Step 4: Save into CSV
with open("C:/Users/LENOVE/Desktop/Data Engineers/extracted_summary.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(["Email", "Phone", "URL", "Date", "Time"])
    for i in range(max_len):
        writer.writerow([
            emails[i],
            phones[i],
            urls[i],
            dates[i],
            times[i]
        ])

print("✅ Data extracted and saved as extracted_summary.csv!")


✅ Data extracted and saved as extracted_summary.csv!


In [18]:
import re
import csv
import os

# === CONFIGURATION ===
input_folder = "C:/Users/LENOVE/Desktop/Data Engineers/Python/data_files"
output_folder = "C:/Users/LENOVE/Desktop/Data Engineers/output_files"

# Create folders if not exist
os.makedirs(input_folder, exist_ok=True)
os.makedirs(output_folder, exist_ok=True)

# === FUNCTION TO EXTRACT DATA FROM TEXT ===
def extract_from_text(text):
    emails = re.findall(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)
    phones = re.findall(r"\+?\d[\d().\-\s]{7,}\d", text)
    urls = re.findall(r"https?://[^\s]+", text)
    dates = re.findall(r"\d{4}-\d{2}-\d{2}", text)
    times = re.findall(r"\d{2}:\d{2}:\d{2}", text)

    print("Emails found:", emails)
    print("Phones found:", phones)
    print("URLs found:", urls)
    print("Dates found:", dates)
    print("Times found:", times)

    max_len = max(len(emails), len(phones), len(urls), len(dates), len(times), 1)
    def pad(lst): return lst + [""] * (max_len - len(lst))
    return pad(emails), pad(phones), pad(urls), pad(dates), pad(times)

# === MAIN PIPELINE ===
print("Files detected in input folder:")
for filename in os.listdir(input_folder):
    print("→", filename)

print("\n🔍 Starting extraction process...\n")

for filename in os.listdir(input_folder):
    # ✅ Only process text files that are not already cleaned/extracted
    if filename.endswith(".txt") and "cleaned" not in filename.lower() and "extract" not in filename.lower():
        file_path = os.path.join(input_folder, filename)
        print(f"Processing: {file_path}")

        with open(file_path, "r", encoding="utf-8") as f:
            text = f.read()

        emails, phones, urls, dates, times = extract_from_text(text)

        # ✅ Generate unique output name (avoid overwriting)
        csv_name = filename.replace(".txt", "_extracted.csv")
        csv_path = os.path.join(output_folder, csv_name)

        with open(csv_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            writer.writerow(["Email", "Phone", "URL", "Date", "Time"])
            for i in range(len(emails)):
                writer.writerow([emails[i], phones[i], urls[i], dates[i], times[i]])

        if any([emails, phones, urls, dates, times]):
            print(f"✅ Data extracted and saved to: {csv_path}")
        else:
            print(f"⚠️ No matches found in: {filename}")

print("\n🎯 All eligible files processed successfully!")


Files detected in input folder:

🔍 Starting extraction process...


🎯 All eligible files processed successfully!
