## Importing Libraries

In [25]:
# !pip install -r '../requirements.txt'

In [12]:
import fiona
import pandas as pd
import geopandas as gpd
import rasterio
import rasterio.features
import zarr
import xarray as xr
import os
import numpy as np
from tqdm import tqdm
from datetime import datetime, date
import dask.array as da
import urllib.request
import zipfile
import dask
import shutil
import shapely
from shapely import intersects
from shapely.geometry import box
from rasterio.windows import Window
import matplotlib.pyplot as plt
from affine import Affine


# zipurl = 'https://s3.waw3-1.cloudferro.com/emodnet/emodnet_native/emodnet_human_activities/energy/wind_farms_points/EMODnet_HA_Energy_WindFarms_20240508.zip'
# geodatabase = 'EMODnet_HA_Energy_WindFarms_20240508.gdb'

zipurl = 'https://s3.waw3-1.cloudferro.com/emodnet/emodnet_native/emodnet_geology/seabed_substrate/multiscale_folk_5/EMODnet_GEO_Seabed_Substrate_All_Res.zip'
geodatabase = 'EMODnet_Seabed_Substrate_1M.gdb'

# zipurl = 'https://s3.waw3-1.cloudferro.com/emodnet/emodnet_native/emodnet_seabed_habitats/seabed_habitats_maps/habitat_types_in_euseamap_2023_eunis_2019/EUSeaMap_2023.zip'
# geodatabase = 'EUSeaMap_2023.gdb'



def download_extract(zipurl, geodatabase):
    zip_file = os.path.basename(zipurl)
    class TqdmUpTo(tqdm):
        def update_to(self, b=1, bsize=1, tsize=None):
            if tsize is not None:
                self.total = tsize
            self.update(b * bsize - self.n)
    with TqdmUpTo(unit='B', unit_scale=True, miniters=1, desc=zip_file) as t:
        urllib.request.urlretrieve(zipurl, filename=zip_file, reporthook=t.update_to)
        # Extract the geodatabase from the zip file
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall('extracted_files')
    for root, dirs, files in os.walk('extracted_files'):
        for dir in dirs:
            if dir.endswith('.gdb') and os.path.basename(dir) == geodatabase:
                gdb_path = os.path.join(root, dir)
                return gdb_path
                break

def attributes_update(dataset, title, resolution, zipurl):
        latitudeattrs = {'_CoordinateAxisType': 'Lat', 
                            'axis': 'Y', 
                            'long_name': 'latitude', 
                            'max': dataset.latitude.values.max(), 
                            'min': dataset.latitude.values.min(), 
                            'standard_name': 'latitude', 
                            'step': (dataset.latitude.values.max() - dataset.latitude.values.min()) / dataset.latitude.values.shape[0], 
                            'units': 'degrees_north'
            }
        longitudeattrs = {'_CoordinateAxisType': 'Lon', 
                        'axis': 'X', 
                        'long_name': 'longitude',
                        'max': dataset.longitude.values.max(),
                        'min': dataset.longitude.values.min(),
                        'standard_name': 'longitude', 
                        'step': (dataset.longitude.values.max() - dataset.longitude.values.min()) / dataset.longitude.values.shape[0], 
                        'units': 'degrees_east'
        }
        dataset.latitude.attrs.update(latitudeattrs)
        dataset.longitude.attrs.update(longitudeattrs)

        # Set the CRS as an attribute
        dataset.attrs['proj:epsg'] = 4326
        dataset.attrs['resolution'] = resolution
        dataset.attrs.update({
            'geospatial_lat_min': dataset['latitude'].min().item(),
            'geospatial_lat_max': dataset['latitude'].max().item(),
            'geospatial_lon_min': dataset['longitude'].min().item(),
            'geospatial_lon_max': dataset['longitude'].max().item()
        })
        dataset.attrs['resolution'] = resolution
        #include where the data comes and when its been converted
        dataset.attrs['History'] = f'Zarr dataset converted from {title}.gdb, downloaded from {zipurl}, on {date.today()}'
        
        #add any other attributes you think necessary to include in the metadata of your zarr dataset
        #dataset.attrs['sources'] = source
    

        return dataset


def rasterize_chunk(chunk, raster_shape, transform):
    raster_chunk = np.zeros(raster_shape, dtype=np.float32)

    valid_geoms = []
    for geom in chunk['geometries']:
        if geom.is_valid:
            valid_geoms.append(geom)
    if not valid_geoms:
        return raster_chunk
    shapes = ((geom, value) for geom, value in zip(valid_geoms, chunk['encoded_data']))
    rasterio.features.rasterize(
        shapes,
        out=raster_chunk,
        transform=transform,
        merge_alg=rasterio.enums.MergeAlg.replace,
        dtype=np.float32,
    )
    
    return raster_chunk


