Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPN-57: Warehouse Feature #1054

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions bin/warehouse/csv_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/usr/bin/env python3
import json
import hashlib
import requests
import sys
import csv
import os
import io
from sys import stderr, stdout, argv
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

def get_file_keys(csv_file, primary_keys):
pk_list = {}
with open(csv_file) as f:
hashcsv = csv.DictReader(codecs.EncodedFile(f, 'utf-8', 'utf-8-sig'), delimiter=",")
for row in hashcsv:
primary_fields = [str(row[t]) for t in primary_keys]
uid = '-'.join(primary_fields)
pk_list[uid]=''
f.close()
return pk_list

def find_removed_and_created_keys(prev, curr):
"""

:param prev: Older CSV
:param curr: Newer CSV
:return: The keys that were removed/added between the 2 CSVs, and the keys which MAY be modified
"""
removed_keys = {}
added_keys = {}
mod_keys = {}

for key in prev:
if not key in curr:
removed_keys[key] = ''
for key in curr:
if not key in prev:
added_keys[key]=''
else:
mod_keys[key]=''

return removed_keys, added_keys, mod_keys

def get_mod_keys(prev, curr, modkeys, primary_keys):
"""
Get modified keys
:param prev: The old CSV used for comparison
:param curr: The newer CSV for comparison
:param modkeys: The keys which may indicate modification in row
:param primary_keys: Primary keys of PD type
:return: The keys of rows which are modified between the old and new CSVs
"""
res = {}
with open(curr) as cf:
currcsv = csv.DictReader(codecs.EncodedFile(cf, 'utf-8', 'utf-8-sig'), delimiter=",")
for row in currcsv:
primary_fields = [str(row[t]) for t in primary_keys]
uid = '-'.join(primary_fields)
if uid in modkeys:
modkeys[uid]=row
cf.close()
with open(prev) as pf:
prevcsv = csv.DictReader(codecs.EncodedFile(pf, 'utf-8', 'utf-8-sig'), delimiter=",")
for row in prevcsv:
primary_fields = [str(row[t]) for t in primary_keys]
uid = '-'.join(primary_fields)
if uid in modkeys:
for field in row:
if row[field] != modkeys[uid][field]:
res[uid]=''
break
pf.close()
return res

def write_warehouse(out, csvfile, primary_keys, write_keys, writehead, fields, log_activity, date):
"""
Write to warehouse CSV file in such a way that does not load entire CSV files to memory.
:param out: warehouse file to write to
:param csvfile: the CSV file used for reading/writing data to warehouse
:param primary_keys: primary keys of PD type
:param write_keys: the keys to be written (added keys, removed keys, modified keys)
:param writehead: boolean to either write or ignore header
:param fields: fields to be written to warehouse
:param log_activity: C = created, D = deleted, M = modified
:param date: date of comparison
:return: None
"""
with open(out, 'a') as f:
warehouse = csv.DictWriter(f, fieldnames=fields, delimiter=',', restval='')
if not exists_flag:
warehouse.writeheader()
with open(csvfile) as cf:
hashcsv = csv.DictReader(codecs.EncodedFile(cf, 'utf-8', 'utf-8-sig'), delimiter=",")
for row in hashcsv:
primary_fields = [str(row[t]) for t in primary_keys]
uid = '-'.join(primary_fields)
if uid in write_keys:
res = row
res["log_date"] = date
res["log_activity"] = log_activity
l = list(res.keys())
for g in l:
if g not in fields:
del res[g]
warehouse.writerow(res)
cf.close()
f.close()

prev_csv, current_csv, endpoint, datestamp, outfile = sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5]
"""
prev_csv = Older CSV being compared
current_csv = newer CSV being compared
endpoint = http address of fields and primary keys of current PD type
datestamp = current date of comparison
outfile = warehouse file for output
"""

field_info = requests.get(endpoint, timeout=100, verify=False).json()

# Grab the primary key fields from the datatype reference endpoint
if 'nil' in current_csv:
pk_fields = [f['primary_key'] for f in field_info['resources'] if 'nil' in f['resource_name']]
fields = [f['fields'] for f in field_info['resources'] if 'nil' in f['resource_name']]
else:
pk_fields = [f['primary_key'] for f in field_info['resources'] if 'nil' not in f['resource_name']]
fields = [f['fields'] for f in field_info['resources'] if 'nil' not in f['resource_name']]

pk_fields.append('owner_org')

#Get Primary Keys for all rows in prev and curr CSV
old_csv_keys = get_file_keys(prev_csv, pk_fields)
new_csv_keys = get_file_keys(current_csv, pk_fields)

#Compare keys to find Removed and Created Primary Keys and Candidate Mod keys
removed_keys, added_keys, mod_keys = find_removed_and_created_keys(old_csv_keys, new_csv_keys)

#Compare rows of mod_keys to confirm that rows were modified
mod_keys = get_mod_keys(prev_csv, current_csv, mod_keys, pk_fields)

