Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow extract #298

Closed
mmann1123 opened this issue Jan 30, 2024 · 2 comments
Closed

Slow extract #298

mmann1123 opened this issue Jan 30, 2024 · 2 comments
Labels
question Further information is requested

Comments

@mmann1123
Copy link
Collaborator

@jgrss I am trying to figure out the best way to extract images to polygons. At the moment I am just using the normal extract function, but I'm finding it to be slow - 12 hrs and nothing returned yet from the first image stack. Does running it with localCluster improve performance? Looking at the code its not clear to me where the slow down is coming from.

 
6.22 GiB | 2.00 MiB
(29, 4815, 5976) | (1, 512, 512)
3480 chunks in 59 graph layers
float64 numpy.ndarray

 ==============

for poly, poly_label in zip(polys,['data_34S_19E_258N','data_34S_19E_259N']):
    for band_name in ["B12", "B11", "B2", "B6", "EVI", "hue"]:
        print(f'working on {band_name} {poly_label}')
        file_glob = f"*{band_name}*.tif"
        f_list = sorted(glob(file_glob))

        with gw.open(f_list,band_names=[i.split('.')[0] for i in f_list],stack_dim='band',
                    nodata=np.nan) as src:
            display(src)
            df = gw.extract(src, poly, all_touched =True, )
            print(df.head())
            df.to_parquet(f'./{band_name}_{poly_label}.parquet', 
                          engine='auto', 
                          compression='snappy')
@mmann1123 mmann1123 added the question Further information is requested label Jan 30, 2024
@jgrss
Copy link
Owner

jgrss commented Jan 31, 2024

Hmm, what do your array chunks look like after opening the file list?

@mmann1123
Copy link
Collaborator Author

Actually I rediscovered your excellent ray solution!


@ray.remote
class Actor(object):
    def __init__(self, aoi_id=None, id_column=None, band_names=None):
            self.aoi_id = aoi_id
            self.id_column = id_column
            self.band_names = band_names

        # While the names can differ, these three arguments are required.
        # For ``ParallelTask``, the callable function within an ``Actor`` must be named exec_task.
    def exec_task(self, data_block_id, data_slice, window_id):
            data_block = data_block_id[data_slice]
            left, bottom, right, top = data_block.gw.bounds
            aoi_sub = self.aoi_id.cx[left:right, bottom:top]

            if aoi_sub.empty:
                return aoi_sub

            # Return a GeoDataFrame for each actor
            return gw.extract(data_block,
                            aoi_sub,
                            id_column=self.id_column,
                            band_names=self.band_names)

ray.init(num_cpus=8)

for band_name in ["B12", "B11", "B2", "B6", "EVI", "hue"]:
    for poly_i, poly_label in zip([0,1],['34S_19E_258N','34S_19E_259N']):
        with rio.Env(GDAL_CACHEMAX=256*1e6) as env:
            band_name = 'B12'
            file_glob = f"*{band_name}*.tif"    
            f_list = sorted(glob(file_glob))
            df_id = ray.put(gpd.read_file(polys[poly_i]).to_crs('EPSG:4326'))

            band_names=[i.split('.ti')[0] for i in f_list]

            # Since we are iterating over the image block by block, we do not need to load
            # a lazy dask array (i.e., chunked).
            with gw.open(f_list, 
                        band_names=band_names, 
                        stack_dim='band', 
                        chunks=16) as src:

                # Setup the pool of actors, one for each resource available to ``ray``.
                actor_pool = ActorPool([Actor.remote(aoi_id=df_id, id_column='id', band_names=band_names)
                                        for n in range(0, int(ray.cluster_resources()['CPU']))])

                # Setup the task object
                pt = ParallelTask(src, row_chunks=4096, col_chunks=4096, scheduler='ray', n_chunks=1000)
                results = pt.map(actor_pool)

        del df_id, actor_pool
        ray.shutdown()
        results2 = [df.reset_index(drop=True) for df in results if len(df) > 0]
        result = pd.concat(results2, ignore_index=True,axis=0)
        result = pd.DataFrame(result.drop(columns='geometry'))
        result.to_parquet(f'./{band_name}_{poly_label}.parquet', 
                          engine='auto', 
                          compression='snappy')

There was one issue with the example, for gw.open the example online had chunks=None and I needed to replace that with some valid value.

ray is extremely fast...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants