## EC json extraction

In [None]:
import google.generativeai as genai
import os
import json
import re
from dotenv import load_dotenv
# ==============================
# CONFIG
# ==============================

#------------ENTER YOUR API KEY HERE----------------
# API_KEY=....

MODEL_NAME = "gemini-2.5-flash"

genai.configure(api_key=API_KEY)

model = genai.GenerativeModel(
    MODEL_NAME,
    generation_config={
        "temperature": 0,
        "top_p": 0.1
    }
)

# ==============================
# JSON CLEANER
# ==============================

def clean_json_response(text):
    if not text:
        raise ValueError("Empty response")

    text = text.strip()

    text = re.sub(r"```json", "", text, flags=re.IGNORECASE)
    text = re.sub(r"```", "", text).strip()

    # Try direct parse
    try:
        return json.loads(text)
    except:
        pass

    # Try list extraction
    list_start = text.find("[")
    list_end = text.rfind("]")

    if list_start != -1 and list_end != -1:
        try:
            return json.loads(text[list_start:list_end+1])
        except:
            pass

    # Try object extraction
    obj_start = text.find("{")
    obj_end = text.rfind("}")

    if obj_start != -1 and obj_end != -1:
        candidate = text[obj_start:obj_end+1]
        decoder = json.JSONDecoder()
        idx = 0
        objs = []

        while idx < len(candidate):
            part = candidate[idx:].lstrip()
            if not part:
                break
            obj, end = decoder.raw_decode(part)
            objs.append(obj)
            idx += len(candidate[idx:]) - len(part) + end

        if len(objs) == 1:
            return objs[0]
        elif len(objs) > 1:
            return objs

    raise ValueError("Could not parse JSON")

# ==============================
# NORMALIZATION (VERY IMPORTANT)
# ==============================

def normalize_keys(obj):
    rename_map = {
        "survey_not_reliable": "survey_no",
        "the_evil_one": "judi",
        "bad_news": "sessu"
    }

    if isinstance(obj, dict):
        new_obj = {}
        for k, v in obj.items():
            new_key = rename_map.get(k, k)
            new_obj[new_key] = normalize_keys(v)
        return new_obj

    if isinstance(obj, list):
        return [normalize_keys(i) for i in obj]

    return obj

# ==============================
# FINAL POST-PROCESS FIX
# ==============================

def clean_account_change_entries(mutations):
    """
    SAFE FIX:
    - split transferee + relationship
    - merge ONLY clear continuation rows
    - NEVER delete anchor rows
    """

    for mutation in mutations:

        entries = mutation.get("account_change_entries", [])
        if not entries:
            continue

        cleaned = []

        for entry in entries:

            survey = entry.get("affected_survey_and_share_number")
            rights = entry.get("those_who_changed_their_rights")
            area = entry.get("area_changed_rights")
            transferees = entry.get("transferees", [])

            # ---- split name + relationship ----
            for t in transferees:
                if "transferee" in t and isinstance(t["transferee"], str):
                    txt = t["transferee"]

                    if "Relationship" in txt:
                        parts = txt.split("Relationship")
                        t["name"] = parts[0].replace(":", "").strip()
                        t["relationship"] = parts[1].replace(":", "").strip()
                        del t["transferee"]

            # ---- SAFE MERGE LOGIC ----
            if cleaned:

                last = cleaned[-1]

                same_survey = (
                    survey is not None and
                    survey == last.get("affected_survey_and_share_number")
                )

                continuation_row = (
                    rights is None and
                    area is None and
                    transferees
                )

                # ONLY merge obvious continuation rows
                if same_survey and continuation_row:
                    last.setdefault("transferees", [])
                    last["transferees"].extend(transferees)
                    continue

            # NEVER remove real rows
            cleaned.append(entry)

        mutation["account_change_entries"] = cleaned

    return mutations



# ==============================
# HELPERS
# ==============================

