<a href="https://colab.research.google.com/github/puchkarev/topomap/blob/main/Topological_Map_Generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Author: Victor Puchkarev

Copyright: 2024

This colab pulls the borders and elevation data, and puts together a series of stl files for printing on a 3d printer.

**Note to pull elevation data please follow the steps here to enable the Google Maps elevation API and include the key into "secrets" section of your cloab
environment under "maps_api_key":**

https://developers.google.com/maps/documentation/elevation/start

The data here is mostly handled in meters, but printers assume units in gcl
are in mm, so consider that in scaling.

The pipeline proceeds as follows:

1. Dependency install (rerun every time you restart the colab)
2. Configuration And Function Definitions (rerun every time you restart the colab or change settings).
3. Fetch Border as Polygons & Generate Sample Points (needed once to configure the samples that will be used)
4. Fetching the elevation data for points from database (can be interrupted and restarted as needed)
5. Generate the stl file

In [None]:
# @title Dependency install (Required)
!pip install numpy-stl

In [10]:
# @title Configuration And Function Definition (Required)
from google.colab import drive, userdata
import numpy as np
import math
import matplotlib.pyplot as plt
import os
from pathlib import Path
import requests
from shapely.geometry import Point, Polygon
from scipy.interpolate import griddata
from scipy.spatial import KDTree
import sqlite3
import statistics
from stl import mesh
import sys
import time

# Parameters
borders = "California" # @param {type:"string"}
sampling_distance_meters = 2500 # @param {type:"number"}
conversion_longitude_degrees = -120 # @param {type:"number"}
conversion_latitude_degrees = 36 # @param {type:"number"}
project_path = "/content/drive/MyDrive/TopologicalMap/" # @param {type:"string"}

max_elements_per_elevation_query = 450 # @param {type:"number"}

model_x_scale = 0.0001 # @param {type:"number"}
model_y_scale = 0.0001 # @param {type:"number"}
model_z_scale = 0.001 # @param {type:"number"}

split_blocks_x = 1 # @param {type:"number"}
split_blocks_y = 1 # @param {type:"number"}

# Path Configuration
if project_path.startswith("/content/drive"):
  drive.mount('/content/drive')

base_path = os.path.join(project_path, borders, "sample_" + str(sampling_distance_meters) + "m_base_" + str(conversion_longitude_degrees) + "_" + str(conversion_latitude_degrees))
Path(base_path).mkdir(parents=True, exist_ok=True)

database_path = os.path.join(base_path, "elevation_samples.db")
print("database_path:", database_path)

mesh_folder = os.path.join(base_path, "stl")
Path(mesh_folder).mkdir(parents=True, exist_ok=True)
mesh_path = os.path.join(mesh_folder, "scale_" + str(model_x_scale) + "_" + str(model_y_scale) + "_" + str(model_z_scale))
print("mesh_path:", mesh_path)

earth_radius_meters = 6357000
empty_altitude = -1000000000

# Function definition

def PlotPolygon(polygon):
  x, y = polygon.exterior.xy
  plt.plot(x, y)

def PlotPoints(points):
  xs = [point.x for point in points]
  ys = [point.y for point in points]
  plt.scatter(xs, ys, s=1)

def DegToRad(deg):
  return deg * 2.0 * math.pi / 360.0

def RadToDeg(rad):
  return rad * 360.0 / (2.0 * math.pi)

def ConvertLatLngToMeters(pt):
  lon = pt.x
  lat = pt.y
  x_m = DegToRad(lon - conversion_longitude_degrees) * earth_radius_meters * math.cos(DegToRad(lat))
  y_m = DegToRad(lat - conversion_latitude_degrees) * earth_radius_meters
  return Point(x_m, y_m)

def ConvertMetersToLatLng(pt):
  x_m = pt.x
  y_m = pt.y
  lat = RadToDeg(y_m / earth_radius_meters) + conversion_latitude_degrees
  lon = RadToDeg(x_m / (earth_radius_meters * math.cos(DegToRad(lat)))) + conversion_longitude_degrees
  return Point(lon, lat)

def ConvertPolygonLatLngToMeters(polygon):
  xs_deg, ys_deg = polygon.exterior.coords.xy
  xs_m = []
  ys_m = []
  for (x_lat, y_lat) in zip(xs_deg, ys_deg):
     pt_m = ConvertLatLngToMeters(Point(x_lat, y_lat))
     xs_m.append(pt_m.x)
     ys_m.append(pt_m.y)
  return Polygon(zip(xs_m,ys_m))

def ConvertPolygonMetersToLatLng(polygon):
  xs_m, ys_m = polygon.exterior.coords.xy
  xs_deg = []
  ys_deg = []
  for (x_m, y_m) in zip(xs_m, ys_m):
     pt_deg = ConvertMetersToLatLng(Point(x_m, y_m))
     xs_deg.append(pt_deg.x)
     ys_deg.append(pt_deg.y)
  return Polygon(zip(xs_deg,ys_deg))

def ConvertElevationPointsToDistanceBased(elevation_points_deg):
  return [(pt[0], pt[1], ConvertLatLngToMeters(Point(pt[3], pt[2])).y, ConvertLatLngToMeters(Point(pt[3], pt[2])).x, pt[4]) for pt in elevation_points_deg]

