/
dirobject.py
137 lines (117 loc) · 4.54 KB
/
dirobject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
from topaz.error import error_for_oserror
from topaz.module import ClassDef
from topaz.modules.enumerable import Enumerable
from topaz.objects.objectobject import W_Object
from topaz.objects.regexpobject import RegexpCache
from topaz.utils.glob import Glob
from topaz.utils.ll_dir import opendir, readdir, closedir
class W_DirObject(W_Object):
classdef = ClassDef("Dir", W_Object.classdef, filepath=__file__)
classdef.include_module(Enumerable)
def __init__(self, space, klass=None):
W_Object.__init__(self, space, klass)
self.open = False
self.path = None
def __del__(self):
if self.open:
closedir(self.dirp)
def ensure_open(self, space):
if not self.open:
raise space.error(space.w_IOError, "closed directory")
@classdef.method("initialize", path="str")
def method_initialize(self, space, path):
self.path = path
try:
self.dirp = opendir(path)
except OSError as e:
raise error_for_oserror(space, e)
self.open = True
@classdef.singleton_method("allocate")
def method_allocate(self, space, args_w):
return W_DirObject(space)
@classdef.singleton_method("pwd")
@classdef.singleton_method("getwd")
def method_pwd(self, space):
return space.newstr_fromstr(os.getcwd())
@classdef.singleton_method("chdir", path="path")
def method_chdir(self, space, path=None, block=None):
if path is None:
path = os.environ["HOME"]
current_dir = os.getcwd()
os.chdir(path)
if block is not None:
try:
return space.invoke_block(block, [space.newstr_fromstr(path)])
finally:
os.chdir(current_dir)
else:
return space.newint(0)
@classdef.singleton_method("delete", path="path")
@classdef.singleton_method("rmdir", path="path")
@classdef.singleton_method("unlink", path="path")
def method_delete(self, space, path):
try:
os.rmdir(path if path else "")
except OSError as e:
raise error_for_oserror(space, e)
return space.newint(0)
@classdef.singleton_method("[]")
def method_subscript(self, space, args_w):
if len(args_w) == 1:
return space.send(self, space.newsymbol("glob"), args_w)
else:
return space.send(self, space.newsymbol("glob"), [space.newarray(args_w)])
@classdef.singleton_method("glob", flags="int")
def method_glob(self, space, w_pattern, flags=0, block=None):
if space.is_kind_of(w_pattern, space.w_array):
patterns_w = space.listview(w_pattern)
else:
patterns_w = [w_pattern]
glob = Glob(space.fromcache(RegexpCache))
for w_pat in patterns_w:
w_pat2 = space.convert_type(w_pat, space.w_string, "to_path", raise_error=False)
if w_pat2 is space.w_nil:
pattern = space.convert_type(w_pat, space.w_string, "to_str")
pattern = space.str_w(w_pat2)
if len(patterns_w) == 1:
for pat in pattern.split("\0"):
glob.glob(pat, flags)
else:
glob.glob(pattern.split("\0")[0], flags)
if block:
for match in glob.matches():
space.invoke_block(block, [space.newstr_fromstr(match)])
return space.w_nil
else:
return space.newarray([space.newstr_fromstr(s) for s in glob.matches()])
@classdef.method("read")
def method_read(self, space, args_w):
self.ensure_open(space)
try:
filename = readdir(self.dirp)
except OSError as e:
raise error_for_oserror(space, e)
if filename is None:
return space.w_nil
else:
return space.newstr_fromstr(filename)
@classdef.method("close")
def method_close(self, space):
self.ensure_open(space)
closedir(self.dirp)
self.open = False
return space.w_nil
@classdef.singleton_method("mkdir", path="path", mode="int")
def method_mkdir(self, space, path, mode=0777):
try:
os.mkdir(path, mode)
except OSError as e:
raise error_for_oserror(space, e)
return space.newint(0)
@classdef.singleton_method("entries", dirname="path")
def method_entries(self, space, dirname):
try:
return space.newarray([space.newstr_fromstr(d) for d in os.listdir(dirname)])
except OSError as e:
raise error_for_oserror(space, e)