In [4]:
import os
import math
import numpy as np
import gdal
from copy import deepcopy


# Helper function
def read_raster(file):
    dataSource = gdal.Open(file)
    band = dataSource.GetRasterBand(1)
    band = band.ReadAsArray()
    return(dataSource, band)


def identicalList(inList):
    global logical
    inList = np.array(inList)
    logical = inList == inList[0]
    if sum(logical) == len(inList):
        return True
    else:
        return False


class LSI:
    def __init__(self, image):
        self.ls1_ds, self.ls1_arr = readraster(image)
        self.perform_checks()

    def perform_checks(self):
        self.row, self.col = self.ls1_ds.RasterYSize, self.ls1_ds.RasterXSize


class Growth_Factors:
    def __init__(self, *args):
        self.gf = dict()
        self.gf_ds = dict()
        self.nFactors = len(args)
        n = 1
        for file in args:
            self.gf_ds[n], self.gf[n] = readraster(file)
            n += 1
        self.perform_checks()

    def perform_checks(self):
        print("\nChecking the size of input growth factors...")
        rows = []
        cols = []
        for n in range(1, self.nFactors+1):
            rows.append(self.gf_ds[n].RasterYSize)
            cols.append(self.gf_ds[n].RasterXSize)
        if (identicalList(rows) == True) and ((identicalList(cols) == True)):
            print("Input factors have same row and column value.")
            self.row = self.gf_ds[n].RasterYSize
            self.col = self.gf_ds[n].RasterXSize
        else:
            print("Input factors have different row and column value.")

In [None]:
class Cellular_Automata():

    def __init__(self, landslide, growth_factors):
        self.factors = growth_factors
        self.perform_checks()
        self.kernelSize = 3

    def perform_checks(self):
        self.row = self.factors.row
        self.col = self.factors.col

    def predict(self):
        self.predicted = deepcopy(self.landslide.arr_lc1)
        sideMargin = math.ceil(self.kernelSize/2)
        for y in range(sideMargin,self.row-(sideMargin-1)):
            for x in range(sideMargin,self.col-(sideMargin-1)):
                kernel = self.landslide.arr_lc1[y-(sideMargin-1):y+(sideMargin), x-(sideMargin-1):x+(sideMargin)]
                builtupCount = sum(sum(kernel==1))
                #If the number of built-up cells greater than equal to assigned threshold
                #if (builtupCount >= self.builtupThreshold):
                if (builtupCount >= self.builtupThreshold) and (self.factors.gf[5][y,x] != 1):  # Adding exception for the restricted areas
                    for factor in range(1,self.factors.nFactors+1):
                #If the assigned thresholds are less than zero, then the smaller than rule applies, else greater than
                        if self.threshold[factor-1] < 0:
                            if (self.factors.gf[factor][y,x] <= abs(self.threshold[factor-1])):
                                self.predicted[y, x] = 1
                            else:
                                pass
                        elif self.threshold[factor-1] > 0:
                            if (self.factors.gf[factor][y,x] >= self.threshold[factor-1]):
                                self.predicted[y, x] = 1
                            else:
                                pass
                if (y%500==0) and (x%500==0):
                    print("Row: %d, Col: %d, Builtup cells count: %d\n" % (y, x, builtupCount), end="\r", flush=True)
                    
    def exportPredicted(self, outFileName):
        driver = gdal.GetDriverByName("GTiff")
        outdata = driver.Create(outFileName, self.col, self.row, 1, gdal.GDT_UInt16) # option: GDT_UInt16, GDT_Float32
        outdata.SetGeoTransform(self.landcovers.ds_lc1.GetGeoTransform())
        outdata.SetProjection(self.landcovers.ds_lc1.GetProjection())
        outdata.GetRasterBand(1).WriteArray(self.predicted)
        outdata.GetRasterBand(1).SetNoDataValue(0)        
        outdata.FlushCache() 
        outdata = None