def load_pdf_bytes(pdf_path):
    if not os.path.exists(pdf_path):
        raise FileNotFoundError(pdf_path)

    with open(pdf_path, "rb") as f:
        return f.read()


def call_gemini(pdf_data, prompt):
    response = model.generate_content([
        {"mime_type": "application/pdf", "data": pdf_data},
        prompt
    ])
    return response.text.strip()


# ==============================
# PROMPTS
# ==============================

HEADER_PROMPT = """
Extract ONLY document metadata from the top of the EC report.

Return:

{
  "document_metadata": {
    "report_title": null,
    "district": null,
    "taluk": null,
    "hobli": null,
    "village": null,
    "survey_no_main": null
  }
}

Ignore mutation tables.
Output JSON only.
"""

SUMMARY_PROMPT = """
Extract ONLY mutation summary table.

Return:

{
  "mutation_summary":[
    {
      "si_no": null,
      "mr_number": null,
      "mutation_type": null
    }
  ]
}

Output JSON only.
"""

MUTATION_PROMPT = """
Extract ALL mutation sections from this EC.

IMPORTANT RULES:

- Each mutation begins with "SI NO".
- Extract ALL mutations.
- Preserve hierarchy.

SCHEMA RULE (STRICT):

Use these key names ONLY:
- survey_no
- judi
- sessu
- water_rate
- revenue

Never output:
- survey_not_reliable
- the_evil_one
- bad_news

Return:

{
  "mutations":[
    {
      "si_no": null,
      "change_method": null,
      "acquisition_method": null,
      "mr_number": null,
      "approve_date": null,
      "mutation_status": null,
      "account_change_entries": [],
      "new_survey_details": [],
      "owner_details": []
    }
  ]
}

ROW INHERITANCE RULE:

If a row has NULL or blank values for:
- affected_survey_and_share_number
- those_who_changed_their_rights
- area_changed_rights

then it belongs to the PREVIOUS non-empty row.

Group such rows as transferees under the previous survey entry.

Do NOT output separate objects with null survey numbers.


Output JSON only.
"""

# ==============================
# EXTRACTION FUNCTIONS
# ==============================

def extract_header(pdf_data):
    print("Extracting header...")
    result = call_gemini(pdf_data, HEADER_PROMPT)
    data = clean_json_response(result)
    return data.get("document_metadata", {})


def extract_summary(pdf_data):
    print("Extracting summary...")
    result = call_gemini(pdf_data, SUMMARY_PROMPT)
    data = clean_json_response(result)
    return data.get("mutation_summary", [])


def extract_mutations(pdf_data):
    print("Extracting mutations...")
    result = call_gemini(pdf_data, MUTATION_PROMPT)
    data = clean_json_response(result)

    if isinstance(data, dict):
        return data.get("mutations", [])

    if isinstance(data, list):
        return data

    return []


# ==============================
# MAIN PIPELINE
# ==============================

def run_pipeline(pdf_path):

    pdf_data = load_pdf_bytes(pdf_path)

    metadata = extract_header(pdf_data)
    mutation_summary = extract_summary(pdf_data)
    mutations = extract_mutations(pdf_data)

    final_json = {
        "document_metadata": metadata,
        "mutation_summary": mutation_summary,
        "mutations": mutations
    }

    # normalize bad OCR keys
    final_json = normalize_keys(final_json)

    final_json["mutations"] = clean_account_change_entries(
        final_json["mutations"]
    )

    output_path = os.path.join(
        os.path.dirname(pdf_path),
        "ec_extracted.json"
    )

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(final_json, f, indent=2, ensure_ascii=False)

    print("Saved:", output_path)


# ==============================
# RUN
# ==============================

if __name__ == "__main__":
    pdf_path = "../templates/ec/EC_sample.pdf"
    run_pipeline(pdf_path)


Extracting header...
Extracting summary...
Extracting mutations...
Saved: ../templates/ec\ec_extracted.json


