# S-57 Converter Class

In [1]:
from osgeo import ogr, osr
import os

class S57:
    def __init__(self):
        # Set S57 options
        os.environ['OGR_S57_OPTIONS'] = '''RETURN_PRIMITIVES=OFF, 
                                          SPLIT_MULTIPOINT=ON,
                                          ADD_SOUNDG_DEPTH=ON,
                                          RETURN_LINKAGES=ON, 
                                          RECODE_BY_DSSI=ON'''
        
        # Register drivers
        ogr.RegisterAll()
        self.s57_driver = ogr.GetDriverByName('S57')
        self.s57_ds = None
        
    def _open_s57(self,s57_file_path):
        """Open S57 datasource"""
        self.s57_ds = self.s57_driver.Open(s57_file_path)
        if self.s57_ds is None:
            raise RuntimeError(f"Could not open S-57 file: {s57_file_path}")
        else:
            print(f"Successfully opened S-57 file: {s57_file_path}")
        return self.s57_ds
    
    def _process_layer(self, input_layer, output_ds, output_layer_name, dsid_dsnm=None, schema=None):
        """Process individual layer conversion"""
        # Determine if we're working with PostGIS
        is_postgis = output_ds.GetDriver().GetName() == 'PostgreSQL'
        
        # Create output layer
        if is_postgis and schema:
            full_layer_name = f"{schema}.{output_layer_name}"
            output_layer_name = output_layer_name.lower()
        else:
            full_layer_name = output_layer_name
        
        out_layer = output_ds.GetLayerByName(full_layer_name)
        

        # Delete existing features for this ENC if dsid_dsnm is provided
        if dsid_dsnm and output_layer_name.lower() != 'dsid' and is_postgis:
            output_ds.ExecuteSQL(f'DELETE FROM "{schema}"."{output_layer_name}" WHERE dsid_dsnm = \'{dsid_dsnm}\'')
            print(f"Deleting features for {dsid_dsnm} in {schema} layer {output_layer_name}...")
            

        # Configure layer options based on driver type
        layer_options = []
        if is_postgis and schema:
            layer_options.append(f'SCHEMA={schema}') # Explicitly set schema for PostGIS

        if out_layer is None:
            # Create new layer if it doesn't exist
            out_layer = output_ds.CreateLayer(
                output_layer_name,
                input_layer.GetSpatialRef(),
                input_layer.GetGeomType(),
                options=layer_options 
            )
            
            # Copy field definitions for new layer
            layer_defn = input_layer.GetLayerDefn()
            for i in range(layer_defn.GetFieldCount()):
                out_layer.CreateField(layer_defn.GetFieldDefn(i))

            # Add dsid_dsnm field for non-DSID layers
            if output_layer_name.lower() != 'dsid':
                dsnm_field = ogr.FieldDefn('dsid_dsnm', ogr.OFTString)
                out_layer.CreateField(dsnm_field)

        if out_layer is not None:
            print(f"Processing layer: {output_layer_name}")
        
        
        # Copy features
        input_layer.ResetReading()
        feature = input_layer.GetNextFeature()
        while feature:
            out_feature = ogr.Feature(out_layer.GetLayerDefn())
            
            # Set geometry
            geom = feature.GetGeometryRef()
            if geom:
                out_feature.SetGeometry(geom.Clone())
                
            # Set attributes
            for i in range(feature.GetFieldCount()):
                field_defn = feature.GetFieldDefnRef(i)
                field_type = field_defn.GetType()
                field_value = feature.GetField(i)
                
                if field_value is not None:
                    if field_type == ogr.OFTString:
                        out_feature.SetField(i, str(field_value))
                    elif field_type == ogr.OFTInteger:
                        out_feature.SetField(i, int(field_value))
                    elif field_type == ogr.OFTReal:
                        out_feature.SetField(i, float(field_value))
                    elif field_type in [ogr.OFTStringList, ogr.OFTIntegerList, ogr.OFTRealList]:
                        # Handle list types
                        if isinstance(field_value, (list, tuple)):
                            out_feature.SetField(i, ','.join(map(str, field_value)))

            # Set dsid_dsnm for non-DSID layers
            if output_layer_name.lower() != 'dsid' and dsid_dsnm:
                out_feature.SetField('dsid_dsnm', dsid_dsnm)

            out_layer.CreateFeature(out_feature)
            feature = input_layer.GetNextFeature()
            
        if out_layer.GetFeatureCount() > 0:
            print(f"Successfully processed {out_layer.GetFeatureCount()} features for layer: {output_layer_name}")    
        return out_layer
    
    @staticmethod
    def _postgis_connect(pg_params: dict, schema: str = 'public'):
        """Connect to PostGIS database,
            Define and check Schema,
            Check user permissions"""
        pg_driver = ogr.GetDriverByName('PostgreSQL')
        
        conn_string = (f"PG: host={pg_params['host']} "
                    f"port={pg_params['port']} "
                    f"dbname={pg_params['dbname']} "
                    f"user={pg_params['user']} "
                    f"password={pg_params['password']}")
        
        pg_ds = pg_driver.Open(conn_string, update=1)
        # Check if connection is successful
        if pg_ds is None:
            raise RuntimeError(f"Could not connect to {pg_params['dbname']} database")
        else:
            print(f"Successfully connected to {pg_params['dbname']} database")
        
        if schema != 'public':
            quoted_schema = f'"{schema}"'
            # Create schema if it doesn't exist
            create_schema_sql = f'CREATE SCHEMA IF NOT EXISTS {quoted_schema}'
            print(f"Executing SQL: {create_schema_sql}")
            pg_ds.ExecuteSQL(create_schema_sql)
            
            # Verify schema creation
            check_schema_sql = f"""
                SELECT EXISTS(
                    SELECT 1 
                    FROM information_schema.schemata 
                    WHERE schema_name = '{schema.lower()}'
                ) AS schema_exists; """
            result = pg_ds.ExecuteSQL(check_schema_sql)
            schema_exists = result.GetNextFeature() is not None
            pg_ds.ReleaseResultSet(result)
            
            print(f"Schema '{schema}' {'exists' if schema_exists else 'creation failed'}")
            if not schema_exists:
                raise RuntimeError(f"Schema '{schema}' was not created successfully.")

        # Check user permissions
            perm_check = pg_ds.ExecuteSQL("SELECT current_user, current_database()")
            if perm_check:
                feature = perm_check.GetNextFeature()
                print(f"Connected as: {feature.GetField(0)} to database: {feature.GetField(1)}")
        return pg_ds



    def to_gpkg(self, input_file_path, output_folder=None):
        # If no export folder specified, use the input folder
        if output_folder is None:
            output_folder = os.path.dirname(input_file_path)

        # Extract the base name of the S57 file (without extension)
        s57_base_name = os.path.splitext(os.path.basename(input_file_path))[0]
        
        # Create the full path for the output GeoPackage directly in the export folder
        output_folder_path = os.path.join(output_folder, f"{s57_base_name}.gpkg")

        # Ensure the export folder exists
        os.makedirs(output_folder, exist_ok=True)

        """Convert S57 to GeoPackage"""
        s57_ds = self._open_s57(input_file_path)
        gpkg_driver = ogr.GetDriverByName('GPKG')
        gpkg_ds = gpkg_driver.CreateDataSource(output_folder_path)
        
        for i in range(s57_ds.GetLayerCount()):
            layer = s57_ds.GetLayer(i)
            self._process_layer(layer, gpkg_ds, layer.GetName())

        print(f"Successfully converted {s57_ds.GetLayerCount()} layers to GeoPackage: {output_folder_path}")
        gpkg_ds = None
        self.s57_ds = None
        
    def to_postgis(self, input_path, pg_params: dict, schema: str = 'public', is_directory: bool = False, pg_ds=None, overwrite: bool=False):
        """Convert S57 to PostGIS"""
        s57_ds = self._open_s57(input_path)

        if is_directory == False:
            pg_ds = self._postgis_connect(pg_params, schema)
        
        # Process DSID layer first
        dsid_layer = s57_ds.GetLayerByName('DSID')
        if dsid_layer is None:
            raise ValueError("DSID layer not found in S57 file")
        
        
        dsid_feature = dsid_layer.GetNextFeature()
        new_dsnm = dsid_feature.GetField('dsid_dsnm')
        new_edtn = dsid_feature.GetField('dsid_edtn')
        new_updn = dsid_feature.GetField('dsid_updn')
        new_uadt = dsid_feature.GetField('dsid_uadt')

        # Check if DSID table exists first
        tables_list = pg_ds.ExecuteSQL(f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{schema}'")
        dsid_exists = False
        
        if tables_list:
            for i in range(tables_list.GetFeatureCount()):
                feature = tables_list.GetNextFeature()
                if feature.GetField(0).lower() == 'dsid':
                    dsid_exists = True
                    break
            pg_ds.ReleaseResultSet(tables_list)

        if dsid_exists:
        # Check for existing entries
            sql = f'SELECT * FROM "{schema}"."dsid" WHERE dsid_dsnm = \'{new_dsnm}\''
            result = pg_ds.ExecuteSQL(sql)

            if result and result.GetFeatureCount() > 0:
                existing = result.GetNextFeature()
                print(f"ENC {new_dsnm} already exists in database:")
                print(f"Existing version: Edition {existing.GetField('dsid_edtn')}, Update {existing.GetField('dsid_updn')}, Update date: {existing.GetField('dsid_uadt')}")
                print(f"New version:      Edition {new_edtn}, Update {new_updn}, Update date: {new_uadt}")
                
                if overwrite == False:
                    
                    user_choice = input("Do you want to Update with new data? (Y/N): ")

                    # Release the SQL query result
                    pg_ds.ReleaseResultSet(result)

                    if user_choice.lower() != 'y':
                        print("Keeping existing data, import cancelled")
                        return 

                    # Delete specific row instead of dropping entire table
                    pg_ds.ExecuteSQL(f'DELETE FROM "{schema}"."dsid" WHERE dsid_dsnm = \'{new_dsnm}\'')

                elif overwrite:
                    pg_ds.ExecuteSQL(f'DELETE FROM "{schema}"."dsid" WHERE dsid_dsnm = \'{new_dsnm}\'')
                
            print(f"Updating {new_dsnm}...") 
       
        self._process_layer(dsid_layer, pg_ds, 'dsid', schema=schema)

        # Process remaining layers (similar row-level update logic can be applied here)
        for i in range(s57_ds.GetLayerCount()):
            layer = s57_ds.GetLayer(i)
            if layer.GetName().lower() != 'dsid':
                self._process_layer(layer, pg_ds, layer.GetName(), new_dsnm, schema=schema)
            
            
        pg_ds = None
        self.s57_ds = None
    
    def postgis_delete(self, pg_params: dict, schema: str, 
                       delete_schema: bool=False, 
                       delete_tables_all: bool=False,  
                       delete_enc_tables: bool=False, 
                       drop_tables: list=None,
                       delete_enc_rows: list=None):
        
        pg_driver = ogr.GetDriverByName('PostgreSQL')
        
        conn_string = (f"PG: host={pg_params['host']} "
                      f"port={pg_params['port']} "
                      f"dbname={pg_params['dbname']} "
                      f"user={pg_params['user']} "
                      f"password={pg_params['password']}")
        
        pg_ds = pg_driver.Open(conn_string, update=1)
        # Check if connection is successful
        if pg_ds is None:
            raise RuntimeError(f"Could not connect to {pg_params['dbname']} database")
        else:
            print(f"Successfully connected to {pg_params['dbname']} database")
        
        if delete_schema:
            sql = f'DROP SCHEMA IF EXISTS "{schema}" CASCADE'
            print(f"Dropping schema: {schema}")
            pg_ds.ExecuteSQL(sql)

        if delete_tables_all:
            sql = f'''
                DO $$ 
                DECLARE 
                    r RECORD;
                BEGIN
                    FOR r IN SELECT tablename FROM pg_tables WHERE schemaname = '{schema}'
                    LOOP
                        EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident('{schema}') || '.' || quote_ident(r.tablename) || ' CASCADE';
                    END LOOP;
                END $$;
            '''
            print(f"Deleting all tables in {schema} Schema")
            pg_ds.ExecuteSQL(sql)

        if delete_enc_tables:
            sql = f'''
                DO $$ 
                DECLARE 
                    r RECORD;
                BEGIN
                    FOR r IN 
                        SELECT table_name 
                        FROM information_schema.columns 
                        WHERE column_name = 'dsid_dsnm' 
                        AND table_schema = '{schema}'
                    LOOP
                        EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident('{schema}') || '.' || quote_ident(r.table_name) || ' CASCADE';
                    END LOOP;
                END $$;
            '''
            print(f"Deleting all tables in {schema} Schema with ENC data")
            pg_ds.ExecuteSQL(sql)

        if drop_tables is not None:
            # Convert single value to list if needed
            if isinstance(drop_tables, str):
                enc_names = [drop_tables]

            tables_list = ', '.join(f"'{table}'" for table in drop_tables)
            sql = f'''
                DO $$
                BEGIN
                    EXECUTE 'DROP TABLE IF EXISTS ' || 
                    string_agg(quote_ident('{schema}') || '.' || quote_ident(table_name), ', ')
                    FROM information_schema.tables 
                    WHERE table_schema = '{schema}'
                    AND table_name IN ({tables_list});
                END $$;
            '''
            print(f"Dropping tables: {drop_tables}")
            pg_ds.ExecuteSQL(sql) 

        if delete_enc_rows is not None:
        # Delete ENC Rows can handle only ine ENC at a time (fix in progress)
        # TODO: Fix delete_enc_rows to handle multiple ENCs at a time from a list
        # Convert single value to list if needed
            if isinstance(delete_enc_rows, str):
                delete_enc_rows = [delete_enc_rows]
                
            # Create formatted string of ENC names for SQL IN clause
            enc_list = ', '.join(f"'{enc}'" for enc in delete_enc_rows)

            
            
            sql = f'''
                DO $$ 
                DECLARE 
                    r RECORD;
                BEGIN
                    FOR r IN 
                        SELECT table_name 
                        FROM information_schema.columns 
                        WHERE column_name = 'dsid_dsnm' 
                        AND table_schema = '{schema}'
                    LOOP
                        EXECUTE 'DELETE FROM ' || quote_ident('{schema}') || '.' || quote_ident(r.table_name) || 
                            ' WHERE dsid_dsnm IN(' || '\'{enc_list}\'' || ')';
                    END LOOP;
                END $$;
            '''

            print(f"Deleting data for following ENCs: {delete_enc_rows}")
            pg_ds.ExecuteSQL(sql)

            

        # Get list of ENCs from DSID table
            sql = f'SELECT dsid_dsnm FROM "{schema}"."dsid"'
            result = pg_ds.ExecuteSQL(sql)

            enc_list = []
            if result:
                feature = result.GetNextFeature()
                while feature:
                    enc_list.append(feature.GetField('dsid_dsnm'))
                    feature = result.GetNextFeature()
                pg_ds.ReleaseResultSet(result)
                print(f"Found ENCs: {enc_list}")

        pg_ds = None
    
    @classmethod
    # Browse the ENC_ROOT folder and process all S-57 files
    def process_folder(cls, input_folder_path, output_folder_path=None, driver: str=None , pg_params=None, schema=None, ovrwrite=False):
        # Create instance of S57 class
        s57_processor = cls()

        # If no export folder specified, use the input folder
        if output_folder_path is None:
            output_folder_path = input_folder_path

        s57_files = []
        for root, dirs, files in os.walk(input_folder_path):
            for file in files:
            # Check if file ends with .000 or has a 3-digit extension
                if file.endswith('.000'):
                    full_path = os.path.normpath(os.path.join(root, file))
                    s57_files.append(full_path)
        print(f"Found {len(s57_files)} S-57 files in folder: {input_folder_path}")        

        if driver is None:
            print("No driver selected. Please specify a driver. \nDrivers available: GPKG, PostGIS")
            return

        if driver == 'GPKG':
            # Create a GPKG file for each S-57 file
            for s57_file in s57_files:
                print(f"\nProcessing S-57 file: {s57_file}")
                output_file = os.path.join(output_folder_path, os.path.splitext(os.path.basename(s57_file))[0] + '.gpkg')
                s57_processor.to_gpkg(s57_file, output_folder_path)
                if os.path.exists(output_file) and not ovrwrite:
                    print(f"Output file already exists: {output_file}")
                    continue
        
        if driver == 'PostGIS':
            
            if pg_params is None:
                raise ValueError("PG parameters not provided")
                
            pg_ds = s57_processor._postgis_connect(pg_params, schema)

            # Create a PostGIS database for each S-57 file

            for s57_file in s57_files:
                print(f"\nProcessing S-57 file: {s57_file}")
                s57_processor.to_postgis(s57_file, pg_params, schema, is_directory=True, pg_ds=pg_ds, overwrite=ovrwrite)

                

#### Initialize S57 Class

In [None]:
# Import the S57 class
from s57_converter import S57

# Create S57 instance
s57 = S57()

# Export to PostGIS
pg_params = {
    'host': 'localhost',
    'port': '5433',
    'dbname': 'My_db',
    'user': 'postgres',
    'password': 'password'
}

#### Delete ENC inside PostGIS

In [None]:
# Delete PostGIS data (Works with multiple ENCs in List or Tuple format. Only outside of function)
# Loop only for "delete_enc_rows", others work straight from function
ens_list = ('US2WC05M.000', 'US2WC11M.000', 'US2WC06M.000')
for enc in ens_list:
    s57.postgis_delete(pg_params, schema='public', 
                            delete_schema =False, 
                            delete_tables_all =False,  
                            delete_enc_tables =True, 
                            drop_tables = None,
                            delete_enc_rows = enc)

### Convert S57 to Geopackage (as ENC.gpkg)

In [None]:
export_dir = "G:/Python Projects/GIS/Packt_Books/GIS_files/export"

S57.to_gpkg(input_file_path = "G:/GIS/GIS_files/11CGD_ENCs/ENC_ROOT/US2WC06M/US2WC06M.000", output_folder = export_dir)
S57.process_folder(input_folder_path = "G:/GIS/GIS_files/11CGD_ENCs", driver= "GPKG")

### Convert S57 to PostGIS tables (as Layer table)

In [None]:
s57.to_postgis("G:/GIS/GIS_files/11CGD_ENCs/ENC_ROOT/US2WC06M/US2WC06M.000",pg_params, schema='public')
S57.process_folder(input_folder_path = "G:/GIS/GIS_files/All_ENCs", driver= "PostGIS", pg_params=pg_params, schema='s57_US', ovrwrite=True)