In [64]:
import arcpy
import requests
import os
import psycopg2
import random
from pathlib import Path

In [65]:
# Set workspace
os.chdir(r'E:\ArcGIS_2\Lab3')
wksp = os.getcwd()

## 1. Data preparation

In [66]:
# Retrieve elevation data from PostGIS database
arcpy.management.MakeQueryLayer(
    input_database=os.path.join(wksp, ""),
    out_layer_name="elevation",
    query="SELECT * FROM elevation;",
    oid_fields="pointid",
    shape_type="POINT",
    srid="4326",
    spatial_reference='GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]];-400 -400 1000000000;-100000 10000;-100000 10000;8.98315284119521E-09;0.001;0.001;IsHighPrecision',
    spatial_properties="DEFINE_SPATIAL_PROPERTIES",
    m_values="DO_NOT_INCLUDE_M_VALUES",
    z_values="DO_NOT_INCLUDE_Z_VALUES",
    extent="0 0 0 0"
)

In [67]:
# Create a copy of the elevation as a shapefile in the workspace
arcpy.management.CopyFeatures(
    in_features="elevation",
    out_feature_class=os.path.join(wksp, "elevation.shp"),
    config_keyword="",
    spatial_grid_1=None,
    spatial_grid_2=None,
    spatial_grid_3=None
)

In [68]:
# Input point dataset to be sampled
elev_data = "elevation.shp"

# Output point layers for elevation and reference samples
elev_sample = "elev_sample.shp"
ref_sample = "ref_sample.shp"

The percentage of data to sample is set to 1. Given the density of available elevation points, 1 percent of data will give enough data to interpolate but avoid overfitting the data. It will also shorten processing time for accuracy assessment.

In [69]:
# Sample percentage 
sample_pct = 1

# Get the number of features in the input shapefile
num_features = int(arcpy.GetCount_management(elev_data).getOutput(0))
ref_num_features = int(arcpy.GetCount_management(elev_data).getOutput(0))  # reference sample

# Calculate the number of features to sample
num_sample = int((sample_pct / 100) * num_features)
ref_num_sample = int((sample_pct / 100) * ref_num_features)

In [70]:
# Create a list of random feature IDs to select for elevation sample
id_list = random.sample(range(1, num_features+1), num_sample)

# Create a list of random feature IDs to select for reference sample
ref_id_list = random.sample(range(1, ref_num_features+1), ref_num_sample)

# Convert the lists to sets and find the common points. 
common_elements = set(id_list) & set(ref_id_list)

# Remove common points from reference sample list
ref_id_list = [item for item in ref_id_list if item not in common_elements]


In [71]:
# Convert the lists to comma-separated strings
id_str = ",".join(str(x) for x in id_list)
ref_id_str = ",".join(str(x) for x in ref_id_list)

# Create a SQL query to select the randomly sampled features for elevation sample
sql = '"FID" IN ({})'.format(id_str)

# Create a SQL query to select the randomly sampled features for reference sample
refsql = '"FID" IN ({})'.format(ref_id_str)

# Use the Select_analysis tool to select the features and write them to new shapefiles
arcpy.Select_analysis(elev_data, elev_sample, sql)
arcpy.Select_analysis(elev_data, ref_sample, refsql)

## 2. Interpolation

In [72]:
# Interpolate the elevation using 3 methods: IDW, Kriging, and GPI

arcpy.ddd.Idw(
    in_point_features=elev_sample,
    z_field="grid_code",
    out_raster=os.path.join(wksp, "IDW_DEM.tif"),
    cell_size=0.1,
    power=2,
    search_radius="VARIABLE 12",
    in_barrier_polyline_features=None
)

arcpy.ddd.Kriging(
    in_point_features=elev_sample,
    z_field="grid_code",
    out_surface_raster=os.path.join(wksp, "Kriging_DEM.tif"),
    semiVariogram_props="Spherical 0.021245 # # #",
    cell_size=0.1,
    search_radius="VARIABLE 12",
    out_variance_prediction_raster=None
)

arcpy.ga.GlobalPolynomialInterpolation(
    in_features=elev_sample,
    z_field="grid_code",
    out_ga_layer=None,
    out_raster=os.path.join(wksp, "GPI_DEM.tif"),
    cell_size=0.1,
    power=1,
    weight_field=None
)

## 3. Accuracy assessment