## Khata Extraction

In [None]:
import google.generativeai as genai
import os
import json
import re

# ==============================
# CONFIG
# ==============================

#------------ENTER YOUR API KEY HERE----------------
# API_KEY=....

MODEL_NAME = "gemini-2.5-flash"

genai.configure(api_key=API_KEY)

model = genai.GenerativeModel(
    MODEL_NAME,
    generation_config={
        "temperature": 0,
        "top_p": 0.1
    }
)

# ==============================
# JSON CLEANER
# ==============================

def clean_json_response(text):

    if not text:
        raise ValueError("Empty response")

    text = text.strip()
    text = re.sub(r"```json", "", text, flags=re.IGNORECASE)
    text = re.sub(r"```", "", text).strip()

    try:
        return json.loads(text)
    except:
        pass

    start = text.find("{")
    end = text.rfind("}")

    if start != -1 and end != -1:
        return json.loads(text[start:end+1])

    raise ValueError("Could not parse JSON")

# ==============================
# POST PROCESS (FIXES)
# ==============================

def clean_khata_output(data):
    """
    Fix:
    - move TOTAL row
    - keep entries clean
    """

    entries = data.get("entries", [])
    cleaned_entries = []
    total_row = None

    for row in entries:

        owner = row.get("owner_name")

        if owner and owner.lower().strip() == "total":
            total_row = row
            continue

        cleaned_entries.append(row)

    data["entries"] = cleaned_entries

    if total_row:
        data["table_summary"] = total_row
    else:
        data["table_summary"] = {}

    return data

# ==============================
# HELPERS
# ==============================

def load_pdf_bytes(path):
    if not os.path.exists(path):
        raise FileNotFoundError(path)

    with open(path, "rb") as f:
        return f.read()


def call_gemini(pdf_data, prompt):
    response = model.generate_content([
        {"mime_type": "application/pdf", "data": pdf_data},
        prompt
    ])
    return response.text.strip()

# ==============================
# PROMPT
# ==============================

KHATA_PROMPT = """
You are extracting data from a Karnataka
"Copy of account/patta book" document.

IMPORTANT RULES:

- Output JSON ONLY.
- Do NOT invent fields.
- Preserve values exactly as written.
- Area must remain STRING.
- Ignore watermark text.
- Hobli must be extracted even if faint.

OUTPUT FORMAT:

{
  "document_metadata": {
    "report_title": null,
    "account_number": null,
    "district": null,
    "taluk": null,
    "hobli": null,
    "village": null
  },

  "entries": [
    {
      "serial_number": null,
      "survey_share_number": null,
      "owner_name": null,
      "area": null,
      "shape": null,
      "local_tax": null,
      "health_insurance": null,
      "education_tax": null,
      "total": null
    }
  ],

  "footer": {
    "location": null,
    "date": null,
    "amount": null,
    "certified_copy": null
  }
}

Output JSON only.
"""

# ==============================
# EXTRACTION
# ==============================

def extract_khata(pdf_data):

    print("Extracting khata data...")
    result = call_gemini(pdf_data, KHATA_PROMPT)
    data = clean_json_response(result)

    # apply fixes
    data = clean_khata_output(data)

    return data

# ==============================
# MAIN PIPELINE
# ==============================

def run_pipeline(pdf_path):

    pdf_data = load_pdf_bytes(pdf_path)

    result = extract_khata(pdf_data)

    output_path = os.path.join(
        os.path.dirname(pdf_path),
        "khata_extracted.json"
    )

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(result, f, indent=2, ensure_ascii=False)

    print("Saved:", output_path)

# ==============================
# RUN
# ==============================

if __name__ == "__main__":
    pdf_path = "../templates/khata/khata_sample.pdf"
    run_pipeline(pdf_path)


Extracting khata data...
Saved: ../templates/khata\khata_extracted.json


## Sale deed

In [None]:
import google.generativeai as genai
import os
import json
import re

