-
Notifications
You must be signed in to change notification settings - Fork 0
/
seed_data.py
346 lines (272 loc) · 11.8 KB
/
seed_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
import csv
import datetime
import time
import os
import re
import sqlite3
import db_setup as setup
#-- CSV READERS
def read_csv_generator(filename):
'''a generator function that reads through the CSV file,
yielding a single row as needed'''
print('Generator Initiated')
with open(filename, 'r') as raw_data:
data_reader = csv.reader(raw_data)
next(data_reader)
for row in data_reader:
if row[1][6:] == '2016':
yield row
else:
continue
print('\nGenerator Complete')
#-- FIELD FORMATTING
def format_date_field(field):
if field:
output_date_str = datetime.datetime.strptime(field, '%m/%d/%Y').strftime('%Y-%m-%d')
return output_date_str
else:
return None
def format_int_field(field):
if field:
return int(field)
else:
return None
def format_lat_long(field):
'''this function takes an entry. It regex matches lattitude and longitude values
inside parens and comma seperated and returns a tuple for lat, long'''
lat, long = (None, None)
if field:
pattern = re.compile(r'^.*\((?P<lat>-?\d+\.\d+), (?P<long>-?\d+\.\d+).*$')
match = pattern.match(field.replace('\n', ' '))
try:
lat = float(match.group('lat'))
long = float(match.group('long'))
except:
pass
return (lat, long)
def format_liter_to_ml(field):
if field:
field = float(field) * 1000
ml = int(field)
return ml
else:
return None
def format_money_field(field):
if field:
if field[0] == '$':
pennies = field[1:].replace('.', '')
else:
pennies = field.replace('.', '')
return int(pennies)
else:
return None
def format_text_field(field):
'''Cleans up txt fields (address, store name, city) to each word capitalized and striped of apostraphies'''
if field:
each_word_upper_str = ' '.join([word.capitalize() for word in field.strip().split(' ')])
return each_word_upper_str
else:
return None
def format_zip_code(field):
if len(field) == 5:
try:
field = int(field)
return field
except ValueError:
pass
else:
return None
#-- ROW PARSING
def parse_a_row(row):
'''parse an entire row from the raw csv'''
#---- sale fields (1)
row[0] = format_text_field(row[0]) # invoice num
row[1] = format_date_field(row[1]) # date
#---- store fields
row[2] = format_int_field(row[2]) # store number
row[3] = format_text_field(row[3]) # store name
row[4] = format_text_field(row[4]) # address
row[5] = format_text_field(row[5]) # city
row[6] = format_zip_code(row[6]) # zip code
row[7] = format_text_field(row[7]) # lat and long
lat, long = format_lat_long(row[7])
row[7] = lat # lat
row.insert(8, long) # long
row[9] = format_int_field(row[9]) # county number
# county name is pre-seeded during table creation
#---- cateogry fields
row[11] = format_int_field(row[11]) # category number
row[12] = format_text_field(row[12]) # category name
#---- vendor fields
row[13] = format_int_field(row[13]) # vendor number
row[14] = format_text_field(row[14]) # vendor name
#---- item fields
row[15] = format_int_field(row[15]) # item number
row[16] = format_text_field(row[16]) # item description
row[17] = format_int_field(row[17]) # pack qty
row[18] = format_int_field(row[18]) # bottle volume ml
row[19] = format_money_field(row[19]) # state wholesale cost
row[20] = format_money_field(row[20]) # state retail cost
#---- sale fields (2)
row[21] = format_int_field(row[21]) # quantity sold
row[22] = format_money_field(row[22]) # sale ammount
row[23] = format_liter_to_ml(row[23]) # sale volume in ml
return row
def parse_a_selective_row(row, categories, items, sales, stores, vendors):
'''parse an entire row from the raw csv'''
row.insert(8, None)
temp_sales = []
temp_stores = []
temp_categories = []
temp_vendors = []
temp_items = []
#---- sale fields (1)
if not row[0] in sales:
row[0] = format_text_field(row[0]) # invoice num
row[1] = format_date_field(row[1]) # date
row[2] = format_int_field(row[2]) # store number FK
row[15] = format_int_field(row[15]) # item number FK
#---- sale fields (2)
row[21] = format_int_field(row[21]) # quantity sold
row[22] = format_money_field(row[22]) # sale ammount
row[23] = format_liter_to_ml(row[23]) # sale volume in ml
temp_sales = [row[0], (row[0], row[1], row[2], row[15], row[21], row[22], row[23])]
#---- store fields
if not row[2] in stores:
# row[2] = format_int_field(row[2]) # store number
row[3] = format_text_field(row[3]) # store name
row[4] = format_text_field(row[4]) # address
row[5] = format_text_field(row[5]) # city
row[6] = format_zip_code(row[6]) # zip code
row[7] = format_text_field(row[7]) # lat and long
lat, long = format_lat_long(row[7])
row[7] = lat # lat
row[8] = long # long
row[9] = format_int_field(row[9]) # county number
temp_stores = [row[2], (row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9])]
#---- cateogry fields
if not row[11] in categories:
row[11] = format_int_field(row[11]) # category number
row[12] = format_text_field(row[12]) # category name
temp_categories = [row[11], (row[11], row[12])]
#---- vendor fields
if not row[13] in vendors:
row[13] = format_int_field(row[13]) # vendor number
row[14] = format_text_field(row[14]) # vendor name
temp_vendors = [row[13], (row[13], row[14])]
#---- item fields
if not row[15] in items:
# row[15] = format_int_field(row[15]) # item number
row[11] = format_int_field(row[11]) # category number FK
row[13] = format_int_field(row[13]) # vendor number FK
row[16] = format_text_field(row[16]) # item description
row[17] = format_int_field(row[17]) # pack qty
row[18] = format_int_field(row[18]) # bottle volume ml
row[19] = format_money_field(row[19]) # state wholesale cost
row[20] = format_money_field(row[20]) # state retail cost
temp_items = [row[15], (row[15], row[11], row[13], row[16], row[17], row[18], row[19], row[20])]
return (temp_sales, temp_stores, temp_categories, temp_vendors, temp_items)
def create_all_tables(db):
print('Creating Tables')
with setup.db_connect(db) as database:
county_table = setup.CountySchema()
database.execute(county_table.create_counties_table())
county_table.insert_counties(database)
stores_table = setup.StoreSchema()
database.execute(stores_table.create_stores_table())
categories_table = setup.CategorieSchema()
database.execute(categories_table.create_categories_table())
vendors_table = setup.VendorSchema()
database.execute(vendors_table.create_vendors_table())
items_table = setup.ItemSchema()
database.execute(items_table.create_items_table())
sales_table = setup.SaleSchema()
database.execute(sales_table.create_sales_table())
def build_virtual_db(raw_row_generator):
categories = {}
items = {}
sales = {}
stores = {}
vendors = {}
# bytes_read = 0
# count = 0
for row in raw_row_generator:
# count += 1
new_sales, new_stores, new_categories, new_vendors, new_items = parse_a_selective_row(row, categories, items, sales, stores, vendors)
if new_sales:
sales[new_sales[0]] = new_sales[1]
if new_stores:
stores[new_stores[0]] = new_stores[1]
if new_categories:
categories[new_categories[0]] = new_categories[1]
if new_vendors:
vendors[new_vendors[0]] = new_vendors[1]
if new_items:
items[new_items[0]] = new_items[1]
# if count == 9000000:
# break
return (categories, items, sales, stores, vendors)
#-- TABLE INSERTS
def insert_sales(db, dict_of_sales):
insert_statement = '''INSERT INTO sales(sale_id, sale_date, store_id, item_id, bottles_sold, sale_value, sale_vol_ml)
VALUES(?, ?, ?, ?, ?, ?, ?)'''
with setup.db_connect(db) as database:
cur = database.cursor()
cur.executemany(insert_statement, dict_of_sales.values())
def insert_stores(db, dict_of_stores):
insert_statment = '''INSERT INTO stores(store_id, store_name, address, city, zip_code, store_lat, store_long, county_id)
VALUES(?, ?, ?, ?, ?, ?, ?, ?)'''
with setup.db_connect(db) as database:
cur = database.cursor()
cur.executemany(insert_statment, dict_of_stores.values())
def insert_categories(db, dict_of_categories):
insert_statement = '''INSERT INTO categories (category_id, category_name)
VALUES(?, ?)'''
with setup.db_connect(db) as database:
cur = database.cursor()
cur.executemany(insert_statement, dict_of_categories.values())
def insert_vendors(db, dict_of_vendors):
insert_statement = '''INSERT INTO vendors (vendor_id, vendor_name)
VALUES(?, ?)'''
with setup.db_connect(db) as database:
cur = database.cursor()
cur.executemany(insert_statement, dict_of_vendors.values())
def insert_items(db, dict_of_items):
insert_statement = '''INSERT INTO items (item_id, category_id, vendor_id, item_description, pack_qty, bottle_volume_ml, state_wholesale, state_retail)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)'''
with setup.db_connect(db) as database:
cur = database.cursor()
cur.executemany(insert_statement, dict_of_items.values())
if __name__ == '__main__':
# sets start time for efficiency comparisons
start = time.time()
# name of target DB to be created or connected to in the directory set in the following step
db_name = 'sales_new_seed.db'
# path to the target db
target_db = os.path.join('output', db_name)
# creates all tables required for the project IF NOT EXISTS
create_all_tables(target_db)
# path to the input CSV
rel_path_to_data = os.path.join('input', 'iowa-liquor-sales', 'Iowa_Liquor_Sales.csv')
# CSV reader creates a generator for line-by-line parsing of the input data
raw_data_generator = read_csv_generator(rel_path_to_data)
print('Parsing data')
# builds a dictionary for each table, with a value eaual to each row's PK and a value of a tuple for each row of parsed data
categories, items, sales, stores, vendors = build_virtual_db(raw_data_generator)
print('Parsing complete')
# parsing results
print('\nReady to insert:\n{} Categories\n{} items\n{} sales\n{} stores\n{} vendors\n'.format(len(categories), len(items), len(sales), len(stores), len(vendors)))
# inserts to each table in turn
print('inserting sales')
insert_sales(target_db, sales)
print('inserting stores')
insert_stores(target_db, stores)
print('inserting categories')
insert_categories(target_db, categories)
print('inserting vendors')
insert_vendors(target_db, vendors)
print('inserting items')
insert_items(target_db, items)
stop = time.time()
# final results
print('\ndatabase seeding took {} seconds'.format(stop - start))