`bryan@ichabod mac_bucket % for file in *_well.json; do
  databricks fs cp "$file" dbfs:/Volumes/geodata/petra/well/
done`

In [None]:
df = spark.read.format('json').load("/Volumes/geodata/petra/well/")

display(df)

In [None]:
from common.transforms import string_to_iso_date, generate_hash
import pyspark.sql.functions as F

df_flat = df.select(
    F.col("repo_id"),
    F.col("uwi.uwi").alias("uwi"),
    F.col("uwi.wsn").alias("wsn"),
    F.col("locat.lat").alias("surface_latitude"),
    F.col("locat.lon").alias("surface_longitude"),
    F.col("bhloc.lat").alias("bottom_latitude"),
    F.col("bhloc.lon").alias("bottom_longitude"),
    F.col("well.county").alias("county"),
    F.col("well.state").alias("state"),
    F.col("well.fieldname").alias("field_name"),
    F.col("well.histoper").alias("historical_operator"),
    F.col("well.label").alias("well_label"),
    F.col("well.leasename").alias("lease_name"),
    F.col("well.leasenumber").alias("lease_number"),
    F.col("well.operator").alias("operator"),
    F.col("well.prodfm").alias("producing_formation"),
    F.col("well.remarks").alias("remarks"),
    F.col("well.shortname").alias("short_name"),
    F.col("well.wellname").alias("well_name"),
    F.col("well.symcode").alias("symbol"),
    F.col("zdata.aband_date").alias("abandonment_date"),
    F.col("zflddef.active_datum").alias("active_datum"),
    F.col("zdata.active_datum_value").alias("active_datum_value"),
    F.col("zdata.comp_date").alias("completion_date"),
    F.col("zdata.cumgas").alias("cum_gas"),
    F.col("zdata.cumoil").alias("cum_oil"),
    F.col("zdata.cumwtr").alias("cum_water"),
    F.col("zdata.elev_df").alias("elev_df"),
    F.col("zdata.elev_gr").alias("elev_gr"),
    F.col("zdata.elev_kb").alias("elev_kb"),
    F.col("zdata.last_act_date").alias("last_activity_date"),
    F.col("zdata.permit_date").alias("permit_date"),
    F.col("zdata.rig_date").alias("rig_date"),
    F.col("zdata.report_date").alias("report_date"),
    F.col("zdata.spud_date").alias("spud_date"),
    F.col("zdata.td").alias('total_depth'),
    F.col("zdata.whipstock").alias("whipstock"),
    F.col("zdata.wtrdepth").alias("water_depth"),
    F.col("well.adddate").alias("app_row_created"),
    F.col("well.chgdate").alias("app_row_changed")
)

date_columns = [
    "abandonment_date",
    "completion_date",
    "last_activity_date",
    "permit_date",
    "report_date",
    "spud_date",
    "rig_date",
    "app_row_created",
    "app_row_changed"
]

df_well = df_flat


# enforce timestamp for dates
for col_name in date_columns:
    df_well = string_to_iso_date(df_well, col_name, col_name)

# add id hash
id_columns = ["repo_id", "uwi"]
df_well = generate_hash(df_well, "id", "well", *id_columns)


display(df_well)