In [None]:
# Specify the target url and if you want to do a delta load
url = ''
delta = False


# this determines how detailed the log is, where INFO is the standard. the list below is ordered from most detailed (DEBUG) to least detailled (CRITICAL)
# logging.DEBUG
# logging.INFO
# logging.WARNING
# logging.ERROR
# logging.CRITICAL
log_level = logging.INFO

In [None]:
import logging
import datetime

logname = ''.join([datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'),
                   '_uploader.log'])
FORMAT = '%(asctime)s %(levelname)s %(message)s'
formatter = logging.Formatter(FORMAT)
logging.basicConfig(format=FORMAT, filename=logname, level=logging.INFO)

try:
    import re
    import subprocess
    import requests
    from io import BytesIO
    import zipfile
    import pandas as pd
    from pycelonis import get_celonis
    import fastparquet as fp
    import pyarrow as pa
    import time
    from datetime import datetime
    from os import listdir
    from os.path import isfile, join
except ModuleNotFoundError as e:
    logging.error(e)
    logging.error('please install missing packages to use this program.')
    print('shutting down')
    quit()

class cloud:

    def get_api(self, path):
        return f"https://{self.tenant}.{self.realm}.celonis.cloud/{path}"

    def __init__(self, tenant, realm, api_key):
        self.tenant = tenant
        self.realm = realm
        self.api_key = api_key

    def get_jobs_api(self, pool_id):
        return self.get_api(f"integration/api/v1/data-push/{pool_id}/jobs/")

    def get_auth(self):
        return {'authorization': f"AppKey {self.api_key}"}

    def list_jobs(self, pool_id):
        api = self.get_jobs_api(pool_id)
        return requests.get(api, headers=self.get_auth()).json()

    def delete_job(self, pool_id, job_id):
        api = self.get_jobs_api(pool_id) + f"/{job_id}"
        return requests.delete(api, headers=self.get_auth())

    def create_job(self, pool_id, targetName, data_connection_id,
                   upsert=False):
        api = self.get_jobs_api(pool_id)
        job_type = "REPLACE"
        if upsert:
            job_type = "DELTA"
        if not data_connection_id:
            payload = {'targetName': targetName, 'type': job_type,
                       'dataPoolId': pool_id}
        else:
            payload = {'targetName': targetName, 'type': job_type,
                       'dataPoolId': pool_id,
                       'connectionId': data_connection_id}
        r = requests.post(api, headers=self.get_auth(), json=payload)
        return r.json()

    def push_new_dir(self, pool_id, job_id, dir_path):
        files = [join(dir_path, f) for f in listdir(dir_path)
                 if isfile(join(dir_path, f))]
        parquet_files = list(filter(lambda f: f.endswith(".parquet"), files))
        for parquet_file in parquet_files:
            logging.debug(f"Uploading chunk {parquet_file}")
            self.push_new_chunk(pool_id, job_id, parquet_file)

    def push_new_chunk(self, pool_id, job_id, file_path):
        api = self.get_jobs_api(pool_id) + f"/{job_id}/chunks/upserted"
        upload_file = {"file": file_path}
        return requests.post(api, files=upload_file, headers=self.get_auth())

    def submit_job(self, pool_id, job_id):
        api = self.get_jobs_api(pool_id) + "/{job_id}/"
        return requests.post(api, headers=self.get_auth())

In [None]:
parts = []
connectionflag = 1
try:
    parts.append(re.search('https://([a-z0-9-]+)\.', url).groups()[0])
    parts.append(re.search('\.([a-z0-9-]+)\.celonis', url).groups()[0])
    parts.append(re.search('ui/pools/([a-z0-9-]+)', url).groups()[0])
    try:
        parts.append(re.search('data-connections/[a-z-]+/([a-z0-9-]+)', url)
                     .groups()[0])
    except AttributeError:
        connectionflag = 0
except AttributeError:
    logging.error(f'{url} this is an unvalid url.')
logging.info(connectionflag)

team = parts[0]
realm = parts[1]
poolid = parts[2]

if connectionflag == 1:
    connectionid = parts[3]
else:
    connectionid = None


cmd = 'printenv | grep CELONIS_API_TOKEN'
appkey = subprocess.run(cmd, shell=True, capture_output=True)
appkey = appkey.stdout.decode('utf-8').split('=')[1].strip()

apikey = appkey


url = f'https://{team}.{realm}.celonis.cloud/storage-manager/api/buckets?feature=SFTP'
header_json = {'Authorization': f'AppKey {appkey}', 'Accept': 'application/json'}
file_response = requests.get(url, headers=header_json)
buckets = [i['id'] for i in file_response.json()]
logging.info('buckets found:')
for bucket_id in buckets:
    logging.info(bucket_id)

In [None]:
# set bucket_id if you leave this blank a random id will be chosen
bucket_id = ''