In [75]:
def accuracy_assessment (raster, validation_data):
    """
    Calculate the RMSE of the interpolations by comparing the values to the validation data
    
    Input:
    - raster: interpolation method
    - validation_data: ground truth 
    
    """
        
    # Output name and path of the shapefile comparing ground truth vs classified
    output_acc = 'Acc_' + Path(raster).stem + '.shp'
    acc_table = os.path.join(wksp, output_acc)
    
    # Output name and path of the table that saves the RMSE for each interpolation
    output_stat = 'Acc_' + Path(raster).stem + '_stat.dbf'
    stat_table = os.path.join(wksp, output_stat)
    
    # Extract the predicted values and save them to the validation data's attribute table
    arcpy.sa.ExtractValuesToPoints(
        in_point_features=validation_data,
        in_raster=raster,
        out_point_features=acc_table,
        interpolate_values="NONE",
        add_attributes="ALL"
    )
    
    # Rename the default fields 
    arcpy.management.CalculateField(
        in_table=acc_table,
        field="GrndTruth",
        expression="!grid_code!",
        expression_type="PYTHON3",
        code_block="",
        field_type="FLOAT",
        enforce_domains="NO_ENFORCE_DOMAINS"
    )
    arcpy.management.CalculateField(
        in_table=acc_table,
        field="Classified",
        expression="!RASTERVALU!",
        expression_type="PYTHON3",
        code_block="",
        field_type="FLOAT",
        enforce_domains="NO_ENFORCE_DOMAINS"
    )
    arcpy.management.DeleteField(
        in_table=acc_table,
        drop_field="grid_code;RASTERVALU",
        method="DELETE_FIELDS"
    )
    
    # Calculate the squared error
    arcpy.management.CalculateField(
        in_table=acc_table,
        field="Sq_error",
        expression="math.pow(!GrndTruth! - !Classified!, 2)",
        expression_type="PYTHON3",
        code_block="",
        field_type="FLOAT",
        enforce_domains="NO_ENFORCE_DOMAINS"
    )
    
    # Create a new statistic table and calculate RMSE
    arcpy.analysis.Statistics(
        in_table=acc_table,
        out_table=stat_table,
        statistics_fields="Sq_error SUM",
        case_field=None,
        concatenation_separator=""
    )    
    arcpy.management.CalculateField(
        in_table=stat_table,
        field="RMSE",
        expression="math.sqrt(!SUM_Sq_err! / !FREQUENCY!)",
        expression_type="PYTHON3",
        code_block="",
        field_type="FLOAT",
        enforce_domains="NO_ENFORCE_DOMAINS"
    )

In [76]:
# Create lists with the raster names of the interpolations with and without extension
interpolations = ['IDW_DEM.tif', 'Kriging_DEM.tif', 'GPI_DEM.tif']
interpolators  = ['IDW_DEM', 'Kriging_DEM', 'GPI_DEM']

# Run the accuracy assesment for each interpolation
for i in range(len(interpolations)):
    accuracy_assessment(interpolations[i], elev_sample)

# Merge the accuracy tables     
arcpy.management.Merge(
    inputs="Acc_IDW_DEM_stat.dbf;Acc_Kriging_DEM_stat.dbf;Acc_GPI_DEM_stat.dbf",
    output="Accuracy_assessment_DEM.dbf",
    field_mappings='Interpolat "Interpolat" true true false 255 Text 0 0,First,#;FREQUENCY "FREQUENCY" true true false 10 Long 0 10,First,#,Acc_IDW_stat,FREQUENCY,-1,-1,Acc_Kriging_stat,FREQUENCY,-1,-1,Acc_GPI_stat,FREQUENCY,-1,-1;SUM_Sq_err "SUM_Sq_err" true true false 19 Double 0 0,First,#,Acc_IDW_stat,SUM_Sq_err,-1,-1,Acc_Kriging_stat,SUM_Sq_err,-1,-1,Acc_GPI_stat,SUM_Sq_err,-1,-1;RMSE "RMSE" true true false 13 Float 0 0,First,#,Acc_IDW_stat,RMSE,-1,-1,Acc_Kriging_stat,RMSE,-1,-1,Acc_GPI_stat,RMSE,-1,-1',
    add_source="NO_SOURCE_INFO"
)

# Update the merged table with the name of each interpolator
with arcpy.da.UpdateCursor("Accuracy_assessment_DEM.dbf", ['Interpolat']) as cursor:
    for i, row in enumerate(cursor):
        if i < len(interpolators):
            row[0] = interpolators[i]
        else:
            break
        cursor.updateRow(row)

# Delete the cursor to release locks on the data
del cursor

