<a href="https://colab.research.google.com/github/rpa-KMG/Gen-Data-AI/blob/main/GenData_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

***INSTALLATION OF FLASK NGROK***

In [None]:

# @title
!pip install flask openpyxl pyngrok

 ***GENDATA AI- SOURCE CODE***


In [None]:
# @title
# ======================= IMPORTS =======================

from flask import Flask, request, send_file, render_template_string
from difflib import SequenceMatcher
from datetime import datetime
from openpyxl import load_workbook
from pyngrok import ngrok
import random
import pandas as pd
import json
import tempfile
import os
import re
import random
from google.colab import drive
drive.mount('/content/drive')
from openpyxl.utils.cell import coordinate_to_tuple

def load_location_master(schema):
    loc = schema.get("LocationMaster")
    if not loc:
        return None

    path = loc.get("source")

    if not os.path.isabs(path):
        path = os.path.join(SCHEMA_DIR, path)

    if path.lower().endswith(".xlsx"):
        df = pd.read_excel(path)
    else:
        df = pd.read_csv(path)

    return df.to_dict(orient="records")

def is_merged_follower_cell(ws, cell):
    for merged_range in ws.merged_cells.ranges:
        if cell.coordinate in merged_range:
            # top-left cell of merged range
            min_row, min_col, _, _ = merged_range.bounds
            if (cell.row, cell.column) != (min_row, min_col):
                return True
    return False


# ======================= HTML =======================

HTML_FORM = """
<!doctype html>
<html>
<head>
  <title>GenData AI</title>
  <style>
    body {
      font-family: Arial;
      background:#f4f6f9;
      display:flex;
      justify-content:center;
      align-items:center;
      height:100vh;
      margin:0;
    }
    .box {
      background:white;
      padding:30px;
      border-radius:10px;
      width:420px;
      box-shadow:0 4px 10px rgba(0,0,0,.1);
    }
    h2 { text-align:center; margin-bottom:5px; }
    h4 { text-align:center; margin:0 0 20px 0; font-weight:normal; color:#555; }

    form {
      display:flex;
      flex-direction:column;
      gap:14px;   /* single source of spacing */
    }

    label {
      font-weight:bold;
      margin:0;
    }

    input, select {
      width:100%;
      padding:10px;
      border-radius:5px;
      border:1px solid #ccc;
      font-size:14px;
      box-sizing:border-box;
    }

    button {
      width:100%;
      padding:12px;
      background:#4CAF50;
      color:white;
      border:none;
      border-radius:5px;
      font-size:15px;
      cursor:pointer;
      margin-top:10px;
    }

    button:hover {
      background:#43a047;
    }
  </style>
</head>

<body>
<div class="box">
  <h2>GenData AI</h2>
  <h4>Your smart test data engineer — one Excel away</h4>
  {% if error %}
    <div style="
      background:#ffe6e6;
      color:#b30000;
      padding:10px;
      border-radius:5px;
      text-align:center;
      font-size:14px;
    ">
      {{ error }}
    </div>
    {% endif %}
  <form method="POST" action="/generate" enctype="multipart/form-data">

    <label>Application Name</label>
    <select name="appname" required>
      <option value="" disabled selected>Select Application</option>
      <option value="Default">Default</option>
      <option value="AQ">AQ</option>
      <option value="IMS">IMS</option>
      <option value="SOV">SOV</option>
    </select>

    <label>Upload Excel Template</label>
    <input type="file" name="excel" required>

    <label>Number of Records</label>
    <input type="number" name="records" min="1" value="5">

    <button type="submit">Generate Test Data</button>

  </form>
</div>
</body>
</html>
"""

# ======================= CONFIG =======================

NGROK_TOKEN = "38sfJj9wqC1283b2W7BRfNhyRFV_7cc4r8GdBXMKj6kks3VQ5"
SCHEMA_DIR = "/content/drive/MyDrive/Colab Notebooks"