def cleaner(data):
    if isinstance(data, str):
        if data == '0' or data.strip() == '' or pd.isna(data):
            data = 'None'
    return data

def encode_categorical(data, category_mapping):
    if isinstance(data[0], str):
        data = pd.Series(data).fillna('None')
        data[data == ' '] = 'None'
        data[data == ''] = 'None'
        data[data == '0'] = 'None'
        data = data.values 
        unique_categories = np.unique(data)
        for category in unique_categories:
            if category not in category_mapping:
                category_mapping[category] = len(category_mapping) + 1
        # category_mapping = {cat: i + 1 for i, cat in enumerate(unique_categories)}
        encoded_data = np.array([category_mapping[item] for item in data])
    else:
        # Convert to float32
        data = data.astype(np.float32)
        # If there are any NaN values, replace them with None
        if np.isnan(data).any():
           data[np.isnan(data)] = 0
        encoded_data = data
        
    return encoded_data, category_mapping


def plot_raster(raster):
    # Compute the raster
    raster_computed = raster.compute()

    # Plot the raster
    plt.imshow(raster_computed, cmap='viridis')
    plt.colorbar()
    plt.title('Raster')
    plt.xlabel('Longitude')
    plt.ylabel('Latitude')
    plt.show()


def gdf2zarrconverter(file_path, native_var, title, layer, arco_asset_tmp_path, metadata_dict):
    with fiona.open(file_path, 'r', layer=layer) as src:
        crs = src.crs
        total_bounds = src.bounds
        lon_min, lat_min, lon_max, lat_max = total_bounds
        resolution = 0.001  # Adjust resolution as needed
        width = int(np.ceil((lon_max - lon_min) / resolution))
        height = int(np.ceil((lat_max - lat_min) / resolution))
        raster_transform = rasterio.transform.from_bounds(lon_min, lat_min, lon_max, lat_max, width, height)

        chunk_width = 2000
        chunk_height = 2000
        category_mapping = {}
        lazy_rasters = []
        dataset = xr.Dataset()
        # with tqdm(total=((height + chunk_height - 1) // chunk_height) * ((width + chunk_width - 1) // chunk_width), desc=f"Processing windows of {layer} - {native_var}") as pbar_window:
        row_num= 0
        for i in range(0, height, chunk_height):
            remainder_y = height - i
            window_height = min(chunk_height, remainder_y)
            row_rasters = []
            for j in range(0, width, chunk_width):
                print(f"Processing window at i={i}, from width{width}, j={j}, from height{height}")
                remainder_x = width - j
                window_width = min(chunk_width, remainder_x)
                window = Window(j, i, window_width, window_height)
                
                chunk_geoms = []
                chunk_data = []
                window_geom = box(
                    *rasterio.windows.bounds(window, transform=raster_transform)
                )
                with tqdm(total=len(src), desc=f"Processing features of {layer} - {native_var}") as pbar:
                    for feature in src:
                        geom = feature['geometry']
                        if isinstance(geom, fiona.model.Geometry):
                            geom = shapely.geometry.shape(geom)
                        if geom.intersects(window_geom):
                            chunk_geoms.append(geom)
                            # Only call the cleaner function if the data type is a string
                            if isinstance(feature['properties'][native_var], str):
                                chunk_data.append(cleaner(feature['properties'][native_var]))
                            else:
                                chunk_data.append(feature['properties'][native_var])
                        pbar.update(1)

                if not chunk_geoms:
                    # Create an empty raster
                    empty_raster = np.zeros((window_height, window_width))
                    lazy_raster = da.from_array(empty_raster, chunks=(chunk_height, chunk_width))
                    
                else:
                    chunk_data = np.array(chunk_data)
                    encoded_data, category_mapping = encode_categorical(chunk_data, category_mapping) 

                    # Calculate the top-left corner of the chunk
                    chunk_x = raster_transform.c + window.col_off * raster_transform.a
                    chunk_y = raster_transform.f + window.row_off * raster_transform.e

                    # Create a new transform for the chunk
                    chunk_transform = Affine(raster_transform.a, raster_transform.b, chunk_x,
                                            raster_transform.d, raster_transform.e, chunk_y)

                    # Use the chunk_transform in the delayed function
                    lazy_raster = dask.delayed(rasterize_chunk)(
                        {'geometries': chunk_geoms, 'encoded_data': encoded_data},
                        (window_height, window_width),
                        chunk_transform
                    )
                    lazy_raster = da.from_delayed(lazy_raster, shape=(window_height, window_width), dtype=np.float32)
                                        
                row_rasters.append(lazy_raster)

            # Instead of appending to row_rasters, create a DataArray and add it to the Dataset
            if row_rasters:
                row_raster = da.concatenate(row_rasters, axis=1)
                row_raster_computed = row_raster.compute()  # Compute the raster to check values
                print(np.max(row_raster_computed))
                data_array = xr.DataArray(
                    row_raster,
                    dims=['latitude', 'longitude'],
                    name=native_var
                ).chunk({'latitude': 500, 'longitude': 500})
            
            # Create the Dataset with native_var as a key, and the data and coordinates associated with this key
            dataset = xr.Dataset(
                {native_var: (['latitude', 'longitude'], data_array.data)},
                coords={'latitude': data_array.latitude, 'longitude': data_array.longitude}
            )

            data_array.to_zarr(f'converted_zarr_files/rows/row_{row_num}.zarr', mode='w')
            row_num += 1
            print(f"Finished processing window at i={i}")

        # Load the zarr files and concatenate them
        datasets = [xr.open_zarr(f'converted_zarr_files/rows/row_{i}.zarr') for i in range(row_num)]
        dataset = xr.concat(datasets, dim='latitude')

        latitudes = np.round(np.linspace(lat_max, lat_min, height, dtype=float), decimals=4)
        longitudes = np.round(np.linspace(lon_min, lon_max, width, dtype=float), decimals=4)

        dataset = dataset.assign_coords({'latitude': latitudes, 'longitude': longitudes})
        dataset = dataset.sortby('latitude')
        dataset = dataset.chunk({'latitude': 'auto', 'longitude': 'auto'})

        if category_mapping:
            dataset[native_var].attrs['categorical_encoding'] = category_mapping
            dataset.attrs['categorical_encoding'] = {native_var: category_mapping}
        dataset = attributes_update(dataset, title, resolution, metadata_dict)
            
        zarr_var_path = f"{arco_asset_tmp_path}/{title}_{native_var}.zarr"
        dataset.to_zarr(zarr_var_path, mode='w', consolidated=True)
        return zarr_var_path


