In [18]:
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import queue
import logging
import time
import logging
import json
import datetime
import binascii
from typing import MutableSequence
import gzip
import shutil
import os

import pandas as pd
from dask.utils import natural_sort_key
import numpy as np
import dask.dataframe as dd

from airsecoma import azure_utils

In [2]:
azure_utils.init_dotenv()

In [3]:
azure_utils.init_logging()
logging.getLogger().setLevel(logging.INFO)
logging.getLogger('azure').setLevel(logging.WARNING)
log = logging.getLogger("diff.ipnb")
log.info("Logging test INFO")
log.debug("Logging test DEBUG")

2023-07-20 10:10:02 - INFO - diff.ipnb - Logging test INFO


In [4]:
TENANT_ID = "46b696da-8827-46db-8cc7-b5020d3f4056"
FS_CONFS = {
    "dev": { "account": "skyguidecapmandatadev", "linked_service": "skyguidecapmandatadev", "tenant_id": TENANT_ID },
    "uat": { "account": "skyguidecapmandatauat", "linked_service": "skyguidecapmandatauat", "tenant_id": TENANT_ID },
    "prd": { "account": "skyguidecapmandataprd", "linked_service": "skyguidecapmandataprd", "tenant_id": TENANT_ID },
}
FS_CONF_BACKUP_DATA     = { "account": "skyguidecapmanbackup", "linked_service": "skyguidecapmanbackup", "tenant_id": TENANT_ID }
FS_CONF_BACKUP_METADATA = { "account": "skyguidecapmanbackup", "linked_service": "skyguidecapmanbackup", "tenant_id": TENANT_ID }

In [13]:
LOCATIONS_TO_BACKUP = [
    { "fs_name": "dev", "root": "skyguide-data" },
    { "fs_name": "uat", "root": "skyguide-data" },
    { "fs_name": "prd", "root": "skyguide-data" },
]

In [12]:
BACKUP_DATA_FOLDER = "data"
BACKUP_METADATA_FOLDER = "metadata"

In [5]:
fs_by_name = {
    name: azure_utils.init_azure_fs(**config)
    for name, config in FS_CONFS.items()
}
fs_backup_data = azure_utils.init_azure_fs(**FS_CONF_BACKUP_DATA)
fs_backup_metadata = azure_utils.init_azure_fs(**FS_CONF_BACKUP_METADATA)

In [6]:
NoneType = type(None)
def normalize_details(v):
    if isinstance(v, (str, int, float, bool, NoneType)):
        return v
    elif isinstance(v, (datetime.datetime,datetime.date)):
        return v.isoformat()
    elif isinstance(v, bytearray):
        return binascii.hexlify(v).decode('ascii')
    elif hasattr(v, "items") and callable(v.items) and hasattr(v, "__getitem__") and callable(v.__getitem__):
        return {k: normalize_details(v) for k, v in v.items()}
    elif isinstance(v, list) or isinstance(v, MutableSequence) or (hasattr(v, "__iter__") and callable(v.__iter__)):
        return [normalize_details(v) for v in v]
    else:
        raise ValueError(f"Unknown type '{type(v).__module__}.{type(v).__name__}' for instance {v}")

In [15]:
class Collector:
    def __init__(self, fs):
        self.fs = fs
        self.is_cancelled = False
        self.folders_to_process = queue.Queue()
        self.collected_files = queue.Queue()
        self.collected_errors = queue.Queue()

    def collect(self):
        try:
            while True:
                if self.is_cancelled:
                    raise Exception("Cancelled")
                try:
                    folder = self.folders_to_process.get(block=True, timeout=1)
                except queue.Empty:
                    continue

                try:
                    details = self.fs.ls(folder, detail=True)
                    for d in details:
                        if d['type'] == 'directory':
                            self.folders_to_process.put(d['name'])
                        else:
                            self.collected_files.put(d)
                finally:
                    self.folders_to_process.task_done()
        except Exception as e:
            self.collected_errors.put(e)

    def parallel_collect(self, parallel=20, log_every_sec=10):
        with concurrent.futures.ThreadPoolExecutor(max_workers=parallel) as executor:
            # start the workers
            for _ in range(parallel):
                executor.submit(self.collect)

            # wait for the workers to finish
            try:
                last_log_time = time.monotonic()
                while True:
                    if self.folders_to_process.unfinished_tasks == 0:
                        log.info(f"collect_all_files_recursive: Finished collecting files")
                        break
                    if self.collected_errors.qsize() > 0:
                        raise Exception(f"Error while collecting files: {self.collected_errors.get()}")

                    now = time.monotonic()
                    if now - last_log_time > log_every_sec:
                        last_log_time = now
                        log.info(f"collect_all_files_recursive: collected_files={self.collected_files.qsize()}, folders_to_process={self.folders_to_process.qsize()}")
                    time.sleep(1)
            finally:
                self.is_cancelled = True
                log.info(f"END OF collect_all_files_recursive")

