In [0]:
# ===== Params (diisi ADF saat run) =====
dbutils.widgets.text("p_date",   "2025-08-29")
dbutils.widgets.text("p_source", "reuters")

p_date   = dbutils.widgets.get("p_date")
p_source = dbutils.widgets.get("p_source")

# ===== Storage & SAS =====
storage_acct = "stnewspulsedev"            # ganti punyamu
container    = "raw"
sas_token    = "sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2025-09-30T14:36:51Z&st=2025-08-30T06:21:51Z&spr=https&sig=6EaBFeIGtbbRo95e8rZm4MNrlrKhPrz7k7pxjA%2BykpY%3D"            # PASTE SAS token (mulai dari tanda ?)

# Simpan SAS di Spark config (format kuncinya HARUS persis begini)
spark.conf.set(f"fs.azure.sas.{container}.{storage_acct}.blob.core.windows.net", sas_token)

# Path WASBS (Blob endpoint). Kalau pakai ADLS Gen2 + OAuth, nanti bisa ganti ke abfss://
in_path  = f"wasbs://{container}@{storage_acct}.blob.core.windows.net/ingest_date={p_date}/source={p_source}/*.csv"
tmp_path = f"wasbs://{container}@{storage_acct}.blob.core.windows.net/__tmp/ingest_date={p_date}/source={p_source}"
out_path = f"wasbs://{container}@{storage_acct}.blob.core.windows.net/clean/ingest_date={p_date}/source={p_source}"
print("IN:", in_path, "\nTMP:", tmp_path, "\nOUT:", out_path)


In [0]:
from pyspark.sql import functions as F

# 1) Baca semua baris sebagai TEXT (kolom 'value')
raw_lines = spark.read.text(in_path)

# 2) Filter baris yang terlihat CSV valid
#looks_like_csv = (
#    (F.size(F.split(F.col("value"), ",")) >= 3)    # ada ≥3 kolom dipisah koma
#    & (~F.col("value").rlike(r"^-{3,}"))           # buang boundary -----
#    & (~F.col("value").rlike(r"^Content-"))        # buang header multipart
#    & (F.length(F.trim(F.col("value"))) > 0)       # buang baris kosong
#)
looks_like_csv = (
    (~F.col("value").rlike(r"^-{3,}"))           # buang boundary -----
    & (~F.col("value").rlike(r"^Content-"))        # buang header multipart
    & (F.length(F.trim(F.col("value"))) > 0)       # buang baris kosong
)
clean_lines = raw_lines.where(looks_like_csv)

# 3) Tulis sementara sebagai text (1 baris/record) supaya bisa diparse ulang sebagai CSV
clean_lines.coalesce(1).write.mode("overwrite").text(tmp_path)

print("Jumlah baris setelah filter:", raw_lines.count())

# Lihat 10 baris pertama
raw_lines.show(10, truncate=False)

# Atau lebih nyaman di notebook Databricks:
display(raw_lines.limit(30))

clean_lines.show(50, truncate=False)
display(clean_lines.limit(30))


In [0]:
# Baca kembali sebagai CSV valid
from pyspark.sql import functions as F

clean_df = (
    spark.read
         .option("header", True)      # baris pertama di tmp jadi header
         .option("quote", '"')        # dukung koma di dalam judul
         .option("escape", '"')
         .option("multiLine", True)   # jaga-jaga ada newline di dalam kutip
         .csv(tmp_path)
         .select("Title","Url")
)

# Tambahkan ingest_date biar enak dipakai di SQL
clean_df = clean_df.withColumn("ingest_date", F.to_date(F.lit(p_date)))

# Tulis hasil akhir ke folder 'clean/...'
(clean_df
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(out_path))

print("Done. Clean CSV at:", out_path)


In [0]:
import os

# 1. Tulis seperti biasa
(clean_df
    .coalesce(1)
    .write.mode("overwrite")
    .option("header", True)
    .csv(out_path))

# 2. Cari file part-*.csv
files = dbutils.fs.ls(out_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]

# 3. Tentukan nama baru
target_path = os.path.join(out_path, "clean_reuters.csv")

# 4. Rename
dbutils.fs.mv(csv_file, target_path)

print("Clean CSV renamed to:", target_path)
