-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpreprocess.py
462 lines (387 loc) · 15 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
import argparse
import ast
import asttokens
import os
import re
import shutil
import tokenize
from ast_conversion import get_dfs, convert
from io import BytesIO
from processor_ast import Preprocess
from subprocess import DEVNULL, STDOUT, run
from typing import Tuple, List
import json
import parse_python3
# Creating exclude tokens for excluding punctuation
import string
EXCLUDE_TOKENS = set(string.punctuation)
SPACE_STOPWORDS = [' ', '\t', '\r', '\n', '\v', '\f']
TOKENS_STOPWORDS = SPACE_STOPWORDS + ["utf-8"]
DOCSTRING_PREFIX = "###DCSTR### "
class CommentProcessor:
"""
Stores every comment from a given code file
self.comments: lineno -> comment string
self.code_without: code without comments
"""
def __init__(self):
self.comments = {}
self.code_without = ""
def parse_comments(self, code: str):
"""
Prepares comments and save into self.comments dict.
---
code: string, represents python's code
"""
for lineno, line in enumerate(code.split("\n")):
code_line = line
if '#' in line:
comment = line[line.find('#') + 1:]
line = line[:line.find('#')]
quotes1 = line.count('"')
quotes2 = line.count("'")
# comment = tiny_filter(comment)
if len(comment) and quotes1 % 2 == 0 and quotes2 % 2 == 0:
self.comments[lineno] = comment
code_line = line[:line.find("#")]
if len(code_line) > 0:
self.code_without += f"{code_line}\n"
# def tiny_filter(code: str) -> str:
# """
# Filter string with regular expressions.
# """
# code = code.strip()
# code = re.sub('\t+', ' ', code)
# code = re.sub(' +', ' ', code)
# return code
def read_file_to_string(filename: str) -> str:
"""
Reading file contents into string object
"""
f = open(filename, 'rt')
s = f.read()
f.close()
return s
def get_tokens(code: str) -> Tuple[list, int, list]:
"""
code: str,
Represents Python's file in string.
---
Returns:
tokens: List[str]
List of tokens in the current function
comments: List[str]
List of retrieved comments
docstring: str or None
Found docstring or None
stopwords_count: int
A number of stopwords in the current piece of code
"""
global TOKENS_STOPWORDS
tokens = []
# comment_ind = 0
double_format = True
ds_begin = code.find('"""')
if ds_begin == -1:
double_format = False
ds_begin = code.find("'''")
ds_end = code.find('"""', ds_begin + 3)
if not double_format:
ds_end = code.find("'''", ds_begin + 3)
docstring = None
if ds_begin != -1 and ds_end != -1:
docstring = code[ds_begin + 3:ds_end].strip()
# Erase docstring from the code
if ds_begin != -1 and ds_end != -1:
code = code[:ds_begin] + code[ds_end + 3:]
# Handle comments with a CommentsProcessor instance
processor = CommentProcessor()
processor.parse_comments(code)
code = processor.code_without
# Let's fulfil returning comments by looking at processor state
comments = list(processor.comments.values())
stopwords_count = 0
is_tokenizable = True
try:
for idx, token in enumerate(
tokenize.tokenize(BytesIO(code.encode('utf-8')).readline)):
# Form indices and tokens
if token.string not in TOKENS_STOPWORDS:
# print(f"idx: {idx}, token: {token.string}")
tokens.append(token.string)
else:
stopwords_count += 1
except:
is_tokenizable = False
return None, None, comments, docstring, stopwords_count, is_tokenizable
return code, tokens, comments, docstring, stopwords_count, is_tokenizable
def get_previous_comments(fun: ast.FunctionDef, code_lines: List[str]) -> str:
"""
Returns a comment on the line above the function definition.
---
fun: ast.FunctionDef,
Function object.
code_lines: str,
Special index to be put in node.id ?
"""
fun_line_first = fun.first_token.start[0] - 1
comment_line = code_lines[fun_line_first - 1].strip()
zero_line = code_lines[fun_line_first - 2].strip()
precomment = ""
if (len(comment_line) > 0) and (comment_line[0] == "#"):
if (fun_line_first >= 2 and len(comment_line) >= 1
and zero_line == "") or (fun_line_first == 1
and len(comment_line) >= 1):
precomment = code_lines[fun_line_first - 1].strip()
return precomment
error_counter = 0
def collect_data(filename: str,
args: argparse.ArgumentParser) -> List[List[str]]:
"""
Read an 2 unparallel corpuses: functions and docstrings.
---
Returns:
data: List[List[str]]
Summarized data from functions.
is_appropriate: bool
A flag indicating that the file is appropriate
(enough scope size or no errors in parsing).
"""
global error_counter
# Convert Python 2 to Python 3
# os.system(f"~/anaconda3/envs/scs/bin/2to3 {filename} -w -n")
run(["/home/masaidov/.conda/envs/scs/bin/2to3", filename, "-w", "-n"],
stdout=DEVNULL, stderr=STDOUT)
print("Building AST tree from a filename:", filename)
try:
code = read_file_to_string(filename)
except:
print("File with bad encoding:", filename)
error_counter += 1
is_appropriate = False
return None, is_appropriate
# let's replace tabs for spaces in the future
code = re.sub('\t', ' ' * 4, code)
code_lines = code.splitlines()
try:
atok = asttokens.ASTTokens(code, parse=True)
astree = atok.tree
except:
print("Files with an error:", error_counter)
error_counter += 1
is_appropriate = False
return None, is_appropriate
data = []
# Global loop: iterating over functions from file
for fun_ind, fun in enumerate(ast.walk(astree)):
if isinstance(fun, ast.FunctionDef) and len(fun.body) > 0:
fun_begin = fun.first_token.startpos
fun_end = fun.last_token.endpos
prev_comment = get_previous_comments(fun, code_lines)
docstring = ast.get_docstring(fun)
if not docstring:
docstring = ""
else:
docstring = DOCSTRING_PREFIX + docstring + "\n"
# Forming scope -- set of node ids (variables)
scope = [arg.arg for arg in fun.args.args]
for node in ast.walk(fun):
if isinstance(node, ast.Name) and \
isinstance(node.ctx, ast.Store):
scope.append(node.id)
scope = set(scope)
if len(scope) < 2:
# print(f"Note: Function with fun.name = {fun.name} has too "
# "small scope.")
continue
function_code = code[fun_begin:fun_end]
# if met @classmethod keyword,
# should relax tabulation
start_def = function_code.find("def")
function_code = function_code[start_def:]
function_code, tokens, comments, docstring, stopwords_count, \
is_tokenizable = get_tokens(function_code)
if not is_tokenizable:
error_counter += 1
function_code = ""
tokens = []
# print(f"In filename = {filename}, fun_ind = {fun_ind}")
# print(f"Found {stopwords_count} stopwords.")
if len(prev_comment) > 0:
comments = [prev_comment] + comments
data.append([filename, function_code, tokens, comments, docstring])
is_appropriate = len(data) > 0
return data, is_appropriate
def retrieve_functions_docstrings(
data: List, args: argparse.ArgumentParser) -> List[List[str]]:
"""
add description
---
Returns:
comments: List[str],
Functions comments separately.
docstrings: List[str],
Tokenized docstring corpus.
functions: List[str],
Tokenized function corpus.
ord_nodes: List[*]
Data consists of (node.id, token ind, remember ind)
objects for further masking and processing.
ast_tokens: List[str],
Functions content with AST post-processing.
text_tokens: List[str],
Functions without AST post-processing.
"""
preprocess_code = Preprocess("code")
preprocess_comment = Preprocess("anno")
preprocess_docstring = Preprocess("docs")
comments = []
docstrings = []
functions = []
ord_nodes = []
tokens = []
for filename, code, fun_tokens, fun_comments, docstring in data:
# Add asserts for debugging
assert type(code) == str, "code variable is not a string"
assert type(fun_tokens) == list, \
"fun_tokens variable is not a list"
assert type(fun_comments) == list, \
"fun_comments variable is not a list"
# Let's preprocess every found comment
for comment in fun_comments:
if len(comment) <= 3:
continue
comment = preprocess_comment.clean(comment).replace("'",
" ").replace(
'"', " ")
comment = comment[:comment.find(".")]
comment = ''.join(ch for ch in comment if ch not in EXCLUDE_TOKENS)
comment = ' '.join(comment.split()) + " ."
comments.append(comment)
# Let's preprocess function tokens
fun_tokens_string = ' '.join(fun_tokens)
fun_tokens_string = preprocess_code.clean(fun_tokens_string)
functions.append(code)
tokens.append(fun_tokens_string)
if docstring is not None:
docstring = preprocess_docstring.clean(docstring).strip()
if len(docstring) > 0:
docstrings.append(docstring)
return comments, docstrings, functions, ord_nodes, tokens
def convert_tokens_to_ast(functions):
global error_counter
ast_tokens = []
for function in functions:
if len(function) == 0:
# Happens after returned TokenError
# Empty function should be skipped
continue
# One more option to get an error
try:
ast_fun_tokens = json.loads(parse_python3.parse_file(function, "code"))
ast_fun_sequential = get_dfs(convert(ast_fun_tokens))
except:
print("Met syntax or type error.")
error_counter += 1
continue
ast_tokens.append(ast_fun_sequential)
return ast_tokens
def set_script_arguments(parser):
# Main arguments
main_args = parser.add_argument_group("Main")
main_args.add_argument("--dirname",
type=str,
default="examples",
help="A file to be processed.")
main_args.add_argument("--sequence_file",
type=str,
default="python150k_sequence.txt",
help="A file to be processed.")
main_args.add_argument("--ast_file",
type=str,
default="python150k_ast.txt",
help="A file to be processed.")
main_args.add_argument("--comments_file",
type=str,
default="python150k_comments.txt",
help="A file to be processed.")
main_args.add_argument("--docstrings_file",
type=str,
default="python150k_docstrings.txt",
help="A file to be processed.")
main_args.add_argument("--output_dir",
type=str,
default="parsed",
help="Parsing data.")
return
def main(args):
global error_counter
# Clear the convertation directory
if os.path.exists("converted"):
shutil.rmtree("converted")
os.mkdir("converted")
# Clear the output directory
directory = args.output_dir
if os.path.exists(directory):
shutil.rmtree(directory)
os.mkdir(directory)
print("Created directory:", directory)
sequence_file = open(os.path.join(directory, args.sequence_file), "a")
comments_file = open(os.path.join(directory, args.comments_file), "a")
ast_file = open(os.path.join(directory, args.ast_file), "a")
dcs_file = open(os.path.join(directory, args.docstrings_file), "a")
print("Opened output files...")
dcs_cnt, comments_cnt, seq_cnt, ast_cnt, file_cnt = 0, 0, 0, 0, 0
for root, _, fnames in sorted(os.walk(args.dirname)):
# print("ROOT:", root)
for fname in fnames:
if fname.endswith(".py"):
print("~" * 50)
print(f"Handling {fname}")
filename = os.path.join(root, fname)
data, is_appropriate = collect_data(filename, args)
if not is_appropriate:
continue
comments, docstrings, functions, ord_nodes, tokens = \
retrieve_functions_docstrings(data, args)
# Postprocessing of tokens to be in AST form
ast_tokens = convert_tokens_to_ast(functions)
# Write out tokens
for comment in comments:
comments_file.write(f"{comment}\n")
comments_cnt += 1
# Write out docstrings
for docstring in docstrings:
if docstring is not None:
dcs_file.write(f"{docstring}\n")
dcs_cnt += 1
# Write out comments
for token in tokens:
sequence_file.write(f"{token}\n")
seq_cnt += 1
# Write out AST
for function_ast_tokens in ast_tokens:
ast_string = ' '.join(function_ast_tokens)
ast_file.write(f"{ast_string}\n")
ast_cnt += 1
file_cnt += 1
print("Updated docstrings count:", dcs_cnt)
print("Updated comment count:", comments_cnt)
print("Updated sequential count:", seq_cnt)
print("Updated AST count:", ast_cnt)
print(
"Processed/Canceled/Total files:",
f"{file_cnt}/{error_counter}/{file_cnt + error_counter}")
print("~" * 50)
sequence_file.close()
comments_file.close()
ast_file.close()
return
if __name__ == '__main__':
parser = argparse.ArgumentParser(
'Python150k preprocess script',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
set_script_arguments(parser)
args, unknown = parser.parse_known_args()
print(args)
main(args)