# Establishing Multisource Data-Integration Framework for Transportation Data Analytics

> ### This notebook demonstrates the map conflation process described in the following paper

```
@article{cui2020establishing,
  title={Establishing Multisource Data-Integration Framework for Transportation Data Analytics},
  author={Cui, Zhiyong and Henrickson, Kristian and Biancardo, Salvatore Antonio and Pu, Ziyuan and Wang, Yinhai},
  journal={Journal of Transportation Engineering, Part A: Systems},
  volume={146},
  number={5},
  pages={04020024},
  year={2020},
  publisher={American Society of Civil Engineers}
}
```

In this notebook, two geographical layers are used the Washington 24K map layer and the HERE layer in 2016 quarter 2. These two layers are stored in PostgreSQL as two tables 
* `state_lines`: Washington 24K map layer used for loop detector data analytics
* `wash_lines_2016q2`: HERE Layer for HERE data
The two tables are backup as `.csv` files and `.sql` files. 

You can easily use `PgAdmin4` (version >= 4.23) to import the `.csv` files to your own PostgreSQL database, taking the `wash_lines_2016q2` table as an example:
1. Create a database for yourself to run this code.
2. Create a `wash_lines_2016q2` table in your database by executing the following code in the **Query Tool** of `PgAdmin4`:

~~~sql
-- Table: public.wash_lines_2016q2
-- DROP TABLE public.wash_lines_2016q2;
CREATE TABLE public.wash_lines_2016q2
(
    geom geometry(MultiLineString,4326),
    link_id bigint,
    st_name character varying(240) COLLATE pg_catalog."default",
    feat_id bigint,
    dir_travel character varying(1) COLLATE pg_catalog."default",
    frontage character varying(1) COLLATE pg_catalog."default",
    ramp character varying(1) COLLATE pg_catalog."default",
    contracc character varying(1) COLLATE pg_catalog."default",
    route_type character varying(1) COLLATE pg_catalog."default",
    iso_code character varying(3) COLLATE pg_catalog."default"
)
WITH (
    OIDS = FALSE
)
TABLESPACE pg_default;

ALTER TABLE public.wash_lines_2016q2
    OWNER to postgres;

GRANT ALL ON TABLE public.wash_lines_2016q2 TO postgres;

GRANT SELECT ON TABLE public.wash_lines_2016q2 TO PUBLIC;
-- Index: sidx_wash_lines_2016q2_geom

-- DROP INDEX public.sidx_wash_lines_2016q2_geom;

CREATE INDEX sidx_wash_lines_2016q2_geom
    ON public.wash_lines_2016q2 USING gist
    (geom)
    TABLESPACE pg_default;
~~~

2. Create the `postgis` Extension for your database (Just right click the **Entensions** menu in your database and **Create**) 
3. Use the **Import** function by right clicking the `wash_lines_2016q2` table to import the `wash_lines_2016q2.csv` file. (Please remeber to turn the **header** on and choose ',' as Delimiter)

In [1]:
import psycopg2

In [22]:
IP = '128.95.29.75'
PORT = 5432
DBNAME = 'postgres217'
USER = 'postgres'
PASSWORD = 'NordicNew4ge'

# Query for all state_lines segment

In [10]:
con = psycopg2.connect(host=IP, port=PORT, dbname=DBNAME, user=USER, password=PASSWORD)
cur = con.cursor()

ids = []
cur.execute("SELECT DISTINCT objectid FROM state_lines WHERE express is null and relroutequ is null ORDER BY objectid")
for rw in cur.fetchall():
    ids.append(rw[0])
    
con.close()

In [11]:
con = psycopg2.connect(host=IP, port=PORT, dbname=DBNAME, user=USER, password=PASSWORD)
cur = con.cursor()

conflation_table_name = 'conflation'
create_conflation_table_query = '''
CREATE TABLE IF NOT EXISTS public.{}
(
    point_id integer NOT NULL,
    here_id integer NOT NULL,
    geom geometry(Geometry,4326),
    angle_score double precision,
    distance_score double precision
)
'''.format(conflation_table_name)
cur.execute(create_conflation_table_query)
con.commit()
con.close()
print('Create conflation table')


Create conflation table


In [12]:
len(ids)

12251

# Conflating function

