Created Tue Mar 29 13:27:47 2022 ; Last updated 02/14/2024

@author: **Michelle M. Fink
         Colorado Natural Heritage Program, Colorado State University**

Purpose: Semi-automate Least-Cost Corridor analysis using ArcPro arcpy

----
License: GNU General Public License version 3.
 This script is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation, either version 3 of the License, or
 (at your option) any later version.

 This program is distributed in the hope that it will be useful,
 but WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 GNU General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with this program.  If not, see https://www.gnu.org/licenses/

# Set up

You either have to open and run this from within an ArcPro project, or use conda to clone the arcpro environment and run the notebook from there. This of course requires you to have Anaconda or Miniconda installed. 

If the latter, make a copy of C:\Program Files\ArcGIS\Pro\bin\Python\res\environment.yaml and edit the `name` and `prefix` lines. Then create a new conda environment with the file (e.g., `conda env create -n arcpro --file myenviron.yaml` )


In [None]:
import os
import numpy as np
from osgeo import osr, gdal
import arcpy
from arcpy import env
from arcpy import da
from arcpy import sa
arcpy.CheckOutExtension("Spatial")

## environment and variables

In [None]:
#Set environments
strWorkspace = r"H:\CPW_Statewide_Habt"
env.workspace = strWorkspace
env.scratchWorkspace = r"H:\CPW_Statewide_Habt\Default.gdb"
permRas = "Elk_permeability02122024.tif" #permeability raster
env.snapRaster = permRas
env.extent = permRas
env.overwriteOutput = True
env.cellSize = "30"
env.compression = "LZW"
env.pyramid = "NONE"
env.rasterStatistics = "STATISTICS 1 1"
env.parallelProcessingFactor = "100%"
env.resamplingMethod = "BILINEAR"

#Set parameters
usestones = False
costname = os.path.join(strWorkspace, "Elk_cost_Run3Feb24.tif") #output name
elevRas = r"D:\GIS\States\CO\NED\filled_ned_30m.tif" #elevation raster
vf = "VF.txt" #Vertical factor for least-cost analysis
spp = "Elk"
maxdist = 60000

Cores are expected to be polygon shapefiles with attributes "CoreID", "ToID", "Patch" (`"Patch" == 'core'`)
if `usestones = True` "Stepping-stones" need to be in the fromCores and/or toCores shapefile with attribute `"Patch" == 'step'`. Stepping-stones are treated as places of rest on the way to the destination, and as such are given 0 cost in the code below. Like so:

| CoreID | ToID | Patch
| ------ | ------ | ------ |
| 606 | 609 | core |
| 434 | 61 | core |
| 105 | 0 | step |

*However* for testing, or if you're not using Stepping-stones, it's a lot quicker to use a single point shapefile with attributes "Pt_ID", "FromPt", "ToPt".
If it is a "To" point, only CoreID should have a non-zero value. Like so:

| Pt_ID | FromPt | ToPt |
| ----- | ----- | ----- |
| 1 | 1 | 2 |
| 2 | 0 | 0 |
| 3 | 3 | 4 |
| 4 | 0 | 0 |
| 5 | 0 | 0 |

Run the cell below for points (changing file names as required *of course*):

In [None]:
# Single point shapefile
fromTopts = "Test_FromTo.shp"
ispoints = True

Run the cell below for polygons:

In [None]:
fromCores = "WinterFromPolys.shp"
toCores = "NotWinter_Cores_and_steps.shp"
ispoints = False

**A Note on maximum distance**  
A maximum distance (`maxdist`) of 60 cost-distance km is initially used, based on seasonal movement estimates for elk[^1]. It can always be increased if necessary. 

**Vertical Factor**  
Contents of the file, VF.txt:  

|   .    |   .    |
| ------ | ------ |
| -90 | -1 |
| -80 | 0.2 |
| -70 | 0.3 |
| -60 | 0.4 |
| -50 | 0.5 |
| -40 | 0.6 |
| -30 | 0.7 |
| -20 | 0.8 |
| -10 | 0.9 |
| 0 | 1 |
| 10 | 1 |
| 20 | 1 |
| 30 | 1 |
| 40 | 1 |
| 50 | 1 |
| 60 | 1 |
| 70 | 1 |
| 80 | 1 |
| 90 | -1 |

the values on the left are slope, in degrees (and direction). On the right are the cost-distance modifiers. The idea being that it takes very little effort to fall off a cliff XD, and downhill slopes in general are easier to travel. Uphill should already be taken into account via the permeability layer. (-1 = Inf = barrier)

## Functions

