#### load libraries and define db_connection 

In [3]:
import os
import json
import time
from datetime import datetime, timezone
import psycopg2
from psycopg2.extras import execute_values
import aiosql
import multiprocessing as mp
from dotenv import load_dotenv
from operator import itemgetter

from pgnetworks_processing.python.utilities import Config, create_run_id

query dml.create_function_public_ghh_decode_id_to_hash at /home/matthiasdaues/Documents/datenschoenheit/pgNetworks/pgnetworks_processing/pgnetworks_processing/sql/dml/ghh_functions.sql:1 may not be a select, consider adding an operator, eg '!'
query dml.drop_function_public_ghh_decode_id_to_hash at /home/matthiasdaues/Documents/datenschoenheit/pgNetworks/pgnetworks_processing/pgnetworks_processing/sql/dml/ghh_functions.sql:35 may not be a select, consider adding an operator, eg '!'
query dml.create_function_public_ghh_decode_hash_to_wkt at /home/matthiasdaues/Documents/datenschoenheit/pgNetworks/pgnetworks_processing/pgnetworks_processing/sql/dml/ghh_functions.sql:41 may not be a select, consider adding an operator, eg '!'
query dml.drop_function_public_ghh_decode_hash_to_wkt at /home/matthiasdaues/Documents/datenschoenheit/pgNetworks/pgnetworks_processing/pgnetworks_processing/sql/dml/ghh_functions.sql:60 may not be a select, consider adding an operator, eg '!'
query dml.create_functi

In [4]:
# set processing parameters

# chunk_size  = the batch size for each partial process
CHUNK_SIZE = Config.CHUNK_SIZE

# set chunk_size for the enhancement and segmentation process
EDGE_PROCESSING_CHUNK_SIZE = Config.EDGE_PROCESSING_CHUNK_SIZE

# set chunk_size for the segmentization of the remaining road_network
FAR_NET_PROCESSING_CHUNK_SIZE = Config.FAR_NET_PROCESSING_CHUNK_SIZE

# concurrency = the number of parallel processes
CONCURRENCY = Config.CONCURRENCY

# connect_db = the database connection string
connect_db = Config.connect_db

# queries = the object containing all SQL needed for processing
queries = Config.queries

In [5]:
# set run_id and initiate workstep_idx

run_id = create_run_id()
workstep_idx = 0

print(run_id)

1741858011


In [6]:
from pgnetworks_processing.python import functions

chunk_bound_query_name = 'find_bounds_in_poi_table'
workstep_query_name = 'join_vertex_2_edge'
workstep_idx += 1
params_list = functions.create_range_bound_params_list(chunk_bound_query_name, workstep_query_name, workstep_idx, run_id)
params_list = params_list[:1]
params_list

[('join_vertex_2_edge', 2595910006465660600, 2595959187947121400, 1741858011)]

In [None]:
workstep_idx += 1
params_list = functions.calculate_selector_grid(workstep_idx, run_id)
# params_list = params_list[:12]

In [8]:
spatial_bound_query_name = 'select_selector_grid'
spatial_workstep_query_name = 'segmentize_road_network'
workstep_idx += 1
params_list = functions.create_spatial_workstep_params_list(spatial_bound_query_name, spatial_workstep_query_name, workstep_idx, run_id)
# params_list = params_list[:12]
print(len(params_list))
params_list

256


