-
Notifications
You must be signed in to change notification settings - Fork 0
/
school_scores.py
150 lines (118 loc) · 5.63 KB
/
school_scores.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
'''
Hello student. Thank you for downloading a CORGIS library. However, you do not need to open this library. Instead you should use the following:
import school_scores
If you opened the file because you are curious how this library works, then well done! We hope that you find it a useful learning experience. However, you should know that this code is meant to solve somewhat esoteric pedagogical problems, so it is often not best practices.
'''
import sys as _sys
import os as _os
import json as _json
import sqlite3 as _sql
import difflib as _difflib
class _Constants(object):
'''
Global singleton object to hide some of the constants; some IDEs reveal internal module details very aggressively, and there's no other way to hide stuff.
'''
_HEADER = {'User-Agent':
'CORGIS School Scores library for educational purposes'}
_PYTHON_3 = _sys.version_info >= (3, 0)
_TEST = False
_HARDWARE = 1000
if _Constants._PYTHON_3:
import urllib.request as _request
from urllib.parse import quote_plus as _quote_plus
from urllib.error import HTTPError as _HTTPError
else:
import urllib2 as _urllib2
from urllib import quote_plus as _quote_plus
from urllib2 import HTTPError as _HTTPError
class DatasetException(Exception):
''' Thrown when there is an error loading the dataset for some reason.'''
pass
_Constants._DATABASE_NAME = "school_scores.db"
if not _os.access(_Constants._DATABASE_NAME, _os.F_OK):
raise DatasetException("Error! Could not find a \"{0}\" file. Make sure that there is a \"{0}\" in the same directory as \"{1}.py\"! Spelling is very important here.".format(_Constants._DATABASE_NAME, __name__))
elif not _os.access(_Constants._DATABASE_NAME, _os.R_OK):
raise DatasetException("Error! Could not read the \"{0}\" file. Make sure that it readable by changing its permissions. You may need to get help from your instructor.".format(_Constants._DATABASE_NAME, __name__))
elif not _os.access(_Constants._DATABASE_NAME, _os.W_OK):
_sys.stderr.write('The local cache (\" \") will not be updated. Make sure that it is writable by changing its permissions. You may need to get help from your instructor.\n'.format(_Constants._DATABASE_NAME))
_sys.stderr.flush()
_Constants._DATABASE = _sql.connect(_Constants._DATABASE_NAME)
class _Auxiliary(object):
@staticmethod
def _parse_type(value, type_func):
"""
Attempt to cast *value* into *type_func*, returning *default* if it fails.
"""
default = type_func(0)
if value is None:
return default
try:
return type_func(value)
except ValueError:
return default
@staticmethod
def _byteify(input):
"""
Force the given input to only use `str` instead of `bytes` or `unicode`.
This works even if the input is a dict, list,
"""
if isinstance(input, dict):
return {_Auxiliary._byteify(key): _Auxiliary._byteify(value) for key, value in input.items()}
elif isinstance(input, list):
return [_Auxiliary._byteify(element) for element in input]
elif _Constants._PYTHON_3 and isinstance(input, str):
return str(input.encode('ascii', 'replace').decode('ascii'))
elif not _Constants._PYTHON_3 and isinstance(input, unicode):
return str(input.encode('ascii', 'replace').decode('ascii'))
else:
return input
@staticmethod
def _guess_schema(input):
if isinstance(input, dict):
return {str(key.encode('ascii', 'replace').decode('ascii')):
_Auxiliary._guess_schema(value) for key, value in input.items()}
elif isinstance(input, list):
return [_Auxiliary._guess_schema(input[0])] if input else []
else:
return type(input)
################################################################################
# Domain Objects
################################################################################
################################################################################
# Interfaces
################################################################################
def get_all():
"""
Returns all of the data for every state into a list.
"""
if False:
# If there was a Test version of this method, it would go here. But alas.
pass
else:
rows = _Constants._DATABASE.execute("SELECT data FROM school_scores".format(
hardware=_Constants._HARDWARE))
data = [r[0] for r in rows]
data = [_Auxiliary._byteify(_json.loads(r)) for r in data]
return _Auxiliary._byteify(data)
################################################################################
# Internalized testing code
################################################################################
def _test_interfaces():
from pprint import pprint as _pprint
from timeit import default_timer as _default_timer
# Production test
print("Production get_all")
start_time = _default_timer()
result = get_all()
print("{} entries found.".format(len(result)))
_pprint(_Auxiliary._guess_schema(result))
print("Time taken: {}".format(_default_timer() - start_time))
if __name__ == '__main__':
from optparse import OptionParser as _OptionParser
_parser = _OptionParser()
_parser.add_option("-t", "--test", action="store_true",
default=False,
help="Execute the interfaces to test them.")
(_options, _args) = _parser.parse_args()
if _options.test:
_test_interfaces()