In [77]:
# Find the interpolator with the lowest RMSE
methods = {}
fields = ["Interpolat", "RMSE"]
with arcpy.da.SearchCursor('Accuracy_assessment_DEM.dbf', fields) as cursor:
    for row in cursor:
        methods[row[0]] = row[1]

best_interpolator = min(methods, key=methods.get)

In [78]:
# Clip interpolation to MN borders 
output_clip = os.path.join(wksp, best_interpolator + '_mn.tif')
out_raster = arcpy.sa.ExtractByMask(
    in_raster=best_interpolator+'.tif',
    in_mask_data="minnesota.shp",
    extraction_area="INSIDE",
    analysis_extent='-97.239102895829 43.499445217943 -89.6516983029999 49.0583312990001 GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]'
)
out_raster.save(output_clip)

# Convert raster to point shapefile
output_point_shp = os.path.join(wksp, best_interpolator + '.shp')
arcpy.conversion.RasterToPoint(
    in_raster=output_clip,
    out_point_features=output_point_shp,
    raster_field="Value"
)

## 4. Save to PostGIS database

In [79]:
# Connect to PostGIS database
connection = psycopg2.connect(host = '',
                              port = '',
                              database = '',
                              user = '',
                              password = '',
                             )

### 4.1. Accuracy assessment table

In [80]:
# Path and fields of the data to load to the database
data = os.path.join(wksp, "Accuracy_assessment_DEM.dbf")
fields = ["OID", "Interpolat", "FREQUENCY", "SUM_Sq_err", "RMSE"]

# Create SQL table
cursor = connection.cursor()
cursor.execute("DROP TABLE IF EXISTS accuracy_assessment_dem")
cursor.execute("""
    CREATE TABLE accuracy_assessment_dem (
        OID INT,
        Interpolat VARCHAR,
        FREQUENCY INT,
        SUM_Sq_err DOUBLE PRECISION,
        RMSE DOUBLE PRECISION)
""")

# Populate table
with arcpy.da.SearchCursor(data, fields) as da_cursor:
    for row in da_cursor:
        cursor.execute("INSERT INTO accuracy_assessment_dem (OID, Interpolat, FREQUENCY, SUM_Sq_err, RMSE) VALUES (%s, %s, %s, %s, %s)", (row[0], row[1], row[2], row[3], row[4]))

connection.commit()

### 4.2. Best interpolation

In [81]:
# Create table name with the name of the best interpolator
point_table = best_interpolator.lower()

fields = ["pointid", "grid_code", "Shape@WKT"]

# Create SQL table
cursor = connection.cursor()
cursor.execute(f"DROP TABLE IF EXISTS {point_table}")
cursor.execute(f"""
    CREATE TABLE {point_table} (
        pointid INT,
        grid_code DOUBLE PRECISION)
""")

cursor.execute(f"""
    SELECT AddGeometryColumn('{point_table}', 'geom', 4326, 'POINT', 2)
""")

# Populate table
with arcpy.da.SearchCursor(output_point_shp, fields) as da_cursor:
    for row in da_cursor:
        wkt = row[2]
        cursor.execute(f"INSERT INTO {point_table} (pointid, grid_code, geom) VALUES (%s, %s, ST_GeomFromText(%s, 4326))", (row[0], row[1], wkt))

connection.commit()

### 4.3. Error table for best interpolation

In [82]:
# Path of the shapefile
data = os.path.join(wksp, 'Acc_' + best_interpolator + '.shp')

# Create the table name to use in PostGIS database
table_name = best_interpolator.lower() + '_error_estimation'

fields = ["GrndTruth", "Classified", "Sq_error", "Shape@WKT"]

# Create SQL table
cursor = connection.cursor()
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
cursor.execute(f"""
    CREATE TABLE {table_name} (
        GrndTruth DOUBLE PRECISION,
        Classified DOUBLE PRECISION,
        Sq_error DOUBLE PRECISION)
""")

cursor.execute(f"""
    SELECT AddGeometryColumn('{table_name}', 'geom', 4326, 'POINT', 2)
""")

# Populate table
with arcpy.da.SearchCursor(data, fields) as da_cursor:
    for row in da_cursor:
        wkt = row[3]
        cursor.execute(f"INSERT INTO {table_name} (GrndTruth, Classified, Sq_error, geom) VALUES (%s, %s, %s, ST_GeomFromText(%s, 4326))", (row[0], row[1], row[2], wkt))

connection.commit()

In [83]:
# Close database connection
connection.close()