In [26]:
if __name__ == "__main__":
    
    flow_dir_ls1 = r"D:\ms gme\thesis\final parameters\ca\data\flow_dir_ls1.tif"
    lsi_ls1 = r"D:\ms gme\thesis\final parameters\ca\data\MGD_lsi2_ls1_clip.tif"
    landslide_1 = r"D:\ms gme\thesis\final parameters\ca\data\ls1_raster.tif"
    
    ls1 = LSI(landslide_1)
    
    growth_factors = Growth_Factors(lsi_ls1, flow_dir_ls1)
    
    model = Cellular_Automata(ls1, growth_factors)
    
    model.predict()
    
    model.exportPredicted

In [237]:
import os
import gdal
import numpy as np

def read_raster(file):
    dataSource = gdal.Open(file)
    band = dataSource.GetRasterBand(1).ReadAsArray()
    return (dataSource, band)
    
class Factors:
    def __init__(self, **params):   # keys are flow_dir, lsi, and initial_cell
        self.flow_dir_ds, self.flow_dir_arr = read_raster(params["flow_dir"])
        self.lsi_ds, self.lsi_arr = read_raster(params["lsi"])
        self.initial_ls = read_raster(params["initial_cell"])[1]  # get only the array
        self.check_shape()
        
    def check_shape(self):
        print("Checking shapes of input growth factors...")
        if self.flow_dir_arr.shape == self.lsi_arr.shape == self.initial_ls.shape:
            self.row = self.flow_dir_arr.shape[0]
            self.col = self.flow_dir_arr.shape[1]
            print("All factors have the same shape.")
        else:
            print("Shape of factors does not match.")
        

class Cellular_Automata:
    def __init__(self, factors, neigh_size):
        self.factors = factors
        self.neigh_size = neigh_size
        
    def simulate(self):
        print("\nSimulating landslide...")
        # landslide base
        self.landslide_predicted = np.zeros_like(self.factors.initial_ls)
        
        col = self.neigh_size - 1
        center =  int((self.neigh_size - 2) - (self.neigh_size - 3)/2)
        
        for i in range(1, self.factors.row - 1):
            # skip no data values
            if self.factors.initial_ls[i].mean() == 0:
                continue
            for j in range(1, len(self.factors.initial_ls[i]) - 1):
                try:
                    # flow dir kernel
                    kernel_fd = flow_dir[i-1:i+col, j-1:j+col]
                    # LSI kernel
                    kernel_LSI = LSI[i-1:i+col, j-1:j+col]

                    # kernel where means will be computed
                    kernel_no_center_fd = kernel_fd.copy()
                    kernel_no_center_LSI = kernel_LSI.copy()
                    kernel_no_center_fd[center, center] = 0
                    kernel_no_center_LSI[center, center] = 0

                    mean_fd = kernel_no_center_fd.mean()
                    mean_LSI = kernel_no_center_LSI.mean()
                    # Rule for landslide transition
                    if mean_fd <= 5 and mean_fd >= 3.5 and mean_LSI <= 5 and mean_LSI >= 3.5:
                        self.landslide_predicted[i][j] = 1

                # if raser dimension is not divisible by the kernel size
                except IndexError:
                    continue    

        return self.landslide_predicted
    
    def export_predicted(self, out_fn):
        print("\nExporting array to GTiff...")
        driver = gdal.GetDriverByName("GTiff")
        out_data = driver.Create(out_fn, self.factors.col, self.factors.row, 1, gdal.GDT_UInt16) # option: GDT_UInt16, GDT_Float32
        out_data.SetGeoTransform(self.factors.lsi_ds.GetGeoTransform())
        out_data.SetProjection(self.factors.lsi_ds.GetProjection())
        out_data.GetRasterBand(1).WriteArray(self.landslide_predicted)
        out_data.GetRasterBand(1).SetNoDataValue(-99999)        
        out_data.FlushCache() 
        out_data = None

        # check if exported 
        if os.path.isfile(out_fn):
            print("Raster exported.")
        else:
            print("Failed to export raster.")
    

