In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import json
import time
import pandas as pd
from pydantic import BaseModel, Field
from typing import Literal, get_args
from openpyxl import Workbook, load_workbook
import re
from bs4 import BeautifulSoup

# new SDK:
from google import genai
from google.genai import types

# progress bar
from tqdm import tqdm

# ─── CONFIG ───────────────────────────────────────────────────────────────────

GEMINI_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("Missing environment variable: GOOGLE_API_KEY")

client = genai.Client(api_key=GEMINI_API_KEY)

# ─── TYPES & PROMPT ───────────────────────────────────────────────────────────

SectorType = Literal[
    "Government/Public Sector", "Finance and Insurance", "Technology",
    "Telecommunications", "Real Estate", "Healthcare", "Retail",
    "Manufacturing", "Entertainment", "Education", "Energy",
    "Automotive", "Hospitality", "Transportation and Logistics",
    "Food and Beverage", "Nonprofit/NGO", "Agriculture", "Other"
]
JudgmentOutcomeType = Literal["Plaintiff", "Defendant", "Undecided"]

sector_list = ", ".join(get_args(SectorType))
system_prompt = (
    "You are a legal and business analyst.\n"
    "Analyze legal case texts and provide answers in JSON format with the keys \"sector\" and \"judgment_outcome\".\n\n"
    f"1. Identify which USA tertiary sector the case belongs to from the following list: {sector_list}.\n"
    "   If none apply, respond with \"Other\".\n\n"
    "2. Determine the judgment outcome: Plaintiff, Defendant, or Undecided.\n\n"
    "Respond only with a JSON object. Example:\n"
    "{\"sector\": \"Technology\", \"judgment_outcome\": \"Plaintiff\"}"
)

# ─── CLEANING ─────────────────────────────────────────────────────────────────

def clean_text(text: str) -> str:
    soup = BeautifulSoup(text, "lxml")
    return soup.get_text(separator=" ", strip=True)

# ─── MODEL OUTPUT PARSING ─────────────────────────────────────────────────────

class LegalAnalysisResult(BaseModel):
    sector: SectorType = Field(..., description="The USA tertiary sector the case belongs to")
    judgment_outcome: JudgmentOutcomeType = Field(..., description="Plaintiff, Defendant, or Undecided")

def extract_sector_and_outcome(case_text: str) -> LegalAnalysisResult | None:
    try:
        resp = client.models.generate_content(
            model="gemini-1.5-flash-latest",
            contents=[system_prompt, case_text],
            config=types.GenerateContentConfig(
                temperature=0.0,
                top_k=1,
                top_p=0.0,
                max_output_tokens=512,
                response_mime_type="application/json"
            )
        )
        return LegalAnalysisResult.parse_raw(resp.text)
    except Exception as e:
        if "RESOURCE_EXHAUSTED" in str(e):
            print("Rate limit hit. Sleeping for 30 seconds before retry...")
            time.sleep(30)
            return extract_sector_and_outcome(case_text)
        print(f"Gemini error processing case: {e}")
        return None

# ─── EXCEL UTIL ───────────────────────────────────────────────────────────────

def write_result_to_excel(file_path: str, row: list):
    if not os.path.exists(file_path):
        wb = Workbook()
        ws = wb.active
        ws.append(["id", "sector", "judgment_outcome"])
        wb.save(file_path)

    wb = load_workbook(file_path)
    ws = wb.active
    ws.append(row)
    wb.save(file_path)

# ─── MAIN ─────────────────────────────────────────────────────────────────────

def main():
    input_file = "/content/remaining.json"
    output_file = "/content/analysis(remaining).xlsx"

     # Handle JSONDecodeError:
    try:
        with open(input_file, "r", encoding="utf-8") as f:
            cases = json.load(f)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        # Attempt to fix the JSON:
        with open(input_file, "r", encoding="utf-8") as f:
            data = f.read()
            # Simple fix: replace unescaped newlines within strings:
            data = data.replace("\n", "\\n")
            try:
                cases = json.loads(data)  # Try to load the fixed data
            except json.JSONDecodeError as e2:
                print(f"Could not fix JSON: {e2}")
                return

    # Wrap with tqdm, show total, update every record
    for idx, case in enumerate(tqdm(cases, desc="Processing cases", unit="case"), start=1):
        cid = case.get("id")
        text_raw = case.get("plain_text", "")
        text = clean_text(text_raw)
        if not text:
            tqdm.write(f"Skipping {cid}: empty text")
            continue

        analysis = extract_sector_and_outcome(text)
        if analysis:
            tqdm.write(f"{cid} → Sector: {analysis.sector}, Outcome: {analysis.judgment_outcome}")
            write_result_to_excel(output_file, [cid, analysis.sector, analysis.judgment_outcome])
        else:
            tqdm.write(f"{cid} → No result")

        # Print a special notice every 500 records
        if idx % 500 == 0:
            tqdm.write(f"✅ {idx} records processed so far")

