<a href="https://colab.research.google.com/github/elephant-xyz/notebook/blob/main/Mining_County.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#County Mining process

In [None]:
# @title Step 1: Upload .env

In [None]:
# @title Step 2: Upload seed_output.zip
import zipfile
import os
import shutil


def process_seed_output(zip_path, csv_outputs, json_files, prepared_zip_name):
    # 1. Create a temporary extraction folder
    temp_dir = "./temp_unzip"
    os.makedirs(temp_dir, exist_ok=True)

    try:
        # 2. Extract all files
        with zipfile.ZipFile(zip_path, "r") as zip_ref:
            zip_ref.extractall(temp_dir)

        # 3. Look for requested CSV files and move them out
        for csv_name in csv_outputs:
            target_file = None
            for root, _, files in os.walk(temp_dir):
                if csv_name in files:
                    target_file = os.path.join(root, csv_name)
                    break
            if target_file:
                shutil.move(target_file, csv_name)
                print(f"✔ {csv_name} extracted to current directory")
            else:
                print(f"⚠ {csv_name} not found in the archive")

        # 4. Collect JSON files and put them into prepare-input.zip
        json_paths = []
        for root, _, files in os.walk(temp_dir):
            for f in files:
                if f in json_files:
                    json_paths.append(os.path.join(root, f))

        if len(json_paths) != len(json_files):
            missing = set(json_files) - {os.path.basename(p) for p in json_paths}
            raise FileNotFoundError(f"Missing JSON files: {missing}")

        with zipfile.ZipFile(prepared_zip_name, "w") as zip_out:
            for path in json_paths:
                zip_out.write(path, arcname=os.path.basename(path))

        print(f"✔ {prepared_zip_name} created with {', '.join(json_files)}")

    finally:
        # 5. Clean up temporary files
        shutil.rmtree(temp_dir, ignore_errors=True)


# Usage: one call does everything
process_seed_output(
    "seed_output.zip",
    ["seed.csv", "seed-results.csv"],
    ["property_seed.json", "unnormalized_address.json"],
    "prepare-input.zip"
)


✔ seed.csv extracted to current directory
✔ seed-results.csv extracted to current directory
✔ prepare-input.zip created with property_seed.json, unnormalized_address.json


In [None]:
# @title Step 3: Prepare
#!pip install selenium webdriver-manager -q > /dev/null
!pip install playwright==1.46.0 > /dev/null
!python -m playwright install chromium > /dev/null

# Step 3: Prepare (updated with Selenium "Continue" click fallback)
from dataclasses import dataclass
from pathlib import Path
from typing import Any, NotRequired, TypedDict, cast

import json
import shutil
import tempfile
import zipfile
from urllib.parse import urlencode, urlparse, urlunparse, parse_qsl, quote

import requests
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# --- NEW: selenium imports ---
import os, tempfile, time, textwrap



# ---------- Types ----------
class SourceHttpRequest(TypedDict, total=False):
    method: str
    url: str
    multiValueQueryString: dict[str, Any]
    headers: dict[str, str]
    body: Any
    json: Any


class PropertySeed(TypedDict, total=False):
    parcel_id: NotRequired[str]
    request_identifier: NotRequired[str]
    source_http_request: SourceHttpRequest


@dataclass(frozen=True)
class PackagerConfig:
    input_zip: Path
    output_zip: Path
    timeout_sec: float = 30.0
    retries: int = 3
    backoff_factor: float = 1.6
    status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504)
    # NEW: enable/disable browser fallback; useful for CI
    enable_browser_fallback: bool = True