In [238]:
if __name__ == "__main__":
    flow_dir_ls1 = r"D:\ms gme\thesis\final parameters\ca\data\flow_dir_ls1.tif"
    ls1_raster = r"D:\ms gme\thesis\final parameters\ca\data\ls1_raster.tif"
    lsi_ls1 = r"D:\ms gme\thesis\final parameters\ca\data\MGD_lsi2_ls1_clip.tif"

    # Initialize factors
    growth_factors = Factors(flow_dir=flow_dir_ls1, lsi=lsi_ls1, initial_cell=ls1_raster)

    # Initialize CA
    kernel = 3
    ca_model = Cellular_Automata(growth_factors, kernel)

    ca_model.simulate()

    # Export 
    out_fn = f"D:/ms gme/thesis/final parameters/ca/data/output/ls1_3x3_new.tif"
    ca_model.export_predicted(out_fn)


Checking shapes of input growth factors...
All factors have the same shape

Simulating landslide...

Exporting array to GTiff...
Raster exported.


In [136]:
flow_dir_ls1 = r"D:\ms gme\thesis\final parameters\ca\data\flow_dir_ls1.tif"
ls1_raster = r"D:\ms gme\thesis\final parameters\ca\data\ls1_raster.tif"
lsi_ls1 = r"D:\ms gme\thesis\final parameters\ca\data\MGD_lsi2_ls1_clip.tif"

# read ds
flow_dir_ds = read_raster(flow_dir_ls1)
ls1_ds = read_raster(ls1_raster)
lsi_ds = read_raster(lsi_ls1)

# variables
ks = 7
ls1 = ls1_ds[1].astype(int)
flow_dir = flow_dir_ds[1].astype(int)
LSI = lsi_ds[1].astype(int)
kernel = np.zeros((ks, ks))

# landslide extent
ls1_final = np.zeros_like(ls1)

# perform ca based on flow_dir and LSI
col = ks - 1
center =  int((ks - 2) - (ks - 3)/2)
for i in range(1, len(ls1) - 1):
    # skip no data values
    if ls1[i].mean() == 0:
        continue
    for j in range(1, len(ls1[i]) - 1):
        try:
            # flow dir kernel
            kernel_fd = flow_dir[i-1:i+col, j-1:j+col]
            # LSI kernel
            kernel_LSI = LSI[i-1:i+col, j-1:j+col]

            # kernel where means will be computed
            kernel_no_center_fd = kernel_fd.copy()
            kernel_no_center_LSI = kernel_LSI.copy()
            kernel_no_center_fd[center, center] = 0
            kernel_no_center_LSI[center, center] = 0

            mean_fd = kernel_no_center_fd.mean()
            mean_LSI = kernel_no_center_LSI.mean()
            # Rule for landslide transition
            if mean_fd <= 5 and mean_fd >= 3.5 and mean_LSI <= 5 and mean_LSI >= 3.5:
                ls1_final[i][j] = 1
        
        # if raser dimension is not divisible by the kernel size
        except IndexError:
            continue

In [137]:
def export_predicted(out_fn, array):
    ref = gdal.Open(r"D:\ms gme\thesis\final parameters\ca\data\ls1_raster.tif")
    driver = gdal.GetDriverByName("GTiff")
    outdata = driver.Create(out_fn, 286, 226, 1, gdal.GDT_UInt16) # option: GDT_UInt16, GDT_Float32
    outdata.SetGeoTransform(ref.GetGeoTransform())
    outdata.SetProjection(ref.GetProjection())
    outdata.GetRasterBand(1).WriteArray(array)
    outdata.GetRasterBand(1).SetNoDataValue(-99999)        
    outdata.FlushCache() 
    outdata = None

In [138]:
fn = f"D:/ms gme/thesis/final parameters/ca/data/output/ls1_{ks}x{ks}.tif"
export_predicted(fn, ls1_final)
if os.path.isfile(fn):
    print("Raster exported.")
else:
    print("Failed to export raster.")

Raster exported.


In [None]:
# masked data in numpy
import numpy as np

image = gdal.Open(fn)
band = image.GetRasterBand(1)
rasterArray = band.ReadAsArray()

# Get nodata value from the GDAL band object
nodata = band.GetNoDataValue()

#Create a masked array for making calculations without nodata values
rasterArray = np.ma.masked_equal(rasterArray, nodata)
print(type(rasterArray))

# Check again array statistics
print(rasterArray.min())