In [None]:
# --- Spark session via CML data connection
import cml.data_v1 as cmldata
import warnings
warnings.filterwarnings("ignore")

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

CONNECTION_NAME = "pdnd-prod-dl-1"   # <-- adjust if your connection is named differently
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()
print("Spark enabled")

In [None]:
# Keep Spark small to avoid hitting namespace quotas
spark.conf.set("spark.sql.shuffle.partitions", "32")
spark.conf.set("spark.default.parallelism", "32")

# If your environment allows these, they help keep executors low:
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.initialExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "4")   # stay below quota

# Be more patient detecting missing pods (if allowed)
spark.conf.set("spark.kubernetes.executor.missingPodDetectDelta", "60s")

# Disable Arrow → avoids some driver/executor serialization issues in restricted clusters
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

print("Spark tuned for notebook.")

In [None]:
# --- User-tunable knobs
DATE_FROM      = "2025-01-01 00:00:00"  # lower bound on payment datetime
SAMPLE_FRAC    = 0.02                   # 2% random sample at source (set smaller if you hit limits)
MAX_ROWS       = 50_000                 # hard cap at source
ML_MAX_ROWS    = 10_000                 # rows collected to pandas (keep small)
SHOW_ROWS      = 10                     # preview rows to show

print(f"DATE_FROM={DATE_FROM}, SAMPLE_FRAC={SAMPLE_FRAC}, MAX_ROWS={MAX_ROWS}, ML_MAX_ROWS={ML_MAX_ROWS}")


In [None]:
query = f"""
SELECT pa, ragione_sociale, remittance
FROM (
  SELECT
    t.fiscalcodepa           AS pa,
    t.companyname            AS ragione_sociale,
    t.remittanceinformation  AS remittance
  FROM pagopa.silver_positive sp
  LATERAL VIEW EXPLODE(sp.transferlist) t_view AS t
  WHERE t.transfercategory LIKE '%0101100IM%'
    AND sp.paymentinfo.paymentdatetime >= CAST('{DATE_FROM}' AS TIMESTAMP)
    AND t.remittanceinformation IS NOT NULL
    AND t.remittanceinformation <> ''
) base
WHERE rand() <= {SAMPLE_FRAC}
LIMIT {MAX_ROWS}
"""

print("Running Spark SQL with sampling+limit pushdown…")
df = spark.sql(query).select(
    F.col("pa").cast("string").alias("pa"),
    F.col("ragione_sociale").cast("string").alias("ragione_sociale"),
    F.col("remittance").cast("string").alias("remittance")
)

print("Schema:")
df.printSchema()
print("Preview:")
df.show(SHOW_ROWS, truncate=False)

In [None]:
# IMPORTANT: keep ML sample small to avoid driver OOM and quota pressure
df_small = df.limit(ML_MAX_ROWS)

print("Collecting a small slice to pandas…")
pdf = df_small.toPandas()
print(f"Pandas rows: {len(pdf)}")
pdf.head(3)

In [None]:
import re

# Try NLTK stopwords if available; fall back to a small built-in list otherwise
try:
    import nltk
    from nltk.corpus import stopwords
    nltk.data.find('corpora/stopwords')
    it_stop = set(stopwords.words('italian'))
    print("Using NLTK Italian stopwords.")
except Exception:
    it_stop = {
        "a","ad","al","alla","alle","allo","ai","agli","agli","all","con","col","coi","come",
        "da","dal","dalla","dalle","dallo","dei","del","della","delle","dello",
        "di","e","ed","gli","i","il","in","la","le","lo","l","ma","mi","ne","nei","nella",
        "nelle","nello","no","non","o","ogni","per","piu","poi","qua","quale","quali","quello",
        "quella","quelle","quelli","se","sia","siano","si","sono","sul","sulla","sulle","sullo",
        "su","tra","un","una","uno","voi","noi","voi","è","e'","d","l","s","c","t"
    }
    print("Using fallback Italian stopwords (compact list).")

def normalize_text(s: str) -> str:
    s = s.lower()
    # keep letters+spaces, replace others with space
    s = re.sub(r"[^a-zàèéìòóùç\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def preprocess(s: str) -> str:
    s = normalize_text(s)
    tokens = [tok for tok in s.split() if tok not in it_stop and len(tok) > 2]
    return " ".join(tokens)

print("Preprocessing sample:")
pdf["remit_clean"] = pdf["remittance"].astype(str).apply(preprocess)
pdf[["remittance", "remit_clean"]].head(5)

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans

# TF-IDF with modest bounds to keep matrix small
vectorizer = TfidfVectorizer(max_df=0.8, min_df=5)  # tweak if your sample is very small/large
X = vectorizer.fit_transform(pdf["remit_clean"])
print("TF-IDF shape:", X.shape)

# Choose a small k; you can iterate later
K = 8
print(f"Training KMeans with k={K}…")
kmeans = KMeans(n_clusters=K, random_state=42, n_init="auto")
kmeans.fit(X)

pdf["cluster"] = kmeans.labels_
print("Cluster distribution:")
print(pdf["cluster"].value_counts().sort_index())

In [None]:
import numpy as np

terms = vectorizer.get_feature_names_out()
order_centroids = np.argsort(kmeans.cluster_centers_, axis=1)[:, ::-1]

TOP_N = 10
SAMPLES_PER_CLUSTER = 3

for i in range(K):
    print("\n" + "-"*60)
    print(f"Cluster {i} | size={ (pdf['cluster']==i).sum() }")
    top_words = [terms[idx] for idx in order_centroids[i, :TOP_N]]
    print("Top words:", ", ".join(top_words))

    examples = pdf.loc[pdf["cluster"]==i, "remittance"].head(SAMPLES_PER_CLUSTER).tolist()
    for e in examples:
        print("  •", e[:200].replace("\n"," "))

In [None]:
# Save cluster assignments and top words in project workspace (local)
assign_path = "remittance_clusters_sample.csv"
pdf[["pa","ragione_sociale","remittance","cluster"]].to_csv(assign_path, index=False, encoding="utf-8")
print(f"Wrote: {assign_path}")

# Top words table
rows = []
for i in range(K):
    top_words = [terms[idx] for idx in order_centroids[i, :10]]
    rows.append({"cluster": i, "top_words": ", ".join(top_words)})

import pandas as pd
pd.DataFrame(rows).to_csv("cluster_top_words.csv", index=False, encoding="utf-8")
print("Wrote: cluster_top_words.csv")