[('segmentize_road_network',
  'POLYGON((10.0268134 53.578148600000006,10.0268134 53.904454,10.591303450000002 53.904454,10.591303450000002 53.578148600000006,10.0268134 53.578148600000006))',
  1741858011),
 ('segmentize_road_network',
  'POLYGON((10.165005 48.3412702,10.165005 48.5685086,10.635642749999999 48.5685086,10.635642749999999 48.3412702,10.165005 48.3412702))',
  1741858011),
 ('segmentize_road_network',
  'POLYGON((10.165005 48.5685086,10.165005 48.8773712,10.635642749999999 48.8773712,10.635642749999999 48.5685086,10.165005 48.5685086))',
  1741858011),
 ('segmentize_road_network',
  'POLYGON((10.1905456 49.4181821,10.1905456 49.9011793,10.8367874 49.9011793,10.8367874 49.4181821,10.1905456 49.4181821))',
  1741858011),
 ('segmentize_road_network',
  'POLYGON((10.201786649999999 48.0354725,10.201786649999999 48.3412702,10.635642749999999 48.3412702,10.635642749999999 48.0354725,10.201786649999999 48.0354725))',
  1741858011),
 ('segmentize_road_network',
  'POLYGON((10.24

#### tear down and set stuff up

In [7]:
with psycopg2.connect(connect_db) as conn:
    try:
#       # drop assets
        # queries.ddl.drop_table_vertex_2_edge(conn)
        # queries.ddl.drop_table_junctioned_edges(conn)
        # queries.ddl.drop_table_segments(conn)
        # queries.ddl.drop_type_segment_processing(conn)
        # queries.ddl.drop_table_nodes(conn)
        # queries.ddl.drop_type_edge_processing(conn)
        # queries.ddl.drop_type_edge_processing_2(conn)
        # queries.ddl.drop_table_selector_grid(conn)
#       # queries.ddl.drop_table_log(conn)

#       # drop assets
#       # queries.dml.drop_function_public_ghh_decode_id_to_wkt(conn)
#       # queries.dml.drop_function_public_ghh_encode_xy_to_id(conn)
        queries.dml.drop_procedure_join_vertex_2_edge(conn)
        queries.dml.drop_procedure_process_junctions_and_edges(conn)
        queries.dml.drop_procedure_calculate_selector_grid(conn)
        queries.dml.drop_procedure_segmentize_road_network(conn)
        queries.dml.drop_procedure_count_node_degree(conn)
        
#       # rebuild assets
        # queries.ddl.create_table_vertex_2_edge(conn)
        # queries.ddl.create_table_junctioned_edges(conn)
        # queries.ddl.create_type_segment_processing(conn)
        # queries.ddl.create_table_segments(conn)
        # queries.ddl.create_table_nodes(conn)
        # queries.ddl.create_type_edge_processing(conn)
        # queries.ddl.create_type_edge_processing_2(conn)
        # queries.ddl.create_table_selector_grid(conn)
#       # queries.ddl.create_table_log(conn)
        
#       # create and replace assets
        queries.dml.create_procedure_join_vertex_2_edge(conn)
        queries.dml.create_procedure_process_junctions_and_edges(conn)
        queries.dml.create_procedure_calculate_selector_grid(conn)
        queries.dml.create_procedure_segmentize_road_network(conn)
        queries.dml.create_procedure_count_node_degree(conn)
        
        conn.commit()
    
    except psycopg2.Error as e:
        print(e)

#### download sources and copy data to DB

#### preprocess data in the DB

##### *define global variables for the run*

##### *define the wrapper functions for multiprocessing*

In [4]:
# create parameter list for a parallel processing work step based on chunk size setting

def create_workstep_params_list(chunk_bound_query_name: str, chunk_size: int, workstep_query_name: str, workstep_idx: int, RUN_ID: int):
    """
    create the params_list for the next process
    work step for parallel execution.
    """
    # get start_date
    start_date = datetime.now(timezone.utc).isoformat()

    # get chunk bounds based on chunk_size
    with psycopg2.connect(connect_db) as conn:
        chunk_bound_query = getattr(queries.dml, chunk_bound_query_name)
        bounds_list = list(map(itemgetter(0),chunk_bound_query(conn, chunk_size=chunk_size)))

    # concatenate the params_list
    params_list = [(workstep_query_name, bounds_list[i], bounds_list[i+1], chunk_size, RUN_ID) for i in range(len(bounds_list)-1)]
    i = len(bounds_list)-1
    params_list.append((workstep_query_name, bounds_list[i],bounds_list[i]+1, chunk_size, RUN_ID))

    # get end_date
    end_date = datetime.now(timezone.utc).isoformat()

    # collect the log info
    message = {"idx":workstep_idx}
    message = json.dumps(message)
    log_level = "INFO"

    # write to log
    with psycopg2.connect(connect_db) as conn:
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step=chunk_bound_query_name,chunk_size=chunk_size,item_count=None,message=message)
        conn.commit()

    return params_list
 

In [5]:
# create a grid of cells that each contain a maximum of elements for further processing

def calculate_selector_grid(max_elements: int):
    """ 
    
    """
    # get start_date
    start_date = datetime.now(timezone.utc).isoformat()

    # retrieve all selector grids
    params = (max_elements,)
    print(params)
    create_grid_statement_name = 'calculate_selector_grid'
    create_grid_statement = getattr(queries.dml, create_grid_statement_name).sql
    with psycopg2.connect(connect_db) as conn:
        start_date = datetime.now(timezone.utc).isoformat()
        with conn.cursor() as cur:
            cur.execute(create_grid_statement,params)
        conn.commit()   
        end_date = datetime.now(timezone.utc).isoformat()

    # collect the log info
    message = {"idx":workstep_idx}
    message = json.dumps(message)
    log_level = "INFO"

    # write to log
    with psycopg2.connect(connect_db) as conn:
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step='create_selector_grid',chunk_size=max_elements,item_count=None,message=message)
        conn.commit()  

In [6]:
# create parameter list for a parallel processing work step based on a spatial selector grid