def SamplePolygon(polygon, x_step, y_step):
  xs, ys = polygon.exterior.coords.xy
  x_min = min(xs)
  x_max = max(xs)
  y_min = min(ys)
  y_max = max(ys)

  X,Y = np.mgrid[x_min:x_max:x_step, y_min:y_max:y_step]
  unfiltered_xy = np.vstack((X.flatten(), Y.flatten())).T
  print("generated: ", len(unfiltered_xy), "samples")
  unfiltered_points = [Point(xy) for xy in unfiltered_xy]
  print("converted")
  filtered = [pt for pt in unfiltered_points if pt.within(polygon)]
  print("filtered to: ", len(filtered), "samples")

  return filtered

def FetchStateBorders(name):
  # Data source for state borders
  boundaries_url = "https://public.opendatasoft.com/api/explore/v2.1/catalog/datasets/us-state-boundaries/records"
  boundaries_query_params = {
      "select": "st_asgeojson",
      "refine" : "name:\"" + name + "\""
      }

  # Fetch the borders
  boundaries_uri = boundaries_url + "?" + "&".join("%s=%s" % (k, v) for k, v in boundaries_query_params.items())
  boundaries_response = requests.get(boundaries_uri)

  # Verify that we got a valid response
  if boundaries_response.status_code != 200:
    raise ValueError("Got an error when fetching data" + str(boundaries_response.status_code))

  # Parse the json
  boundaries_json = boundaries_response.json()
  print("raw json data: ", boundaries_json)

  # Sanity check the data
  if not 'results' in boundaries_json.keys():
    raise ValueError("No results in response")

  if len(boundaries_json['results']) != 1:
    raise ValueError("Invalid number of reults in response")

  if not 'st_asgeojson' in boundaries_json['results'][0].keys():
    raise ValueError("st_asgeojson not in results")

  if not 'geometry' in boundaries_json['results'][0]['st_asgeojson'].keys():
    raise ValueError("geometry not in results")

  if not 'coordinates' in boundaries_json['results'][0]['st_asgeojson']['geometry'].keys():
    raise ValueError("coordinates not in results")

  boundary_lat_lng_polygons = boundaries_json['results'][0]['st_asgeojson']['geometry']['coordinates'];
  if len(boundary_lat_lng_polygons) <= 0:
    raise ValueError("no polygons in results")

  # Construct the polygons from the list
  return [Polygon([(lat_lng[0], lat_lng[1]) for lat_lng in boundary_lat_lng_polygon[0]]) for boundary_lat_lng_polygon in boundary_lat_lng_polygons]

def FetchCountryBorders(name):
  # Data source for country borders
  boundaries_url = "https://public.opendatasoft.com/api/explore/v2.1/catalog/datasets/georef-world-country/records"
  boundaries_query_params = {
      "select": "geo_shape",
      "refine" : "cntry_name_en:\"" + name + "\""
      }

  # Fetch the borders
  boundaries_uri = boundaries_url + "?" + "&".join("%s=%s" % (k, v) for k, v in boundaries_query_params.items())
  boundaries_response = requests.get(boundaries_uri)

  # Verify that we got a valid response
  if boundaries_response.status_code != 200:
    raise ValueError("Got an error when fetching data" + str(boundaries_response.status_code))

  # Parse the json
  boundaries_json = boundaries_response.json()
  print("raw json data: ", boundaries_json)

  # Sanity check the data
  if not 'results' in boundaries_json.keys():
    raise ValueError("No results in response")

  if len(boundaries_json['results']) != 1:
    raise ValueError("Invalid number of reults in response")

  if not 'geo_shape' in boundaries_json['results'][0].keys():
    raise ValueError("st_asgeojson not in results")

  if not 'geometry' in boundaries_json['results'][0]['geo_shape'].keys():
    raise ValueError("geometry not in results")

  if not 'coordinates' in boundaries_json['results'][0]['geo_shape']['geometry'].keys():
    raise ValueError("coordinates not in results")

  boundary_lat_lng_polygons = boundaries_json['results'][0]['geo_shape']['geometry']['coordinates'];
  if len(boundary_lat_lng_polygons) <= 0:
    raise ValueError("no polygons in results")

  # Construct the polygons from the list
  return [Polygon([(lat_lng[0], lat_lng[1]) for lat_lng in boundary_lat_lng_polygon[0]]) for boundary_lat_lng_polygon in boundary_lat_lng_polygons]


def FetchBorders(name):
  # Known special borders
  if name == 'corsica':
    return [
        Polygon([
            (8.347301685425535, 43.053819727534886),
            (9.665560361913943, 43.053819727534886),
            (9.665560361913943, 41.32156099491138),
            (8.347301685425535, 41.32156099491138)
        ])
    ]
  elif name == 'picos de europa':
    return [
        Polygon([
            (-5.020446, 43.149083),
            (-4.696474, 43.149083),
            (-4.696474, 43.259457),
            (-5.020446, 43.259457)
        ])
    ]
  elif name == 'home':
    return [
        Polygon([
            (-122.0472449091571, 37.39081257452758),
            (-122.04300782580036, 37.39081257452758),
            (-122.04300782580036, 37.38820289700541),
            (-122.0472449091571, 37.38820289700541)
        ])
    ]

  try:
    return FetchStateBorders(name)
  except ValueError:
    return FetchCountryBorders(name)

def CreateSamplesTable(path):
  # Connect to the database
  conn = sqlite3.connect(path)
  cursor = conn.cursor()

  # Create the table if it doesn't exist
  cursor.execute("""CREATE TABLE IF NOT EXISTS samples (
      id INTEGER PRIMARY KEY,
      owner TEXT,
      latitude REAL,
      longitude REAL,
      altitude REAL
  )""")

  # Commit the changes and close the connection
  conn.commit()
  conn.close()

