forked from actgov/treatment-validation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DynaCollector_v2.py
148 lines (128 loc) · 5.83 KB
/
DynaCollector_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import re
import shutil
import csv
class StartHere():
'''A Spot to run your main script.
DynaCollator
PURPOSE: traverse a given directory and collate DynaLog files (not recursive) into
Patient folders, possibly with sub-folders for each fraction collected.
This could be run once a day as a cron job (scheduled job).
Used to try and facilitate the used of FluDo program.
test
AUTHOR: B Cooper
DATE: 2016-05-13
Version: 0.5
STATUS: development - functional '''
def __init__(self, choice=None):
#my_path = 'D:\\Temp\\dyna_LA1'
my_path = choice
path_digest = DigestPath(my_path)#'/home/mpre/xxx/Dynalogs_from_2016-02-18')
files_collated = CreateFolderStructure(path_digest.pat_log_records, my_path)
class DigestPath:
def __init__(self, input_dir=None):
assert (os.path.exists(input_dir) and (os.path.isdir(input_dir))), "check your directory %s exists ... I can't find it" % input_dir
self.entries = os.listdir(input_dir)
# pat_log_records dictionary has Aria patient number as
# 'key' and a list of objects (pat_record) as the 'value'
self.pat_log_records = dict()
self.input_dir = input_dir
self.do_work(self.input_dir, self.pat_log_records, self.entries)
class pat_record:
'''A class to act as a "struct" data container for
meta data collected from the patient log files
'''
def __init__(self, last_name=None, first_name=None, pat_num=None,
uid=None, seq=None, log_date=None, log_time=None,
filename=None):
self.pat_num = pat_num
self.log_date = log_date
self.log_time = log_time
self.filename = filename
self.last_name = last_name
self.first_name = first_name
self.pat_num_in = None
self.uid = uid
self.seq = seq
def do_work(self, input_dir, pats, entries):
for entry in entries:
# use regular expression to pull out data from dynalog filename.
# grp1: A or B carriage in MLC, gr2 is year, gr3 is month etc
# grps 5, 6, 7 are hours, mins, seconds, grp 8 (after _ ) is Aria patient number
ptrn = re.compile(r'(A|B)(\d\d\d\d)(\d\d)(\d\d)(\d\d)(\d\d)(\d\d)_(\d+)\.dlg')
if os.path.isfile(os.path.join(input_dir, entry)):
# print('Here is my entry: %s' % entry) #if os.path.isfile(entry)
self.get_pat_log_record(input_dir, entry, pats, ptrn)
for key, value in pats.items():
print("Patient number: %s" % key)
for entry in value:
print('\t\tFilename is %s' % entry.filename)
print('\t\t\t Logtime is %s' % entry.log_time)
def get_pat_log_record(self, root_path, fname, pats, ptrn):
m = ptrn.match(fname)
if (m == None):
#only continue if we get a complete match with dynalog filename
return
my_pat_rec = self.pat_record()
#populate some fields of pat_record struct from matched groups
my_pat_rec.pat_num = m.group(8)
my_pat_rec.log_date = str(m.group(2)) + str(m.group(3)) + str(m.group(4))
my_pat_rec.log_time = str(m.group(5)) + str(m.group(6)) + str(m.group(7))
my_pat_rec.filename = os.path.join(root_path, fname)
my_file = os.path.join(root_path, fname)
#need to peek into file to get other fields of pat_record
count=0
with open(my_file, "r") as filehandle:
while (count < 3): #just read first 3 lines of dynalog file
line = filehandle.readline()
count +=1
elems = (line.strip()).split(',')
if count ==2:
#replace non-alphanumeric chars with _
tmp_str = elems[0]
my_pat_rec.last_name = re.sub(r'[^A-Za-z0-9_]+', '_', tmp_str)
#replace non-alphanumeric chars with _
tmp_str2 = elems[1]
my_pat_rec.first_name = re.sub(r'[^A-Za-z0-9_]+', '_', tmp_str2)
my_pat_rec.pat_num_in = elems[2]
if count ==3:
my_pat_rec.uid = elems[0]
my_pat_rec.seq = elems[1]
#my_lines.append(elems)
my_list = list()
patnum = my_pat_rec.pat_num
if patnum in pats:
pats[patnum].append(my_pat_rec)
else:
my_list.append(my_pat_rec)
pats[patnum] = my_list
class CreateFolderStructure:
def __init__(self, pat_records, container_path):
assert os.path.isdir(container_path), "Container path must exist, can't find %s" % container_path
self.pat_records = pat_records
self.container_path = container_path
self.do_work()
def do_work(self):
for k, v in (self.pat_records).items():
for pat_rec in sorted(v, key=lambda x:x.filename):
sub_folder = str(k)+"_"+pat_rec.last_name+"_"+pat_rec.first_name
new_path = os.path.join(self.container_path, sub_folder, pat_rec.log_date)
mystat = self.make_folders(new_path)
if mystat == 0:
result = shutil.copy2(pat_rec.filename, new_path)
print ('file %s copied to %s' % (pat_rec.filename, result))
def make_folders(self,pth):
'''try and create folders in pth, including subfolders
if they do not already exist.
returns 0 on success or if already exists
returns -1 on exception
'''
if not os.path.exists(pth):
try:
os.makedirs(pth)
print("created %s" % pth)
return 0
except:
print("%s not created" % pth)
return -1
return 0