In [2]:
import os
from pathlib import Path

import pandas as pd
import json

import ibis as ib
from ibis import _

In [3]:
ib.options.interactive = True

In [4]:
conn = ib.connect("duckdb://", **{'memory_limit' : '150GB', 'threads' : 30})

In [5]:
conn.sql("""SELECT * FROM duckdb_settings();""").execute()

Unnamed: 0,name,value,description,input_type,scope
0,access_mode,automatic,"Access mode of the database (AUTOMATIC, READ_O...",VARCHAR,GLOBAL
1,allocator_background_threads,false,Whether to enable the allocator background thr...,BOOLEAN,GLOBAL
2,allocator_bulk_deallocation_flush_threshold,512.0 MiB,If a bulk deallocation larger than this occurs...,VARCHAR,GLOBAL
3,allocator_flush_threshold,128.0 MiB,Peak allocation threshold at which to flush th...,VARCHAR,GLOBAL
4,allow_community_extensions,true,Allow to load community built extensions,BOOLEAN,GLOBAL
...,...,...,...,...,...
124,python_scan_all_frames,false,"If set, restores the old behavior of scanning ...",BOOLEAN,GLOBAL
125,TimeZone,UTC,The current time zone,VARCHAR,LOCAL
126,prefetch_all_parquet_files,false,Use the prefetching mechanism for all types of...,BOOLEAN,GLOBAL
127,python_enable_replacements,false,Whether variables visible to the current stack...,BOOLEAN,LOCAL


In [6]:
conn.sql("""SELECT current_setting('threads') as threads;""").execute()

Unnamed: 0,threads
0,30


In [7]:
base_path = Path(os.getcwd()) / '..' / ".."/ 'soge-home' / 'projects' / 'mistral' / 'DAFNI_NIRD' / 'results'
base_path

PosixPath('/hn01-home/cenv1007/../../soge-home/projects/mistral/DAFNI_NIRD/results')

In [8]:
def update_edge_speed(
    table,
    combined_label: str,
    total_flow: float,
    initial_flow_speed: float,
    min_flow_speed: float,
    breakpoint_flow: float,
    ):
    vp = total_flow / 24
    return (
        table
        .mutate(
            acc_speed = ib.least(
            ib.cases(
                ((combined_label == 'M') & (vp > breakpoint_flow), initial_flow_speed - 0.033 * (vp - breakpoint_flow)),
                ((combined_label == 'A_single') & (vp > breakpoint_flow), initial_flow_speed - 0.05 * (vp - breakpoint_flow)),
                ((combined_label == 'A_dual') & (vp > breakpoint_flow), initial_flow_speed - 0.033 * (vp - breakpoint_flow)),
                ((combined_label == 'B') & (vp > breakpoint_flow), initial_flow_speed - 0.05 * (vp - breakpoint_flow)),
                else_ = initial_flow_speed
            ), min_flow_speed)
        )
    )

def apply_recovery(
    table,
    recovery_rates_dict : dict,
    day,
    ):

    event_day = bool(int(recovery_rates_dict[day].filter(_.damage_level=='event_day').recovery_rate.execute()[0]))
    return (
        table
        .join(
            recovery_rates_dict[day],
            predicates = [_.damage_level == recovery_rates_dict[day].damage_level],
            how='left',
        )
        .mutate(
            acc_capacity = ib.cases(
                (_.damage_level.isin(['extensive','severe']) & event_day, 0),
                (_.damage_level_max=='no', _.current_capacity),
                else_ = _.current_capacity*_.recovery_rate,
            )
        )
    )



In [9]:
od = conn.read_parquet(base_path / "base_scenario" / "revision" / "odpfc_0.5_grouped.pq")

In [10]:
recovery_rates = pd.read_csv(base_path.parent / 'processed_data' / 'tables' / 'recovery design_updated.csv')
recovery_rates_dict = {
    key : (

        conn.create_table(
            f'bridge_recovery_dict_{key}',
            obj=(pd.DataFrame(
                data=[val]
                # ,index = ['damage_level','recovery_rate']
                )
                .T
                .reset_index(drop=False)
                )
                ,overwrite=True,
        )
        .rename({"damage_level" : "col0", 'recovery_rate' : 'col1' })


    ) for key, val in recovery_rates.to_dict(orient='index').items()}



