In [1]:
import pandas as pd
import numpy as np
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon
from glob import glob
import os
from pathlib import Path
import rasterio
import matplotlib.pyplot as plt
from collections import defaultdict
import geopandas as gpd
import shapely.speedups
from shapely.geometry import MultiPolygon
import time
import yaml

import dask.dataframe as dd
from dask.dataframe import from_pandas, concat
from dask.distributed import Client, LocalCluster

import math
from rasterio.merge import merge
import shutil
from rasterio.windows import Window
from rasterio.transform import Affine
from rasterio.plot import show
from shapely.geometry import box
from time import time_ns
import heapq
import sys

import rioxarray as rxr
import xarray as xr

In [2]:
def raster_to_df_dask(file_name):
    # read the raster file and convert to a dask dataframe
    raster = rasterio.open(file_name)
    xmin, ymin, xmax, ymax = raster.bounds
    img = raster.read(1)
    cnt += 1
    loc_names = []
    values = []
    for i in range(len(img)):
        for j in range(len(img[0])):
            if img[i][j] > 0:
                loc_names.append(str([int(-ymax+complete_bb)//10 + i, int(xmin-complete_lb)//10 + j]))
                values.append(img[i][j])
    raster.close()
    tmp_df = pd.DataFrame({'loc': loc_names, 'intensity': values})
    cur_df = from_pandas(tmp_df, chunksize=10000)
    return cur_df

In [2]:
def raster_to_df_rioxarray(file_name):
    # read the raster file and convert to an xarray dataframe
    raster  = rxr.open_rasterio(file_name, chunk = 100) # 100 is the chunk size in the x and y direction
    raster = raster.rename('intensity')
    # print(raster['band'])
    df = raster.to_dask_dataframe()
    raster.close()
    # print(df.head())
    df = df.where(df['intensity'] > 0)
    # print(df.head())
    df = df.dropna()
    df['loc'] = df['x'].astype(str) + ',' + df['y'].astype(str)
    df = df.drop(columns = ['x', 'y'])
    df = df.reset_index()
    # print(df.head())
    return df

In [3]:

def update_all_rasters_heap(df, file_names):
    # update the heap with the intensity values of the raster files
    exception_file_list = []
    cnt = 0
    print('In total file num:', len(file_names))

    df_concat_list = []
    
    for file_name in file_names:
        cur_df = raster_to_df_rioxarray(file_name)

        df_concat_list.append(cur_df)
        cnt += 1
        if cnt % 1000 == 0:
            print('finish', cnt, 'file')
        # if cnt == 10:
        #     break
    # print(type(df_concat_list))
    # return df_concat_list
    # df = concat(df_concat_list)
    # print(type(df))
    # return df
    print("here")
    # calculate the percentile of the intensity values and return
    return dd.concat(df_concat_list).groupby('loc')['intensity'].apply(lambda group: np.percentile(group, 95), meta=('intensity', 'f8')).compute()
    # percentile_df = df.groupby('loc')['intensity'].apply(lambda group: np.percentile(group, 95), meta=('intensity', 'f8'))
    # percentile_df = percentile_df.compute()
    # return percentile_df

In [4]:
# load the paths to files from yaml file
config_file_name = 'config.yaml'
with open(config_file_name, 'r') as f:
    config = yaml.safe_load(f)
    rx_burn_units_path = config['rx_burn_units_path']
    results_dir = config['results_dir']
    intensity_dir = config['intensity_dir']
    budget = config['budget']
# print(rx_burn_units_path, results_dir, intensity_dir, budget)
# exit(1)
intensity_file_names = glob(os.path.join(intensity_dir, '*.tif'))
total_files = len(intensity_file_names)
print('total simulated burns =', total_files)
# exit(1)
complete_lb, complete_rb, complete_tb, complete_bb = [753003.071258364, 839057.6399108863, 4133476.546731642, 4230634.714689108]
transform = Affine(10, 0.0, complete_lb, 
                0.0, -10, complete_bb)

heat_array_area = np.zeros([int(math.ceil(complete_bb - complete_tb))//10,int(math.ceil(complete_rb-complete_lb))//10])

print('heat array init done')

total simulated burns = 46804
heat array init done


In [5]:
heat_array_intensity = np.zeros_like(heat_array_area)
heat_array_intensity = heat_array_intensity.astype(int)

In [6]:
cluster = LocalCluster(n_workers=2,
                    threads_per_worker=4,
                    memory_target_fraction=0.75,
                    memory_limit='6GB')
client = Client(cluster)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34867 instead


In [7]:
%%time
df_heat_array = None
# new_df_concat_list = update_all_rasters_heap(df_heat_array, intensity_file_names)
percentile_df = update_all_rasters_heap(df_heat_array, intensity_file_names)

In total file num: 46804
finish 1000 file
finish 2000 file
finish 3000 file
finish 4000 file
finish 5000 file
finish 6000 file
finish 7000 file
finish 8000 file
finish 9000 file
finish 10000 file
finish 11000 file
finish 12000 file
finish 13000 file
finish 14000 file
finish 15000 file
finish 16000 file
finish 17000 file
finish 18000 file
finish 19000 file
finish 20000 file
finish 21000 file
finish 22000 file
finish 23000 file
finish 24000 file
finish 25000 file
finish 26000 file
finish 27000 file
finish 28000 file
finish 29000 file
finish 30000 file
finish 31000 file
finish 32000 file
finish 33000 file
finish 34000 file


: 

In [None]:
pdf = dd.concat(new_df_concat_list).groupby('loc')['intensity'].apply(lambda group: np.percentile(group, 95), meta=('intensity', 'f8')).compute()
pdf

In [9]:
type(new_df_concat_list)

list

In [10]:
%%time
nd = dd.concat(new_df_concat_list)

CPU times: user 705 ms, sys: 46.8 ms, total: 752 ms
Wall time: 695 ms


In [12]:
nd, sys.getsizeof(nd)

(Dask DataFrame Structure:
                  index   band spatial_ref intensity     loc
 npartitions=100                                            
                  int64  int64       int64   float32  object
                    ...    ...         ...       ...     ...
 ...                ...    ...         ...       ...     ...
                    ...    ...         ...       ...     ...
                    ...    ...         ...       ...     ...
 Dask Name: concat, 1401 graph layers,
 48)

In [13]:
percentile_df = nd.groupby('loc')['intensity'].apply(lambda group: np.percentile(group, 95), meta=('intensity', 'f8'))
percentile_df = percentile_df.compute()

In [17]:
sys.getsizeof(percentile_df)/(1024*1024)

329.0154638290405

In [10]:
f = open('df_list.txt', 'w')
f.writelines(str(new_df_concat_list))
f.close()

In [7]:
l = None
with open('df_list.txt') as f1:
    l = f1.readlines()
    f1.close()
type(l)

list

In [18]:
for i in range(len(heat_array_intensity)):
    for j in range(len(heat_array_intensity[0])):
        try:
            heat_array_intensity[i][j] = percentile_df.loc[str([i, j])]['intensity'] 
        except:
            heat_array_intensity[i][j] = 0


In [24]:
raster = rasterio.open(intensity_file_names[1])
with rasterio.open(os.path.join(results_dir, 'summed_raster_heatmap_intensity.tif'),
                        'w',
                        driver='GTiff',
                        height = heat_array_area.shape[0],
                        width = heat_array_area.shape[1],
                        count=1,
                        dtype=heat_array_area.dtype,
                        crs=raster.crs,
                        transform = transform) as dst:
            dst.write(heat_array_intensity, 1)
            dst.close()
raster.close()