Data was collected using my https://github.com/rbhughes/purr_petra FastAPI thing, which admittedly does a lot of opinionated "Silver" level data reconciliation before getting transformed to JSON. I will likely fork purr_petra into a simple CLI. To import the JSON files into Databricks volumes:

```
bryan@ichabod mac_bucket % for file in *_well.json; do
  databricks fs cp "$file" dbfs:/Volumes/geodata/petra/well_raw/
done
```


In [None]:
import sys

sys.path.insert(0, "../")  # Go up one directory from notebooks/ to src/

In [None]:
df = spark.read.format("json").load("/Volumes/geodata/petra/well_raw/")

display(df)

In [None]:
from shared.transforms import string_to_iso_date, generate_hash, replace_10e30_with_null
import pyspark.sql.functions as F

df_flat = df.select(
    F.col("repo_id"),
    F.col("uwi.uwi").alias("uwi"),
    F.col("uwi.wsn").alias("wsn"),
    F.col("uwi.label").alias("uwi_label"),
    F.col("uwi.sortname").alias("sortname"),
    F.col("bhloc.lat").alias("bottom_lat"),
    F.col("bhloc.lon").alias("bottom_lon"),
    F.col("locat.congress").alias("congress"),
    F.col("locat.lat").alias("surface_lat"),
    F.col("locat.lon").alias("surface_lon"),
    F.col("locat.x").alias("surface_x"),
    F.col("locat.y").alias("surface_y"),
    F.col("well.adddate").alias("app_row_created"),
    F.col("well.chgdate").alias("app_row_changed"),
    F.col("well.county").alias("county"),
    F.col("well.elev_fid").alias("elev_fid"),
    F.col("well.elev_zid").alias("elev_zid"),
    F.col("well.fieldname").alias("fieldname"),
    F.col("well.fmattd").alias("fmattd"),
    F.col("well.histoper").alias("histoper"),
    F.col("well.label").alias("well_label"),
    F.col("well.leasename").alias("leasename"),
    F.col("well.leasenumber").alias("leasenumber"),
    F.col("well.operator").alias("operator"),
    F.col("well.prodfm").alias("prodfm"),
    F.col("well.remarks").alias("remarks"),
    F.col("well.shortname").alias("shortname"),
    F.col("well.state").alias("state"),
    F.col("well.symbol").alias("symbol"),
    F.col("well.symcode").alias("symcode"),
    F.col("well.wellname").alias("wellname"),
    F.col("zdata.aband_date").alias("aband_date"),
    F.col("zdata.active_datum_value").alias("active_datum_value"),
    F.col("zdata.comp_date").alias("comp_date"),
    F.col("zdata.cumgas").alias("cumgas"),
    F.col("zdata.cumoil").alias("cumoil"),
    F.col("zdata.cumwtr").alias("cumwtr"),
    F.col("zdata.elev_df").alias("elev_df"),
    F.col("zdata.elev_gr").alias("elev_gr"),
    F.col("zdata.elev_kb").alias("elev_kb"),
    F.col("zdata.elev_seis").alias("elev_seis"),
    F.col("zdata.last_act_date").alias("last_act_date"),
    F.col("zdata.permit_date").alias("permit_date"),
    F.col("zdata.rig_date").alias("rig_date"),
    F.col("zdata.report_date").alias("report_date"),
    F.col("zdata.spud_date").alias("spud_date"),
    F.col("zdata.td").alias("td"),
    F.col("zdata.whipstock").alias("whipstock"),
    F.col("zdata.wrs_date").alias("wrs_date"),
    F.col("zdata.wtrdepth").alias("water_depth"),
    F.col("zflddef.active_datum").alias("active_datum"),
)

df_well = df_flat


# enforce timestamp for dates
for col_name in [
    "aband_date",
    "comp_date",
    "permit_date",
    "report_date",
    "spud_date",
    "rig_date",
    "wrs_date",
    "app_row_created",
    "app_row_changed",
]:
    df_well = string_to_iso_date(df_well, col_name, col_name)


# ensure real nulls
for col_name in [
    "bottom_lat",
    "bottom_lon",
    "surface_lat",
    "surface_lon",
    "surface_x",
    "surface_y",
    "active_datum_value",
    "cumgas",
    "cumoil",
    "cumwtr",
    "elev_df",
    "elev_gr",
    "elev_kb",
    "elev_seis",
    "td",
    "whipstock",
]:
    df_well = replace_10e30_with_null(df_well, col_name, col_name)


# add id hash
id_columns = ["repo_id", "uwi"]
df_well = generate_hash(df_well, "id", "well", *id_columns)


display(df_well)

In [None]:
%sql

SELECT 
    surface_lat,
    surface_lon,
    ST_Point(surface_lon, surface_lat) as geometry_point
FROM geodata.petra.well_bronze

In [None]:
%sql
DELETE FROM geodata.petra.well_bronze;

In [None]:
from shared.transforms import upsert_dataframe_to_table

result = upsert_dataframe_to_table(df_well, "geodata.petra.well_bronze")
display(result)

In [None]:
# Using this in vscode might be possible by installing spark, sedona locally,
# but they may never be as compatible as really running on the cluster. The
# best plan is to run spatial stuff on databricks


# from sedona.spark import SedonaContext
# from sedona.sql.st_constructors import ST_Point

# sedona = SedonaContext.create(spark)

# df = spark.table("geodata.petra.well_bronze")
# df_with_point = df.withColumn("point_geom", ST_Point("surface_lon", "surface_lat"))

# df_with_point.write.format("delta").mode("overwrite").option(
#     "overwriteSchema", "true"
# ).option("mergeSchema", "true").saveAsTable("geodata.petra.well_bronze")