if bucket_id == '':
    for bucketed_id in buckets:
        bucket_id = bucketed_id
logging.info(f'bucket chosen: {bucket_id}')

# set folder to scan (default is the root folder)
path_to_file = ''
encoding = None

In [None]:
# get all the file desciptions
url = f'https://{team}.{realm}.celonis.cloud/storage-manager/api/buckets?feature=SFTP'
header_json = {'Authorization': f'AppKey {appkey}', 'Accept': 'application/json'}
file_response = requests.get(url, headers=header_json)
# get all the file names

url = f'https://{team}.{realm}.celonis.cloud/storage-manager/api/buckets/{bucket_id}/files?path=/' + path_to_file
header_json = {'Authorization': f'AppKey {appkey}', 'Accept': 'application/json'}
file_response = requests.get(url, headers=header_json)
files = []
folders = []
try:
    logging.info(file_response.json())
    for i in file_response.json()['children']:
        if i['type'] == 'FILE':
            files.append(path_to_file + i['filename'])
        elif i['type'] == 'DIRECTORY':
            folders.append(i['filename'])
except:
    logging.error(file_response)

In [None]:
# create a list with all the headers in it that are most recent
header = sorted([header for header in files if ('_HEADER' in header)], reverse=True)
header2 = [header[0]]
for i in header:
    if all(i.split('HEADER')[0] not in h for h in header2):
        header2.append(i)
header = header2
logging.info(header)
jobstatus = {}
uppie = cloud(tenant=team, realm=realm, api_key=apikey)

# start the upload per header
for i in header:
    if True:
        table_files = []
        indices = []
        ref = i.split('HEADER')
        ref[1] = ref[1].replace('.csv', '')
        targetname = ref[0][:-1].replace(path_to_file, '')
        jobhandle = uppie.create_job(pool_id=poolid,
                             data_connection_id=connectionid,
                             targetName=targetname,
                             upsert=delta)
        logging.debug(jobhandle)
        jobstatus[jobhandle['id']] = False
        for n in range(len(files)):
            if ref[0] in files[n] and ref[1] in files[n]:
                indices.append(n)
        for m in indices[::-1]:
            table_files.append(files.pop(m))
        url = f'https://{team}.{realm}.celonis.cloud/storage-manager/api/buckets/{bucket_id}/files?path=/' + i
        header_json = {'Authorization': 'AppKey {}'.format(appkey), 'Accept': 'application/octet-stream'}
        with requests.get(url, headers=header_json, stream=False) as r:
            r.raise_for_status()
            df = pd.read_csv(BytesIO(r.content), header=None, dtype=str, sep=' ', names=['names', 'type', 'length', 'declength'], encoding=encoding)

        for file in table_files:
            try:
                url = f'https://{team}.{realm}.celonis.cloud/storage-manager/api/buckets/{bucket_id}/files?path=/' + file
                header_json = {'Authorization': 'AppKey {}'.format(appkey), 'Accept': 'application/octet-stream'}
                with requests.get(url, headers=header_json, stream=False) as r:
                    r.raise_for_status()
                    if 'HEADER' in file:
                        continue
                    elif file.split('.')[-1] == 'zip':
                        z = zipfile.ZipFile(BytesIO(r.content))
                        fh = z.open(z.infolist()[0])
                    else:
                        fh = BytesIO(r.content)
                    df_up = pd.read_csv(fh, header=None, dtype='string', sep=';', names=list(df['names']), quotechar='"', encoding=encoding)
                    buffer = BytesIO()
                    df_up.to_parquet(buffer, index=False, compression='snappy')
                    uppie.push_new_chunk(pool_id=poolid, job_id=jobhandle['id'], file_path=buffer.getvalue())
            except Exception as e:
                logging.error(f'{file} failed with error: {e}')
        uppie.submit_job(pool_id=poolid, job_id=jobhandle['id'])
logging.info('upload done.')
running = True
while running:
    jobs = uppie.list_jobs(poolid)
    for jobids in jobstatus:
        for i in jobs:
            try:
                if i['id'] == jobids:
                    if i['status'] == 'QUEUED':
                        pass
                    elif jobstatus[jobids] is True:
                        pass
                    elif i['status'] == 'DONE':
                        jobstatus[jobids] = True
                    elif i['status'] != 'RUNNING':
                        jobstatus[jobids] = True
                    else:
                        pass
                    break
            except (KeyboardInterrupt, SystemExit):
                logging.error('terminating program\n')
                quit()
            except:
                pass
    if all(status is True for status in jobstatus.values()):
        running = False
        for i in jobs:
            if i['id'] in jobstatus:
                if i['status'] == 'DONE':
                    logging.info(f"{i['targetName']} was successfully installed in the database")
                else:
                    logging.error(f"{i['targetName']} failed with: {i}")
    else:
        time.sleep(15)