# Libraries

For mapping trained models are used. Image segmentation is performed after the same prepatation, as was made for data for algorithm training:
1) getting additional information from bands -- spectral indicies;
2) data normalisation

In [1]:
#libraries

import os
import pandas as pd
import numpy as np
import rasterio as rio
from itertools import product
from rasterio import windows
import matplotlib.pyplot as plt
from sklearn import preprocessing

from osgeo import gdal
from osgeo.gdalconst import GDT_Int16

import joblib

In [2]:
import warnings
warnings.filterwarnings('ignore')

# Helpers

In [3]:
col_names = ['B1',
 'B2',
 'B3',
 'B4',
 'B5',
 'B6',
 'B7',
 'B8',
 'B8A',
 'B9',
 'B11',
 'B12']

col_names_full = ['B1',
 'B2',
 'B3',
 'B4',
 'B5',
 'B6',
 'B7',
 'B8',
 'B8A',
 'B9',
 'B11',
 'B12',
 'ndvi',
 'evi',
 'savi',
 'msi',
 'bsi']

In [4]:
def NDVI(red: pd.Series, nir: pd.Series):
    ndvi = (nir - red) / ((nir + red).apply(lambda x: 0.000001 if x == 0 else x))
    return ndvi

def EVI(red: pd.Series, nir: pd.Series, blue: pd.Series):
    evi = (2.5 * ((nir - red) / (nir + 6 * red - 7.5 * blue + 1)).apply(lambda x: 0.000001 if x == 0 else x))
    return evi
    

def SAVI(red: pd.Series, nir: pd.Series):  
    savi = ((nir - red) / 1.428*(nir + red + 0.428).apply(lambda x: 0.000001 if x == 0 else x))
    return savi
    

def MSI(nir: pd.Series, thermal2: pd.Series): 
    msi = ((thermal2/nir).apply(lambda x: 0.000001 if x == 0 else x))
    return msi
    

def BSI(red: pd.Series, nir: pd.Series, thermal2: pd.Series, blue: pd.Series):  
    bsi = (((thermal2+red)-(nir+blue))/((thermal2+red)+(nir+blue)).apply(lambda x: 0.000001 if x == 0 else x))
    return bsi
    

def get_spectral_indices(df: pd.DataFrame) -> pd.DataFrame:
    blue = df['B2']
    #green = df['B3']
    red = df['B4']
    nir = df['B8']
    thermal2 = df['B11']
    
    df.loc[:, "NDVI"] = NDVI(red=red, nir=nir)
    df.loc[:, "EVI"] = EVI(red=red, nir=nir, blue=blue)
    df.loc[:, "SAVI"] = SAVI(red=red, nir=nir)
    df.loc[:, "MSI"] = MSI(nir=nir, thermal2=thermal2)
    df.loc[:, "BSI"] = BSI(thermal2=thermal2, red=red, nir=nir, blue=blue)
    return df


In [5]:
def to_2d_array(x: np.ndarray)->np.ndarray: 
    return x.reshape(x.shape[0], x.shape[1] * x.shape[2])

In [6]:
def save_tif(raster_input:str, raster_output:str, values:np.array):
    in_data, out_data = None, None
    in_data = gdal.Open(raster_input)
    if in_data is None:
        print ('Unable to open %s' % raster_input)
    band1 = in_data.GetRasterBand(1)
    rows = in_data.RasterYSize
    cols = in_data.RasterXSize
    driver = in_data.GetDriver()
    out_data = driver.Create(raster_output, cols, rows, 1, GDT_Int16)
    dem_data = np.array(values)
    out_band = out_data.GetRasterBand(1)
    out_band.WriteArray(dem_data)
    out_band.FlushCache()
    out_band.SetNoDataValue(-1)

    out_data.SetGeoTransform(in_data.GetGeoTransform())
    out_data.SetProjection(in_data.GetProjection())
    del out_data
    return 'Done'

In [8]:
def get_dataset(x: np.ndarray)->pd.DataFrame:
    bands = x[:12, ...]
    bands = to_2d_array(x[:12, ...]) 
    raw_data = pd.DataFrame(bands.T, columns=col_names)
    df_ = get_spectral_indices(raw_data)
    df_.replace([np.inf, -np.inf], np.nan, inplace=True)
    min_max_scaler = preprocessing.MinMaxScaler()
    x_scaled = min_max_scaler.fit_transform(df_.values)
    df = pd.DataFrame(x_scaled, columns = col_names_full)
    return df

# Mapping

## Models read

In [9]:
svc_model = joblib.load('svc_best_model.joblib')
rf_model = joblib.load('rf_best_model.joblib')
knn_model = joblib.load('knn_best_model.joblib')

In [10]:
svc_model_w = joblib.load('svc_worst_model.joblib')
rf_model_w = joblib.load('rf_worst_model.joblib')
knn_model_w = joblib.load('knn_worst_model.joblib')

## Getting classified raster

In [11]:
fname = 'data/2020_kola_median_composite.tif' #Sentinel-2 image

In [12]:
#function for pixels classification
def simple_classifier(df: pd.DataFrame)->np.ndarray:
    null_sample = df[df.isnull().any(axis=1)]
    predict_sample = df[~df.isnull().any(axis=1)]
    predict_sample['class'] = knn_model_w.predict(predict_sample)
    null_sample['class'] = 0
    fin_sample = pd.concat([predict_sample, null_sample], sort=False).sort_index()
    mask = fin_sample['class']
    return mask.values #return np array

In [14]:
with rio.open(fname, 'r+') as src:
    x = src.read() #raster read
    df = get_dataset(x) #raster to dataframe
    predictions = simple_classifier(df) #dataframe classification
    cover_tile = predictions.reshape((x.shape[1], x.shape[2])) #reshaping array to the shape of the raster
    output_mask=cover_tile

raster_output = 'knn_model_w_map.tif' #output file name
status = save_tif(raster_input=fname, raster_output=raster_output, values=output_mask)
print(status)

Done


In [15]:
# getting tif file for forest mask if the size of the file is big

#def get_tiles(ds, width=256, height=256):
#    nols, nrows = ds.meta['width'], ds.meta['height']
#    offsets = product(range(0, nols, width), range(0, nrows, height))
#    big_window = windows.Window(col_off=0, row_off=0, width=nols, height=nrows)
#    for col_off, row_off in  offsets:
#        window =windows.Window(col_off=col_off, 
#                               row_off=row_off, 
#                               width=width, 
#                               height=height).intersection(big_window)
#        transform = windows.transform(window, ds.transform)
#        yield window, transform


#tile_width, tile_height = 512, 512

#with rio.open(fname, 'r+') as src:
#    meta = src.meta.copy()
#    output_mask = np.zeros(shape = (meta['height'], meta['width']))
#    for window, transform in get_tiles(src, 
#                                       width=tile_width,
#                                       height=tile_height):
#        meta['transform'] = transform
#        meta['width'], meta['height'] = window.width, window.height
        
#        x = src.read(window=window)
#        df = get_dataset(x)
        
#        predictions = simple_classifier(df)
#        cover_tile = predictions.reshape(meta['height'], meta['width'])
#        ranges = window.toranges()
#        output_mask[ranges[0][0]:ranges[0][1],ranges[1][0]:ranges[1][1]] = cover_tile

#raster_output = 'prediction_forest_knn_worst.tif'
#status = save_tif(raster_input=fname, raster_output=raster_output, values=output_mask)
#print(status)