def DropSamplesTample(path):
  # Connect to the database
  conn = sqlite3.connect(path)
  cursor = conn.cursor()

  # Drop the table
  cursor.execute("""DROP TABLE samples""")

  # Commit the changes and close the connection
  conn.commit()
  conn.close()

def StoreSamplesInDatabase(path, borders, samples_for_polygons_deg, threshold):
  # Connect to the database
  conn = sqlite3.connect(path)
  cursor = conn.cursor()

  # Grab any points that exist in the database
  cursor.execute("""SELECT * FROM samples""")
  existing_points_deg = cursor.fetchall()
  existing_points_m = ConvertElevationPointsToDistanceBased(existing_points_deg)
  kd_existing_m = ()
  if len(existing_points_m) > 0:
    kd_existing_m = KDTree([[pt[3], pt[2]] for pt in existing_points_m])

  # Loop through each list of sample points
  for polygon_ix, samples_for_polygon_deg in enumerate(samples_for_polygons_deg):
    handled = 0
    checked = 0
    inserted = 0

    total = len(samples_for_polygon_deg)
    print("processing polygon", polygon_ix, "with", total, "points")
    for sample_deg in samples_for_polygon_deg:
      handled = handled + 1
      if handled % 10000 == 0:
        print("polygon", polygon_ix, "handled", handled, "checked", checked, "inserted", inserted)

      # If there is already a point nearby within threshold then we skip this point.
      if len(existing_points_deg) > 0:
        sample_m = ConvertLatLngToMeters(sample_deg)
        count = kd_existing_m.count_neighbors(KDTree([[sample_m.x, sample_m.y]]), r=threshold * 0.9)

        checked = checked + 1
        if count > 0:
          continue

      cursor.execute("""INSERT INTO samples (owner, latitude, longitude, altitude)
                        VALUES (?, ?, ?, ?)""", (borders + "_" + str(polygon_ix), sample_deg.y, sample_deg.x, empty_altitude))
      inserted = inserted + 1

    print("done with polygon", polygon_ix, "inserted", inserted, "points")

  # Check total number of samples in database
  cursor.execute("""SELECT COUNT(*) FROM samples""")
  count = cursor.fetchone()[0]
  print("database contains", count, "samples")
  cursor.execute("""SELECT COUNT(*) FROM samples WHERE altitude <= %f""" % empty_altitude)
  count = cursor.fetchone()[0]
  print("database contains", count, "samples without altitude")

  # Commit the changes and close the connection
  conn.commit()
  conn.close()

def GetSamplePointsFromDatabaseWithElevation(path):
  # Connect to the database
  conn = sqlite3.connect(path)
  cursor = conn.cursor()

  # Fetch the sample points
  cursor.execute("""SELECT * FROM samples WHERE altitude > %f""" % empty_altitude)
  sample_points = cursor.fetchall()

  # Close the connection
  conn.close()

  return sample_points

def GetSamplePointsFromDatabaseWithoutElevation(path):
  # Connect to the database
  conn = sqlite3.connect(path)
  cursor = conn.cursor()

  # Fetch the sample points
  cursor.execute("""SELECT * FROM samples WHERE altitude <= %f""" % empty_altitude)
  sample_points = cursor.fetchall()

  # Close the connection
  conn.close()

  return sample_points

def OverwritePointsInDatabase(path, points):
  # Connect to the database
  conn = sqlite3.connect(path)
  cursor = conn.cursor()

  for point in points:
    qry = """UPDATE samples SET owner = '%s', latitude = %f, longitude = %f, altitude = %f WHERE id = %d;""" % (point[1], point[2], point[3], point[4], point[0])
    cursor.execute(qry)

  conn.commit()
  conn.close()

def FetchElevationData(points):
  if (len(points) > max_elements_per_elevation_query):
    raise ValueError("too many points requested")

  if (len(points) <= 0):
    raise ValueError("no points requested")

  if userdata.get('maps_api_key') == "":
    raise ValueError("you are missing the key, follow instructions above to get one")

  lat_lng = []
  for pt in points:
    lat_lng.append(str(pt[2]) + "," + str(pt[3]))

  # Data source for elevation data
  maps_elevation_url = "https://maps.googleapis.com/maps/api/elevation/json"
  maps_elevation_query_params = {
      "locations": "|".join("%.12f,%.12f" % (pt[2], pt[3]) for pt in points),
      "key" : userdata.get('maps_api_key')
      }

  # Fetch the elevation data
  maps_elevation_uri = maps_elevation_url + "?" + "&".join("%s=%s" % (k, v) for k, v in maps_elevation_query_params.items())
  maps_elevation_response = requests.get(maps_elevation_uri)

  # Verify that we got a valid response
  if maps_elevation_response.status_code != 200:
    raise ValueError("Got an error when fetching data" + str(maps_elevation_response.status_code) + str(maps_elevation_response))

  # Parse the json
  maps_elevation_json = maps_elevation_response.json()

  # Sanity check the data
  if not 'results' in maps_elevation_json.keys():
    raise ValueError("No results in response")

  returned_points = maps_elevation_json['results']
  if (len(returned_points) != len(points)):
    raise ValueError("wrong number of points returned")

  populated_points = []
  for returned_point in returned_points:
    # Find the closest point to the returned point
    min_dist_sq = 1e9
    min_ix = -1
    for ix, point in enumerate(points):
      dlat = (point[2] - returned_point['location']['lat'])
      dlon = (point[3] - returned_point['location']['lng'])
      dist_sq = dlat * dlat + dlon * dlon
      if dist_sq < min_dist_sq:
        min_dist_sq = dist_sq
        min_ix = ix

    if min_ix < 0:
      raise ValueError("Could not find a closest point to" + str(returned_point))

    point = points[min_ix]
    populated_points.append((point[0], point[1], point[2], point[3], returned_point['elevation']))

  if (len(populated_points) != len(points)):
    raise ValueError("wrong number of points populated")

  return populated_points