# ======================= LOAD SCHEMAS =======================

def load_schema(appname):
    for file in os.listdir(SCHEMA_DIR):
        if file.lower() == f"{appname.lower()}.json":
            with open(os.path.join(SCHEMA_DIR, file), "r") as f:
                return json.load(f)
    return None

def load_all_schemas():
    schemas = []
    for file in os.listdir(SCHEMA_DIR):
        if file.endswith(".json"):
            with open(os.path.join(SCHEMA_DIR, file), "r") as f:
                schemas.append(json.load(f))
    return schemas

# ======================= DATA GENERATOR =======================

from datetime import datetime, timedelta
import random

def generate_value(rule, row_index, row_context):
    gen = rule.get("generation", "random")
    if gen == "derived_from_location":
      location = row_context.get("__location__")
      if not location:
          return None

      return location.get(rule["field"])


    dt = rule.get("datatype")

    # ========= SEQUENCE =========

    if gen == "sequence":
        return rule.get("start", 1) + (row_index * rule.get("step", 1))

    if gen == "sequence_text":
        prefix = rule.get("prefix", "")
        return f"{prefix}{row_index + 1}"

    # ========= FIXED =========

    if gen == "fixed":
        return rule.get("value")
    if gen == "current_date":
        fmt = rule.get("format", "%m/%d/%Y")
        return datetime.today().strftime(fmt)

    if gen == "conditional_random":
       condition = rule.get("condition")
       if condition:
           parent, _, value = condition.partition("!=")
           parent = parent.strip()
       if row_context.get(parent) in [None, "", value.strip().strip("'")]:
              return ""
       return random.choice(rule["values"])

    # ========= RANDOM =========


    if gen in ["random", "optional_random"]:

        #Random string from values list
        if dt == "string" and "values" in rule:
            return random.choice(rule["values"])

        #Integer
        if dt == "integer":
            return random.randint(
                rule.get("min_value", 0),
                rule.get("max_value", 100)
            )

        #Currency / Decimal
        if dt in ["currency", "decimal"]:
            val = random.uniform(
                rule.get("min_value", 0),
                rule.get("max_value", 100)
            )
            return round(val, 2)

        #Percentage (list OR range)
        if dt == "percentage":
            if "values" in rule:
                return random.choice(rule["values"])
            return round(random.uniform(
                rule.get("min_value", 0),
                rule.get("max_value", 100)
            ), 2)


        #Random Date
        if dt == "date":
            start = datetime.strptime(rule["min_date"], "%Y-%m-%d")
            end = datetime.strptime(rule["max_date"], "%Y-%m-%d")
            delta_days = (end - start).days
            return start + timedelta(days=random.randint(0, delta_days))

    # ========= CALCULATED / DERIVED =========

    if gen in ["calculated", "derived"]:
        if "formula_map" in rule:
            parent = rule.get("depends_on")
            parent_val = row_context.get(parent)
            return rule["formula_map"].get(parent_val, "")
        if "map" in rule:
            parent = rule.get("depends_on")
            parent_val = row_context.get(parent)
            return rule["map"].get(parent_val, "")
        formula = rule.get("formula")
        if not formula:
            return None

            # ========= DATE DERIVED SUPPORT =========

        if dt == "date" and "add_years" in formula:
            match = re.search(r"add_years\((.*?),\s*(\d+)\)", formula)
            if match:
                base_col = match.group(1).strip()
                years = int(match.group(2))

                base_date = row_context.get(base_col)

                if not base_date:
                    return ""   # nothing to derive from

                if isinstance(base_date, str):
                    base_date = datetime.strptime(base_date, "%m/%d/%Y")

                try:
                    new_date = base_date.replace(year=base_date.year + years)
                except:
                    return ""

                fmt = rule.get("format", "%m/%d/%Y")
                return new_date.strftime(fmt)


                fmt = rule.get("format", "%m/%d/%Y")
                return new_date.strftime(fmt)
            if dt == "date" and formula in row_context:
               base_date = row_context.get(formula)
               fmt = rule.get("format", "%m/%d/%Y")

               if isinstance(base_date, str):
                  base_date = datetime.strptime(base_date, fmt)

               return base_date.strftime(fmt)


        if gen == "conditional_random":
            condition = rule.get("condition")
            if condition:
                parent, _, value = condition.partition("!=")
                parent = parent.strip()
                if row_context.get(parent) in [None, "", value.strip().strip("'")]:
                    return ""
            return random.choice(rule["values"])

        # ----- TEXT FORMULA (Address / Location) -----
        if any(ch.isalpha() for ch in formula) and "'" in formula:
            try:
                return eval(f"f'''{formula}'''", {}, row_context)
            except:
                return ""

        # ----- SAFE NUMERIC FORMULA -----
        safe = formula

        for k, v in row_context.items():
            if v is None:
                continue

            if isinstance(v, (int, float)) or str(v).replace('.', '', 1).isdigit():
                safe = re.sub(rf'\b{re.escape(k)}\b', str(v), safe)

        try:
            return round(eval(safe), 2)
        except:
            return ""