def create_spatial_workstep_params_list(spatial_bound_query_name: str, chunk_size: int, workstep_query_name: str, workstep_idx: int, RUN_ID: int):
    """
    create the params_list for the next process
    work step for parallel execution.
    """
    # get start_date
    start_date = datetime.now(timezone.utc).isoformat()

    # retrieve all selector grids
    with psycopg2.connect(connect_db) as conn:
        spatial_bound_query = getattr(queries.dml, spatial_bound_query_name)
        bounds_list = list(map(itemgetter(0),spatial_bound_query(conn, chunk_size=chunk_size)))

    # concatenate the params_list
    params_list = [(workstep_query_name, bounds_list[i], chunk_size, RUN_ID) for i in range(len(bounds_list))]

    # get end_date
    end_date = datetime.now(timezone.utc).isoformat()

    # collect the log info
    message = {"idx":workstep_idx}
    message = json.dumps(message)
    log_level = "INFO"

    # write to log
    with psycopg2.connect(connect_db) as conn:
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step=spatial_bound_query_name,chunk_size=chunk_size,item_count=None,message=message)
        conn.commit()

    return params_list

In [7]:
# Start a parallel processing workstep bounded by ID range

def call_workstep(workstep_query_name: str, lower_bound: int, upper_bound: int, chunk_size: int, RUN_ID: int):
    """
    Call a procedure for a workstep that can be
    executed in parallel, like "vertex_2_edge".
    """
    params = (lower_bound, upper_bound)
    workstep_query = getattr(queries.dml, workstep_query_name).sql
    with psycopg2.connect(connect_db) as conn:
        with conn.cursor() as cur:
            start_date = datetime.now(timezone.utc).isoformat()
            cur.execute(workstep_query, params)
            # get end_date
            end_date = datetime.now(timezone.utc).isoformat()
            item_count = (cur.fetchone())[0]
            # collect the log info
            message = {"idx":workstep_idx,
                    "concurrency": CONCURRENCY,
                    "chunk_size": chunk_size
                    }
            message = json.dumps(message)
            log_level = "INFO"
        # write to log
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step=workstep_query_name,chunk_size=chunk_size,item_count=item_count,message=message)
        conn.commit()


def call_parallel_workstep(params_list, CONCURRENCY: int, chunk_size: int, workstep_query_name: str, workstep_idx: int, RUN_ID: int):
    """
    Parallel call of a procedure.
    """
    # get start_date
    start_date = datetime.now(timezone.utc).isoformat()

    with mp.Pool(processes=CONCURRENCY) as pool:
        pool.starmap(call_workstep, params_list)
    
    # get end_date
    end_date = datetime.now(timezone.utc).isoformat()

    # collect the log info
    message = {"idx":workstep_idx,
            "concurrency": CONCURRENCY,
            "chunk_size": chunk_size
            }
    message = json.dumps(message)
    log_level = "INFO"

    # write to log
    with psycopg2.connect(connect_db) as conn:
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step=workstep_query_name,chunk_size=chunk_size,item_count=None,message=message)
        conn.commit()

In [8]:
# Start a parallel processing workstep bounded by geometry

def call_spatial_workstep(spatial_workstep_query_name: str, selector_geometry: str, chunk_size: int, RUN_ID: int):
    """
    Call a procedure for a workstep that can be
    executed in parallel, like "vertex_2_edge".
    """
    params = (selector_geometry,)
    spatial_workstep_query = getattr(queries.dml, spatial_workstep_query_name).sql
    with psycopg2.connect(connect_db) as conn:
        with conn.cursor() as cur:
            start_date = datetime.now(timezone.utc).isoformat()
            cur.execute(spatial_workstep_query, params)
            # get end_date
            end_date = datetime.now(timezone.utc).isoformat()
            item_count = (cur.fetchone())[0]
            # collect the log info
            message = {"idx":workstep_idx,
                    "concurrency": CONCURRENCY,
                    "chunk_size": chunk_size
                    }
            message = json.dumps(message)
            log_level = "INFO"
        # write to log
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step=spatial_workstep_query_name,chunk_size=chunk_size,item_count=item_count,message=message)
        conn.commit()


def call_parallel_spatial_workstep(params_list, CONCURRENCY: int, chunk_size: int, workstep_query_name: str, workstep_idx: int, RUN_ID: int):
    """
    Parallel call of a procedure.
    """
    # get start_date
    start_date = datetime.now(timezone.utc).isoformat()

    with mp.Pool(processes=CONCURRENCY) as pool:
        pool.starmap(call_spatial_workstep, params_list)
    
    # get end_date
    end_date = datetime.now(timezone.utc).isoformat()

    # collect the log info
    message = {"idx":workstep_idx,
            "concurrency": CONCURRENCY,
            "chunk_size": chunk_size
            }
    message = json.dumps(message)
    log_level = "INFO"

    # write to log
    with psycopg2.connect(connect_db) as conn:
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step=workstep_query_name,chunk_size=chunk_size,item_count=None,message=message)
        conn.commit()