In [20]:
def ConflateSegments(ids, direction_subquery, distance, angle = 0.25, conflation_table = 'conflation', RECONFLATE=False):
    con = psycopg2.connect(host=IP, port=PORT, dbname=DBNAME, user=USER, password=PASSWORD)
    cur = con.cursor()
    
    if RECONFLATE:
        for i in ids:
            sql = '''DELETE FROM {} WHERE point_id = '''.format(conflation_table) + str(i)
            print (sql)
            cur.execute(sql)
            print(cur.rowcount)
            con.commit()
    
    c = 0
    for i in ids:
        sql = '''
    WITH base_points AS (
                        SELECT 
                            objectid AS base_id,
                            ST_DumpPoints( ST_Segmentize( ST_Transform(wkb_geometry,9110), 30 ) ) AS dp,
                            geom AS base_geom,
                            rt_typea AS base_type,
                            direction AS base_dir
                        FROM state_lines
                        WHERE objectid = {}
        )
        , base_pairs AS (
                        SELECT 
                            base_id,
                            ST_Transform((dp).geom,4326 ) AS pt_geom,
                            LEAD(ST_Transform((dp).geom,4326 )) OVER (PARTITION BY base_id ORDER BY (dp).path[1]) As lead_geom,
                            ST_Centroid(ST_MakeLine(ST_Transform((dp).geom,4326 ), LEAD(ST_Transform((dp).geom,4326 )) 
                                OVER (PARTITION BY base_id ORDER BY (dp).path[1]))) AS base_line_centroid,
                            base_type,
                            base_dir,
                            (dp).path[1] AS base_path
                        FROM base_points
        )
        , here_near_points AS (
                        SELECT 
                            here.link_id AS here_id,
                            here.geom AS here_geom,
                            ST_DumpPoints( ST_Segmentize( ST_Transform(here.geom, 9110), 30 ) ) AS here_dp,
                            here.ramp AS here_type,
                            here.dir_travel AS here_dir
                        FROM state_lines AS sl JOIN wash_lines_2016q2 here
                            -- ON ST_Distance(sl.geom, here.geom) < 0.0001
                            ON ST_DWithin(ST_Transform(sl.wkb_geometry, 4326), here.geom, {})
                        WHERE sl.objectid = {}
                        AND here.ramp = 'N'
                        {}
        )
        , here_pairs AS (
                        SELECT 
                            here_id,
                            ST_Transform((here_dp).geom,4326 ) AS here_pt_geom,
                            LEAD(ST_Transform((here_dp).geom,4326 )) OVER (PARTITION BY here_id ORDER BY (here_dp).path[2]) As here_lead_geom,
                            ST_Centroid(ST_MakeLine(ST_Transform((here_dp).geom,4326 ), LEAD(ST_Transform((here_dp).geom,4326 )) 
                                OVER (PARTITION BY here_id ORDER BY (here_dp).path[2]))) AS here_line_centroid,
                            here_type,
                            here_dir,
                            (here_dp).path[2] AS here_path
                        FROM here_near_points
        )
        , points AS (
                        SELECT 	
                            base_id,
                            ST_MakeLine(pt_geom, lead_geom) AS base_line,
                            ST_Azimuth(pt_geom, lead_geom ) AS base_az,
                            ST_Distance(pt_geom, lead_geom) AS base_length,
                            base_line_centroid,
                            base_type,
                            base_dir,
                            base_path,
                            here_id,
                            ST_MakeLine(here_pt_geom, here_lead_geom) AS here_line,
                            --ST_Azimuth(here_pt_geom, here_lead_geom ) AS here_az,
                            CASE        			
                                WHEN here_dir = 'T' THEN 
                                    ST_Azimuth(here_lead_geom, here_pt_geom) 
                                ELSE
                                    ST_Azimuth(here_pt_geom, here_lead_geom) 
                            END  AS here_az,
                            ST_Distance(here_pt_geom, here_lead_geom) AS here_length,
                            here_type,
                            here_dir,
                            here_path,
                            ST_Distance(base_line_centroid, here_line_centroid ) AS distance
                    FROM base_pairs JOIN here_pairs ON 
                        --ST_Distance(wkt_geom,(here_geom).geom ) < 0.001)
                        ST_DWithin(base_line_centroid, here_line_centroid, {})
                    ORDER BY base_id, base_path, distance, here_id, here_path
        )
        , points_here AS (
                    SELECT  base_id,
                        base_path,
                        base_line,
                        base_az,
                        base_dir,
                        here_id,
                        here_path,
                        here_az,
                        here_dir,
                        distance,
                        ACOS(COS( base_az ) * COS(here_az ) + SIN( base_az ) * SIN(here_az ))  AS angle
                    FROM points
                    WHERE ABS( SIN( base_az - here_az ) ) < {}
        )
        ,final AS (
                    SELECT 
                        base_id,
                        base_path,
                        base_line,
                        base_az,
                        base_dir,
                        here_id,
                        here_path,
                        here_az,
                        here_dir,
                        distance,
                        angle,
                        ROW_NUMBER() OVER ( PARTITION BY base_id, base_path ORDER BY distance ) AS rw
                    FROM points_here 
        )
        INSERT INTO {}
        SELECT  
                base_id,
                here_id,
                ST_MakeLine( base_line ORDER BY base_path ) AS geom,
                MAX(angle) AS angle,
                MAX(distance) AS distance
        FROM final 
        WHERE rw = 1
        GROUP BY 
                base_id,
                here_id 
        ON CONFLICT DO NOTHING;
        '''.format(i, distance, i, direction_subquery, distance, angle, conflation_table)
        cur.execute(sql)
        print( '{} rows inserted on id {}'.format(cur.rowcount,i))
        con.commit()
        c += 1
    con.close()