In [11]:
import itertools
import time

In [12]:
scenarios = (itertools.product([60],[7,8,9,10,12,13,14,15,16,18,19],range(0,6))) 

In [None]:
for depth, event, day in scenarios: 
    start = time.time()
    road_links = conn.read_parquet(base_path/ 'disruption_analysis'/ 'revision' / f'{depth}'/ 'links' / f'road_links_{event}.gpq')
    
    road_links = (
        road_links
        .mutate(
                breakpoint_flows = _.combined_label.cases(
                    ('M', 1200),
                    ('A_dual', 1080),
                    ('A_single', 1200),
                    ('B', 1200)
            ))
    )
    
    
    disrupted_links = (
        road_links
        .filter(
            ib.or_(
                _.max_speed < _.current_speed,
                _.damage_level_max.isin(['extensive', 'severe'])
            )
        )
        .select('e_id')
        .distinct()
        .execute()['e_id']
        .tolist()
    )
    
    ## Applying recovery (update road link capacities)
    cols = road_links.columns
    road_links = (
        road_links
        .mutate(acc_capacity = _.current_capacity)  
        .mutate(damage_level = _.road_label + '_' + _.damage_level_max)
        .pipe(apply_recovery, recovery_rates_dict = recovery_rates_dict, day = day)
        .select(*cols, 'acc_capacity')
    )
    
    
    recovery_links = road_links.select('e_id', 'acc_capacity')
    
    od_disrupted = (
         od
        .select('path','origin_node','destination_node')
        .mutate(
            od_id = _.origin_node.concat(":",_.destination_node)
        )
        .unnest('path')
        .group_by('path')
        .agg(
            od_id = _.od_id.collect()
        ) 
        .join(
            recovery_links,
            predicates = [_.path == recovery_links.e_id],
            how='left',
        )
        .filter(_.path.isin(disrupted_links))
        .unnest('od_id')
        .group_by('od_id')
        .agg(
            disrupted_links=_.path.collect(),
            min_link_capacity=_.acc_capacity.min()
        )
        .mutate(
            origin_node = _.od_id.re_split(":")[0],
            destination_node = _.od_id.re_split(":")[1],
        )
        .select('origin_node','destination_node','disrupted_links', 'min_link_capacity')
        .join(
            od,
            predicates=['origin_node','destination_node'],
            how='inner',
        )
    ).cache()
    
    ## Initialise key variables for flow simulation 
    
    
    links =(
        od_disrupted
        .mutate(disrupted_flows = ib.greatest(_.flow - _.min_link_capacity, 0))
        .unnest('path')    
        .group_by('path')
        .agg(
            disrupted_flows=_.disrupted_flows.sum()
        )
        .join(
            road_links, 
            predicates = [_.path == road_links.e_id],
            how = 'right'
        )
        .mutate(
            disrupted_flows = ib.cases(
            (_.disrupted_flows.isnull(), 0), # if null, link is not disrupted!
            else_ = _.disrupted_flows)
        )
        .mutate
        (
            acc_capacity = _.acc_capacity + _.disrupted_flows,
            acc_flow = _.current_flow - _.disrupted_flows
        )
        .pipe(
            update_edge_speed, 
            combined_label = _.combined_label,
            total_flow = _.acc_flow,
            initial_flow_speed = _.initial_flow_speeds,
            min_flow_speed = _.min_flow_speeds,
            breakpoint_flow = _.breakpoint_flows
        )
        .select(*road_links.columns, "acc_flow", "acc_speed")
    )
    
    ## Export to parquet files
    od_disrupted.to_parquet(base_path/ 'disruption_analysis'/ 'revision' / 'ibis_results'/ f'disrupted_od_depth{depth}_event{event}_day{day}.pq', compression = "ZSTD")
    links.to_parquet(base_path / 'disruption_analysis' / 'revision' / 'ibis_results' / f'road_links_depth{depth}_event{event}_day{day}.gpq', compression = "ZSTD")
    print(f"time(sec): {(time.time() - start)}")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

time(sec): 264.01131677627563


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

time(sec): 288.6630802154541


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

time(sec): 316.6005425453186


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

time(sec): 304.3411138057709


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

time(sec): 328.2886230945587
