In [None]:
# Imports

import zipfile as zp
from pathlib import Path
from os import scandir, makedirs, path
from io import BytesIO
from xml.etree import ElementTree

from datetime import datetime

In [None]:
def createFile(filePath, datasetName, sqlQuery):

    makedirs(filePath, exist_ok=True)
    with open(path.join(filePath, f"{datasetName}.sql"), "w") as f:
        f.write(sqlQuery)

In [None]:
def getSQLQueries(zpf_filename, content):
    filename = zpf_filename.split("/")[-1].replace(".xdmz", "")
    print(f"filename : {filename}")
    datasets = content.split("<dataSets>")[1].split("</dataSets>")[0]
    # print(f"datasets: {datasets}")
    datasetXML = ElementTree.fromstring("<dataSets>"+datasets+"</dataSets>")
    for dataSet in datasetXML.findall('dataSet'):
        dataSetName = dataSet.get('name')
        for sql in dataSet.findall("sql"):
            createFile(f"./sql_queries/{filename}", dataSetName, sql.text)

In [None]:
# Search parameters

TABLES_TO_CHECK = [
    "PER_ALL_PEOPLE_F"
]

COLUMNS_TO_CHECK = [
    "PERSON_NUMBER"
]

# Constants

DATAMODEL_ZIP = 'xdmz'
DATAMODEL_EXTENSION = 'xdm'

REPORT_ZIP = 'xdoz'
REPORT_EXTENSION = 'xdo'

TARGET_EXT = [
    'xdm',
    'xdo'
]

TARGET_EXT_ZIP = [
    'xdmz',
    'xdoz'
]

In [None]:
# Logging

def log_messsage(message, override=False):
    write_mode = 'w+' if override else 'a+'
    
    with open(f"log_{datetime.now().strftime('%Y%m%d')}.txt") as log_file:
        log_file.write(f"{message}")

In [None]:
def find_in_zip(path):
    print(path)
    if zp.is_zipfile(path):
        with zp.ZipFile(path, mode='r') as zfile:
            for zpf in zfile.filelist:
                
                if any(ext in zpf.filename for ext in TARGET_EXT_ZIP):
                    zipdata = BytesIO(zfile.read(zpf.filename))

                    with zp.ZipFile(zipdata) as inner_file:
        
                        for zpff in inner_file.filelist:
                            
                            # Check for report files
                            if zpff.filename[-3:] == TARGET_EXT[1]:
                                continue
                                
                            # Check for data model files
                            if zpff.filename[-3:] == TARGET_EXT[0]:
                                getSQLQueries(zpf.filename, inner_file.read(zpff.filename).decode(encoding='utf-8'))
                                


In [None]:
def traverse_folder(entry_point:str):
    entry_path = Path(entry_point)
    
    if entry_path.is_dir():
        list_of_files = scandir(entry_path)
        for scanned_file in list_of_files:
            print(f"scanned_file: {scanned_file}")
            traverse_folder(scanned_file)
    else:
        find_in_zip(entry_path)

In [None]:
traverse_folder(f'./Inputs')