# Conflate all Segments

In [None]:
ConflateSegments(ids, direction_subquery='', distance=0.0001, angle=0.25, conflation_table = 'conflation', RECONFLATE=False)

## For specific segments, if the conflation results are not good enough, we can reconflate by manually adjusting parameters as follows or using iteratively adjusting as the paper mentioned.

# SR-520

In [21]:
ids = [17815]
direction_subquery = ''' AND dir_travel = 'F' '''
distance = 0.001

ConflateSegments(ids, direction_subquery, distance, conflation_table = 'conflation', RECONFLATE=True)

DELETE FROM conflation WHERE point_id = 17815
3
3 rows inserted on id 17815


# I-90

In [103]:
ids=[2439, 15111, 15112]
direction_subquery = ''' AND here.dir_travel != 'B' '''
# dir_travel = 'F'
distance = 0.001

ConflateSegments(ids, direction_subquery, distance, conflation_table = 'conflation', RECONFLATE=True)

DELETE FROM temp_007 WHERE point_id = 2439
8
DELETE FROM temp_007 WHERE point_id = 15111
5
DELETE FROM temp_007 WHERE point_id = 15112
1
4 rows inserted on id 2439
3 rows inserted on id 15111
1 rows inserted on id 15112


# I405

In [75]:
ids=[2157, 2156, 15327]
direction_query = ''' AND here.dir_travel != 'B' '''
# dir_travel = 'F'
distance = 0.001

ConflateSegments(ids, direction_subquery, distance, conflation_table = 'conflation', RECONFLATE=True)

DELETE FROM temp_007 WHERE point_id = 2157
5
DELETE FROM temp_007 WHERE point_id = 2156
8
DELETE FROM temp_007 WHERE point_id = 15327
5
5 rows inserted on id 2157
8 rows inserted on id 2156
5 rows inserted on id 15327


# I-5

In [76]:
ids=[13524]
direction_query = '''  '''
distance = 0.001

ConflateSegments(ids, direction_subquery, distance, conflation_table = 'conflation', RECONFLATE=True)

DELETE FROM temp_007 WHERE point_id = 13524
5
5 rows inserted on id 13524


## North Bound

In [87]:
ids=[13524, 3126, 15483, 3127]
direction_query = ''' AND here.dir_travel = 'F' '''
distance = 0.001
angle = 0.35

ReConflateSegments(ids, direction_subquery, distance, angle, conflation_table = 'conflation', RECONFLATE=True)

DELETE FROM temp_007 WHERE point_id = 13524
2
DELETE FROM temp_007 WHERE point_id = 3126
6
DELETE FROM temp_007 WHERE point_id = 15483
6
DELETE FROM temp_007 WHERE point_id = 3127
5
2 rows inserted on id 13524
6 rows inserted on id 3126
6 rows inserted on id 15483
5 rows inserted on id 3127


## South Bound

In [88]:
ids=[9307, 15484, 9274]
direction_query = ''' AND here.dir_travel = 'T' '''
distance = 0.001
angle = 0.35

ConflateSegments(ids, direction_subquery, distance, angle, conflation_table = 'conflation', RECONFLATE=True)

DELETE FROM temp_007 WHERE point_id = 9307
6
DELETE FROM temp_007 WHERE point_id = 15484
0
DELETE FROM temp_007 WHERE point_id = 9274
5
7 rows inserted on id 9307
5 rows inserted on id 15484
4 rows inserted on id 9274


## ---------------------------- Finished ----------------------------------