In [0]:
dbutils.widgets.text("sql_query", "")
dbutils.widgets.text("database", "")
sql_query = dbutils.widgets.get("sql_query")
database = dbutils.widgets.get("database")

if not database:
  database = "ifrs17_cur"

spark.sql(f"USE {database}")

if not sql_query:
  sql_query = "SELECT * FROM data_ifrs17 LIMIT 10"

In [0]:
df = spark.sql(sql_query)

In [0]:
import hashlib
import time

timestamp = str(int(time.time() * 10000))
short_hash = hashlib.md5(timestamp.encode()).hexdigest()[:6]
# Path for the output CSV files
base_csv_path = f"dbfs:/tmp/api_responses/result_{short_hash}"

In [0]:
# Assuming df is your DataFrame
total_records = df.count()
records_per_file = 1000
num_partitions = (total_records // records_per_file) + (1 if total_records % records_per_file > 0 else 0)

# Repartition the DataFrame into the required number of partitions
df_repartitioned = df.repartition(num_partitions)

# Write the DataFrame in parallel to multiple CSV files, each with 1000 records
df_repartitioned.write.mode("overwrite").option("header", "true").csv(base_csv_path)

# List all the files written and get the paths
files = dbutils.fs.ls(base_csv_path)
csv_files = [file.path for file in files if file.path.endswith(".csv")]

In [None]:
# Create a summary CSV file with file paths and record counts
summary_path = f"{base_path}_summary.csv"
summary_data = [(path, count) for path, count in file_paths]
summary_df = spark.createDataFrame(summary_data, ["file_path", "record_count"])

# Write the summary CSV file
summary_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(summary_path)

# Retrieve the summary file path
summary_file = [file.path for file in dbutils.fs.ls(summary_path) if file.path.endswith(".csv")][0]

# Exit the notebook and return the summary file path
dbutils.notebook.exit(summary_file)