if __name__ == "__main__":
    main()


In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import json
import time
import pandas as pd
from openpyxl import Workbook, load_workbook

# Gemini client imports
from google import genai
from google.genai import types

# ─── CONFIG ───────────────────────────────────────────────────────────────────
GEMINI_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("Missing environment variable: GOOGLE_API_KEY")

client = genai.Client(api_key=GEMINI_API_KEY)

# ─── PROMPT SETUP ─────────────────────────────────────────────────────────────
system_prompt = (
    "You are a legal expert in USA.\n"
    "Each input is a semicolon‑separated string of act names.\n"
    "if the input is null or nan or empty, set coun to 0 and move to next row, do not count nans"
    "if the input is non empty then Split on `;`, extract the unique act names, count them, and respond only in JSON "
    "with these two keys:\n"
    "  1. \"unique_acts\" (an array of strings)\n"
    "  2. \"act_count\" (integer)\n"
    "Do not include any extra text or formatting."
)

def write_row_to_excel(output_path: str, row_id: str, row_json: dict):
    """
    Append a single row to the Excel file, keyed by row_id.
    Creates the file with header if it doesn't exist.
    """
    cols = ["id", "unique_acts", "act_count"]
    if not os.path.exists(output_path):
        wb = Workbook()
        ws = wb.active
        ws.append(cols)
        wb.save(output_path)

    wb = load_workbook(output_path)
    ws = wb.active
    # Check if 'act_count' and 'unique_acts' are present in row_json
    act_count = row_json.get("act_count", 0)  # Default to 0 if not found
    unique_acts = row_json.get("unique_acts", [])  # Default to empty list if not found
    ws.append([
        row_id,
        ", ".join(unique_acts),
        act_count
    ])
    wb.save(output_path)

def process_acts(acts_str: str) -> dict | None:
    """
    Call Gemini 1.5 Flash to extract unique acts and count them.
    Returns the parsed JSON dict, or None on failure.
    """
    user_prompt = (
        f"Act names: {acts_str}\n"
        "(Values are separated by ';')"
    )

    try:
        resp = client.models.generate_content(
            model="gemini-1.5-flash-latest",
            contents=[system_prompt, user_prompt],
            config=types.GenerateContentConfig(
                temperature=0.0,
                top_k=1,
                top_p=0.0,
                max_output_tokens=256,
                response_mime_type="application/json"
            )
        )
        # Attempt to parse the response as JSON
        # If parsing fails, return an empty dictionary with default values
        try:
            result = json.loads(resp.text)
        except json.JSONDecodeError:
            print(f"[Warning] Could not parse JSON response for acts_str: {acts_str}")
            result = {"unique_acts": [], "act_count": 0}
        return result
    except Exception as e:
        print(f"[Error] processing acts: {e}")
        if "RESOURCE_EXHAUSTED" in str(e):
            time.sleep(30)
            return process_acts(acts_str)
        return None

def main():
    output_path = "/content/output_acts_additional.xlsx" # where results will be written

    for _, r in acts_df.iterrows():
        row_id   = r.get("id", "")
        acts_str = r.get("ACT_names", "")
        if not row_id or not acts_str:
            print(f"Skipping row id={row_id!r}: missing data")
            continue

        result = process_acts(acts_str)
        if result:
            write_row_to_excel(output_path, row_id, result)
            print(f"Wrote id={row_id} (count={result.get('act_count', 0)})")
        else:
            print(f"Failed to process id={row_id}")

if __name__ == "__main__":
    main()

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import json
import time
import re
import pandas as pd
from openpyxl import Workbook, load_workbook
from google import genai
from google.genai import types

# ─── CONFIG ───────────────────────────────────────────────────────────────────
GEMINI_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GEMINI_API_KEY:
    raise RuntimeError("Missing environment variable: GOOGLE_API_KEY")
client = genai.Client(api_key=GEMINI_API_KEY)

# ─── PROMPT ───────────────────────────────────────────────────────────────────
system_prompt = (
    "You are a legal expert in USA.\n"
    "Each input is a semicolon‑separated string of act names.If there is no semicolon, check the text if it has any act in it\n"
    "If the input is null, nan, or empty, respond with:\n"
    "  { \"unique_acts\": [], \"act_count\": 0 }\n"
    "Otherwise, split on `;`, dedupe, count, and respond ONLY with JSON:\n"
    "{\n"
    "  \"unique_acts\": [ ... ],\n"
    "  \"act_count\": integer\n"
    "}\n"
    "No extra text."
)

def write_row_to_excel(output_path: str, row_id: str, data: dict):
    cols = ["id", "unique_acts", "act_count"]
    if not os.path.exists(output_path):
        wb = Workbook()
        ws = wb.active
        ws.append(cols)
        wb.save(output_path)

    wb = load_workbook(output_path)
    ws = wb.active
    ws.append([
        row_id,
        ", ".join(data.get("unique_acts", [])),
        data.get("act_count", 0)
    ])
    wb.save(output_path)