# ---------- Public API ----------
class PropertyZipPackager:
    """
    Importable packager:
      - Unpacks input ZIP with seed_output/
      - Reads seed_output/property_seed.json
      - Executes described HTTP request
      - If HTML shows a "Continue" modal, opens a tiny Selenium session to click it and re-grab the page
      - Writes response to seed_output/<id>.json if JSON, else <id>.html
      - Repackages as output ZIP
    """

    def __init__(self, config: PackagerConfig, session: Session | None = None) -> None:
        self.cfg = config
        self.session = session or self._build_session()

    def run(self) -> Path:
        self._validate_paths()

        with tempfile.TemporaryDirectory(prefix="prop_zip_packager_") as tmpdir:
            workdir = Path(tmpdir)
            self._extract_input_zip(workdir)

            seed_dir = workdir / "seed_output"
            seed = self._load_property_seed(seed_dir / "property_seed.json")

            # Fetch body as text (could be HTML or JSON)
            body_text = self._fetch_html_from_seed(seed)

            # ---- derive filename from IDs
            file_stem = self._resolve_identifier(seed)

            # Decide extension by attempting JSON parse
            try:
                parsed = json.loads(body_text)
                out_path = seed_dir / f"{file_stem}.json"
                out_path.write_text(json.dumps(parsed, ensure_ascii=False, indent=2), encoding="utf-8")
            except json.JSONDecodeError:
                out_path = seed_dir / f"{file_stem}.html"
                out_path.write_text(body_text, encoding="utf-8")

            self._write_output_zip(workdir)

        return self.cfg.output_zip

    # ---------- Helpers ----------
    def _build_session(self) -> Session:
        retry = Retry(
            total=self.cfg.retries,
            connect=self.cfg.retries,
            read=self.cfg.retries,
            status=self.cfg.retries,
            backoff_factor=self.cfg.backoff_factor,
            status_forcelist=self.cfg.status_forcelist,
            allowed_methods={"GET", "POST"},
            raise_on_status=False,
            respect_retry_after_header=True,
        )
        adapter = HTTPAdapter(max_retries=retry)
        s = requests.Session()
        s.headers.update({
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/124.0 Safari/537.36"
            )
        })
        s.mount("http://", adapter)
        s.mount("https://", adapter)
        return s

    def _validate_paths(self) -> None:
        if not self.cfg.input_zip.exists():
            raise FileNotFoundError(f"Input ZIP not found: {self.cfg.input_zip}")
        self.cfg.output_zip.parent.mkdir(parents=True, exist_ok=True)

    def _extract_input_zip(self, workdir: Path) -> None:
        with zipfile.ZipFile(self.cfg.input_zip, "r") as zf:
            zf.extractall(workdir)

        seed_dir = workdir / "seed_output"
        if not seed_dir.is_dir():
            raise ValueError("Input ZIP must contain a 'seed_output/' directory.")
        if not (seed_dir / "property_seed.json").is_file():
            raise ValueError("Missing 'seed_output/property_seed.json' in input ZIP.")

    def _load_property_seed(self, path: Path) -> PropertySeed:
        try:
            data = json.loads(path.read_text(encoding="utf-8"))
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON in {path}: {e}") from e
        if not isinstance(data, dict) or "source_http_request" not in data:
            raise ValueError("property_seed.json must include 'source_http_request'")
        return cast(PropertySeed, data)

    def _resolve_identifier(self, seed: PropertySeed) -> str:
        pid = (seed.get("parcel_id") or "").strip()
        rid = (seed.get("request_identifier") or "").strip()
        ident = pid or rid
        if not ident:
            raise ValueError("property_seed.json must include 'parcel_id' or 'request_identifier' for output filename.")
        return ident

    def _normalize_query_params(self, mvqs: dict[str, Any] | None) -> dict[str, Any]:
        if not mvqs:
            return {}
        params: dict[str, Any] = {}
        for k, v in mvqs.items():
            if isinstance(v, list) and v:
                params[k] = v[0]
            else:
                params[k] = v
        return params

    # NEW: build a full URL from base and params (for the browser path)
    def _build_url(self, base: str, params: dict[str, Any]) -> str:
        if not params:
            return base
        parsed = urlparse(base)
        existing = dict(parse_qsl(parsed.query, keep_blank_values=True))
        existing.update({k: "" if v is None else str(v) for k, v in params.items()})
        new_qs = urlencode(existing, doseq=True, quote_via=quote)
        return urlunparse(parsed._replace(query=new_qs))

    def _fetch_html_from_seed(self, seed: PropertySeed) -> str:
        # (kept name for compatibility) — returns text that might be HTML or JSON
        src = seed.get("source_http_request", {})
        method = (src.get("method") or "GET").upper()
        url = src.get("url")
        if not url:
            raise ValueError("source_http_request.url is required")

        params = self._normalize_query_params(
            cast(dict[str, Any] | None, src.get("multiValueQueryString"))
        )
        headers = cast(dict[str, str] | None, src.get("headers")) or {}
        body = src.get("body")
        json_body = src.get("json")

        headers.setdefault("Accept", "application/json, text/html;q=0.9")

        kwargs: dict[str, Any] = {"timeout": self.cfg.timeout_sec, "headers": headers}
        if method == "GET":
            kwargs["params"] = params
        elif method == "POST":
            kwargs["params"] = params
            if json_body is not None:
                kwargs["json"] = json_body
            elif body is not None:
                kwargs["data"] = body
        else:
            raise ValueError(f"Unsupported HTTP method: {method}")

        # First attempt: plain requests
        resp = self.session.request(method, url, **kwargs)
        resp.raise_for_status()
        encoding = resp.encoding or resp.apparent_encoding
        html_or_json = resp.content.decode(encoding or "utf-8", errors="replace")

        # If not HTML, just return (JSON path)
        if "application/json" in (resp.headers.get("Content-Type") or ""):
            return html_or_json

        # Detection: does it look like the modal with a continue button?
        if self.cfg.enable_browser_fallback and self._looks_like_continue_modal(html_or_json):
            # Minimal browser pass to click "Continue"
            final_html = self._fetch_via_browser_click_continue(
                full_url=self._build_url(url, params),
                timeout=self.cfg.timeout_sec,
            )
            # If the browser got us something meaningful, return it; else fall back to original
            if final_html:
                return final_html

        return html_or_json
   # NEW: simple heuristic for the “Continue” modal
    def _looks_like_continue_modal(self, html: str) -> bool:
      # noisy but robust: sees the modal container and any continue cue
      h = html.lower()
      return (("id=\"pnlissues\"" in h) or ("id='pnlissues'" in h)) and (
          "btncontinue" in h or "value=\"continue\"" in h or ">continue<" in h
      )

    def _fetch_via_browser_click_continue(self, full_url: str, timeout: float) -> str | None:
      """
      Async Playwright fallback that works even if an event loop is already running.
      - Deep logs at each step
      - Searches all frames for the Continue button
      - Clicks: normal -> JS -> __doPostBack('btnContinue','')
      - Dumps before/after PNG + HTML to a temp folder
      """
      # Local imports—avoid NameError and allow graceful fallback if Playwright missing
      try:
          import asyncio, os, tempfile, time, threading, queue
          from playwright.async_api import async_playwright, TimeoutError as PWTimeout
      except Exception as e:
          print("[fallback] Playwright (async) not available. Install with:")
          print("[fallback]   pip install playwright==1.46.0 && python -m playwright install chromium")
          print(f"[fallback] import error: {e}")
          return None

      t_ms = int(max(2000, timeout * 1000))
      issue_panel_sel = "#pnlIssues"
      continue_selectors = [
          '#pnlIssues input[name="btnContinue"]',
          '#pnlIssues input[value="Continue"]',
          '#pnlIssues button[value="Continue"]',
          '#btnContinue',  # top-level fallback
      ]
      property_markers = [
          "#parcelLabel",
          ".sectionTitle",
          "table.detailsTable",
          ".textPanel",
          "[id*='Property']",
      ]

      dbg_dir = os.path.join(tempfile.gettempdir(), f"prop_zip_debug_{int(time.time())}")
      os.makedirs(dbg_dir, exist_ok=True)

      def log(msg: str):
          print(f"[fallback] {msg}", flush=True)

      async def dump(page, name: str):
          try:
              png = os.path.join(dbg_dir, f"{name}.png")
              html = os.path.join(dbg_dir, f"{name}.html")
              await page.screenshot(path=png, full_page=True)
              with open(html, "w", encoding="utf-8") as f:
                  f.write(await page.content())
              log(f"saved debug artifacts: {png} , {html}")
          except Exception as e:
              log(f"dump error ({name}): {e}")

      async def is_visible(el) -> bool:
          try:
              box = await el.bounding_box()
              if not box:
                  return False
              disp = await el.evaluate("el => getComputedStyle(el).display")
              vis = await el.evaluate("el => getComputedStyle(el).visibility")
              return disp != "none" and vis != "hidden"
          except Exception:
              return False

      async def do_flow() -> str | None:
          async with async_playwright() as pw:
              log(f"launching browser → {full_url}")
              browser = await pw.chromium.launch(headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"])
              context = await browser.new_context(
                  user_agent=(
                      "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/124.0 Safari/537.36"
                  ),
                  viewport={"width": 1280, "height": 2000},
                  locale="en-US",
                  extra_http_headers={
                      "Accept-Language": "en-US,en;q=0.9",
                      "Accept": "text/html,application/xhtml+xml",
                  },
              )
              page = await context.new_page()

              page.on("console", lambda m: log(f"page.console[{m.type}] {m.text}"))
              page.on("pageerror", lambda e: log(f"page.error {e}"))

              await page.goto(full_url, wait_until="networkidle", timeout=t_ms)
              await dump(page, "00_after_goto")

              # Quick modal presence check
              modal_present = False
              try:
                  await page.wait_for_selector(issue_panel_sel, state="visible", timeout=8000)
                  modal_present = True
                  log("modal detected: #pnlIssues is visible")
              except PWTimeout:
                  log("modal not immediately visible; will still search for Continue and markers")

              async def find_in_any_frame(selectors):
                  # returns (frame, element_handle, selector_used) or (None, None, None)
                  for fr in page.context.pages[0].frames:
                      try:
                          fr_url = fr.url
                      except Exception:
                          fr_url = "<unknown>"
                      for sel in selectors:
                          try:
                              el = await fr.query_selector(sel)
                              if el and await is_visible(el):
                                  return fr, el, sel
                              elif el:
                                  log(f"element found but not visible in frame {fr_url} via selector: {sel}")
                          except Exception as e:
                              log(f"query error in frame {fr_url} sel {sel}: {e}")
                  return None, None, None

              # Attempt click
              clicked = False
              fr, btn, used_sel = await find_in_any_frame(continue_selectors)

              if btn:
                  await dump(page, "01_before_click")
                  try:
                      await btn.click(timeout=min(5000, t_ms))
                      clicked = True
                  except Exception as e:
                      try:
                          await fr.evaluate("(el)=>el.click()", btn)
                          clicked = True
                      except Exception as e2:
                          log(f"JS click failed: {e2}")

              # Fallback: call ASP.NET postback
              if not clicked:
                  try:
                      has_postback = await page.evaluate("() => typeof window.__doPostBack === 'function'")
                      log(f"__doPostBack present? {has_postback}")
                  except Exception as e:
                      has_postback = False
                      log(f"error checking __doPostBack: {e}")
                  if has_postback:
                      try:
                          await page.evaluate("__doPostBack('btnContinue','')")
                          clicked = True
                          log("__doPostBack('btnContinue','') invoked")
                      except Exception as e:
                          log(f"__doPostBack failed: {e}")

              # Wait for modal to hide OR property markers to appear
              condition_hit = "none"
              if clicked or modal_present:
                  try:
                      await page.wait_for_selector(issue_panel_sel, state="hidden", timeout=min(15000, t_ms))
                      condition_hit = "modal_hidden"
                  except PWTimeout:
                      for sel in property_markers:
                          try:
                              await page.wait_for_selector(sel, state="visible", timeout=4000)
                              condition_hit = f"property_marker:{sel}"
                              break
                          except PWTimeout:
                              continue

              await dump(page, f"02_after_click_{condition_hit}")

              # Summary
              summary_bits = [f"url={page.url}", f"clicked={clicked}", f"condition={condition_hit}"]
              try:
                  pnl = await page.query_selector(issue_panel_sel)
                  exists = pnl is not None
                  summary_bits.append(f"modal_node_exists={exists}")
                  if pnl:
                      summary_bits.append(f"modal_visible={await is_visible(pnl)}")
              except Exception:
                  pass


              html = await page.content()
              await context.close()
              await browser.close()
              return html

      # Run the async flow safely whether or not a loop is already running
      try:
          loop = asyncio.get_event_loop()
          if loop.is_running():
              # Run in a dedicated thread with its own event loop
              q = queue.Queue()

              def runner():
                  new_loop = asyncio.new_event_loop()
                  asyncio.set_event_loop(new_loop)
                  try:
                      res = new_loop.run_until_complete(do_flow())
                      q.put(res)
                  except Exception as e:
                      print(f"[fallback] thread runner error: {e}")
                      q.put(None)
                  finally:
                      new_loop.close()

              t = threading.Thread(target=runner, daemon=True)
              t.start()
              t.join()
              return q.get()
          else:
              return loop.run_until_complete(do_flow())
      except RuntimeError:
          # No loop: create one
          return asyncio.run(do_flow())
      except Exception as e:
          return None





    # Helper to build an XPath for a found element (stable click wait)
    def _to_xpath(self, element) -> str:
        # Selenium doesn’t expose a direct “getXPath”, so we try a simple robust path
        # If it has a name attr, prefer that
        name = element.get_attribute("name")
        if name:
            return f"//*[@name={json.dumps(name)}]"
        value = element.get_attribute("value")
        if value:
            return f"//*[@value={json.dumps(value)}]"
        # Fallback: tag/index (not ideal but acceptable here)
        tag = element.tag_name.lower()
        return f"//{tag}"

    def _write_output_zip(self, workdir: Path) -> None:
        src_dir = Path(workdir) / "seed_output"
        tmp_path = self.cfg.output_zip.with_suffix(self.cfg.output_zip.suffix + ".tmp")

        with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
            for p in sorted(src_dir.rglob("*")):
                if p.is_file():
                    arcname = p.relative_to(workdir).as_posix()
                    zf.write(p, arcname)

        shutil.move(tmp_path, self.cfg.output_zip)


# ---------- Example usage ----------
if __name__ == "__main__":
    cfg = PackagerConfig(
        input_zip=Path("seed_output.zip"),
        output_zip=Path("output.zip"),
        timeout_sec=30.0,
        enable_browser_fallback=True,  # set False to force pure-requests mode
    )
    packager = PropertyZipPackager(cfg)
    packager.run()


!unzip -q output.zip
!cd seed_output && zip -q ../prepared-county.zip ./*.html ./*.json && cd ..
!rm -rf seed_output/ output.zip


In [None]:
# @title Step 4: Upload your generated-scripts.zip

In [None]:
# @title Step 5: Transform
import subprocess
import sys

try:
    subprocess.run(
        [
            "npx", "-y", "@elephant-xyz/cli@latest", "transform",
            "--input-zip", "prepared-county.zip",
            "--output-zip", "transformed-county.zip",
            "--scripts-zip", "generated-scripts.zip"
        ],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.PIPE
    );
    print("✅ Transform done\n")
except subprocess.CalledProcessError as e:
    print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
    print(e.stderr.strip(), file=sys.stderr)
    sys.exit(e.returncode)


✅ Transform done



In [None]:
# @title Step 6: Validate
!pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_validate():
    try:
        print("Validation started")
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli@latest", "validate", "transformed-county.zip"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Validate failed, please check submit_errors.csv for details", file=sys.stderr)
            return

        print("\n✅ Validate done\n")

    except subprocess.CalledProcessError as e:
        print("❌ Validate failed, please check submit_errors.csv for details", file=sys.stderr)


if __name__ == "__main__":
    run_validate()

Validation started


❌ Validate failed, please check submit_errors.csv for details


In [None]:
# @title Step 7: Hash

!pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def get_seed_cid(path="seed-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["propertyCid"]


def run_hash():
    try:
        seed_group_cid = get_seed_cid()
        subprocess.run(
            [
                "npx", "-y", "@elephant-xyz/cli@latest",
                "hash", "transformed-county.zip",
                "--output-zip", "hashed-data.zip",
                "--output-csv", "county-results.csv",
                "--property-cid", seed_group_cid
            ],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Validate failed, please check submit_errors.csv for details", file=sys.stderr)
            return

        print("✅ Hash done\n")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_hash()



✅ Hash done



In [None]:
# @title Step 8: Upload
from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys


def get_upload_results(path="county-results.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row


def upload():
    try:
        subprocess.run(
            ["npx", "-y", "@elephant-xyz/cli@1.28.4", "upload", "hashed-data.zip", "--output-csv", "county-results.csv"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )

        upload_results = get_upload_results()

        data_cid, html_link = upload_results["dataCid"], upload_results["htmlLink"]

        print("✅ Upload done\n")
        print(f"HTML link: {html_link}\n")
        print(f"County group IPFS link: https://ipfs.io/ipfs/{data_cid}\n")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    upload()


✅ Upload done

HTML link: https://ipfs.io/ipfs/bafybeid5h2d5fftqqilum37c44omikrxviadyyq2y7nj2hv2n2ir6jsh5i

County group IPFS link: https://ipfs.io/ipfs/bafkreide37afbaresmt4kreryo533p27cqg7v5ix5o3kdpstoedonxghqu



In [None]:
# @title Step 9 (Optional): Upload your own encrypted JSON Wallet (keystore)

import os

keystore_filename = "keystore.json" # @param {"type":"string"}
password = "" # @param {"type":"string"}


if not password:
    raise Exception("password cannot be empty! Please, provide your password")
if not os.path.exists(keystore_filename):
    raise Exception(f"File {keystore_filename} not found! Please, upload your keystore file and provide it's name")


os.environ["KEYSTORE_FILENAME"] = keystore_filename
os.environ["KEYSTORE_PASSWORD"] = password

print("✅ Keystore uploaded")


In [None]:
# @title Step 10: Submit

! pip3 install python-dotenv -q

from dotenv import load_dotenv
load_dotenv()

import subprocess
import sys
import csv


def get_transaction_hash(path="transaction-status.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        first_row = next(reader, None)
        if first_row is None:
            raise ValueError("CSV file is empty")
        return first_row["transactionHash"]


def has_submit_errors(path="submit_errors.csv"):
    with open(path, newline='', encoding='utf-8') as csvfile:
        reader = csv.DictReader(csvfile)
        return next(reader, None) is not None


def run_submit_to_contract():
  if os.getenv("KEYSTORE_FILENAME"):
        submit_command = [
            "npx", "-y", "@elephant-xyz/cli@latest", "submit-to-contract", "seed-results.csv",
            "--keystore-json", os.getenv("KEYSTORE_FILENAME"),
            "--keystore-password", os.getenv("KEYSTORE_PASSWORD"),
        ]
    else:
        submit_command = [
            "npx", "-y", "@elephant-xyz/cli@latest", "submit-to-contract", "seed-results.csv",
            "--from-address", "0xefAd08946612A15d5De8D4Db7fc03556b6424075",
            "--api-key", "f7e18cf6-5d07-4e4a-ae23-f27b812614e6",
            "--domain", "oracles-69c46050.staircaseapi.com",
            "--oracle-key-id", "7ad26e0b-67c9-4c2f-95a2-2792c7db5ac7",
        ]
    try:
        subprocess.run(
            submit_command,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.PIPE,
            check=True,
            text=True,
        )
        if has_submit_errors():
            print("❌ Submit failed, please check submit_errors.csv for details", file=sys.stderr)
            return

        transaction_hash = get_transaction_hash()
        transaction_link = f"https://polygonscan.com/tx/{transaction_hash}"

        print("✅ Submit done\n")
        print(f"Transaction link: {transaction_link}")

    except subprocess.CalledProcessError as e:
        print(f"Command failed (exit code {e.returncode}):", file=sys.stderr)
        print(e.stderr.strip(), file=sys.stderr)
        sys.exit(e.returncode)


if __name__ == "__main__":
    run_submit_to_contract()


✅ Submit done

Transaction link: https://polygonscan.com/tx/0x26551cdfbf80ac026d20afc03295bb9f6c67b1ce964b518c3c92fcc00327240f


In [None]:
# @title Step 11: Download county-results.csv
import os; from google.colab import files; (files.download('county-results.csv'), print("✅ File was downloaded successfully"))[1] if os.path.exists('county-results.csv') else print("❌ File not found")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ File was downloaded successfully