In [9]:
# Index a table for further processing

def create_index(index_statement_name: str, workstep_idx: int, RUN_ID: int):
    """
    Call index creation statement by query name.
    """
    index_statement = getattr(queries.ddl, index_statement_name).sql
    start_date = datetime.now(timezone.utc).isoformat()
    with psycopg2.connect(connect_db) as conn:
        with conn.cursor() as cur:
            cur.execute(index_statement)
        conn.commit()   
    end_date = datetime.now(timezone.utc).isoformat()

    # collect the log info
    message = {"idx":workstep_idx}
    message = json.dumps(message)
    log_level = "INFO"

    # write to log
    with psycopg2.connect(connect_db) as conn:
        queries.dml.write_to_log(conn,log_level=log_level,run_id=RUN_ID,start_date=start_date,end_date=end_date,work_step=index_statement_name,chunk_size=None,item_count=None,message=message)
        conn.commit()  

##### *perform processing*

In [12]:
# find bounds in the poi table

chunk_bound_query_name = 'find_bounds_in_poi_table'
workstep_query_name = 'join_vertex_2_edge'
workstep_idx += 1
params_list = create_workstep_params_list(chunk_bound_query_name, CHUNK_SIZE, workstep_query_name, workstep_idx, RUN_ID)
#params_list = params_list[:1]
#params_list


In [13]:
# join vertices to the nearest edges

workstep_idx += 1
call_parallel_workstep(params_list,CONCURRENCY, CHUNK_SIZE, workstep_query_name, workstep_idx, RUN_ID)
workstep_idx += 1
create_index('create_index_vertex_2_edge_edge_id_idx', workstep_idx, RUN_ID)

In [14]:
# find bounds in the vertex junction table

chunk_bound_query_name = 'find_bounds_in_vertex_2_edge'
workstep_query_name = 'process_junctions_and_edges'
workstep_idx += 1
params_list = create_workstep_params_list(chunk_bound_query_name, EDGE_PROCESSING_CHUNK_SIZE, workstep_query_name, workstep_idx, RUN_ID)
#params_list

In [15]:
# process the junctions and edges (segmentize the near_net edges)

#workstep_idx += 1
call_parallel_workstep(params_list,CONCURRENCY, EDGE_PROCESSING_CHUNK_SIZE, workstep_query_name, workstep_idx, RUN_ID)

In [16]:
# create spatial selector grid over the remaining road network edges
workstep_idx += 1
calculate_selector_grid(FAR_NET_PROCESSING_CHUNK_SIZE)

(20000,)


In [None]:
# prepare the spatial bound parameter list for far_net edge processing

spatial_bound_query_name = 'select_selector_grid'
spatial_workstep_query_name = 'segmentize_road_network'
workstep_idx += 1
params_list = create_spatial_workstep_params_list(spatial_bound_query_name, FAR_NET_PROCESSING_CHUNK_SIZE, spatial_workstep_query_name, workstep_idx, RUN_ID)
# params_list = params_list[:12]
print(len(params_list))

256


In [18]:
# process the remaining road network edges (process the far_net edges)

workstep_idx += 1
call_parallel_spatial_workstep(params_list,6, FAR_NET_PROCESSING_CHUNK_SIZE, spatial_workstep_query_name, workstep_idx, RUN_ID)

In [20]:
# index the segments table

workstep_idx += 1
create_index('create_index_segments_geom_idx', workstep_idx, RUN_ID)
workstep_idx += 1
create_index('create_index_segments_node_1_idx', workstep_idx, RUN_ID)
workstep_idx += 1
create_index('create_index_segments_node_2_idx', workstep_idx, RUN_ID)
workstep_idx += 1
create_index('create_index_segments_edge_id_idx', workstep_idx, RUN_ID)

In [10]:
# prepare the spatial bound parameter list for node degree calculation

spatial_bound_query_name = 'select_selector_grid'
spatial_workstep_query_name = 'count_node_degree'
workstep_idx += 1
params_list = create_spatial_workstep_params_list(spatial_bound_query_name, FAR_NET_PROCESSING_CHUNK_SIZE, spatial_workstep_query_name, workstep_idx, RUN_ID)
# params_list = params_list[:12]
# print(len(params_list))
# print(params_list)

In [None]:
# calculate the node degree from the segments table

workstep_idx += 1
call_parallel_spatial_workstep(params_list,6, FAR_NET_PROCESSING_CHUNK_SIZE, spatial_workstep_query_name, workstep_idx, RUN_ID)
workstep_idx += 1
create_index('create_index_nodes_node_id_idx', workstep_idx, RUN_ID)
workstep_idx += 1
create_index('create_index_nodes_selector_grid_hash_id_idx', workstep_idx, RUN_ID)