In [None]:
def raster2array(ras_name):
    """
    Convert a geoTIFF to a 2D numpy array.
    You could also do this with arcpy.RasterToNumPyArray
    ras_name: string, path/name of raster
    """
    ras_tif = gdal.Open(ras_name)
    ras_band = ras_tif.GetRasterBand(1)
    ras_data = ras_band.ReadAsArray()
    ras_tif is None
    ras_ary = ras_data
    return ras_ary

def perm_to_cost(perm, stones=None):
    """
    Convert a 0.5-5 Permeability raster into a Cost raster
    perm: string, path/name of permeability raster
    stones: string, path/name of stepping-stone raster, if any
    """
    if stones is None:
        cost = (sa.Raster(perm) / 5) ** -3
    else:
        rvsdperm = sa.Con(sa.IsNull(stones), sa.Raster(perm), sa.Raster(perm) + 1)
        cost = (rvsdperm / 5) ** -3
    return cost

def tryslice(rasCorridor):
    """
    Equation to use to attempt to auto-delimit least-cost corridors
    rasCorridor: string, path/name of generated cost accumulation raster
    """
    ary = raster2array(rasCorridor)
    mskary = np.ma.masked_less_equal(ary, 0)
    arymin = np.ma.min(mskary)
    cor1 = (1.0195 * arymin) + 114.44
    cor0 = cor1 - 200
    cor2 = cor1 + 200

    print("Minimum: " + str(arymin))
    print([cor0, cor1, cor2])
    return [cor0, cor1, cor2]

def create_corridors(fromRas, toRas, species, maxdist, pth):
    """
    Get unique pairs of Cores and generate corridors for each pair
    fromRas: string, path/name of the originating Core areas raster
    toRas: string, path/name of the destination Core areas raster
    species: string, name to use as prefix of generated corridor files
    maxdist: integer, maximum distance to use, in cost-meters
    pth: string, path to save all this to
    """
    corID = "_to".join([os.path.basename(fromRas).strip(species+".tif"), 
                         os.path.basename(toRas).strip(species+".tif")])
    outName1 = species + "_Accum" + corID + ".tif"
    outName2 = species + "_Corridors" + corID + ".tif"
    if os.path.isfile(os.path.join(pth, outName1)):
        accumRas = sa.Raster(os.path.join(pth, outName1))
    else:
        accumRas = sa.Raster(fromRas) + sa.Raster(toRas)
        outRas1 = sa.Con(accumRas <= maxdist, accumRas, 0)
        outRas1.save(os.path.join(pth, outName1))
        
    print("    " + outName1 + " saved")
    thresh = tryslice(os.path.join(pth, outName1))
    try:
        outRas2 = sa.Con((accumRas > 0) & (accumRas <= thresh[0]), 0,
                         sa.Con((accumRas > thresh[0]) & (accumRas <= thresh[1]), 1,
                                sa.Con((accumRas > thresh[1]) & (accumRas <= thresh[2]), 2)))
        outRas2.save(os.path.join(pth, outName2))
        print("    " + outName2 + " saved")
        
    except TypeError:
        print("Cannot create " + outName2)
        
def fname(sp, fstr=None, tstr=None):
    """
    sp: string, species name/ID
    fstr: string, from core or point
    tstr: string, to core or point
    
    Returns output filename
    """
    if fstr is not None:
        outname = "_".join([sp, "PD", fstr])
        
    if tstr is not None:
        outname = "_".join([sp, "PD", tstr])
        
    return outname

def pDistance(sp, fstr, tstr, lyr, fldname, ispoints, **kwargs):
    msg = "Something went wrong"
    fromRas = os.path.join(kwargs["path"], fname(sp, fstr=fstr) + "PD.tif")
    toRas = os.path.join(kwargs["path"], fname(sp, tstr=tstr) + "PD.tif")
        
    if not os.path.isfile(fromRas):
        if ispoints:
            lyrname = lyr
        else:
            lyrname = "from_" + lyr
            
        frmCore = arcpy.SelectLayerByAttribute_management(lyrname, "NEW_SELECTION", fldname + " = " + fstr)
        frmDist = sa.PathDistance(frmCore, kwargs["cost"], kwargs["elev"], "#", "#", kwargs["elev"], 
                                  sa.VfTable(os.path.join(kwargs["path"], kwargs["vf"])))
        frmDist.save(fromRas)
        msg = "Created " + fromRas
    else:
        msg = fromRas + " already exists, skipping"
            
    if not os.path.isfile(toRas):
        if ispoints:
            lyrname = lyr
        else:
            lyrname = "to_" + lyr
        toCore = arcpy.SelectLayerByAttribute_management(lyrname, "NEW_SELECTION", fldname + " = " + tstr)
        toDist = sa.PathDistance(toCore, kwargs["cost"], kwargs["elev"], "#", "#", kwargs["elev"],
                                 sa.VfTable(os.path.join(kwargs["path"], kwargs["vf"])))
        toDist.save(toRas)
        msg = "Created " + toRas
    else:
        msg = toRas + " already exists, skipping"
    
    return [msg, fromRas, toRas]