if len(added_keys)==0 and len(removed_keys)==0 and len(mod_keys)==0:
print("No changes detected between files")
else:
print("No changes detected between files")
exists_flag = os.path.isfile(outfile)
#write created keys to warehouse file
write_warehouse(outfile, current_csv, pk_fields, added_keys, exists_flag, fieldnames, "C", datestamp)
exists_flag = True
#write modified keys to warehouse file
write_warehouse(outfile, current_csv, pk_fields, mod_keys, exists_flag, fieldnames, "M", datestamp)
#write deleted keys to warehouse file
write_warehouse(outfile, prev_csv, pk_fields, removed_keys, exists_flag, fieldnames, "D", datestamp)

108 changes: 108 additions & 0 deletions bin/warehouse/generate_warehouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""
Creates Warehouse for all PD-Types found in archived backups.

Arguments:
fname - directory of archived backups
operation - '-d' to compare last 2 backups (default), '-a' to compare all backups.
"""
import tarfile
import sys
import os
import subprocess
import shutil
from datetime import datetime

fname = sys.argv[1]

if len(sys.argv) == 3:
operation = sys.argv[2]
if operation != '-a' and operation != '-d':
sys.exit("Error: invalid operation value (sys.argv[2]), -d or -a expected")
else:
operation = "-d"

tar_array = sorted(os.listdir(fname))
if operation == "-d":
tar_array = tar_array[-2:]
print(tar_array)
prev = ''
curr = ''

def get_base(tfile):
base = os.path.basename(tfile)
pd_name = os.path.splitext(os.path.splitext(base)[0])[0]
return pd_name

def extract(tfile, dest):
fpath = './' + dest
tar = tarfile.open(fname + tfile)
tar.extractall(path=fpath)
tar.close()
return fpath

def run_migrations(fpath):
if not os.path.exists('temp'):
os.mkdir('temp')
for csvfile in os.listdir(fpath):
print("Migrating {0} from directory {1}".format(csvfile, fpath))
proc = subprocess.Popen(['python', 'migrate_all.py', fpath+'/'+csvfile, 'temp/'+fpath+'m_'+csvfile])
proc.wait()

def csv_diff(prev_csv, curr_csv, endpoint, outfile):
now = datetime.now()
dt_string = now.strftime("%Y-%m-%d")

print("Getting difference between {0} and {1}".format(prev_csv, curr_csv))
proc = subprocess.Popen(['python', 'csv_diff.py', 'temp/'+prev_csv, 'temp/'+curr_csv, endpoint,
dt_string, outfile])
proc.wait()


if not os.path.exists('warehouse_reports'):
os.mkdir('warehouse_reports')

while tar_array:
if tar_array == []:
break
if prev == '':
prev = tar_array.pop(0)
curr = tar_array.pop(0)
else:
prev = curr
curr = tar_array.pop(0)

prev_base = get_base(prev)
curr_base = get_base(curr)

# Extract zipped backups
prev_path = extract(prev, prev_base)
curr_path = extract(curr, curr_base)

# Migrate all CSVs
run_migrations(prev_path)
run_migrations(curr_path)

# Delete extracted directories
shutil.rmtree(prev_path)
shutil.rmtree(curr_path)

# Match Migrated CSVs
csv_array = sorted(os.listdir('temp'))
prev_array = [a for a in csv_array if prev_base in a]
curr_array = [a for a in csv_array if curr_base in a]

for i in range(len(prev_array)):
now = datetime.now()
dt_string = now.strftime("%H:%M:%s")
print(dt_string,'\n')
pdtype = prev_array[i].split('_')[1].split('.')[0]
schema = pdtype
if 'nil' in pdtype:
schema = schema.split('-')[0]

csv_diff(prev_array[i], curr_array[i],
'http://open.canada.ca/data/en/recombinant-schema/{0}.json'.format(schema),
'warehouse_reports/{0}_warehouse_test.csv'.format(pdtype))

shutil.rmtree('temp')
13 changes: 4 additions & 9 deletions bin/warehouse/migrate_all.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/usr/bin/env python

"""
This script takes proactive disclosure data in the form of a csv file and runs it against the corresponding migration scripts
"""

import sys
import codecs
Expand All @@ -9,14 +11,7 @@
import subprocess
import shutil


"""
This script takes proactive disclosure data in the form of a csv file and runs it against the corresponding migration scripts
"""


def run_scripts(infile, outfile, matching_files):

# Remove any dead procedures from previous calls to this method
if proc_array:
proc_array[:] = []
Expand All @@ -27,7 +22,7 @@ def run_scripts(infile, outfile, matching_files):

else:
for matching_file in matching_files:
print("Starting process: {0} with {1}".format(matching_files.index(matching_file),matching_file))
print("Starting process: {0} with {1}".format(matching_files.index(matching_file), matching_file))
if len(proc_array) == 0:
proc_array.append(subprocess.Popen(['python', matching_file, 'warehouse'], stdin=subprocess.PIPE, stdout=subprocess.PIPE))
elif matching_file == matching_files[-1]:
Expand Down