# Geopack & rechunk ET cells

In [1]:
import os
os.environ['USE_PYGEOS'] = '1'

import dask
import pandas
import geopandas
import dask_geopandas
import dask.dataframe as ddf
from dask.distributed import LocalCluster, Client

#dask.config.set({'temporary_directory': '/home/jovyan/data/tmp/dask_tmp/'})

We set up a local cluster to run the rechunking on Dask:

In [2]:
client = Client(LocalCluster(n_workers=8))

2023-08-24 17:25:58,524 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-at6vga28', purging
2023-08-24 17:25:58,612 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-g9ig2f11', purging
2023-08-24 17:25:58,823 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-uc8sfktx', purging
2023-08-24 17:25:59,043 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-4s67oanb', purging
2023-08-24 17:25:59,136 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-djyu5os2', purging


In [5]:
client.shutdown()

Connect to the ET cells (under `form`) and other data:

In [3]:
form = dask_geopandas.read_parquet('form/')
function = ddf.read_parquet('function/')
ss = ddf.read_parquet('signature_type/')
ss['signature_type'] = (
    ss['signature_type']
    .astype('category')
    .cat.as_known()
)

Join all and spatial shuffle with a repartition to only 100 partitions and write directly to disk:

---

**NOTE** - `form` and `function` are aligned so we can save some computation by concatenating them instead of joining by index. Before, we check that is the case:

In [23]:
%%time
ff_check = ddf.concat(
    [
        form.rename(columns={'hindex': 'hindex_form'}), 
        function.rename(columns={'hindex': 'hindex_function'}), 
    ], 
    axis='columns'
)
(ff_check['hindex_form'] != ff_check['hindex_function']).sum().compute()

We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.


CPU times: user 16.6 s, sys: 1.29 s, total: 17.9 s
Wall time: 41.7 s


0

---

## FF

Now we can proceed with the full operation:

In [4]:
%%time
gp_all = (
    ddf.concat(
        [
            form.rename(columns={'hindex': 'hindex_form'}), 
            function.rename(columns={'hindex': 'hindex_function'}), 
        ], 
        axis='columns'
    )
    .drop(columns=['hindex_form'])
    .repartition(npartitions=100)
    .rename(columns={'hindex_function': 'hindex'})
    .rename_geometry('geometry')
    .spatial_shuffle(by='hilbert') 
)

We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.


CPU times: user 2min 35s, sys: 12.4 s, total: 2min 47s
Wall time: 8min 36s


## Write geoms

Write ET cells to disk:

In [None]:
%%time
(
    gp_all
    [['geometry', 'hindex']]
    .to_parquet('../et_cells_geopack/et_cells')
)



## Write form

Write form to disk (as non-spatial table):

In [None]:
%%time
(
    gp_all
    [form.drop(columns='tessellation').columns]
    .to_parquet('../et_cells_geopack/form')
)

## Write function

Write function to disk (as non-spatial table):

In [None]:
%%time
import pyarrow as pa

schema = {k: pa.float64() for k in function.columns}
schema["hindex"] = pa.string()
(
    gp_all
    [function.columns]
    .to_parquet('../et_cells_geopack/function', schema=schema)
)

## Write SS

In [None]:
ids = ddf.read_parquet('../et_cells_geopack/et_cells/', columns=['hindex'])

Write signature types to disk:

In [None]:
%%time

ss_unique = ss.drop_duplicates().set_index('hindex')

(
    ids
    .join(ss_unique, on='hindex')
    .to_parquet('../et_cells_geopack/signature_type')
)

In [None]:
%%time

ss_check = ddf.concat(
    [
        (
            ddf.read_parquet('../et_cells_geopack/form/', columns=['hindex'])
            .rename(columns={'hindex': 'hindex_form'})
        ),
        ddf.read_parquet('../et_cells_geopack/signature_type/', columns=['hindex'])
    ],
    axis='columns'
)
(ss_check['hindex_form'] != ss_check['hindex']).sum().compute()