# ======================= EXCEL PROCESSOR =======================

def header_match(excel_header, schema_header):
    if not excel_header or not schema_header:
        return False

    e = normalize(excel_header)
    s = normalize(schema_header)

    return normalize(excel_header) == normalize(schema_header)


    # similarity score
    similarity = SequenceMatcher(None, a, b).ratio()

    return similarity > 0.75


def normalize(text):
    if text is None:
        return ""

    text = str(text).lower()

    # remove excel line breaks
    text = text.replace("\n", " ")

    # remove column letters like C D E H at start
    text = re.sub(r'^[a-z]\s+', '', text)

    # remove symbols
    text = re.sub(r"[^a-z0-9]", "", text)

    return text


def find_header_row(ws):
          best_row = None
          best_streak = 0

          for r in range(1, ws.max_row + 1):
              streak = 0
              max_streak_in_row = 0

              for c in range(1, ws.max_column + 1):
                  val = ws.cell(r, c).value

                  if isinstance(val, str) and val.strip() != "":
                      streak += 1
                      max_streak_in_row = max(max_streak_in_row, streak)
                  else:
                      streak = 0

              # Pick row with strongest continuous headers
              if max_streak_in_row > best_streak:
                  best_streak = max_streak_in_row
                  best_row = r

          return best_row

def build_headers(ws, header_row):
    headers = []

    for c in range(1, ws.max_column + 1):
        top = ws.cell(header_row, c).value
        bottom = ws.cell(header_row + 1, c).value

        # if bottom looks like calculation or empty → don't merge
        if bottom and isinstance(bottom, str):
            clean = bottom.lower()

            if any(x in clean for x in ["+", "-", "=", "to", "and", "or"]):
                headers.append(top)
                continue

        if top and bottom:
            headers.append(f"{top} {bottom}")
        elif top:
            headers.append(top)
        elif bottom:
            headers.append(bottom)
        else:
            headers.append(None)

    return headers

def has_multirow_header(ws, row):
    filled = 0
    for c in range(1, ws.max_column + 1):
        if ws.cell(row+1, c).value not in (None, ""):
            filled += 1
    return filled > 3

def is_multirow_header(ws, row):
    filled_next = 0
    for c in range(1, ws.max_column + 1):
        if ws.cell(row+1, c).value not in (None, ""):
            filled_next += 1
    return filled_next > 3

def generate_excel(input_path, output_path, records, appname):

    # Load schemas
    if appname != "Default":
        schema = load_schema(appname)
        schemas = [schema] if schema else []
    else:
        schemas = load_all_schemas()

    # LOAD LOCATION MASTER
    location_data = None
    if schemas and schemas[0]:
        location_data = load_location_master(schemas[0])

    wb = load_workbook(input_path)
    dataset_locations = {}

    for i in range(records):
        if location_data:
            dataset_locations[i] = random.choice(location_data)

    for sheet in wb.sheetnames:
        ws = wb[sheet]

        # ---- Detect header row ----
        header_row = find_header_row(ws)

        if not header_row:
            continue

        # ---- Read headers ----
        headers = []

        for c in range(1, ws.max_column + 1):
            val = ws.cell(header_row, c).value

            # always trust only the main header row
            headers.append(val)




        print("Detected headers:", headers)

        # ---- Find first empty row after header ----
        start_row = header_row + 1


        # ======================= DATA GENERATION LOOP =======================


        for r in range(start_row, start_row + records):
            row_context = {}

            if location_data:
                row_context["__location__"] = dataset_locations[r - start_row]

            # -------- PASS 1 --------
            for c, header in enumerate(headers, start=1):
                if not header:
                    continue

                rule = None
                for schema in schemas:
                    for col_name, col_rule in schema.get("columns", {}).items():
                        if header_match(header, col_name):
                            rule = col_rule
                            break
                    if rule:
                        break

                if not rule or rule.get("generation") in ["calculated", "derived"]:
                    continue

                cell = ws.cell(r, c)
                if is_merged_follower_cell(ws, cell):
                    continue

                value = generate_value(rule, r - start_row, row_context)
                row_context[header] = value
                cell.value = value


            # -------- PASS 2 --------
            for c, header in enumerate(headers, start=1):
                if header in row_context:
                    continue

                rule = None
                for schema in schemas:
                    for col_name, col_rule in schema.get("columns", {}).items():
                        if header_match(header, col_name):
                            rule = col_rule
                            break
                    if rule:
                        break

                if not rule or rule.get("generation") not in ["calculated", "derived"]:
                    continue

                cell = ws.cell(r, c)
                if is_merged_follower_cell(ws, cell):
                    continue

                value = generate_value(rule, r - start_row, row_context)
                row_context[header] = value
                cell.value = value




    wb.save(output_path)


# ======================= FLASK =======================

app = Flask(__name__)

@app.route("/", methods=["GET"])
def home():
    return render_template_string(HTML_FORM)

@app.route("/generate", methods=["POST"])
def generate():
    appname = request.form["appname"]
    records = int(request.form["records"])
    file = request.files["excel"]

    filename = file.filename.lower()

    # Appname vs file validation
    if appname != "Default":
        APP_FILE_KEYWORDS = {
        "IMS": ["ims"],
        "SOV": ["sov", "schedule of value", "schedule-of-value"],
        "Quincy": ["quincy"],
        "AQ": ["aq"]
    }

    if appname != "Default":
        keywords = APP_FILE_KEYWORDS.get(appname, [])
        if not any(k in filename for k in keywords):
            return render_template_string(
                HTML_FORM,
                error=f"Selected application '{appname}' does not match uploaded file '{file.filename}'."
            )


    with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as inp:
        file.save(inp.name)
        input_path = inp.name

    original_name = os.path.splitext(file.filename)[0]
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    output_filename = f"{original_name}_Output_{timestamp}.xlsx"
    output_path = os.path.join(tempfile.gettempdir(), output_filename)


    generate_excel(input_path, output_path, records, appname)

    return send_file(
    output_path,
    as_attachment=True,
    download_name=output_filename
)


# ======================= NGROK =======================

ngrok.set_auth_token(NGROK_TOKEN)
public_url = ngrok.connect(5000)
print("Public URL:", public_url)
print(os.listdir(SCHEMA_DIR))

print("Schema directory files:", os.listdir(SCHEMA_DIR))

test_schema = load_schema("SOV")
if test_schema:
    print("SOV schema loaded")
    print("SOV columns count:", len(test_schema.get("columns", {})))
else:
    print("SOV schema NOT loaded")

app.run(port=5000)