# ==============================
# CONFIG
# ==============================

#------------ENTER YOUR API KEY HERE----------------
# API_KEY=....

MODEL_NAME = "gemini-2.5-flash"

genai.configure(api_key=API_KEY)

model = genai.GenerativeModel(
    MODEL_NAME,
    generation_config={
        "temperature": 0,
        "top_p": 0.1
    }
)

# ==============================
# JSON CLEANER
# ==============================

def clean_json_response(text):

    if not text:
        raise ValueError("Empty response")

    text = text.strip()
    text = re.sub(r"```json", "", text, flags=re.IGNORECASE)
    text = re.sub(r"```", "", text).strip()

    try:
        return json.loads(text)
    except:
        pass

    start = text.find("{")
    end = text.rfind("}")

    if start != -1 and end != -1:
        return json.loads(text[start:end+1])

    raise ValueError("Could not parse JSON")

# ==============================
# HELPERS
# ==============================

def load_pdf_bytes(path):
    if not os.path.exists(path):
        raise FileNotFoundError(path)

    with open(path, "rb") as f:
        return f.read()


def call_gemini(pdf_data, prompt):
    response = model.generate_content([
        {"mime_type": "application/pdf", "data": pdf_data},
        prompt
    ])
    return response.text.strip()

# ==============================
# PROMPT (SALE DEED)
# ==============================

SALE_DEED_PROMPT = """
Extract structured information from this SALE DEED.

IMPORTANT RULES:

- Output JSON ONLY.
- Do NOT invent values.
- If field is blank in template â†’ null.
- Preserve numbers and text exactly.
- Do NOT summarize legal clauses.

OUTPUT FORMAT:

{
  "document_info": {
    "document_type": "SALE DEED",
    "execution_place": null,
    "execution_date": null
  },

  "seller": {
    "name": null,
    "age": null,
    "father_name": null,
    "address": null
  },

  "purchaser": {
    "name": null,
    "age": null,
    "father_name": null,
    "address": null
  },

  "property_details": {
    "apartment_number": null,
    "floor": null,
    "building_name": null,
    "corporation_number": null,
    "road": null,
    "division_number": null,
    "super_builtup_area": null,
    "undivided_share_percent": null,
    "undivided_share_area": null
  },

  "previous_sale_details": {
    "previous_owner": null,
    "previous_sale_deed_date": null,
    "document_number": null,
    "sub_registrar_office": null,
    "khata_number": null
  },

  "sale_consideration": {
    "amount": null,
    "amount_words": null
  },

  "payment_details": [
    {
      "cheque_number": null,
      "cheque_date": null,
      "bank": null,
      "amount": null
    }
  ],

  "schedule_property": {
    "east": null,
    "west": null,
    "north": null,
    "south": null
  },

  "market_value": null
}

Output JSON only.
"""

# ==============================
# EXTRACTION
# ==============================

def extract_sale_deed(pdf_data):

    print("Extracting sale deed...")
    result = call_gemini(pdf_data, SALE_DEED_PROMPT)
    return clean_json_response(result)

# ==============================
# MAIN PIPELINE
# ==============================

def run_pipeline(pdf_path):

    pdf_data = load_pdf_bytes(pdf_path)

    result = extract_sale_deed(pdf_data)

    output_path = os.path.join(
        os.path.dirname(pdf_path),
        "sale_deed_extracted.json"
    )

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(result, f, indent=2, ensure_ascii=False)

    print("Saved:", output_path)

# ==============================
# RUN
# ==============================

if __name__ == "__main__":
    pdf_path = "../templates/sale_deed/sale_deed_template.pdf"
    run_pipeline(pdf_path)


Extracting sale deed...
Saved: ../templates/sale_deed\sale_deed_extracted.json


## Handover Summary

1. Use these extraction codes to extract the EC, khata and sale-deed of user. otherwise json formats will mismatch

2. Requirements: pip install google-generativeai