In [None]:
import pandas as pd
import re
import json
import emoji
from datetime import datetime, date, timezone
from sqlalchemy import create_engine
import nltk
import os
from dotenv import load_dotenv
from nltk.corpus import stopwords

In [None]:
nltk.download("stopwords")
STOP_WORDS = set(stopwords.words("english"))

load_dotenv()
DB_HOST = os.getenv("DB_HOST")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
engine = create_engine(
    f"mysql+mysqlconnector://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
)

In [3]:
def normalize_text(text):
    if pd.isna(text):
        return None

    text = str(text).lower()
    text = emoji.replace_emoji(text, replace="")
    text = re.sub(r"[^\w\s@.-]", "", text)
    words = text.split()
    words = [w for w in words if w not in STOP_WORDS]
    return " ".join(words)

In [None]:
def detect_email_content(text):
    if not isinstance(text, str):
        return False

    email_markers = ["From:", "To:", "Subject:", "Message-ID:", "Date:"]
    return sum(marker in text for marker in email_markers) >= 2

In [None]:
def parse_email_content(raw_text):
    headers, _, body = raw_text.partition("\n\n")

    email_data = {
        "message_id": None,
        "date": None,
        "sender": None,
        "receiver": None,
        "subject": None,
        "body": body.strip()
    }

    for line in headers.split("\n"):
        if line.startswith("Message-ID:"):
            email_data["message_id"] = line.replace("Message-ID:", "").strip()
        elif line.startswith("Date:"):
            email_data["date"] = line.replace("Date:", "").strip()
        elif line.startswith("From:"):
            email_data["sender"] = line.replace("From:", "").strip()
        elif line.startswith("To:"):
            email_data["receiver"] = line.replace("To:", "").strip()
        elif line.startswith("Subject:"):
            email_data["subject"] = line.replace("Subject:", "").strip()

    return email_data

In [6]:
def normalize_email_data(email_dict):
    for key in ["subject", "body"]:
        if email_dict.get(key):
            email_dict[key] = normalize_text(email_dict[key])
    return email_dict

In [None]:
#Normalizing
def normalize_all_tables_to_json(
    engine,
    output_file="outputs/normalized_output.json",
    chunk_size=1000
):
    tables = pd.read_sql("SHOW TABLES", engine).iloc[:, 0].tolist()
    table_id = 1
    first_record = True  

    with open(output_file, "w", encoding="utf-8") as f:
        f.write("[\n")

        for table in tables:
            query = f"SELECT * FROM `{table}`"

            # READ TABLE IN CHUNKS
            for chunk in pd.read_sql(query, engine, chunksize=chunk_size):

                for _, row in chunk.iterrows():
                    cleaned_row = {}

                    for column, value in row.items():

                        # EMAIL DATA
                        if isinstance(value, str) and detect_email_content(value):
                            email_data = parse_email_content(value)
                            email_data = normalize_email_data(email_data)
                            cleaned_row[column] = email_data

                        # TEXT DATA
                        elif isinstance(value, str):
                            cleaned_row[column] = normalize_text(value)

                        # DATE / DATETIME
                        elif isinstance(value, (datetime, date)):
                            cleaned_row[column] = value.isoformat()

                        # NON-TEXT
                        else:
                            cleaned_row[column] = value

                    record = {
                        "id": table_id,
                        "source_name": table,
                        "timestamp": datetime.now(timezone.utc).isoformat(),
                        "data": cleaned_row
                    }

                    if not first_record:
                        f.write(",\n")
                    else:
                        first_record = False

                    json.dump(record, f, ensure_ascii=False)

            table_id += 1
            print(table,"done")

        f.write("\n]")

    print("Normalization & JSON generation completed")


In [12]:
normalize_all_tables_to_json(engine)

customer done
customer_support_tickets done
department done
emails done
employee done
employee_project done
glassdoor-companies-reviews done
manager done
order_items done
orders done
pdf done
product done
project done
tata_motors_employee_reviews done
Normalization & JSON generation completed
