This repository has been archived by the owner on Apr 27, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
utils.py
139 lines (113 loc) · 4.74 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import time
from typing import List, Optional
import torch
from torch.fx.experimental.proxy_tensor import make_fx
from functorch._src.compile_utils import strip_overloads
import torch_mlir
import iree_torch
DEVICE_TO_IREE_BACKEND = { "cpu" : "llvm-cpu",
"cuda" : "cuda" }
def timeit(*, append_time_to: Optional[List] = None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time_ns()
result = func(*args, **kwargs)
end_time = time.time_ns()
if append_time_to is not None:
append_time_to.append(end_time - start_time)
return result
return wrapper
return decorator
def _returns_nothing(fx_g: torch.fx.GraphModule) -> bool:
for node in fx_g.graph.nodes:
if node.op == "output":
assert len(node.args) == 1, "Output node must have a single argument"
node_arg = node.args[0]
if isinstance(node_arg, tuple):
return len(node_arg) == 0
return False
def _unwrap_single_tuple_return(fx_g: torch.fx.GraphModule) -> bool:
"""
Replace tuple with tuple element in functions that return one-element tuples.
Returns true if an unwrapping took place, and false otherwise.
"""
unwrapped_tuple = False
for node in fx_g.graph.nodes:
if node.op == "output":
assert len(node.args) == 1, "Output node must have a single argument"
node_arg = node.args[0]
if isinstance(node_arg, tuple):
if len(node_arg) == 1:
node.args = (node_arg[0],)
unwrapped_tuple = True
break
if unwrapped_tuple:
fx_g.graph.lint()
fx_g.recompile()
return unwrapped_tuple
def make_torch_mlir_compiler(use_tracing: bool, device: str, verbose=False):
def compiler(fx_graph: torch.fx.GraphModule,
example_inputs: List[torch.Tensor]):
"""Compile GraphModule using torch-mlir + IREE."""
if verbose:
print("Compiling graph...")
if _returns_nothing(fx_graph):
return fx_graph
was_unwrapped = _unwrap_single_tuple_return(fx_graph)
fx_graph = make_fx(fx_graph)(*example_inputs)
strip_overloads(fx_graph)
if verbose:
print("torch.fx graph:")
print(fx_graph.graph)
ts_compiler = torch.jit.trace if use_tracing else torch.jit.script
ts_graph = ts_compiler(fx_graph, example_inputs)
if verbose:
torch_mlir_module = torch_mlir.compile(
ts_graph, example_inputs,
output_type=torch_mlir.OutputType.TORCH)
print("\n\ntorch-mlir backend contract graph:")
print(torch_mlir_module)
linalg_module = torch_mlir.compile(
ts_graph, example_inputs,
output_type=torch_mlir.OutputType.LINALG_ON_TENSORS)
backend = DEVICE_TO_IREE_BACKEND[device]
arch = "sm_80" if device == "cuda" else None
compiled_module = iree_torch.compile_to_vmfb(linalg_module, backend, arch)
loaded_module = iree_torch.load_vmfb(compiled_module, backend)
def forward(*inputs):
result = loaded_module.forward(*inputs)
result = tuple() if result is None else result
return (result,) if was_unwrapped else result
return forward
return compiler
def check_results(compiled_results, eager_results):
for compiled_result, eager_result in zip(compiled_results, eager_results):
if not torch.allclose(compiled_result.to("cpu"),
eager_result.to("cpu"), atol=1e-5):
print("Compiled result does not match eager result")
return
print("Compiled result matches eager result!")
def print_time_stats(times):
times_tensor = torch.tensor(times)
def quantile_ms(q):
return torch.quantile(times_tensor.to(float), q).item() / 1e6
print(f"Median: {quantile_ms(0.5)} ms")
print(f"10%ile: {quantile_ms(0.1)} ms")
print(f"90%ile: {quantile_ms(0.9)} ms")
print(f"Total: {torch.sum(times_tensor) / 1e6} ms")
print()