In [2]:
from sqlalchemy import create_engine
import psycopg2
import arcpy
import pandas as pd
import random
import numpy as np
import json

In [3]:
elevation_csv_path = r'C:\Users\tjjoh\OneDrive\Desktop\GIS 5572\Lab3\elevation.csv'

stem_path = r'C:\Users\tjjoh\Documents\GIS5572'
gdb_name = r'Lab3GDB'
gdb_path = stem_path + r'/' + gdb_name + '.gdb'

In [4]:
#create database connection
connection_string = f'postgresql://<user>:<password>@34.133.43.30:5432/lab2'
engine = create_engine(connection_string)

#retrieve the data from PostGIS
table_name = "mn_elevation"
query = f'SELECT *, ST_AsText(geometry) AS WKT_geom FROM {table_name};'

df = pd.read_sql(query, engine) #read data with pandas
df.to_csv(elevation_csv_path, index = False) #save as csv

In [5]:
#create a new file GDB if one doesn't exist
try:
    arcpy.management.CreateFileGDB(
        out_folder_path = stem_path,
        out_name = gdb_name,
        out_version = 'CURRENT'
    )
except:
    print('GDB already exists.')

In [6]:
#set workspace
arcpy.env.workspace = gdb_path

#create a new feature class
elevation = 'elevation'
spatial_ref = arcpy.SpatialReference(4326)
arcpy.management.CreateFeatureclass(gdb_path, elevation, 'POINT', spatial_reference = spatial_ref)

#add a field for elevation to the feature class
arcpy.management.AddField(elevation, 'elevation', 'DOUBLE')

#iterate through the dataframe to populate ferature class
cursor = arcpy.da.InsertCursor(elevation, ['SHAPE@', 'elevation'])
for index, row in df.iterrows():
    geometry = arcpy.FromWKT(row['wkt_geom'])  #convert WKT to geometry
    cursor.insertRow([geometry, row['elevation']])

del cursor

In [7]:
#clear selection
arcpy.management.SelectLayerByAttribute(elevation, 'CLEAR_SELECTION')

#create training and testing datasets
training = 'training'
testing = 'testing'

#count features
total_count = int(arcpy.GetCount_management(elevation)[0])

#calculate 5% sample size
sample_size = max(1, total_count // 20)  # Ensure at least one feature is selected

#get all OBJECTIDs
oids = [row[0] for row in arcpy.da.SearchCursor(elevation, ["OID@"])]

#randomly select 5% of features
random_oids = random.sample(oids, sample_size)

#create selection query
oid_query = f"OBJECTID IN ({','.join(map(str, random_oids))})"

#select and export training data
arcpy.management.SelectLayerByAttribute(elevation, "NEW_SELECTION", oid_query)
arcpy.management.CopyFeatures(elevation, training)

#create opposite selection query
oid_query = f"OBJECTID NOT IN ({','.join(map(str, random_oids))})"

#select and export testing data
arcpy.management.SelectLayerByAttribute(elevation, "NEW_SELECTION", oid_query)
arcpy.management.CopyFeatures(elevation, testing)

In [8]:
#conduct spline interpolation on training data
spline = arcpy.ddd.Spline(
    in_point_features = training,
    z_field = 'elevation',
    out_raster = gdb_path + r'/Spline',
)

In [9]:
#conduct IDW interpolation on training data
idw = arcpy.ddd.Idw(
    in_point_features = training,
    z_field = 'elevation',
    out_raster = gdb_path + r'/IDW',
    power = 2,
)

In [10]:
#conduct Ordinary Kriging on training data
okriging = arcpy.ddd.Kriging(
    in_point_features = training,
    z_field = 'elevation',
    out_surface_raster = gdb_path + r'/OKriging',
)

In [11]:
#extract the values of the interpolated rasters to the testing data points
arcpy.sa.ExtractMultiValuesToPoints(
    in_point_features = testing,
    in_rasters = 'Spline Spline;IDW IDW;OKriging OKriging',
)

In [12]:
methods = ['Spline', 'IDW', 'OKriging']

#calculate error for each testing point for each method
for method in methods:
    arcpy.management.CalculateField(
        in_table = testing,
        field = f'error_{method}',
        expression = f'!{method}! - !elevation!',
        expression_type = 'PYTHON3',
        field_type = 'FLOAT',
    )

In [13]:
#extract errors for each method as a list
error_spline = [row[0] for row in arcpy.da.SearchCursor(testing, ['error_Spline'])]
error_idw = [row[0] for row in arcpy.da.SearchCursor(testing, ['error_IDW'])]
error_okrig = [row[0] for row in arcpy.da.SearchCursor(testing, ['error_OKriging'])]

In [14]:
#calculate rmse, mae, and maxae for each method
rmse = []
mae = []
maxae = []
for error in [error_spline, error_idw, error_okrig]:
    absolute_error = [abs(x) for x in error]
    squared_error = [x ** 2 for x in error]
    
    rmse.append(np.sqrt(np.mean(squared_error)))
    mae.append(np.mean(absolute_error))
    maxae.append(np.max(absolute_error))

In [15]:
#create a pandas dataframe for accuracy assessment
accuracy_assessment = pd.DataFrame({'Interpolation Method': methods,
                                   'Root Mean Squared Error': rmse,
                                   'Mean Absolute Error': mae,
                                   'Maximum Absolute Error': maxae
                                   })

In [16]:
accuracy_assessment

Unnamed: 0,Interpolation Method,Root Mean Squared Error,Mean Absolute Error,Maximum Absolute Error
0,Spline,14.508639,7.55684,241.30275
1,IDW,12.350837,6.678294,214.172302
2,OKriging,14.107064,7.959049,221.433472


In [17]:
# Push the DataFrame to PostGIS
table_name = 'elevation_accuracy'
accuracy_assessment.to_sql(table_name, engine, if_exists="replace", index=False)

print(f"DataFrame successfully pushed to the PostGIS table '{table_name}'.")

DataFrame successfully pushed to the PostGIS table 'elevation_accuracy'.


In [18]:
testing_geojson = stem_path + r'\Lab3\testing_points.geojson'

arcpy.conversion.FeaturesToJSON(
    in_features = testing,
    out_json_file = testing_geojson,
    geoJSON = 'GEOJSON'
)

In [19]:
with open(testing_geojson, "r") as f:
    geojson_data = json.load(f)

In [20]:
geojson_data['features'][0]

{'type': 'Feature', 'id': 1, 'geometry': {'type': 'Point', 'coordinates': [-95.99715165999999, 43.997147933000065]}, 'properties': {'OBJECTID': 1, 'elevation': 523.8542, 'Spline': 534.580933, 'IDW': 531.203308, 'OKriging': 528.686829, 'error_Spline': 10.7267323, 'error_IDW': 7.34910822, 'error_OKriging': 4.83262873}}

In [21]:
table_name = 'elevation_testing'
conn = engine.connect()
conn.execute(f"DROP TABLE IF EXISTS {table_name};")
conn.execute(f"""
    CREATE TABLE {table_name} (
        id SERIAL PRIMARY KEY,
        geom GEOMETRY,
        attributes JSONB
    );
""")

i = 0
for feature in geojson_data["features"]:
    if i < 2000:
        geom = json.dumps(feature["geometry"])  # Convert geometry to JSON
        properties = feature["properties"]  # Extract attributes

        insert_query = f"""
            INSERT INTO {table_name} (geom, attributes)
            VALUES (ST_GeomFromGeoJSON('{geom}'), '{json.dumps(properties)}')
        """
        conn.execute(insert_query)
        
        i += 1
        
    else:
        break

