-
Notifications
You must be signed in to change notification settings - Fork 80
/
so6_compute_occupancy.py
134 lines (110 loc) · 3.78 KB
/
so6_compute_occupancy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import argparse
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import timedelta
from pathlib import Path
from typing import Dict, List, Optional
import pandas as pd
from shapely.ops import cascaded_union
from tqdm import tqdm
from traffic.core.time import to_datetime
from traffic.data import SO6, nm_airspaces
def occupancy(data, configuration):
return len(data.intersects(configuration))
def compute_stats(
input_file: Path,
output_file: Optional[Path],
sector_list: List[str],
max_workers: int,
interval: int,
starting_from: Optional[str],
ending_at: Optional[str],
) -> pd.DataFrame:
so6 = SO6.from_file(input_file.as_posix())
if so6 is None:
raise RuntimeError
total: List[Dict[str, int]] = []
if starting_from is None:
start_time = so6.data.time1.min()
else:
start_time = max(to_datetime(starting_from), so6.data.time1.min())
if ending_at is None:
end_time = so6.data.time2.max()
else:
end_time = min(to_datetime(ending_at), so6.data.time2.max())
if end_time < start_time:
msg = f"End time {end_time} is anterior to start time {start_time}"
raise ValueError(msg)
# First clip
so6 = so6.between(start_time, end_time)
delta = timedelta(minutes=interval)
size_range = int((end_time - start_time) / delta) + 1
time_list = [start_time + i * delta for i in range(size_range)]
all_sectors = [nm_airspaces[airspace] for airspace in sector_list]
so6 = so6.inside_bbox(
cascaded_union([s.flatten() for s in all_sectors if s is not None])
)
for start_ in tqdm(time_list):
subset = so6.between(start_, delta)
args = {}
# subset serializes well as it is much smaller than so6
# => no multiprocess on so6!!
with ProcessPoolExecutor(max_workers=max_workers) as executor:
tasks = {
executor.submit(occupancy, subset, sector): sector.name
for sector in all_sectors
if sector is not None
}
for future in as_completed(tasks):
conf = tasks[future]
try:
args[conf] = future.result()
except Exception as e:
print(f"Exception {e} raised on {conf}")
total.append(args)
stats = pd.DataFrame.from_dict(total)
stats.index = time_list
if output_file is not None:
if output_file.suffix == ".pkl":
stats.to_pickle(output_file.as_posix())
elif output_file.suffix == ".csv":
stats.to_csv(output_file.as_posix())
elif output_file.suffix == ".xlsx":
stats.to_excel(output_file.as_posix())
else:
print(stats)
return stats
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Statistics of occupancy on a SO6 file"
)
parser.add_argument(
"-i",
dest="interval",
default=10,
type=int,
help="number of minutes for a time window",
)
parser.add_argument(
"-o", dest="output_file", type=Path, help="output file for results"
)
parser.add_argument(
"-t",
dest="max_workers",
default=4,
type=int,
help="number of parallel processes",
)
parser.add_argument(
"-f", dest="starting_from", help="start time (yyyy:mm:ddThh:mm:ssZ)"
)
parser.add_argument(
"-u", dest="ending_at", help="end time (yyyy:mm:ddThh:mm:ssZ)"
)
parser.add_argument("input_file", type=Path, help="SO6 file to parse")
parser.add_argument(
"sector_list",
nargs="+",
help="list of airspaces to pick in AIRAC files",
)
args = parser.parse_args()
res = compute_stats(**vars(args))