def PopulateElevationData(path):
  query_index = 0
  while 1==1:
    query_start_time = time.time()
    no_elevation_points = GetSamplePointsFromDatabaseWithoutElevation(path)
    query_end_time = time.time()

    had_elements = len(no_elevation_points)
    print("Database has", had_elements, " points without elevation query took", query_end_time - query_start_time, "seconds")

    if had_elements <= 0:
      print("database has no points to query")
      break

    run_start_time = time.time()
    fetching_time = 0.0
    writing_time = 0.0
    for start in range (0, had_elements, max_elements_per_elevation_query):
      stop = min(start + max_elements_per_elevation_query, had_elements)

      query_points = no_elevation_points[start:stop]
      if len(query_points) <= 0:
        print("empty query")
        break

      query_index = query_index + 1

      fetch_start_time = time.time()
      returned_points = FetchElevationData(query_points)
      fetch_complete_time = time.time()

      OverwritePointsInDatabase(path, returned_points)

      write_complete_time = time.time()

      fetching_time = fetching_time + fetch_complete_time - fetch_start_time
      writing_time = writing_time + write_complete_time - fetch_complete_time

      print("Points Left=", had_elements - start,
            "Queries Left=", round((had_elements - start) / max_elements_per_elevation_query),
            "Query=", query_index,
            "P/Q=", len(query_points),
            "Fetched=", len(returned_points),
            "Qps=", round(query_index / (time.time() - run_start_time), 2),
            "fetch_time=", round(fetching_time / query_index, 3),
            "write_time=", round(writing_time / query_index, 3),
            "left_time=", round((had_elements - start) * (time.time() - run_start_time) / (query_index * max_elements_per_elevation_query)))

def PlotElevationPoints(elevation_points):
  # Create a dictionary to store the points for each owner
  owner_points = {}
  for point in elevation_points:
    owner = point[1]
    if owner not in owner_points:
      owner_points[owner] = []
    owner_points[owner].append(point)

  # Create a plot for each owner
  for owner, points in owner_points.items():
    xs = [point[3] for point in points]
    ys = [point[2] for point in points]
    zs = [point[4] for point in points]

    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(xs, ys, zs, 'z', s=1, c=zs, cmap='viridis')
    ax.set_xlabel('x')
    ax.set_ylabel('y')
    ax.set_zlabel('z')
    ax.set_title(owner)
    plt.show()

def GenerateGridFromElevationPoints(x_values, y_values, z_values, x_samples, y_samples):
  # Create a grid of points over the area covered by the values
  x_grid, y_grid = np.meshgrid(np.linspace(min(x_values), max(x_values), x_samples), np.linspace(min(y_values), max(y_values), y_samples))

  # Use the griddata function to interpolate the z values of data onto the grid
  z_grid = griddata((x_values, y_values), z_values, (x_grid, y_grid), method='linear')

  return (x_grid, y_grid, z_grid)

# Whether the given point should be considered
def ConsiderPoint(pt):
  return not math.isnan(pt[2]) and not math.isnan(pt[1]) and not math.isnan(pt[0])

# Generates a mesh from a 2d grid of points provided by x, y and z values
# Points need to be on a grid and grid will be filled as follows with
# triangles:
# 1 -- 2 -- 3
# |   /|   /|
# |  / |  / |
# | /  | /  |
# 4 -- 5 -- 6
def GenerateMeshFromGrid(x_values, y_values, z_values, low_z_value):
  x_index_range = len(x_values)
  y_index_range = len(x_values[0])

  # populate the vectors for the triangles
  max_number_of_squares = (x_index_range - 1) * (y_index_range - 1) * 2
  max_number_of_triangles = max_number_of_squares * 2
  max_number_of_vectors = max_number_of_triangles * 3

  # side triangles
  side_triangles = (x_index_range - 1) * 2 * 2 + (y_index_range - 1) * 2 * 2
  side_vectors = side_triangles * 3

  v_i = 0
  vectors = np.zeros((max_number_of_vectors + side_vectors, 3, 3))
  for x_i in range(0, x_index_range - 1):
    for y_i in range(0, y_index_range - 1):
      # define the points we plan to use
      pts = []
      pts.append(np.array([x_values[x_i][y_i], y_values[x_i][y_i], z_values[x_i][y_i]]))
      pts.append(np.array([x_values[x_i][y_i + 1], y_values[x_i][y_i + 1], z_values[x_i][y_i + 1]]))
      pts.append(np.array([x_values[x_i + 1][y_i], y_values[x_i + 1][y_i], z_values[x_i + 1][y_i]]))
      pts.append(np.array([x_values[x_i + 1][y_i + 1], y_values[x_i + 1][y_i + 1], z_values[x_i + 1][y_i + 1]]))

      # define same points but with z set at low z value
      lowpts = [np.array([pt[0], pt[1], low_z_value]) for pt in pts]

      # check if these points are usable
      usept = [ConsiderPoint(pt) for pt in pts]

      # build the triangles
      for (v1, v2, v3) in [(0, 1, 2), (2, 1, 3)]:
        if usept[v1] and usept[v2] and usept[v3]:
          # build the top triangles
          vectors[v_i] = np.array([pts[v1], pts[v3], pts[v2]])
          v_i  = v_i + 1

          # build the bottom triangles if z value is set
          if not math.isnan(low_z_value):
            vectors[v_i] = np.array([lowpts[v1], lowpts[v2], lowpts[v3]])
            v_i  = v_i + 1

      # build the triangles along all the edges
      if not math.isnan(low_z_value):
        for (c, f, i1, i2) in [(0, 0, 0, 1), (1, 1, 2, 3), (2, 1, 0, 2), (3, 0, 1, 3)]:
          c0 = (c == 0 and x_i == 0)
          c1 = (c == 1 and x_i == x_index_range - 2)
          c2 = (c == 2 and y_i == 0)
          c3 = (c == 3 and y_i == y_index_range - 2)
          if c0 or c1 or c2 or c3:
            if usept[i1] and usept[i2]:
              if f == 0:
                vectors[v_i] = np.array([pts[i1], pts[i2], lowpts[i1]])
              else:
                vectors[v_i] = np.array([pts[i2], pts[i1], lowpts[i1]])
              v_i  = v_i + 1

              if f == 0:
                vectors[v_i] = np.array([pts[i2], lowpts[i2], lowpts[i1]])
              else:
                vectors[v_i] = np.array([pts[i2], lowpts[i1], lowpts[i2]])
              v_i  = v_i + 1

  # resize the array to the number of triangles we actually used
  vectors = np.resize(vectors, (v_i, 3, 3))

  # convert the data into the mesh numpy format
  data = np.zeros(v_i, dtype=mesh.Mesh.dtype)
  data['vectors'] = vectors

  # return a mesh
  return mesh.Mesh(data, remove_empty_areas=True)

