# Process a raster into a large collection of points, then process those points using Dask to clip and spatial join key information from the study's focus area

This notebook exports each raster pixel to a csv file that contains its coordinates and population count, as well as the administrative units within which it lies.
This process can take some time, so go and relax and enjoy a nice cup of coffee - you've earned it.

In [None]:
import os, sys
os.environ['USE_PYGEOS'] = '0'

import re
import numpy as np

import rasterio
from rasterio import features, transform
from rasterio.mask import mask
from rasterio.transform import Affine
from rasterio.warp import calculate_default_transform, reproject, Resampling

import pandas as pd
import geopandas as gpd

import shapely
from shapely.geometry import shape, box, Polygon, Point

import json

### Setup

In [None]:
data_root = 'D:\\github_test\\'

##################################################################
##################################################################
#read project input parameters 
data_file = data_root + 'project_data.json'

##################################################################
##################################################################
#read project variables so that we have our parameters and file locations
with open(data_file, 'rb') as f:
    data_loaded = json.load(f)
f.close()

##################################################################
##################################################################
#read information from the project setup file that's relevant to this section of code
#imports
local_boundaries_folder = data_loaded['local_boundaries_folder']
local_population_folder = data_loaded['local_population_folder']
dest_crs = data_loaded['dest_crs']
dest_crs_id = data_loaded['dest_crs_id']
buffer_m = data_loaded['buffer_m']
level = data_loaded['level']
max_level = data_loaded['max_level']
if level != 'custom':
    shapefile_adm_field = data_loaded['shapefile_adm_field']
    adm_name = data_loaded['adm_name']

### Set up dask cluster (if this is/will be a lot points)

In [None]:
import dask
import coiled
from dask.distributed import Client, LocalCluster, Lock
from dask.utils import SerializableLock
import dask.dataframe as dd

from dask_control import *

In [None]:
client=get_dask_client(cluster_type='local',n_workers=4,processes=True,threads_per_worker=4)

### Load and process raster to points

Load in the population raster we are using so we process its points

In [None]:
pop_pth = local_population_folder

pop_file = sorted([os.path.join(pop_pth,file) \
            for file \
            in os.listdir(pop_pth) \
            if file.endswith(".tif")])

pop_file = pop_file[0]

In [None]:
# Read the population raster file
with rasterio.open(pop_file) as src:
    # Read the raster data
    raster_array = src.read(1).astype(np.float32)  # Assuming a single band raster
    transform = src.transform
    crs = src.crs

    nonzero_cell_count = 0
    for _, window in src.block_windows(1):
        data = src.read(1, window=window)
        # Count nonzero and non-null cells
        nonzero_cell_count += ((data > 0) & (data != src.nodata)).sum()
    del data
    
# Get the height and width of the raster
height, width = raster_array.shape

# Initialize lists to store points and values
points_lon = [None for _ in range(nonzero_cell_count)]
points_lat = [None for _ in range(nonzero_cell_count)]
values = [None for _ in range(nonzero_cell_count)]

In [None]:
# Iterate over each pixel
# Thi can take some time
list_count = 0
for row in range(height):
    if row % 1000 == 0:
        print('busy converting pixels: ' + str(round((row/height)*100)) + '%')
    for col in range(width):
        # Get the pixel value
        value = raster_array[row, col]
        if (value > 0) & (value != src.nodata):
            # Convert pixel coordinates to geographic coordinates
            lon, lat = rasterio.transform.xy(transform, row, col)
            points_lon[list_count] = lon
            points_lat[list_count] = lat
            values[list_count] = value
            list_count += 1    

print('pixels conversion complete')

del raster_array

In [None]:
data = {'lon_' + dest_crs_id : points_lon,
        'lat_' + dest_crs_id: points_lat,
        'VALUE': values}

# Convert data to a GeoDataFrame with point geometries
geometry = [Point(lon, lat) for lon, lat in zip(data['lon_' + dest_crs_id], data['lat_' + dest_crs_id])]
pts = gpd.GeoDataFrame(data, geometry=geometry, crs=dest_crs)

In [None]:
#avoid unnecessary pixels in case there is some funny format stuff going on with raster
# this step isn't really necessary anymore, but we keep it anyways just in case - it does not take up too much time. better safe than sorry.
pts = pts[pts.VALUE > 0]
pts = pts[pts.VALUE.notnull()]

