forked from OpenAPC/openapc-de
-
Notifications
You must be signed in to change notification settings - Fork 0
/
csv_occurence_count.py
executable file
·73 lines (60 loc) · 2.55 KB
/
csv_occurence_count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import argparse
import codecs
from collections import OrderedDict
import sys
import openapc_toolkit as oat
ARG_HELP_STRINGS = {
"source_file": "The source csv file",
"count_column": "The numerical index of the column where values " +
"should be counted",
"encoding": "The encoding of the CSV file. Setting this argument will " +
"disable automatic guessing of encoding.",
"sort": "sort results by occurence count"
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("source_file", help=ARG_HELP_STRINGS["source_file"])
parser.add_argument("count_column", type=int, help=ARG_HELP_STRINGS["count_column"])
parser.add_argument("-e", "--encoding", help=ARG_HELP_STRINGS["encoding"])
parser.add_argument("-s", "--sort", action="store_true", help=ARG_HELP_STRINGS["sort"])
args = parser.parse_args()
enc = None
if args.encoding:
try:
codec = codecs.lookup(args.encoding)
msg = "Encoding '{}' found in Python's codec collection as '{}'"
print(msg.format(args.encoding, codec.name))
enc = args.encoding
except LookupError:
oat.print_r("Error: '" + args.encoding + "' not found Python's " +
"codec collection. Either look for a valid name here " +
"(https://docs.python.org/2/library/codecs.html#standard-" +
"encodings) or omit this argument to enable automated " +
"guessing.")
sys.exit()
header, content = oat.get_csv_file_content(args.source_file, enc)
column_name = "column " + str(args.count_column)
if header:
header_line = header[0]
column_name = header_line[args.count_column]
oat.print_g("Performing occurence count in column '" + column_name + "'")
occurence_dict = OrderedDict()
for line in content:
try:
value = line[args.count_column]
except IndexError as ie:
oat.print_y("IndexError ({}) at line {}, skipping...".format(ie.message, line))
continue
if value not in occurence_dict:
occurence_dict[value] = 1
else:
occurence_dict[value] += 1
if args.sort:
occurence_dict = OrderedDict(sorted(occurence_dict.items(), key=lambda x: x[1],
reverse=True))
for item in occurence_dict.items():
print(item[0] + ": " + str(item[1]))
if __name__ == '__main__':
main()