**A Note on the tryslice equation**  
This equation is based on research by the author of the relationship between the minimum value present in the least-cost accumulation raster and the value threshold needed to create a truly least-cost corridor. No single set of parameters works in all situations, unfortunately, and frankly I need a better workflow here.  

# Main

Do note that I have zero error checking here, and there are probably quite a few opportunities for code efficiencies (I don't use python nearly as frequently as R, and I'm always writing code in a hurry).

In [None]:
if ispoints:
    arcpy.MakeFeatureLayer_management(fromTopts, "allpts")
    stone = None
else:
    arcpy.MakeFeatureLayer_management(fromCores, "from_core")
    arcpy.MakeFeatureLayer_management(toCores, "to_core")
    if usestones:
        if not os.path.isfile(os.path.join(strWorkspace, "stoneRas.tif")):
            print("Make Stepping Stone raster")
            fromsteps = arcpy.SelectLayerByAttribute_management("from_core", "NEW_SELECTION", "Patch = 'step'")
            tosteps = arcpy.SelectLayerByAttribute_management("to_core", "NEW_SELECTION", "Patch = 'step'")
            arcpy.Merge_management([fromsteps, tosteps], "tempstone")
            arcpy.conversion.FeatureToRaster("tempstone", "Patch", "stoneRas.tif")
        stone = "stoneRas.tif"
    else:
        stone = None

if os.path.isfile(costname):
    print("Found cost layer, using " + costname)
    costRas = costname
else:
    print("Converting Permeability to Cost")
    costRas = perm_to_cost(permRas, stones=stone)
    costRas.save(costname)

**NOTE:** The next 2 cells take a lot of time and computer resources. A Path Distance raster must be created for *each* From and To point/core, and then an Accumulated Distance raster for each From-To pair. Each output will be as large (disk size wise) as the input permeability layer (~ 1.3 GB for statewide). On my machine, each Path Distance takes around 5 minutes to create @ 100% CPU.

In [None]:
#Iterate through core areas and get corridors for each
print("On to all the Path Distances")

kwargs = {"cost":costRas, "elev":elevRas, "path":strWorkspace, "vf":vf}

if ispoints:
    arcpy.MakeTableView_management(fromTopts, "pt_tbl")
    
    with da.SearchCursor("pt_tbl", ["FromPt", "ToPt"]) as curSrch:
        for row in curSrch:
            if row[0] == 0:
                pass
            else:
                valstr = str(row[0])
                constr = str(row[1])
                theresult = pDistance(spp, valstr, constr, 
                                      "allpts", "Pt_ID", ispoints, **kwargs)
                print(theresult[0])
                create_corridors(theresult[1], theresult[2], spp, maxdist, strWorkspace)
    
else:
    arcpy.MakeTableView_management(fromCores, "from_tbl")

    with da.SearchCursor("from_tbl", ["CoreID", "ToID"]) as curSrch:
        for row in curSrch:
            #make a Path Distance raster from each Core
            valstr = str(row[0])
            constr = str(row[1])
            theresult = pDistance(spp, valstr, constr, 
                                  "core", "CoreID", ispoints, **kwargs)
            print(theresult[0])
            create_corridors(theresult[1], theresult[2], spp, maxdist, strWorkspace)


If the corridor function **above** does not create a contiguous least-cost path From and To, you will need to use the saved Accumulated Distance raster to manually identify the least-cost corridor. Pay attention to the printed output of the minimum cost-distance value and attempted corridor cost-distance values and use those as starting points.

This is the point where this process *really* needs help because boy is it tedious to manually identify the corridors. I have wanted to add a process based on least-cost *path* (see Arcpro help) to ensure a contiguous corridor, but have never had the time to work it out.

## Clean-up

In [None]:
del curSrch

arcpy.CheckInExtension("Spatial")

[^1]: Armstrong, D.M., J.P. Fitzgerald, and C.A. Meaney. 2011. Mammals of Colorado, 2nd edition. Denver Museum of Nature and Science and University Press of Colorado, Boulder, CO.