def findClosestIndexSorted(list, value):
  #return np.argmin(np.abs(np.array(list)-value))
  below_i = 0
  above_i = len(list) - 1
  if list[below_i] >= value:
    return below_i
  if list[above_i] <= value:
    return above_i
  while 1==1:
    mid_i = round((above_i + below_i) * 0.5)
    if mid_i == above_i or mid_i == below_i:
      if abs(list[below_i] - value) < abs(list[above_i] - value):
        return below_i
      return above_i
    if list[mid_i] == value:
      return mid_i
    if list[mid_i] > value:
      above_i = mid_i
    else:
      below_i = mid_i

def filterPoints(x_grid, y_grid, z_grid, polygons):
  removed = 0
  for i in range(0, len(x_grid)):
    for j in range(0, len(x_grid[0])):
      if math.isnan(x_grid[i][j]):
        continue
      if math.isnan(y_grid[i][j]):
        continue
      if math.isnan(z_grid[i][j]):
        continue
      if not Point(x_grid[i][j], y_grid[i][j]).within(polygons).any():
        z_grid[i][j] = float("nan")
        removed = removed + 1
  print("removed", removed, "points")

def GenerateListWithUniqueValues(values, step):
  unique_values = []
  for v in values:
    if len(unique_values) == 0 or abs(unique_values[findClosestIndexSorted(unique_values, v)] - v) > step * 0.9:
      unique_values.append(v)
      unique_values.sort()
  return unique_values

def ArrangeElevationPointsIntoGrid(x_values, y_values, z_values, x_step, y_step, default_elevation):
  unique_x_values = GenerateListWithUniqueValues(x_values, x_step)
  unique_y_values = GenerateListWithUniqueValues(y_values, y_step)

  x_grid = [ [unique_x_values[i]] * len(unique_y_values) for i in range(len(unique_x_values)) ]
  y_grid = [ unique_y_values for i in range(len(unique_x_values)) ]
  z_grid = [ [default_elevation] * len(unique_y_values) for i in range(len(unique_x_values)) ]
  for (x, y, z) in zip(x_values, y_values, z_values):
    x_i = findClosestIndexSorted(unique_x_values, x)
    y_i = findClosestIndexSorted(unique_y_values, y)
    x_grid[x_i][y_i] = x
    y_grid[x_i][y_i] = y
    z_grid[x_i][y_i] = z

  return (x_grid, y_grid, z_grid)

def SplitGridIntoBlocks(x_grid, y_grid, z_grid, start_x, end_x, start_y, end_y):
  x_values_0 = [ x_vals[0] for x_vals in x_grid]
  y_values_0 = y_grid[0]

  x_start_i = findClosestIndexSorted(x_values_0, start_x)
  if x_grid[x_start_i][0] < start_x:
    x_start_i = x_start_i + 1
  x_end_i = findClosestIndexSorted(x_values_0, end_x)
  if x_grid[x_end_i][0] > end_x:
    x_end_i = x_end_i - 1

  y_start_i = findClosestIndexSorted(y_values_0, start_y)
  if y_grid[0][y_start_i] < start_y:
    y_start_i = y_start_i + 1
  y_end_i = findClosestIndexSorted(y_values_0, end_y)
  if y_grid[0][y_end_i] < end_y:
    y_end_i = y_end_i + 1

  x_end_i = x_end_i - 1
  y_end_i = y_end_i - 1

  print("block indices x", x_start_i, x_end_i, "y", y_start_i, y_end_i)

  x_block = []
  y_block = []
  z_block = []
  for x_i in range(x_start_i - 1, x_end_i + 1):
    x_block.append([])
    y_block.append([])
    z_block.append([])
    for y_i in range(y_start_i - 1, y_end_i + 1):
      # populate the x values
      if x_i < x_start_i:
        x_block[-1].append(start_x)
      elif x_i > x_end_i:
        x_block[-1].append(end_x)
      else:
        x_block[-1].append(min(max(x_grid[x_i][y_i], start_x), end_x))

      # populate the y values
      if y_i < y_start_i:
        y_block[-1].append(start_y)
      elif y_i > y_end_i:
        y_block[-1].append(end_y)
      else:
        y_block[-1].append(min(max(y_grid[x_i][y_i], start_y), end_y))

      # populate the z values
      z_block[-1].append(z_grid[min(max(x_i, x_start_i), x_end_i)][min(max(y_i, y_start_i), y_end_i)])

  return (x_block, y_block, z_block)