def main(zipurl, geodatabase):
    gdb_path = download_extract(zipurl, geodatabase)
    temp_zarr_path = 'converted_zarr_files'
    os.makedirs(temp_zarr_path, exist_ok=True)
    title = os.path.splitext(os.path.basename(geodatabase))[0]

    # Get the layers from the geodatabase
    layers = fiona.listlayers(gdb_path)

    # Create an empty xarray dataset to hold the combined data
    combined_dataset = xr.Dataset()

     # Process each layer and each variable using gdf2zarr
    for layer in layers:
        # Get the variables from the layer

        # if layer == 'EMODnet_HA_Energy_WindFarms_pg_20240508':

        variables = fiona.open(gdb_path, layer=layer).meta['schema']['properties'].keys()
        
        zarr_vars_paths = [] # replace with your column names
        for variable in variables:

            # if variable == 'POWER_MW':
            try:
                print(f"Processing {layer} - {variable}")
                zarr_var_path = gdf2zarrconverter(gdb_path, variable, title, layer, temp_zarr_path, zipurl)
                zarr_vars_paths.append(zarr_var_path)
            except Exception as e:
                print(f"Failed to process {layer} - {variable}: {e}")
                continue

        with dask.config.set(scheduler='single-threaded'):
            for path in zarr_vars_paths:
                try:
                    dataset = xr.open_dataset(path, chunks={})  # Use Dask to lazily load the dataset
                    dataset = dataset.chunk({'latitude': 'auto', 'longitude': 'auto'}) 
                    combined_dataset = xr.merge([combined_dataset, dataset], compat='override', join='outer')
                except Exception as e:
                    print(f"Failed to combine zarr dataset {path}: {e}")
                    continue

        # add applicable categorical encodings
        categorical_encodings_dict = {}
        for var in combined_dataset.variables:
            if 'categorical_encoding' in combined_dataset[var].attrs:
                categorical_encodings_dict[var] = combined_dataset[var].attrs['categorical_encoding']

        combined_dataset.attrs['categorical_encoding'] = categorical_encodings_dict

        with dask.config.set(scheduler='single-threaded'):
            try:    
                final_dataset = combined_dataset.chunk({'latitude': 'auto', 'longitude': 'auto'})  # for var in dataset.variables:
                zarr_path = f"{layer}.zarr"
                final_dataset.to_zarr(zarr_path, mode = 'w')
                shutil.rmtree(temp_zarr_path)
            except Exception as e:
                print(f"final zarr dataset did not save {layer}: {e}")
                continue

    # Print the combined dataset
    print(combined_dataset)


main(zipurl, geodatabase)

EMODnet_GEO_Seabed_Substrate_All_Res.zip:  54%|█████▍    | 423M/777M [00:04<00:03, 96.1MB/s]   


KeyboardInterrupt: 