In [19]:
%%pyspark
import pandas as pd

WAREHOUSE_PATH = "abfss://latticefs@latticedatalake.dfs.core.windows.net/synapse/workspaces/latticeworkspace/warehouse"
LANDING_FOLDER = "landing"
BRONZE_FOLDER = "bronze"
RAW_FILE_EXT = "out"
PROCESSED_FILE_EXT = "csv"

RAW_FILE_FOLDER = f"{WAREHOUSE_PATH}/{LANDING_FOLDER}/"

In [20]:
import os
import glob
from pandas.errors import EmptyDataError
from notebookutils import mssparkutils


def get_processed_path(raw_path: str) -> str:
    """ Creates the processed file destination based on the raw file path """
    processed_path = raw_path.replace(LANDING_FOLDER, BRONZE_FOLDER) \
        .replace(RAW_FILE_EXT, PROCESSED_FILE_EXT)
    assert raw_path != processed_path
    return processed_path

def file_already_exists(path: str) -> bool:
    """ Checks if file already exists on the Datalake"""
    return mssparkutils.fs.exists(path)

def process_data(bronze_file_path: str) -> pd.DataFrame:
    """ Loads the raw data and applies the processing at loading """
    try:
        df = pd.read_csv(bronze_file_path, sep="\t", skiprows=6,header=[0,1])
    except EmptyDataError:
        print("Empty file: ", bronze_file_path)
        return pd.DataFrame()

    # Dropping multilevel index for SQL compatibility
    df.columns = [column[0] for column in df.columns]
    df.reset_index(inplace=True)
    return df

def store_data(data: pd.DataFrame, file_path: str) -> None:
    data.to_csv(file_path, sep=",")

def retrieve_files_in_folders(folder_path: str, file_ext: str) -> list:
    file_infos = files = mssparkutils.fs.ls(folder_path)

    files = [file.path for file in file_infos if file.path.endswith(file_ext)]

    return files


def apply_data_processing(raw_data_path: str):
    raw_files_list = retrieve_files_in_folders(raw_data_path, file_ext=RAW_FILE_EXT)
    processed_files = []

    for raw_file_path in raw_files_list:
        processed_file_path = get_processed_path(raw_file_path)
        if file_already_exists(processed_file_path):
            continue
        df = process_data(raw_file_path)
        if not df.empty:
            store_data(df, processed_file_path)
            processed_files.append(processed_file_path)

    return processed_files



In [17]:
print("Processing files in: ", RAW_FILE_FOLDER)
new_files = apply_data_processing(RAW_FILE_FOLDER)
print("New files created: ", new_files)

In [21]:
# test_file_path = "abfss://latticefs@latticedatalake.dfs.core.windows.net/synapse/workspaces/latticeworkspace/warehouse/landing/1.5MW-1682790356.out"
# df = process_data(test_file_path)

# df.head()