def ScaleElevationPoints(elevation_points, x_scale, y_scale, z_scale):
  return [(point[0], point[1], point[2] * y_scale, point[3] * x_scale, point[4] * z_scale) for point in elevation_points]

def MeasureModelSize(elevation_points):
  # Extract the point values
  x_values = [point[3] for point in elevation_points]
  y_values = [point[2] for point in elevation_points]
  z_values = [point[4] for point in elevation_points]

  return (min(x_values), max(x_values), min(y_values), max(y_values), min(z_values), max(z_values))

def FillPlane(x_grid, y_grid, z_grid, start_x, end_x, start_y, end_y, z):
  x_values_0 = [ x_vals[0] for x_vals in x_grid]
  y_values_0 = y_grid[0]

  x_start_i = findClosestIndexSorted(x_values_0, start_x)
  x_end_i = findClosestIndexSorted(x_values_0, end_x)
  y_start_i = findClosestIndexSorted(y_values_0, start_y)
  y_end_i = findClosestIndexSorted(y_values_0, end_y)

  for x_i in range(x_start_i, x_end_i + 1):
    for y_i in range(y_start_i, y_end_i + 1):
      z_grid[x_i][y_i] = z

def MeasurePlaneZ(x_grid, y_grid, z_grid, start_x, end_x, start_y, end_y):
  x_values_0 = [ x_vals[0] for x_vals in x_grid]
  y_values_0 = y_grid[0]

  x_start_i = findClosestIndexSorted(x_values_0, start_x)
  x_end_i = findClosestIndexSorted(x_values_0, end_x)
  y_start_i = findClosestIndexSorted(y_values_0, start_y)
  y_end_i = findClosestIndexSorted(y_values_0, end_y)

  z_min = z_grid[x_start_i][y_start_i]
  z_max = z_grid[x_start_i][y_start_i]
  for x_i in range(x_start_i, x_end_i + 1):
    for y_i in range(y_start_i, y_end_i + 1):
      z_min = min(z_grid[x_i][y_i], z_min)
      z_max = max(z_grid[x_i][y_i], z_max)
  return (z_min, z_max)

def MeasureFeatureSize(x_grid, y_grid, z_grid):
  x_values_0 = [ x_vals[0] for x_vals in x_grid]
  y_values_0 = y_grid[0]
  x_diffs = [x_values_0[i+1] - x_values_0[i] for i in range(0, len(x_values_0) - 1)]
  y_diffs = [y_values_0[i+1] - y_values_0[i] for i in range(0, len(y_values_0) - 1)]
  return (statistics.median(x_diffs), statistics.median(y_diffs))

def AddLegend(x_grid, y_grid, z_grid, min_x, max_x, min_y, max_y, min_z, max_z, model_x_scale, model_y_scale, model_z_scale):
  original_x_size = (max_x - min_x) / model_x_scale
  original_scale_line_size = 10.0 ** (math.ceil(math.log10(original_x_size)) - 2)
  scaled_scale_line_size = original_scale_line_size * model_x_scale
  scaled_imperial_line_size = scaled_scale_line_size * 1.60934
  print("original_size:", original_scale_line_size, "scaled_size:", scaled_scale_line_size)

  # Boundary
  x_range = max_x - min_x
  y_range = max_y - min_y
  z_range = max_z - min_z

  (x_feature, y_feature) = MeasureFeatureSize(x_grid, y_grid, z_grid)

  x_boundary = min(x_feature * 2, 2)
  y_boundary = min(y_feature * 2, 2)
  z_step = z_range * 0.02

  plane_min_x = max_x - 3 * x_boundary - max(scaled_imperial_line_size, scaled_scale_line_size)
  plane_max_x = max_x - x_boundary,
  plane_min_y = min_y + y_boundary
  plane_max_y = min_y + 5 * y_boundary

  (plane_min_z, plane_max_z) = MeasurePlaneZ(x_grid, y_grid, z_grid, plane_min_x, plane_max_x, plane_min_y, plane_max_y)
  print("measured", plane_min_z, plane_max_z)
  legend_base_z = plane_max_z + z_step * 0.5

  # Create a place where we will make the legend
  print("adding legend block at", plane_min_x, plane_max_x, plane_min_y, plane_max_y)
  FillPlane(x_grid, y_grid, z_grid,
            plane_min_x, plane_max_x,
            plane_min_y, plane_max_y,
            legend_base_z)

  # This one is presumably for imperial (so km to miles)
  FillPlane(x_grid, y_grid, z_grid,
            plane_min_x + x_boundary, plane_min_x + x_boundary + scaled_imperial_line_size,
            plane_min_y + 1 * y_boundary, plane_min_y + 2 * y_boundary,
            legend_base_z + z_step)

  # This one is presumably for metric
  FillPlane(x_grid, y_grid, z_grid,
            plane_min_x + x_boundary, plane_min_x + x_boundary + scaled_scale_line_size,
            plane_min_y + 3 * y_boundary, plane_min_y + 4 * y_boundary,
            legend_base_z + z_step)