In [None]:
pts = pts.set_crs(dest_crs_id)
pts = pts.to_crs(4326)
pts['lon_4326'] = pts.geometry.x
pts['lat_4326'] = pts.geometry.y

pts = pts.to_crs(dest_crs_id)

pts = pts[['VALUE','lon_4326','lat_4326','lon_' + dest_crs_id,'lat_' + dest_crs_id]]

In [None]:
import dask.dataframe as dd
pts_dd = dd.from_pandas(pts,chunksize=100000)
# pts_dd.to_csv(local_population_folder+'population_tabular_raw.csv', header=True, index=True, single_file=True)

float64_cols = pts_dd.select_dtypes(include='float64').columns
pts_dd = pts_dd.map_partitions(lambda pts_dd: pts_dd.astype({col: 'float32' for col in float64_cols}))

In [None]:
pts_dd = pts_dd[~pts_dd.isin([' ']).any(axis=1)]

#### Clip to desired extent

Load in AOI as clipping object

In [None]:
aoi_path = local_boundaries_folder + max_level + '\\'

aoi_file = sorted([os.path.join(aoi_path,file) \
            for file \
            in os.listdir(aoi_path) \
            if file.endswith(".shp")])

aoi_file = aoi_file[0]

aoi = gpd.read_file(aoi_file)

aoi = aoi[aoi[shapefile_adm_field] == adm_name]
aoi = aoi.set_crs("EPSG:4326")

In [None]:
if max_level == 'adm2':
    aoi = aoi[['geometry','ADM1_EN','ADM2_EN','ADM1_PCODE','ADM2_PCODE']]
    aoi = aoi.rename({'ADM1_PCODE':'Adm1_Code','ADM2_PCODE':'Adm2_Code'},axis=1)
else:
    aoi = aoi[['geometry','ADM1_EN','ADM2_EN','ADM3_EN','ADM1_PCODE','ADM2_PCODE','ADM3_PCODE']]
    aoi = aoi.rename({'ADM1_PCODE':'Adm1_Code','ADM2_PCODE':'Adm2_Code','ADM3_PCODE':'Adm3_Code'},axis=1)

In [None]:
def clip_pts(df, polys):
    
    # ensure that broadcast polys are compiled before running the clip
    if isinstance(polys, gpd.GeoDataFrame) == False:
        polys = polys.result()
    
    # convert to gdf
    gdf = gpd.GeoDataFrame(
        df, 
        geometry=gpd.points_from_xy(df.lon_4326, df.lat_4326)
    ).set_crs("EPSG:4326")
    
    # Clip by extent
    gdf = gpd.clip(gdf, polys)
    
    # Drop the geometry column as it confuses Dask (which doesn't understand geometry metadata) and isn't needed
    df = pd.DataFrame(gdf.drop('geometry', axis=1))
    
    return df
    

In [None]:
# Broadcast adm3
aoi_dist = client.scatter(aoi, broadcast=True)

In [None]:
# Distributed clip
aoi_pts = pts_dd.map_partitions(clip_pts, aoi_dist)

#### Spatial join information from a large collection of polygons to a large collection of points

In [None]:
def get_sj(df, polys):
    # Join using 4326
    # Convert to GDF
    if isinstance(polys, gpd.GeoDataFrame) == False:
        polys = polys.result()
    gdf = gpd.GeoDataFrame(
        df, 
        geometry=gpd.points_from_xy(df.lon_4326, df.lat_4326)
    ).set_crs("EPSG:4326")
    
    gdf = gpd.sjoin(gdf, polys, how='left', op='within')
    
    df = pd.DataFrame(gdf.drop('geometry', axis=1))    
    
    return df
    

In [None]:
aoi_pts_adm = pts_dd.map_partitions(get_sj, aoi_dist)

#### Export

Export dask outputs

In [None]:
keep = [col for col in aoi_pts_adm.columns if 'Unnamed' not in col and 'index' not in col]
# Select only the columns to keep
aoi_pts_adm = aoi_pts_adm[keep]
aoi_pts_adm = aoi_pts_adm.dropna()
aoi_pts_adm.to_csv(local_population_folder+'population_tabular_final.csv',header=None,index=False, single_file=True)