/
pysnow-multi.py
468 lines (378 loc) · 18.5 KB
/
pysnow-multi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
#!/usr/bin/env python
# coding: utf-8
import datetime as dt
import math
import os
import threading
import time
import zipfile as zf
import pandas as pd
import snowflake.connector
# CRED_FILE = "snowflake_credentials.py"
from snowflake_credentials import creds
class SnowConnector():
def __init__(self, creds):
try:
self.user = creds['user']
self.account = creds['account']
self.warehouse = creds['warehouse']
self.database = creds['database']
self.schema = creds['schema']
self.infile = creds['infile']
self.filechunks_exist = False
self.staging_folder = 'staging'
self.rows_to_read = 1000000
self.field_delimiter = ','
self.field_delimiter = '\n'
self.filename_in_zip = ''
self.rm_staging_folder = True
self.rm_staging_folder = True
if 'snow_table' in creds and creds['snow_table']:
self.snow_table = creds['snow_table']
elif 'infile' in creds and creds['infile']:
# create table name from infile name
remove_list = ['.txt', '.csv', '.zip']
temp_filename = creds['infile'].split('/')[-1]
for r in remove_list:
temp_filename = temp_filename.replace(r, '')
temp_filename = temp_filename.replace(' ', '_').replace('.', '_')
self.snow_table = ''.join(s for s in temp_filename if (s.isalnum() or s == '_'))
else:
now = str(dt.datetime.now())
self.snow_table = 'python_upload_' + now.replace(':', '-').replace('.', '-').replace(' ', '-')
if '.zip' in self.infile:
self.filetype = 'zip'
elif '.txt' in self.infile:
self.filetype = 'txt'
elif '.csv' in self.infile:
self.filetype = 'csv'
else:
self.filetype = 'none'
if 'create_new_if_table_exists' in creds and creds['create_new_if_table_exists']:
self.if_table_exists = 'create_new'
elif 'replace_if_table_exists' in creds and creds['replace_if_table_exists']:
self.if_table_exists = 'replace'
else:
print("""No selection for how to handle pre-existing Snowflake tables with the same name.
Default is to create a new table""")
self.if_table_exists = 'create_new'
if 'filechunks_exist' in creds and creds['filechunks_exist']:
self.filechunks_exist = True
if 'staging_folder_for_filechunks' in creds and creds['staging_folder_for_filechunks']:
self.staging_folder = str(creds['staging_folder_for_filechunks'])
if 'rows_to_read_for_chunking' in creds and creds['rows_to_read_for_chunking']:
self.rows_to_read = creds['rows_to_read_for_chunking']
if 'field_delimiter' in creds and creds['field_delimiter']:
self.field_delimiter = str(creds['field_delimiter'])
if 'record_delimiter' in creds and creds['record_delimiter']:
self.record_delimiter = str(creds['record_delimiter'])
if 'filename_in_zip' in creds and creds['filename_in_zip']:
self.filename_in_zip = creds['filename_in_zip']
if 'delete_staging_folder_after_process' in creds and creds['delete_staging_folder_after_process']:
self.rm_staging_folder = creds['delete_staging_folder_after_process']
if 'remove_local_staging_filechunks' in creds and creds['remove_local_staging_filechunks']:
self.rm_staging_files = creds['remove_local_staging_filechunks']
except Exception as e:
print(e, '\nPlease check the credential file for missing information')
def print_conn_info(self):
print()
print('USER:', self.user)
print('ACCOUNT:', self.account)
print('WAREHOUSE:', self.warehouse)
print('DATABASE:', self.database)
print('SCHEMA:', self.schema)
print('DATA INPUT:', self.infile)
print('INFILE TYPE:', self.filetype)
print('SNOWFLAKE TABLE:', self.snow_table)
print()
def get_filetype(self):
return self.filetype
def get_infile(self):
return self.infile
def get_table(self):
return self.snow_table
def get_rows_to_read(self):
return self.rows_to_read
def get_field_delimiter(self):
return self.field_delimiter
def get_record_delimiter(self):
return self.record_delimiter
def get_staging_folder(self):
return self.staging_folder
def get_file_within_zip(self):
return self.filename_in_zip
def get_rm_staging_folder(self):
return self.rm_staging_folder
def get_rm_staging_files(self):
return self.rm_staging_files
# TODO: error handling when connection fails after 5 attempts
def snowflake_cursor(self):
e_counter = 0
conn = None
while e_counter < 5:
try:
conn = snowflake.connector.connect(
user=self.user,
authenticator='externalbrowser',
account=self.account,
warehouse=self.warehouse,
database=self.database,
schema=self.schema
)
break
except snowflake.connector.errors.DatabaseError as e:
print(e)
print('Connection to Snowflake refused, trying again...')
e_counter += 1
time.sleep(5)
if not conn:
print("""\n*****\n
Connection to Snowflake refused after 5 attempts.
Please check connection credentials and
connection to server/internet.
\n*****\n""")
exit(1)
print("Connected to Snowflake")
return conn
def table_exists(self, conn, table=None):
if table:
table_name = table
else:
table_name = self.snow_table
check = f"SHOW TABLES LIKE '{table_name}'"
cur = conn.cursor()
cur.execute(check)
tables = cur.fetchall()
if tables:
return True
return False
def create_table(self, col_names_types):
table_name = self.snow_table
conn = self.snowflake_cursor()
if self.table_exists(conn):
if self.if_table_exists == 'create_new':
old_table_name = table_name
loop_count = 2
while True:
table_name = f'{old_table_name}_{loop_count}'
exists = self.table_exists(conn, table_name)
if not exists:
break
loop_count += 1
self.snow_table = table_name
print(old_table_name, 'already exists. new table name is', table_name)
else:
print(table_name, 'already exists. table will be replaced')
drop_rows = "DELETE FROM " + table_name
conn.cursor().execute(drop_rows)
print(table_name, 'table rows cleared and ready for upload')
create = "CREATE OR REPLACE TABLE " + table_name + col_names_types
conn.cursor().execute(create)
print(table_name, 'table created or replaced')
def file_col_names_types(filename, all_strings=True):
# set all col types to string
if all_strings:
infile = pd.read_csv(filename, nrows=1)
temp_cols = infile.columns
clean_cols = []
unnamed_count = 1
for col in temp_cols:
if 'Unnamed:' in col:
col = f'UNNAMED_COLUMN_{unnamed_count}'
unnamed_count += 1
clean_cols.append(col)
infile_sql_types = "(" + ", ".join([col.replace(' ', '_').strip() + ' string' for col in clean_cols]) + ")"
# attempt to guess col types based on first 1000 rows of infile
else:
infile = pd.read_csv(filename, nrows=1000)
py_to_sql_types = {'int64': 'integer', 'int32': 'integer', 'object': 'string',
'float64': 'bigint', 'float32': 'bigint', 'bool': 'bit',
'datetime64[ns]': 'datetime'}
infile_types = infile.dtypes.apply(lambda x: x.name).to_dict()
infile_sql_types = "(" + ", ".join([key.replace(' ', '_').strip() + ' ' +
(py_to_sql_types[value] if value in py_to_sql_types else 'string')
for key, value in infile_types.items()]) + ")"
return infile_sql_types
class ZipToGzThread (threading.Thread):
def __init__(self, thread_id, skip, rows_to_read, filename, header, dtypes,
staging_folder, zipname=None):
threading.Thread.__init__(self)
self.threadID = thread_id
self.skip = skip
self.rows_to_read = rows_to_read
self.filename = filename
self.header = header.columns
self.dtypes = dtypes
self.staging_folder = staging_folder
self.zipname = zipname
def run(self):
chunk_file(self.skip, self.rows_to_read, self.filename, self.header,
self.dtypes, self.threadID, self.staging_folder, self.zipname)
def chunk_file(skip, rows_to_read, filename, header, dtypes, threadID, staging_filepath, zipname=None):
if zipname:
temp = pd.read_csv(zipname, skiprows=skip, nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine='c')
else:
temp = pd.read_csv(filename, skiprows=skip, nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine='c')
print(f'Read {len(temp)} lines starting at row {skip}')
temp.to_csv(f'{staging_filepath}/{filename}_{threadID}.gz', index=False, header=True,
compression='gzip', chunksize=1000)
print(f'Sent thread ID {threadID} to local staging folder as GZIP')
class SfExecutionThread (threading.Thread):
# credit: https://interworks.com/blog/2020/03/04/zero-to-snowflake-multi-threaded-bulk-loading-with-python/
def __init__(self, thread_id, sql_query):
threading.Thread.__init__(self)
self.threadID = thread_id
self.sql_query = sql_query
def run(self):
print('Starting {0}: {1}'.format(self.threadID, self.sql_query))
execute_in_snowflake(self.sql_query)
print('Exiting {0}: {1}'.format(self.threadID, self.sql_query))
def execute_in_snowflake(sf_query):
# connect to snowflake
temp_snow = SnowConnector(creds)
conn = temp_snow.snowflake_cursor()
# increase timeout
conn.cursor().execute("""ALTER SESSION SET
STATEMENT_TIMEOUT_IN_SECONDS = 86400""")
conn.cursor().execute(sf_query)
conn.close()
def main():
start_time = dt.datetime.now()
print('Starting main:', start_time)
snow = SnowConnector(creds)
snow.print_conn_info()
filetype = snow.get_filetype()
table_name = snow.get_table()
if filetype in ['zip', 'csv', 'txt']:
filename = snow.get_infile()
col_names_types = file_col_names_types(filename, all_strings=True)
print(dt.datetime.now(), '***', 'Creating table', table_name)
snow.create_table(col_names_types)
# create staging folder
staging_folder = snow.get_staging_folder()
path = os.path.abspath(os.getcwd())
staging_filepath = str(path)+'/'+staging_folder
staging_exists = os.path.isdir(staging_folder)
print(dt.datetime.now(), '***', 'Creating or clearing staging folder...')
if staging_exists:
# remove contents from staging folder
for root, dirs, files in os.walk(staging_filepath):
for f in files:
os.unlink(os.path.join(root, f))
else:
try:
os.mkdir(staging_filepath)
except OSError as error:
print(error)
print(dt.datetime.now(), '***', 'Staging folder ready')
print(dt.datetime.now(), '***', 'Counting rows in file and getting header info...')
if filetype == 'zip':
zipname = snow.get_infile()
filename = snow.get_file_within_zip()
with zf.ZipFile(zipname) as folder:
with folder.open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(zipname, nrows=1)
else:
with open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(filename, nrows=1)
print(dt.datetime.now(), '***', f'Header info gathered for your {str(row_count)} row file')
# calculate number of iterations needed for file chunking (recommended to use 1 mill)
rows_to_read = snow.get_rows_to_read()
rows_div_mill = row_count / rows_to_read
iterations = math.ceil(rows_div_mill)
skip = 1
# set all types to str/obj for faster read
dtypes = {k: str for k in header.columns}
file_chunk_threads = []
print(dt.datetime.now(), '***', 'Creating thread list for file chunking and GZIP formatting...')
if filetype == 'zip':
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read, filename, header,
dtypes, staging_filepath, zipname=zipname))
skip += rows_to_read
else:
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read, filename, header,
dtypes, staging_filepath, zipname=None))
skip += rows_to_read
print(dt.datetime.now(), '***', 'Thread list created for file chunking and formatting')
print(dt.datetime.now(), '***', 'Starting file chunking and formatting threads...')
for fc_thread in file_chunk_threads:
fc_thread.start()
for fc_thread in file_chunk_threads:
fc_thread.join()
print(dt.datetime.now(), '***', 'Files chunked and formatted in folder:', staging_folder)
# starting to stage files for transfer
staging_files = []
for root, dirs, files in os.walk(f"{staging_filepath}/."):
for f in files:
staging_files.append(f)
print(dt.datetime.now(), '***', 'Total chunked files:', len(staging_files))
put_statements = []
print(dt.datetime.now(), '***', 'Creating PUT statements for file chunks...')
for staging_file in staging_files:
put_statements.append(f'''
PUT file://{staging_filepath}/{staging_file} @%{table_name}
SOURCE_COMPRESSION = GZIP
PARALLEL = 20
AUTO_COMPRESS = FALSE
''')
print(dt.datetime.now(), '***', 'PUT statements created')
put_threads = []
put_counter = 0
# create thread list
print(dt.datetime.now(), '***', 'Creating thread list for PUT statements...')
for statement in put_statements:
put_threads.append(SfExecutionThread(put_counter, statement))
put_counter += 1
# execute the threads
print(dt.datetime.now(), '***', 'Starting PUT query threads...')
for thread in put_threads:
thread.start()
for thread in put_threads:
thread.join()
print(dt.datetime.now(), '***', 'PUT threads complete. Data Staged')
field_delimiter = snow.get_field_delimiter()
record_delimiter = snow.get_record_delimiter()
print(dt.datetime.now(), '***', 'Starting COPY INTO...')
copy_into_sql = f"""COPY INTO {table_name}
FROM @%{table_name}
FILE_FORMAT = (REPLACE_INVALID_CHARACTERS = TRUE
SKIP_HEADER = 1
FIELD_DELIMITER = '{field_delimiter}'
RECORD_DELIMITER = '{record_delimiter}')
ON_ERROR = CONTINUE"""
snow.snowflake_cursor().cursor().execute(copy_into_sql)
print(dt.datetime.now(), '***', 'COPY INTO complete. Data in', table_name)
# clear staging table
print(dt.datetime.now(), '***', 'Clearing staging table...')
remove_staging_sql = f"REMOVE @%{table_name}"
snow.snowflake_cursor().cursor().execute(remove_staging_sql)
print(dt.datetime.now(), '***', 'Staging table cleared')
# clear local staging folder
if snow.get_rm_staging_files():
print(dt.datetime.now(), '***', 'Clearing local staging folder...')
for root, dirs, files in os.walk(staging_filepath):
for f in files:
os.unlink(os.path.join(root, f))
print(dt.datetime.now(), '***', 'Local staging folder cleared')
if snow.get_rm_staging_folder():
print(dt.datetime.now(), '***', 'Deleting staging folder...')
os.rmdir(staging_filepath)
print(dt.datetime.now(), '***', 'Local staging folder deleted')
elif filetype == 'sql':
print('This connector is not equipped to handle SQL transfers at this time')
elif filetype in ['hive', 'hadoop']:
print('This connector is not equipped to handle Hive/Hadoop transfers at this time')
end_time = dt.datetime.now()
print('Total file splitting, formatting, and upload process took', end_time - start_time)
if __name__ == '__main__':
main()