def GenerateStlFilesFromData(filepath, x_grid, y_grid, z_grid, x_min, x_max, y_min, y_max, low_z_value, x_blocks, y_blocks):
  for x_block_i in range(0, x_blocks):
    for y_block_i in range(0, y_blocks):
      print("generating block", x_block_i, y_block_i)
      x_start = x_min + (x_max - x_min) * x_block_i / x_blocks
      x_end = x_min + (x_max - x_min) * (x_block_i + 1) / x_blocks
      y_start = y_min + (y_max - y_min) * y_block_i / y_blocks
      y_end = y_min + (y_max - y_min) * (y_block_i + 1) / y_blocks

      print("getting block", round(x_start, 2), round(x_end, 2), round(y_start, 2), round(y_end, 2))
      (x_block, y_block, z_block) = SplitGridIntoBlocks(x_grid, y_grid, z_grid, x_start, x_end, y_start, y_end)

      print("generating mesh from points", len(x_block), len(x_block[0]))
      my_mesh = GenerateMeshFromGrid(x_block, y_block, z_block, low_z_value)

      path = filepath + "_" + str(x_block_i) + "_" + str(y_block_i) + "_.stl"
      print("storing mesh", path)
      my_mesh.save(path)

CreateSamplesTable(database_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
database_path: /content/drive/MyDrive/TopologicalMap/California/sample_2500m_base_-120_36/elevation_samples.db
mesh_path: /content/drive/MyDrive/TopologicalMap/California/sample_2500m_base_-120_36/stl/scale_0.0001_0.0001_0.001


In [None]:
# @title Testing (optional). This will produce a simple test database and simple stl file. API will not be used. If everything completes then code should be ok.

# Make sure we can create and delete the table in the test database
test_table = "test_table.db"
CreateSamplesTable(test_table)
DropSamplesTample(test_table)
CreateSamplesTable(test_table)

# Polygon specifiedin lon, lat
test_polygons_deg = [
    Polygon([
        [-122.06789052124051, 37.41075847402667],
        [-122.03171597386043, 37.41075847402667],
        [-122.03171597386043, 37.3858709893142],
        [-122.06789052124051, 37.3858709893142]
    ])
]

# Plot the original polygon
PlotPolygon(test_polygons_deg[0])
plt.show()

# Convert the polygon to meters
test_polygons_m = [ConvertPolygonLatLngToMeters(p) for p in test_polygons_deg]

# Plot the modified polygon
PlotPolygon(test_polygons_m[0])
plt.show()

# Sample the polygons
test_sample_size1 = 1000
test_samples_for_polygons_1_m = [SamplePolygon(test_polygon_m, test_sample_size1, test_sample_size1) for test_polygon_m in test_polygons_m]

# Plot the samples
PlotPoints(test_samples_for_polygons_1_m[0])
plt.show()

# Convert the data.
test_samples_for_polygons_1_deg = []
for test_samples_for_polygon_1_m in test_samples_for_polygons_1_m:
  test_samples_for_polygons_1_deg.append([ConvertMetersToLatLng(pt_m) for pt_m in test_samples_for_polygon_1_m])

# Plot the samples
PlotPoints(test_samples_for_polygons_1_deg[0])
plt.show()

# Store the samples in the database
StoreSamplesInDatabase(test_table, "test", test_samples_for_polygons_1_deg, threshold=test_sample_size1)

# Sample the polygons
test_sample_size2 = 500
test_samples_for_polygons_2_m = [SamplePolygon(test_polygon_m, test_sample_size2, test_sample_size2) for test_polygon_m in test_polygons_m]

# Plot the samples
PlotPoints(test_samples_for_polygons_2_m[0])
plt.show()

# Convert the data.
test_samples_for_polygons_2_deg = []
for test_samples_for_polygon_2_m in test_samples_for_polygons_2_m:
  test_samples_for_polygons_2_deg.append([ConvertMetersToLatLng(pt_m) for pt_m in test_samples_for_polygon_2_m])

# Plot the samples
PlotPoints(test_samples_for_polygons_2_deg[0])
plt.show()

# Store the samples in the database
StoreSamplesInDatabase(test_table, "test", test_samples_for_polygons_2_deg, threshold=test_sample_size2)

# Get the samples back out of the database
test_fetched_samples = GetSamplePointsFromDatabaseWithoutElevation(test_table)

# Plot the fetched Samples
PlotElevationPoints(test_fetched_samples)
plt.show()

# Populate the elevation data
test_updated_samples = [[sample[0], sample[1], sample[2], sample[3], 1.0] for sample in test_fetched_samples]

# Write results to the database
OverwritePointsInDatabase(test_table, test_updated_samples)

# Get the samples back out of the database
test_elevation_points_deg = GetSamplePointsFromDatabaseWithElevation(test_table)

# Convert elevation points to meters
test_elevation_points_meters = ConvertElevationPointsToDistanceBased(test_elevation_points_deg)

# scale the model
test_model_x_scale = 0.001
test_model_y_scale = 0.001
test_model_z_scale = 0.001
test_scaled_elevation_points = ScaleElevationPoints(test_elevation_points_meters, test_model_x_scale, test_model_y_scale, test_model_z_scale)
print("Scaled from", MeasureModelSize(test_elevation_points_meters), "to", MeasureModelSize(test_scaled_elevation_points))

# measure the model size
(test_min_x, test_max_x, test_min_y, test_max_y, test_min_z, test_max_z) = MeasureModelSize(test_scaled_elevation_points)

# add a base to the model
test_base_top = test_min_z
test_base_bottom = test_base_top - max((test_max_z - test_min_z) * 0.1, 0.001)
print("adding base", test_base_bottom, test_base_top)

# convert the model into a grid
test_x_values = [point[3] for point in test_scaled_elevation_points]
test_y_values = [point[2] for point in test_scaled_elevation_points]
test_z_values = [point[4] for point in test_scaled_elevation_points]
(text_x_grid, test_y_grid, test_z_grid) = ArrangeElevationPointsIntoGrid(test_x_values, test_y_values, test_z_values, test_model_x_scale * model_x_scale, test_sample_size2 * test_model_y_scale, test_base_top)
print("grid has", len(text_x_grid), len(text_x_grid[0]))

# Generate the stl files
GenerateStlFilesFromData("test_stl_file", text_x_grid, test_y_grid, test_z_grid, test_min_x, test_max_x, test_min_y, test_max_y, test_base_bottom, x_blocks=1, y_blocks=1)



In [None]:
# @title Fetch Borders as Polygons (required for sample generation)

# Generate the data.
polygons_deg = FetchBorders(borders)
# Convert the maps.
polygons_m = [ConvertPolygonLatLngToMeters(polygon_deg) for polygon_deg in polygons_deg]

# Print and display the results
print("polygons_deg=", polygons_deg)
print("polygons_m=", polygons_m)

for polygon_deg in polygons_deg:
  PlotPolygon(polygon_deg)
plt.show()

for polygon_m in polygons_m:
  PlotPolygon(polygon_m)
plt.show()


In [None]:
# @title Generate Sample Points (this may be slow due to checking for points being inside polygons). This is stored in a database. (Required once to init database.)

# Generate the data.
samples_for_polygons_m = [SamplePolygon(polygon_m, sampling_distance_meters, sampling_distance_meters) for polygon_m in polygons_m]

# Convert the data.
samples_for_polygons_deg = []
for samples_for_polygon_m in samples_for_polygons_m:
  samples_for_polygons_deg.append([ConvertMetersToLatLng(pt_m) for pt_m in samples_for_polygon_m])

# Print and display the results.
print("samples_for_polygons_m=", len(samples_for_polygons_m), samples_for_polygons_m)
print("samples_for_polygons_deg=", len(samples_for_polygons_deg), samples_for_polygons_deg)

# Storing to database
StoreSamplesInDatabase(database_path, borders, samples_for_polygons_deg, sampling_distance_meters*0.75)

# Plotting
for (samples_for_polygon_m, polygon_m) in zip(samples_for_polygons_m, polygons_m):
  PlotPolygon(polygon_m)
  PlotPoints(samples_for_polygon_m)
plt.show()

for (samples_for_polygon_deg, polygon_deg) in zip(samples_for_polygons_deg, polygons_deg):
  PlotPolygon(polygon_deg)
  PlotPoints(samples_for_polygon_deg)
plt.show()

In [None]:
# @title Fetching the elevation data for points from database (seems to run at about 1-2 QPS). Cost per query $0.005 (credit $200) (Required once to populate database elevations)

PopulateElevationData(database_path)

In [None]:
# @title Plot the populated elevation data (optional)

elevation_points_deg = GetSamplePointsFromDatabaseWithElevation(database_path)
PlotElevationPoints(elevation_points_deg)

print(len(elevation_points_deg))

elevation_points_meters = ConvertElevationPointsToDistanceBased(elevation_points_deg)
PlotElevationPoints(elevation_points_meters)


In [None]:
# @title Generate the stl file (outputs stl files)

# Get the elevation points from database
print("fetching points from database")
elevation_points_deg = GetSamplePointsFromDatabaseWithElevation(database_path)

# Convert elevation points to meters
print("converting to meters")
elevation_points_meters = ConvertElevationPointsToDistanceBased(elevation_points_deg)

# scale the model
print("scaling")
scaled_elevation_points = ScaleElevationPoints(elevation_points_meters, model_x_scale, model_y_scale, model_z_scale)
print("Scaled from", MeasureModelSize(elevation_points_meters), "to", MeasureModelSize(scaled_elevation_points))

# measure the model size
print("Measuring the model")
(min_x, max_x, min_y, max_y, min_z, max_z) = MeasureModelSize(scaled_elevation_points)

# add a base to the model
print("Adding base")
base_top = min_z
base_bottom = base_top - (max_z - min_z) * 0.1
print("base", base_bottom, base_top)

# convert the model into a grid
print("Converting to grid")
x_values = [point[3] for point in scaled_elevation_points]
y_values = [point[2] for point in scaled_elevation_points]
z_values = [point[4] for point in scaled_elevation_points]
(x_grid, y_grid, z_grid) = ArrangeElevationPointsIntoGrid(x_values, y_values, z_values, sampling_distance_meters * model_x_scale, sampling_distance_meters * model_y_scale, base_top)
print("grid has", len(x_grid), len(x_grid[0]))

print("Adding a legend to the grid")
AddLegend(x_grid, y_grid, z_grid, min_x, max_x, min_y, max_y, min_z, max_z, model_x_scale, model_y_scale, model_z_scale)

# Generate the stl files
print("generating stl files")
GenerateStlFilesFromData(mesh_path, x_grid, y_grid, z_grid, min_x, max_x, min_y, max_y, base_bottom, x_blocks=split_blocks_x, y_blocks=split_blocks_y)

print("done")