This notebook copies the folders and files from a lakehouse to another based on a watermark.
It considers the timestamp in which the folder or the files has been created in the source.
If the watermark is greater or equal than the last modify date for the folder/file, this will be copied on the targed.
If the watermark is lower than the last modify date for the folder/file, this will be ignored.

<u>DO NOT USE IT TO COPY DELTA TABLES</u>

**Parameters**
workspace_source : String - Name of the sourc workspace </br>
workspace_target : String - Name of the target workspace </br>  
lakehouse_source : String - Name of the source lakehouse </br> 
lakehouse_target = String - Name of the target lakehouse </br>
source_root = String - Initial folder to start from </br>
target_root = String - Initial target folder to start from </br>
overwritetarget = Boolean </br> 
    - True: In case the folder/file already exists in the target will be overwritten </br>
    - False: In case the folder/file already exists in the target will be skipped </br>
datetime_str : Timestamp - watermark for the new files </br>


**Limitation**
Shortcuts in source lakehouse will be considered as regular folders and their content will be copied in the target lakehouse using a Onelake folder.

In [60]:
workspace_source = ''
workspace_target = ''
lakehouse_source = '' 
lakehouse_target = ''
source_root = 'Files/'
target_root = 'Files/'
overwritetarget = True
datetime_str = 'YYYY-MM-DD HH:MM:SS'

#do not change this
source = f'abfss://{workspace_source}@onelake.dfs.fabric.microsoft.com/{lakehouse_source}.Lakehouse/{source_root}'

StatementMeta(, fc12947d-f5da-44a9-974d-479236bf29c9, 62, Finished, Available, Finished)

**copydata**

This define the routine to copy the folders/files </br>

Parameters:</br>
sourcepath : String - Source path of the folder/file </br>
targetpath : String - Target path for the folder/file </br>
recursive : Boolean - </br>
        1. True - Subfolders and files will be copied</br>
        2. False - Subfolders and files will not be considered</br>
overwrite : Boolean -  </br>
        1. True : Folder/File will be overwritten in case you run the process multiple time with the same watermark    </br>
        2. False : Folder/File will be skipped in case you run the process multiple time with the same watermark     </br>   


In [61]:
from notebookutils import mssparkutils

def copydata(sourcepath='.', targetpath='.', recursive=True, overwrite=False):
    try:
        try:
            mssparkutils.fs.ls(targetpath)
            if overwrite:
                exists = f'overwrite={overwrite},{targetpath} already exists, sync will drop it'
                mssparkutils.fs.rm(targetpath, recursive)
            if not overwrite:
                exists = f'overwrite={overwrite},{targetpath} already exists, sync will skip it'
        except Exception as e:
            exists = f'{targetpath} does not exist, sync will create it'
        mssparkutils.fs.cp(sourcepath, targetpath, recursive)
        status = 'Completed'
    except Exception as e:
           status = f'Error during the copy: {str(e)}'
    return f'{exists} - {status}'

StatementMeta(, fc12947d-f5da-44a9-974d-479236bf29c9, 63, Finished, Available, Finished)

**SyncFilesFromFolder**

This define the routine to identify folders and files that need to be copied </br>
root : String - Initial path to start the scan from

In [62]:
from notebookutils import mssparkutils
from pyspark.sql.types import * #StructType, StructField
from datetime import datetime as D
from pyspark.sql.functions import from_unixtime, col, to_utc_timestamp

def SyncFilesFromFolder(root = '.'):

    dt = D.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
    unix_timestamp = int(dt.timestamp())

    files = mssparkutils.fs.ls(root)

    columns = StructType([
        StructField('executiontime', TimestampType(), True),
        StructField('file', StringType(), True),
        StructField('modifyTime', StringType(), True ),
        StructField('status', StringType(), True)
    ])
    
    file_details = []
    
    # Create an empty DataFrame with the specified schema
    new_files = spark.createDataFrame(file_details, schema=columns)
    for file in files:
       
        target = file.path.replace(workspace_source, workspace_target)
        target = target.replace(lakehouse_source, lakehouse_target)

        if file.isDir and file.modifyTime/1000 < unix_timestamp:
            new_files = new_files.union(SyncFilesFromFolder(file.path))
        elif file.isDir and file.modifyTime/1000 >= unix_timestamp:   
            status=copydata(file.path, target, True, overwritetarget)
            new_row = (D.now(),file.path, file.modifyTime,status)
            file_details.append(new_row)

        if not file.isDir and file.modifyTime/1000 >= unix_timestamp:
            status=copydata(file.path, target, False, overwritetarget)
            new_row = (D.now(), file.path, file.modifyTime, status)
            file_details.append(new_row)

    df = spark.createDataFrame(file_details, columns)
    new_files = new_files.union(df)
    del df

    return new_files

StatementMeta(, fc12947d-f5da-44a9-974d-479236bf29c9, 64, Finished, Available, Finished)

**Runs the process**
This cell runs the copy and save the detailed results in a temporary view 

In [63]:
df = SyncFilesFromFolder(source)
df.createOrReplaceTempView('Results')

StatementMeta(, fc12947d-f5da-44a9-974d-479236bf29c9, 65, Finished, Available, Finished)

**Checking the outcome**


In [64]:
%%sql
SELECT *, cast(modifyTime/1000 as Timestamp) FROM Results

StatementMeta(, fc12947d-f5da-44a9-974d-479236bf29c9, 66, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 5 fields>