Author: Mirco Nanni

Geospatial Analytics, Master degree in Data Science and Business Informatics, University of Pisa

# Geospatial Analytics - Lesson 3: Spatial Data Analysis

Topics covered:
* [Density](#density) and [kernel density](#kernel)
* [ANN analysis](#ann) and [L-function](#l-function)
* [Spatial autocorrelation](#autocorrelation) (Moran's I and Geary's C)
* [Interpolation with IDW](#idw)
* [Kriging](#kriging)


|Task  | Method | Ad hoc library | Is in these exercises? |
| --- | --- | --- | --- |
|Density| Basic | *none* | Yes |
|| Kernel | *none* | Yes |
|Point patterns | Average Nearest Neighbor | *none* | Yes |
|| L-function | *none* | Yes |
|Autocorrelation| Moran's I | pysal | Yes |
|| Geary's C| pysal | Yes |
|Interpolation| Thiessen polygons| - | - |
||Inverse Distance Weighted| KNeighborsRegressor | Yes |
||Trend surface| - | - |
||Kriging|pykrige | Yes |
|Regression| Exogenous/Endogenous regressors| - | - |
|Associations|Co-location patterns| - | - |
|Trends|Spatial Trends detection| - | - |


<a id='density'></a>
# Density measures
Adapted from https://pygis.io/docs/e_summarize_vector.html

In [None]:
# Import modules
import geopandas as gpd
import geoplot as gplt
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rasterio
from rasterio.transform import Affine
from scipy import stats
from shapely.geometry import Polygon, box
from sklearn.datasets import fetch_species_distributions
from sklearn.neighbors import KernelDensity

In [None]:
# Load data

# source: ISTAT - https://www.istat.it/storage/cartografia/confini_amministrativi/generalizzati/2024/Limiti01012024_g.zip
regions = gpd.read_file("data/Limiti01012024_g/Reg01012024_g/Reg01012024_g_WGS84.shp")
tuscany = regions[regions['DEN_REG']=='Toscana'].to_crs('EPSG:4326')

df = pd.read_csv("data/EV_stations.csv")
EVS = gpd.GeoDataFrame(
    geometry=gpd.points_from_xy(df.lon, df.lat, crs="EPSG:4326"), data=df
)


In [None]:
EVS

In [None]:
tuscany.plot()
EVS.plot()

In [None]:
def create_grid(feature, side_length):
    # Get extent of buffered input feature
    min_x, min_y, max_x, max_y = feature.total_bounds
    print("Bbox: ", min_x, min_y, max_x, max_y)

    # Create empty list to hold individual cells that will make up the grid
    cells_list = []

    for x in np.arange(min_x - side_length, max_x + side_length, side_length):
        for y in np.arange(min_y - side_length, max_y + side_length, side_length):
            # Create a box with specified side length and append to list
            cells_list.append(box(x, y, x + side_length, y + side_length))
    # Create grid from list of cells
    grid = gpd.GeoDataFrame(cells_list, columns = ['geometry'], crs = "EPSG:4326")

    # Create a column that assigns each grid a number
    grid["Grid_ID"] = np.arange(len(grid))

    # Return grid
    return grid

In [None]:
grid = create_grid(tuscany, 0.1)

# Create subplots
fig, ax = plt.subplots(1, 1, figsize = (10, 10))

# Plot data
tuscany.plot(ax = ax, color = 'bisque', edgecolor = 'dimgray')
EVS.plot(ax = ax, marker = 'o', color = 'dodgerblue', markersize = 3)
grid.plot(ax = ax, color = 'none', edgecolor = 'lightseagreen', alpha = 0.55)

# Set title
ax.set_title('Tuscany - Boundaries, EV stations, and Grids', fontdict = {'fontsize': '15', 'fontweight' : '3'})


In [None]:
# Perform a INNER - INTERSECT spatial join
EVS_cells = gpd.sjoin(EVS, grid, how = "inner", op = "intersects")

# Remove duplicate counts
# With intersect, those that fall on a boundary will be allocated to all cells that share that boundary
EVS_cells = EVS_cells.drop_duplicates(subset = ['ID']).reset_index(drop = True)

# Set field name to hold count value
count_field = "Count"

# Add a field with constant value of 1
EVS_cells[count_field] = 1

# Group GeoDataFrame by cell while aggregating the Count values
EVS_cells = EVS_cells.groupby('Grid_ID').agg({count_field:'sum'})

# Merge the resulting grouped dataframe with the grid GeoDataFrame, using a left join to keep all cell polygons
grid = grid.merge(EVS_cells, on = 'Grid_ID', how = "left")

# Fill the NaN values (cells without any points) with 0
grid[count_field] = grid[count_field].fillna(0)

# Convert Count field to integer
grid[count_field] = grid[count_field].astype(int)

# Display grid attribute table
display(grid)

In [None]:
import numpy as np
grid['logCount'] = np.log(grid['Count']+1)  # to make log colormap


# Create subplots
fig, ax = plt.subplots(1, 1, figsize = (10, 10))

# Plot data
tuscany.plot(ax = ax, color = 'none', edgecolor = 'dimgray')
EVS.plot(ax = ax, marker = 'o', color = 'dimgray', markersize = 3)
grid.plot(ax = ax, column = "logCount", cmap = "RdPu", edgecolor = 'lightseagreen', linewidth = 0.5, alpha = 0.70, legend = True)

# Set title
ax.set_title('Tuscany - Binning EVS Points', fontdict = {'fontsize': '15', 'fontweight' : '3'})

<a id='kernel'></a>
# Simple Kernel Density
Adopt uniform weights and 8-cell neighborhood

In [None]:
# Join the two grids and aggregate counts
grid2 = gpd.GeoDataFrame(grid.to_crs('EPSG:3003').buffer(2000).to_crs('EPSG:4326'), geometry=0)
grid2["Grid2_ID"] = np.arange(len(grid2))
# Each cell in grid2 captures the 8-cell neighborhood of the original cell
grid2 = gpd.sjoin(grid2, grid, how = "left", predicate = "intersects")
# Sum up the densities of the neihborhood
grid2 = grid2.groupby('Grid2_ID').agg({'Count':'sum'}).reset_index().rename(columns={'Grid2_ID':'Grid_ID'})
grid2 = gpd.GeoDataFrame(grid2.merge(grid, on = 'Grid_ID', how = "left"))




In [None]:
grid2['logCount'] = np.log(grid2['Count_x']+1)

# Create subplots
fig, ax = plt.subplots(1, 1, figsize = (10, 10))

# Plot data
tuscany.plot(ax = ax, color = 'none', edgecolor = 'dimgray')
EVS.plot(ax = ax, marker = 'o', color = 'dimgray', markersize = 3)
grid2.plot(ax = ax, column = "logCount", cmap = "RdPu", edgecolor = 'lightseagreen', linewidth = 0.5, alpha = 0.70, legend = True)


<a id='ann'></a>
# Average Nearest Neighbor analysis

In [None]:
import geopandas as gpd
import numpy as np
from shapely.geometry import Point
from scipy.spatial import cKDTree

def average_nearest_neighbor_distance(gdf, proj=None):
    # Ensure the geometry column contains Points
    if not all(gdf.geometry.type == 'Point'):
        raise ValueError("The GeoDataFrame should only contain Point geometries.")
    
    # Convert WGS84 coordinates to a local projection (e.g., UTM) to get distances in meters
    if proj == None:
        proj = 3395  # World Mercator (EPSG:3395), or choose another local CRS for your region
    gdf_projected = gdf.to_crs(epsg=proj)  

    # Extract the x and y coordinates
    coordinates = np.array([[geom.x, geom.y] for geom in gdf_projected.geometry])

    # Use a KDTree for efficient nearest neighbor calculation
    kdtree = cKDTree(coordinates)

    # Query the nearest neighbor for each point (k=2 because the nearest neighbor of a point is itself)
    distances, indices = kdtree.query(coordinates, k=2)

    # The nearest neighbor distance for each point is the second column (ignoring self-distance)
    nearest_neighbor_distances = distances[:, 1]

    # Calculate the average of these nearest neighbor distances
    avg_nearest_neighbor_distance = nearest_neighbor_distances.mean()

    return avg_nearest_neighbor_distance


In [None]:
df = pd.read_csv("data/EV_stations.csv")
EVS = gpd.GeoDataFrame(
    geometry=gpd.points_from_xy(df.lon, df.lat, crs="EPSG:4326"), data=df
)


In [None]:
average_nearest_neighbor_distance(EVS, proj=3003)  # EPSG:3003 - Monte Mario / Italy zone 1 (https://epsg.io/3003)


<a id='l-function'></a>
# Ripley's L-function L(d)

In [None]:
import numpy as np
import geopandas as gpd
from scipy.spatial import cKDTree
import matplotlib.pyplot as plt

def ripley_L_function(gdf, distances, proj=None):
    # Ensure the geometry column contains Points
    if not all(gdf.geometry.type == 'Point'):
        raise ValueError("The GeoDataFrame should only contain Point geometries.")
    
    # Convert WGS84 coordinates to a local projection (e.g., UTM or World Mercator) to get distances in meters
    if proj == None:
        proj = 3395  # World Mercator (EPSG:3395), or choose another local CRS for your region
    gdf_projected = gdf.to_crs(epsg=proj)

    # Extract the coordinates of the points
    coordinates = np.array([[geom.x, geom.y] for geom in gdf_projected.geometry])

    # Get the area of the convex hull of the points (or another study area measure if provided)
    area = gdf_projected.unary_union.convex_hull.area

    # Total number of points
    num_points = len(gdf)

    # Use a KDTree for efficient distance calculation
    kdtree = cKDTree(coordinates)

    # Density of points
    point_density = num_points / area

    # Initialize an array to store L(d) values
    L_values = np.zeros(len(distances))

    # Iterate over each distance threshold
    for i, d in enumerate(distances):
        # Query for the number of neighbors within distance d for each point
        counts = kdtree.query_ball_point(coordinates, r=d)

        # Compute K(d) by summing the neighbors for each point, excluding the point itself
        K_d = sum(len(c) - 1 for c in counts) / num_points

        # Normalize by point density and area
        K_d /= point_density

        # Compute L(d)
        L_values[i] = np.sqrt(K_d / np.pi)

    return L_values


In [None]:
distances = np.linspace(0, 10000, 100)  # Define distance thresholds (0 to 1000 meters, 50 steps)
L_values = ripley_L_function(EVS, distances, proj=3003) # EPSG:3003 - Monte Mario / Italy zone 1 (https://epsg.io/3003)

# Plot the L-function
plt.plot(distances, L_values)
plt.xlabel('Distance (meters)')
plt.ylabel('L(d)')
plt.title('Ripley’s L-function')
plt.plot(distances, distances, color='r', linestyle='--') # CSR reference line
plt.show()


<a id='autocorrelation'></a>
# Spatial Autocorrelation

Source: https://geographicdata.science/book/notebooks/06_spatial_autocorrelation.html

In [None]:
# Graphics
import matplotlib.pyplot as plt
import seaborn
from pysal.viz import splot
from splot.esda import plot_moran
import contextily

# Analysis
import geopandas
import pandas
from pysal.explore import esda
from pysal.lib import weights
from numpy.random import seed

In [None]:
# Source: https://data.london.gov.uk/dataset/eu-referendum-results
brexit_data_path = "data/uk_brexit_referendum.csv"
ref = pandas.read_csv(brexit_data_path, index_col="Area_Code")
ref.info()

# Source: https://geoportal.statistics.gov.uk/datasets/ed4fc01f849541bca7bb9044db5008a2_0/explore
lads = geopandas.read_file(
    "data/Local_Authority_Districts.geojson"
).set_index("lad16cd")
lads.info()

In [None]:
# Join geometries and votation results
db = (
    geopandas.GeoDataFrame(
        lads.join(ref[["Pct_Leave"]]), crs=lads.crs
    )
    .to_crs(epsg=3857)[
        ["OBJECTID", "lad16nm", "Pct_Leave", "geometry"]
    ]
    .dropna()
)
db.info()

In [None]:
# Plot
f, ax = plt.subplots(1, figsize=(9, 9))
db.plot(
    column="Pct_Leave",
    cmap="viridis",
    scheme="quantiles",
    k=6,
    edgecolor="white",
    linewidth=0.0,
    alpha=0.75,
    legend=True,
    legend_kwds={"loc": 2},
    ax=ax,
)
contextily.add_basemap(
    ax,
    crs=db.crs,
    source=contextily.providers.Esri.WorldTerrain,
)
ax.set_axis_off()

In [None]:
# Generate W from the GeoDataFrame
w = weights.KNN.from_dataframe(db, k=8)
# Row-standardization
w.transform = "R"

In [None]:
w.weights

In [None]:
moran = esda.moran.Moran(db["Pct_Leave"], w)
geary = esda.geary.Geary(db["Pct_Leave"], w)

In [None]:
print("Moran's I:", moran.I)
print("Geary's C:", geary.C)

In [None]:
plot_moran(moran)

# Interpolation

<a id='idw'></a>
# IDW
Exploit KNeighborsRegressor in sklearn

In [None]:
import numpy as np
import pandas as pd
import geopandas as gpd
from sklearn.neighbors import KNeighborsRegressor
import matplotlib.pyplot as plt
from shapely.geometry import Point

def knn_interpolation(gdf, value_column, n_neighbors=5, grid_resolution=100, weight_func='distance'):
    # Ensure the geometry column contains Points
    if not all(gdf.geometry.type == 'Point'):
        raise ValueError("The GeoDataFrame should only contain Point geometries.")

    # 
    # Preprocessing
    #
    # Convert the GeoDataFrame to the appropriate projection for spatial analysis (meters)
    gdf_projected = gdf.to_crs(epsg=3003)  # Use Italy 1 Mercator
    # Extract coordinates (X, Y) and the values to interpolate
    X = np.array([[point.x, point.y] for point in gdf_projected.geometry])
    y = gdf_projected[value_column].values
    
    #
    # Fit KNeighborsRegressor
    #
    knn = KNeighborsRegressor(n_neighbors=n_neighbors, weights=weight_func)
    knn.fit(X, y)
    
    #
    # Plot
    #
    # Create a grid over the area of interest
    xmin, ymin, xmax, ymax = gdf_projected.total_bounds
    x_grid = np.linspace(xmin, xmax, grid_resolution)
    y_grid = np.linspace(ymin, ymax, grid_resolution)
    # Generate mesh grid of points for interpolation
    xv, yv = np.meshgrid(x_grid, y_grid)
    grid_points = np.column_stack([xv.ravel(), yv.ravel()])
    # Predict values on the grid
    z_pred = knn.predict(grid_points)    
    # Reshape the predicted values to match the grid shape
    z_pred = z_pred.reshape(grid_resolution, grid_resolution)    
    # Create a GeoDataFrame from the grid points and predictions for plotting
    interp_gdf = gpd.GeoDataFrame(
        {'value': z_pred.ravel()},
        geometry=[Point(x, y) for x, y in grid_points],
        crs=gdf_projected.crs
    )    
    # Plot the original points and the interpolated map
    fig, ax = plt.subplots(figsize=(18, 16))
    # Plot the interpolated values as a heatmap (use a colormap like viridis or coolwarm)
    plt.imshow(z_pred, extent=(xmin, xmax, ymin, ymax), origin='lower', cmap='coolwarm', alpha=0.6)
    # Overlay the original points, colored by their actual values
    sc = ax.scatter(
        X[:, 0], X[:, 1], c=y, cmap='coolwarm', edgecolor='k', s=50, label='Known Points', zorder=10
    )
    # Add title and labels
    plt.title('KNeighborsRegressor Interpolation')
    plt.xlabel('Longitude')
    plt.ylabel('Latitude')
    plt.legend()

    plt.show()

In [None]:
def IDW_weight(dist_list):
# replace each distance with its weight
    n=10
    return [ 1/(d**n) for d in dist_list ]

gdf = gpd.read_file("data/sf_bay_rainfall/sf_bay_rainfall.shp")
knn_interpolation(gdf, value_column='VALUE', n_neighbors=5, grid_resolution=200, weight_func=IDW_weight)


<a id='kriging'></a>
# Kriging
Exploit the pykrige library

In [None]:
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
from pykrige.ok import OrdinaryKriging
from shapely.geometry import Point

def kriging_interpolation(gdf, value_column, variogram_model='linear', grid_resolution=100):

    # Ensure the geometry column contains Points
    if not all(gdf.geometry.type == 'Point'):
        raise ValueError("The GeoDataFrame should only contain Point geometries.")

    # Convert the GeoDataFrame to the appropriate projection for spatial analysis (meters)
    gdf_projected = gdf.to_crs(epsg=3003)  # Use Italy 1 Mercator 
    # Extract coordinates (X, Y) and the values to interpolate
    X = np.array([[point.x, point.y] for point in gdf_projected.geometry])
    y = gdf_projected[value_column].values
    # Define the area for interpolation by getting the bounding box of the input points
    xmin, ymin, xmax, ymax = gdf_projected.total_bounds
    # Create a grid over the area of interest
    x_grid = np.linspace(xmin, xmax, grid_resolution)
    y_grid = np.linspace(ymin, ymax, grid_resolution)
    xv, yv = np.meshgrid(x_grid, y_grid)

    
    # Set up the Kriging model
    kriging_model = OrdinaryKriging(
        X[:, 0], X[:, 1], y, variogram_model=variogram_model, verbose=False, enable_plotting=False
    )
    # Perform the interpolation on the grid
    z_pred, ss = kriging_model.execute("grid", x_grid, y_grid)

    
    # Plot the original points and the interpolated map
    fig, ax = plt.subplots(figsize=(10, 8))
    
    # Plot the interpolated values as a heatmap (use a colormap like viridis or coolwarm)
    img = ax.imshow(z_pred, extent=(xmin, xmax, ymin, ymax), origin='lower', cmap='coolwarm', alpha=0.6)

    # Overlay the original points, colored by their actual values
    sc = ax.scatter(
        X[:, 0], X[:, 1], c=y, cmap='coolwarm', edgecolor='k', s=50, label='Known Points', zorder=10
    )

    # Add title and labels
    plt.title(f'Ordinary Kriging Interpolation ({variogram_model.capitalize()} Variogram)')
    plt.xlabel('Longitude')
    plt.ylabel('Latitude')
    plt.legend()

    plt.show()


In [None]:
gdf = gpd.read_file("data/sf_bay_rainfall/sf_bay_rainfall.shp")
kriging_interpolation(gdf, value_column='VALUE', variogram_model='linear', grid_resolution=100)