-
Notifications
You must be signed in to change notification settings - Fork 0
/
tsv_file_analyzer.py
143 lines (107 loc) · 4.57 KB
/
tsv_file_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import csv
import os
import sys
import click
import pathlib
import logging
import calendar
import time
import pathlib
from colorama import Fore, Style
from datetime import datetime
from datetime import date
DEFAULT_COLUMNS_ONLY = False
DEFAULT_COLUMNS_LOWERCASE = False
DEFAULT_OUTDIR = "/tmp/" + os.path.basename(__file__) + '/' + str(datetime.today().strftime('%Y-%m-%d-%H%M%S'))
LOGGING_FORMAT = "%(levelname)s : %(asctime)s : %(pathname)s : %(lineno)d : %(message)s"
LOG_LEVEL = logging.INFO
position_to_header_lookup = {}
def display_columns(parts, lowercase):
print(Fore.YELLOW + "\nHere are the columns\n")
print(Style.RESET_ALL + '', end='')
for i, field in enumerate(parts, 1):
field = field.replace('"', '')
if not lowercase:
print("{}. ".format(i), end='')
print(Fore.BLUE + "{}".format(field))
print(Style.RESET_ALL + '', end='')
else:
print("{}".format(field.lower()))
def display_record(parts, rec_ctr, line_ctr):
print(Fore.YELLOW + "\nHere record number '{}' (at line '{}')".format(rec_ctr, line_ctr))
print(Style.RESET_ALL + '', end='')
for i, field in enumerate(parts):
field = field.replace('"', '')
header = position_to_header_lookup[i]
print(Fore.BLUE + "{}:".format(header), end='')
print(Style.RESET_ALL + '', end='')
print(" {}".format(field))
# print("{}: {}".format(header, field))
@click.command()
@click.option('--outdir', help='The default is the current working directory')
@click.option('--infile', help="The tab-delimited file to be analyzed")
@click.option('--logfile', help="The log file")
@click.option('--columns_only', is_flag=True, help="If specified, will only display the column names")
@click.option('--columns_lowercase', is_flag=True, help="If specified, will only display the column names in lowercase without numbers")
def main(outdir, infile, logfile, columns_only, columns_lowercase):
"""Analyze a tab-delimited file
"""
error_ctr = 0
if infile is None:
print(Fore.RED + "--infile was not specified")
print(Style.RESET_ALL + '', end='')
error_ctr += 1
if error_ctr > 0:
sys.exit(1)
assert isinstance(infile, str)
if outdir is None:
outdir = DEFAULT_OUTDIR
print(Fore.YELLOW + "--outdir was not specified and therefore was set to '{}'".format(outdir))
print(Style.RESET_ALL + '', end='')
assert isinstance(outdir, str)
if not os.path.exists(outdir):
pathlib.Path(outdir).mkdir(parents=True, exist_ok=True)
print(Fore.YELLOW + "Created output directory '{}'".format(outdir))
print(Style.RESET_ALL + '', end='')
if logfile is None:
logfile = outdir + '/' + os.path.basename(__file__) + '.log'
print(Fore.YELLOW + "--logfile was not specified and therefore was set to '{}'".format(logfile))
print(Style.RESET_ALL + '', end='')
assert isinstance(logfile, str)
if columns_only is None:
columns_only = DEFAULT_COLUMNS_ONLY
print(Fore.YELLOW + "--columns_only was not specified and therefore was set to '{}'".format(columns_only))
print(Style.RESET_ALL + '', end='')
if columns_only:
if columns_lowercase is None:
columns_lowercase = DEFAULT_COLUMNS_LOWERCASE
print(Fore.YELLOW + "--columns_lowercase was not specified and therefore was set to '{}'".format(columns_lowercase))
print(Style.RESET_ALL + '', end='')
logging.basicConfig(filename=logfile, format=LOGGING_FORMAT, level=LOG_LEVEL)
global position_to_header_lookup
with open(infile, 'r') as f:
line_ctr = 0
rec_ctr = 0
position_to_header_lookup = {}
for line in f:
line_ctr += 1
parts = line.split("\t")
if line_ctr == 1:
for i, header in enumerate(parts):
header = header.replace('"', '')
position_to_header_lookup[i] = header.strip()
if columns_only:
display_columns(parts, columns_lowercase)
break
continue
else:
rec_ctr += 1
if rec_ctr > 1:
yes_or_no = input("Would you like to see the next record? [Y/n] ")
if yes_or_no is None or yes_or_no == '' or yes_or_no.lower() == 'y':
pass
else:
break
display_record(parts, rec_ctr, line_ctr)
if __name__ == "__main__":
main()