### NOT_IMPLEMENTED Using custom code using PySpark RDDs is not allowed on serverless compute. We suggest using mapInPandas or mapInArrow for the most common use cases. For more details on compatibility and limitations, check: https://docs.databricks.com/release-notes/serverless.html limitations

In [0]:
# Read metadata from Unity Catalog table
df_metadata = spark.table("my_unity_catalog_metastore.seq_exe_convert.file_matadata_1")

# Convert DataFrame to RDD of rows
metadata_rdd = df_metadata.rdd

# Function to perform conversion per row
def convert_to_delta(row):
    file_name = row["File Name"]
    source_path = row["Source Path"]
    delta_path = row["delta1_path"]

    try:
        df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(source_path)

        df.write.format("delta").mode("overwrite").save(delta_path)

        print(f"{file_name} successfully converted.")
    except Exception as e:
        print(f"Error converting {file_name}: {str(e)}")

# Run conversion in parallel
metadata_rdd.foreach(convert_to_delta)


In [0]:

from concurrent.futures import ThreadPoolExecutor

# Load metadata
df_meta = spark.table("my_unity_catalog_metastore.seq_exe_convert.file_matadata_1")

# Function to convert one file
def convert_file(row):
    try:
        file_name = row["File Name"]
        source_path = row["Source Path"]
        # parquet_path = f"{row['Parquet Path']}.{file_name}_parquet"
        delta1_path = f"{row['delta1_path']}.{file_name}_delta"

        # Read source Delta table
        df = spark.table(source_path)

        # # Save as Parquet table
        # df.write.format("parquet").mode("overwrite").saveAsTable(parquet_path)

        # Save as Delta table
        df.write.format("delta").mode("overwrite").saveAsTable(delta1_path)

        print(f" Converted: {file_name}")
    except Exception as e:
        print(f" Error in {row['File Name']}: {e}")

# Run in parallel
with ThreadPoolExecutor() as executor:
    executor.map(convert_file, df_meta.collect())


In [0]:
df1 = spark.table("my_unity_catalog_metastore.seq_exe_convert.file_matadata_1")
display(df1)

In [0]:
%sql
UPDATE my_unity_catalog_metastore.seq_exe_convert.file_matadata_1
SET `Parquet Path` = 'workspace.default'
WHERE `File Name` IN ('reviews', 'penguins', 'product_details');


In [0]:
%sql
select * from my_unity_catalog_metastore.seq_exe_convert.file_matadata_1

In [0]:
%sql
CREATE TABLE my_unity_catalog_metastore.seq_exe_convert.execution_log(
  file_name  String,
  process_type  string,
  start_time  timestamp,
  end_time  timestamp ,
  status  string,
  error_message string 
) using delta;

In [0]:
%sql
select * from my_unity_catalog_metastore.seq_exe_convert.execution_log as lo 


In [0]:
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pyspark.sql import Row

# Load metadata into list of Rows
df_meta = spark.table("my_unity_catalog_metastore.seq_exe_convert.file_matadata_1").collect()

# Function to convert and log
def convert_file(row):
    file_name = row["File Name"]
    source_path = row["Source Path"]
    delta1_path = f"{row['delta1_path']}.{file_name}_delta"

    start_time = datetime.now()
    status = "SUCCESS"
    error_message = ""

    try:
        df = spark.table(source_path)
        df.write.format("delta").mode("overwrite").saveAsTable(delta1_path)
        print(f" Converted: {file_name}")
    except Exception as e:
        status = "FAILED"
        error_message = str(e)
        print(f" Error in {file_name}: {e}")

    end_time = datetime.now()

    # Build log Row
    log_row = Row(
        file_name=file_name,
        process_type="delta",
        start_time=start_time,
        end_time=end_time,
        status=status,
        error_message=error_message
    )

    # Save log to Delta log table
    spark.createDataFrame([log_row]).write.mode("append").saveAsTable("my_unity_catalog_metastore.seq_exe_convert.execution_log")

# Run parallel execution
with ThreadPoolExecutor() as executor:
    executor.map(convert_file, df_meta)


In [0]:
%sql
select * from my_unity_catalog_metastore.seq_exe_convert.execution_log 