def process_acts(acts_str: str) -> dict:
    user_prompt = f"Act names: {acts_str}\n(Values are separated by ';')"
    resp = client.models.generate_content(
        model="gemini-1.5-flash-latest",
        contents=[system_prompt, user_prompt],
        config=types.GenerateContentConfig(
            temperature=0.0,
            top_k=1,
            top_p=0.0,
            max_output_tokens=1024,
            response_mime_type="application/json"
        )
    )
    raw = resp.text.strip()
    # grab first {...} block
    m = re.search(r'\{.*\}', raw, flags=re.DOTALL)
    js = m.group() if m else raw
    try:
        return json.loads(js)
    except json.JSONDecodeError:
        print(f"[Warning] Could not parse JSON:\n{raw}\nDefaulting to zero.")
        return {"unique_acts": [], "act_count": 0}

def main():
    output_path = "output_acts.xlsx"

    # iterate over the already‐loaded problem_rows DataFrame
    for _, row in problem_rows.iterrows():
        row_id   = str(row.get("id", "")).strip()
        acts_str = str(row.get("ACT_names", "")).strip()

        if not row_id:
            print("Skipping row with missing id")
            continue

        # Always show the original text
        print(f"\nRow id={row_id}")
        print(f"Original ACT_names: {acts_str!r}")

        if not acts_str:
            data = {"unique_acts": [], "act_count": 0}
            print("→ Empty input, count=0")
        else:
            data = process_acts(acts_str)
            # Show the extracted unique acts
            print(f"→ Extracted unique_acts ({len(data.get('unique_acts', []))}):")
            for act in data.get("unique_acts", []):
                print(f"   - {act}")

        write_row_to_excel(output_path, row_id, data)
        print(f"Wrote to Excel: id={row_id}, act_count={data.get('act_count', 0)}")

        time.sleep(0.1)

if __name__ == "__main__":
    main()


In [None]:
import pandas as pd
import os

case_law_metadata = pd.read_excel(r"/content/drive/MyDrive/FHA/Thesis_data/caselaw_meta.xlsx")
citations_analysis_final = pd.read_excel(r"/content/drive/MyDrive/FHA/Thesis_data/citations_analysis_final.xlsx")

# Normalize column names if needed
case_law_metadata = case_law_metadata.rename(columns={"opinion_id": "id"})

# Perform an outer join on ['id', 'opinion_id']
merged = pd.merge(
    citations_analysis_final,
    case_law_metadata,
    on="id",
    how="outer",
    indicator=True  # optional: adds a column "_merge" showing the source of each row
)

merged = merged.drop(columns=["_merge"])

label_analysis_final = pd.read_excel(r"/content/drive/MyDrive/FHA/Thesis_data/label_analysis_final.xlsx")

final_df = pd.merge(
    merged,
    label_analysis_final,
    on="id",
    how="outer",
    indicator=True
)

final_df = final_df.drop(columns=["_merge"])

import pandas as pd

# prepare lists to collect the counts
statute_counts  = []
case_law_counts = []
rules_counts    = []

for _, row in final_df[['STATUTE_codes', 'CASE_LAW_names', 'RULES_codes']].iterrows():
    # STATUTE_codes
    sc = row['STATUTE_codes']
    if pd.isna(sc):
        statute_counts.append(0)
    else:
        s_list = sc.split(';')
        # use pandas' unique instead of set
        statute_counts.append(len(pd.unique(s_list)))

    # CASE_LAW_names
    cl = row['CASE_LAW_names']
    if pd.isna(cl):
        case_law_counts.append(0)
    else:
        cl_list = cl.split(';')
        case_law_counts.append(len(pd.unique(cl_list)))

    # RULES_codes
    rc = row['RULES_codes']
    if pd.isna(rc):
        rules_counts.append(0)
    else:
        r_list = rc.split(';')
        rules_counts.append(len(pd.unique(r_list)))

# attach back to final_df
final_df['unique_STATUTE_codes']   = statute_counts
final_df['unique_CASE_LAW_names']  = case_law_counts
final_df['unique_RULES_codes']     = rules_counts


try:
  df_add = pd.read_excel("/content/output1_acts_updated.xlsx")
except FileNotFoundError:
  print("Error: File '/content/output1_acts_updated.xlsx' not found.")
  # Handle the error appropriately, e.g., exit or use a default DataFrame
  exit()

# Merge the DataFrames on 'id'
final_df = pd.merge(final_df, df_add, on='id', how='left')

final_df = final_df.drop(columns=['ACT_names'])

from google.colab import drive
drive.mount('/content/drive')

# Save the final_df DataFrame to your Google Drive
final_df.to_excel('/content/drive/MyDrive/FHA/Thesis_data/final_df.xlsx', index=False)