In [16]:
import re


def _build_backup_path(md5):
    return f"{BACKUP_DATA_FOLDER}/{md5[:2]}/{md5[2:4]}/{md5[4:]}/{md5}.gz"

class Migrator:
    def __init__(
        self, 
        fs_by_name, 
        fs_backup_data, 
        backup_data_folder, 
        fs_backup_metadata, 
        backup_metadata_folder,
        parallel=20, 
        log_every_sec=10,
    ):
        self.fs_by_name             = fs_by_name
        self.fs_backup_data         = fs_backup_data
        self.backup_data_folder     = backup_data_folder
        self.fs_backup_metadata     = fs_backup_metadata
        self.backup_metadata_folder = backup_metadata_folder
        self.parallel               = parallel
        self.log_every_sec          = log_every_sec
        self._backup_data_snapshot  = None

    @property
    def backup_data_snapshot(self):
        if self._backup_data_snapshot is None:
            log.log(f"backup_data_snapshot: loading backup data snapshot")
            self._backup_data_snapshot = self._get_backup_data_snapshot()
        return self._backup_data_snapshot
        
    def get_snapshot(self, fs_name, folder):
        """Get the current snapshot of a folder in a filesystem

        Returns:
            DataFrame: DataFrame with columns name, size, md5 (dtypes: string, int64, string)
        """
        fs = self.fs_by_name[fs_name]
        c = Collector(fs)
        c.folders_to_process.put(folder)
        c.parallel_collect(parallel=self.parallel, log_every_sec=self.log_every_sec)
        cf = normalize_details(c.collected_files.queue)
        if len(cf) == 0:
            return pd.DataFrame(
                columns=['name', 'size', 'md5'], 
                dtypes=['string', 'int64', 'string']
            )

        df = pd.json_normalize(cf)
        df = df.rename(columns={ 'content_settings.content_md5': 'md5' })
        df = df[['name', 'size', 'md5']]
        
        # remove folder prefix
        df['name'] = df['name'].str.replace(rf"^{re.escape(folder)}/", "", regex=True)
        
        # coerce dtypes
        df['name'] = df['name'].astype('string')
        df['size'] = df['size'].astype('int64')
        df['md5'] = df['md5'].astype('string')
        
        return df
    
    def get_backup_snapshot_versions(self, fs_name, folder):
        backup_file_paths = fs_backup_metadata.glob(f"{BACKUP_METADATA_FOLDER}/{fs_name}/{folder}/*.parquet")
        return sorted([os.path.splitext(os.path.basename(p))[0] for p in backup_file_paths], key=natural_sort_key)
    
    def get_backup_snapshot(self, fs_name, folder, version):
        df = pd.read_parquet(f"{BACKUP_METADATA_FOLDER}/{fs_name}/{folder}/{version}.parquet", filesystem=fs_backup_metadata)
            
    def get_snapshot_diff(self, snapshot_src, snapshot_dst):
        """Perform a diff between two snapshots

        Returns:
            DataFrame: DataFrame with columns name, size_src, size_dst, md5_src, md5_dst, diff (equal, different, only_src, only_dst)
        """
        df_diff = pd.merge(snapshot_src, snapshot_dst, how='outer', suffixes=('_src', '_dst'), on='name', indicator=True)
        df_diff = df_diff.rename(columns={ '_merge': 'diff' })
        df_diff.loc[df_diff['diff'] == 'left_only', 'diff'] = 'only_src'
        df_diff.loc[df_diff['diff'] == 'right_only', 'diff'] = 'only_dst'
        mask = df_diff['diff'] == 'both'
        df_diff.loc[mask, 'diff'] = np.where(df_diff.loc[mask, 'md5_src'] == df_diff.loc[mask, 'md5_dst'], 'equal', 'different')
        return df_diff
    
    def do_backup(self, fs_name, folder, backup_version, snapshot=None):
        log.info(f"do_backup: starting backup #'{backup_version}' of {fs_name=}, {folder=}")

        fs = fs_by_name[fs_name]
        
        if snapshot is None:
            snapshot = self.get_snapshot(fs_name, folder)
            
        # find md5 that must be backed up
        files_to_backup = snapshot[snapshot['md5'].isin(self.backup_data_snapshot['md5'])]        
        # keep only the first file with a given md5, because we only need to upload one copy of the file
        files_to_backup = files_to_backup.drop_duplicates(subset=['md5'], keep='first')

        log.info(f"do_backup: found {len(files_to_backup)} new files to backup")
        
        # backup data
        with ThreadPoolExecutor(max_workers=self.parallel) as executor:
            results = executor.map(lambda row: self._backup_file(fs_name, row['name'], row['md5']), files_to_backup.iterrows())
            for _ in results:
                pass  # raises exception if any of the threads failed
                
        # backup metadata
        snapshot_path = f"{BACKUP_METADATA_FOLDER}/{fs_name}/{folder}/{backup_version}.parquet"
        with fs_backup_metadata.open(snapshot_path, 'wb') as f:
            snapshot.to_parquet(f)

        # update inplace the backup snapshot to include the new backed up files
        self._backup_data_snapshot = pd.concat([self._backup_data_snapshot, files_to_backup])

    def restore_backup(self, dst_fs_name, df_diff):
        df = pd.concat(
            df_diff[df_diff['diff'] == 'only_right'],
            df_diff[df_diff['diff'] == 'different']
        )
        log.info(f"restore_backup: restoring {len(df)} missing or changed files")
        with ThreadPoolExecutor(max_workers=parallel) as executor:
            results = executor.map(lambda row: restore_file(fs_name, row['name'], row['md5_right']), df.iterrows())
            for _ in results:
                pass  # raises exception if any

    def _backup_single_file(self, fs_name, name, md5):
        fs = self.fs_by_name[fs_name]
        backup_file_path = _build_backup_path(md5)
        log.info(f"do_backup: backing up file '{name}' to '{backup_file_path}'")
        # open two handles to the file, one for reading and one for writing
        with fs.open(name, 'rb') as f_read, \
                fs_backup_data.open(backup_file_path, 'wb') as f_write, \
                gzip.GzipFile(fileobj=f_write, mode='wb') as gzip_file:
            shutil.copyfileobj(f_read, gzip_file)

    def _restore_file(self, fs_name, name, md5):
        fs = fs_by_name[fs_name]
        backup_file_path = _build_backup_path(md5)
        log.info(f"do_backup: restoring file '{name}' from '{backup_file_path}'")
        with fs_backup_data.open(backup_file_path, 'rb') as f_read, \
                fs.open(name, 'wb') as f_write, \
                gzip.GzipFile(fileobj=f_read, mode='rb') as gzip_file:
            shutil.copyfileobj(gzip_file, f_write)



In [14]:




def backup_all_locations():
    backup_version = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    backup_data_snapshot = get_current_snapshot(fs_backup_data, BACKUP_DATA_FOLDER)
    for loc in LOCATIONS_TO_BACKUP:
        fs_name = loc['fs_name']
        root = loc['root']
        backup_data_snapshot = do_backup(fs_name, root, backup_data_snapshot, backup_version)
        
    
    


In [None]:
prev_snapshot = get_backup_snapshot(fs_backup, BUCKET)

In [None]:
diff = make_diff(curr_snapshot, prev_snapshot)

In [None]:
# display stats by diff type
diff.groupby('diff').count()

In [None]:
#-------------------

In [None]:
backup_files = get_current_snapshot(fs_backup, 'files')

In [None]:
def create_backup_path(md5):
    

In [None]:
print(df_files.head())