forked from MulanRevive/mulan-rework
-
Notifications
You must be signed in to change notification settings - Fork 0
/
环境.py
284 lines (243 loc) · 8.77 KB
/
环境.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import imp
import math
import os
import sys
import time
import threading
from datetime import datetime
from pathlib import Path
from 木兰.分析器.语法分析器 import 语法分析器
# 参考:https://docs.python.org/3.7/library/threading.html#thread-objects
class 线程(threading.Thread):
"""
跟踪线程包装器。
"""
def __init__(自身, *args, **kw):
threading.Thread.__init__(自身, *args, **kw)
自身.已亡 = False
def start(自身):
自身._线程__运行_backup = 自身.run
自身.run = 自身._线程__运行
threading.Thread.start(自身)
def __运行(自身):
sys.settrace(自身.全局跟踪)
自身._线程__运行_backup()
自身.run = 自身._线程__运行_backup
def 全局跟踪(自身, 栈, 事件, 参数):
if 事件 == 'call':
return 自身.本地跟踪
return
def 本地跟踪(自身, 栈, 事件, 参数):
if 自身.已亡:
if 事件 == 'line':
raise SystemExit
return 自身.本地跟踪
def 杀死(自身):
自身.已亡 = True
def 分析并编译(源码文件名):
with open(源码文件名, 'r', encoding='utf-8') as 文件:
源码 = 文件.read()
分析器 = 语法分析器()
节点 = 分析器.分析(源码=源码, 源码文件=源码文件名)
return compile(节点, 源码文件名, 'exec')
def 加载木兰模块(名称, 全局, 源自=(), 目录相对层次=0):
木兰源码路径 = str(Path(*(名称.split(".")))) + '.ul'
可执行码 = 分析并编译(木兰源码路径)
if 可执行码 is None:
raise ModuleNotFoundError(名称)
# TODO: 研究何用
所有模块 = []
后段 = 名称
# 即便尚没有模块,第一个模块名也不应为空
模块名 = lambda 模块名称: 所有模块[-1].__name__ + '.' + 模块名称 if 所有模块 else 模块名称
点位 = 0
while 点位 != -1:
点位 = 后段.find('.')
前段, 后段 = 后段[:点位], 后段[点位 + 1:]
if 点位 == -1:
前段 = 后段
模块 = imp.new_module(模块名(后段))
模块.__dict__.update(创建全局变量(argv=(全局['ARGV'])))
模块.__dict__['__file__'] = os.path.abspath(木兰源码路径)
exec(可执行码, 模块.__dict__)
else:
模块 = imp.new_module(模块名(前段))
if 所有模块:
所有模块[-1].__dict__[前段] = 模块
所有模块.append(模块)
顶层 = 所有模块[0]
if len(所有模块) > 1:
if 源自 is not None:
for sym in 源自:
if sym == '*':
for k in 模块.__dict__:
if k not in 全局:
顶层.__dict__[k] = 模块.__dict__[k]
break
else:
顶层.__dict__[sym] = 模块.__dict__[sym]
return 顶层
def 内置扩展(内置项):
from inspect import isclass
for k, v in __builtins__.items():
if isclass(v) and issubclass(v, BaseException):
内置项[k] = v
return 内置项
def __内置_除(a, b):
if isinstance(a, int):
if isinstance(b, int):
return math.floor(a / b)
return a / b
def __内置_求余(a, b):
# TODO: 不解为何要在 a b 为整数时特殊处理
return a % b
def 创建全局变量(argv=[], 文件名=''):
def 转字符串(x):
def 容器转为字符串(容器, 始='', 末='', 格式=None):
字符串 = 始
for 序号, 该项 in enumerate(容器):
if 序号:
字符串 += ', '
if 格式 is None:
字符串 += 转字符串(该项)
else:
字符串 += 格式(容器, 该项)
字符串 += 末
return 字符串
if x is None:
return 'nil'
if isinstance(x, bool):
return 'true' if x else 'false'
if isinstance(x, list):
return 容器转为字符串(x, '[', ']')
if isinstance(x, tuple):
return 容器转为字符串(x)
if isinstance(x, dict):
return 容器转为字符串(x, '{', '}', lambda 容器, 键: '%s: %s' % (键, 容器[键]))
if isinstance(x, set):
return 容器转为字符串(x, '{', '}')
return str(x)
def 自定义输出(*各物件, 分隔符=' ', 终止符='', 文件=sys.stdout, flush=False):
"""打印内容到流, 默认到标准输出."""
for 物件 in 各物件:
文件.write(转字符串(物件))
if 物件 != 各物件[(-1)]:
文件.write(分隔符)
文件.write(终止符)
if flush:
文件.flush()
def 自定义导入(名称, 全局=None, 本地=None, 源自=(), 目录相对层次=0):
"""导入一个木兰模块, 如果没找到, 则导入 Python 模块
"""
try:
return 加载木兰模块(名称, 全局, 源自, 目录相对层次)
except FileNotFoundError:
return __import__(名称, 全局, 本地, 源自, 目录相对层次)
except Exception as 例外:
raise 例外
def 本地断言(表达式, 反馈=None):
assert 表达式, 反馈
def 内置自身():
""" 当前任务 ID """
return threading.currentThread()
def 生成新任务(任务名, *参数列表):
# 待研究:各参数作用
私有线程 = 线程(target=任务名, args=参数列表, daemon=True)
私有线程.start()
return 私有线程
def 杀死任务(任务线程):
if isinstance(任务线程, 线程):
if 任务线程 == threading.currentThread():
sys.exit()
elif 任务线程.is_alive():
任务线程.杀死()
def 用pip安装(*包, 命令='install'):
import pip._internal
return pip._internal.main([命令, *包])
def eval_print(expr):
if expr is None:
return
try:
expr()
except Exception:
自定义输出(expr, 终止符='\n')
当前目录 = os.getcwd()
if 当前目录 not in sys.path:
sys.path.append(当前目录)
return {
'print': 自定义输出,
'println': lambda *各物件: 自定义输出(*各物件, **{'终止符': '\n'}),
'assert': 本地断言,
'len': len,
'enumerate': enumerate,
'all': all,
'any': any,
'range': range,
'round': round,
'input': input,
'reverse': reversed,
'super': super,
'locals': lambda: locals(),
'bool': bool,
'float': float,
'int': int,
'str': str,
'list': list,
'dict': dict,
'set': set,
'tuple': lambda *各实参: 各实参,
'char': chr,
'ord': ord,
'bytes': lambda 文本, 编码='ascii': bytes(文本, encoding=编码),
'typeof': lambda 对象: 对象.__class__.__name__,
'isa': lambda x, 类型: isinstance(x, 类型),
'max': max,
'min': min,
'map': map,
'filter': filter,
'zip': zip,
'staticmethod': staticmethod,
'property': property,
'ceil': math.ceil,
'floor': math.floor,
'fabs': math.fabs,
'sqrt': math.sqrt,
'log': math.log,
'log10': math.log10,
'exp': math.exp,
'pow': math.pow,
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'asin': math.asin,
'acos': math.acos,
'atan': math.atan,
'spawn': 生成新任务,
'kill': 杀死任务,
'self': 内置自身,
'再会': sys.exit,
'quit': sys.exit,
'open': open,
'install': 用pip安装,
'time': time.time,
'year': lambda: datetime.now().year,
'month': lambda: datetime.now().month,
'day': lambda: datetime.now().day,
'hour': lambda: datetime.now().hour,
'minute': lambda: datetime.now().minute,
'second': lambda: datetime.now().second,
'microsecond': lambda: datetime.now().microsecond,
'sleep': time.sleep,
'delay': lambda 毫秒数: time.sleep(毫秒数 / 1000),
'delayMicroseconds': lambda 微秒数: time.sleep(微秒数 / 1000000),
'PI': math.pi,
'ARGV': argv,
'__builtins__': 内置扩展({
'__import__': 自定义导入,
'__build_class__': __build_class__,
'__name__': '__main__',
'__file__': 文件名,
'__print__': eval_print,
'__div__': __内置_除,
'__rem__': __内置_求余})
}