In [45]:
import csv
from datetime import datetime
import os
import re

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, IntegerType
from pyspark.sql.functions import current_date, col, to_date, when, current_timestamp, monotonically_increasing_id, lit, upper

In [57]:
# Creo SparkSession
spark = SparkSession.builder.master("local[1]").appName("FlussoDipendenti").getOrCreate()

# Definisco lo schema come stringhe inizialmente (come in SQL)
# StructType = insieme di colonne (struttura della tabella)
# StructField = definizione di una singola colonna, con nome, tipo e possibilità di avere valori null
schema = StructType([
    StructField("CF", StringType(), True),
    StructField("NOME", StringType(), True),
    StructField("DN", StringType(), True),
    StructField("SALARIO", StringType(), True)
])

# Leggo il CSV
df_raw = spark.read.csv("Flusso.csv",
    header=True,                        # la prima riga del CSV contiene i nomi delle colonne
    schema=schema                       # nomi colonne
)

# Aggiungo la colonna DINS con la data corrente
df_raw = df_raw.withColumn("DINS", current_date()) # oppure current_timestamp() per vedere data ed ora. 
df_raw.show()

+-----------------+-----------------+-----------+-------+----------+
|               CF|             NOME|         DN|SALARIO|      DINS|
+-----------------+-----------------+-----------+-------+----------+
| RSSMRA85M01H501Z|      Mario Rossi| 1985-03-15|   2500|2025-09-25|
| RSSMRA85M01H501Z|      Mario Rossi| 1985-03-15|  -2500|2025-09-25|
| BNCLGU90B22F205X|     Luca Bianchi| 1990-07-22|   3200|2025-09-25|
| VRDLCN80C15D612Y|    Claudia Verdi| 15/08/1980|   2800|2025-09-25|
| NGRMRT75D10E345W|     Martina Neri| 1975-12-10|   3100|2025-09-25|
| FBLGPP88A12H501V|    Filippo Baldi| 1988-01-12|   2700|2025-09-25|
| DMRCNZ92E18F205U|   Daniele Moroni|1992-06-188|   3000|2025-09-25|
|  SPRTMN7820G345T|Simone Sportiello| 1978-20-20|   -500|2025-09-25|
| GRLFRN83B25H501S|    Giorgia Ferri| 1983-02-25|   2900|2025-09-25|
| CNCLNZ87C30D612R|   Concetta Lanza| 30-06-1987|    fff|2025-09-25|
| PPLGRD91D05E345Q|   Paolo Pugliese| 1991-05-05|   3400|2025-09-25|
| LDDRCR99R14F205U|   Riccardo Lod

In [60]:
from pyspark.sql.functions import col

In [47]:
# Nome del file di log
log_file = 'tlog.csv'

# Se il file non esiste, creo l'intestazione
if not os.path.exists(log_file):
    with open(log_file, mode='w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['IDRUN', 'IDTLOG', 'CALLER', 'TESTO', 'DINS'])

# Variabile globale per IDTLOG
idtlog_counter = 1

def plog(caller, testo, idrun):
    global idtlog_counter
    with open(log_file, mode='a', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([
            idrun,
            idtlog_counter,
            caller.upper(),
            testo.upper(),
            datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        ])
    idtlog_counter += 1

# Esempio di utilizzo
idrunn = 1
plog("PLOAD_DIP", "Inizio procedura", idrunn)
plog("PLOAD_DIP", "Righe inserite in DIP00_OK: 25", idrunn)
plog("PLOAD_DIP", "Fine procedura", idrunn)


In [48]:


schema_log = StructType([
    StructField("IDRUN", IntegerType(), True),
    StructField("IDTLOG", IntegerType(), True),
    StructField("CALLER", StringType(), True),
    StructField("TESTO", StringType(), True),
    StructField("DINS", StringType(), True)  # o TimestampType() se vuoi convertirlo subito
])
df_log = spark.read.csv("tlog.csv", header=True, schema=schema_log)
df_log.show(truncate=False)

+-----+------+---------+------------------------------------+-------------------+
|IDRUN|IDTLOG|CALLER   |TESTO                               |DINS               |
+-----+------+---------+------------------------------------+-------------------+
|1    |1     |PLOAD_DIP|INIZIO PROCEDURA                    |2025-09-25 14:09:21|
|1    |2     |PLOAD_DIP|RIGHE INSERITE IN DIP00_OK: 25      |2025-09-25 14:09:21|
|1    |3     |PLOAD_DIP|FINE PROCEDURA                      |2025-09-25 14:09:21|
|1    |4     |PLOAD_DIP|INIZIO PROCEDURA                    |2025-09-25 14:16:29|
|1    |5     |PLOAD_DIP|TABELLE DIP00_OK E DIP00_KO TRONCATE|2025-09-25 14:16:29|
|1    |6     |PLOAD_DIP|RIGHE INSERITE IN DIP00_OK: 8       |2025-09-25 14:16:29|
|1    |7     |PLOAD_DIP|RIGHE INSERITE IN DIP00_KO: 14      |2025-09-25 14:16:29|
|1    |8     |PLOAD_DIP|FINE PROCEDURA                      |2025-09-25 14:16:29|
|1    |9     |PLOAD_DIP|INIZIO PROCEDURA                    |2025-09-25 14:24:05|
|1    |10    |PL

In [49]:


def is_valid_row(row):
    # SALARIO numerico e positivo
    try:
        salario = float(row['SALARIO'])
        if salario <= 0:
            return False
    except (ValueError, TypeError):
        return False

    # DN formato YYYY-MM-DD
    if not row['DN'] or not re.match(r'^\d{4}-\d{2}-\d{2}$', row['DN']):
        return False

    # NOME non vuoto e senza numeri
    if not row['NOME'] or re.search(r'\d', row['NOME']):
        return False

    # Controllo giorno e mese validi
    month = int(row['DN'][5:7])
    day = int(row['DN'][8:10])
    if month < 1 or month > 12 or day < 1 or day > 31:
        return False

    return True

In [50]:
def load_dip(df_raw, idrun, log_func):
    dip_ok_file = 'DIP00_OK.csv'
    dip_ko_file = 'DIP00_KO.csv'
    
    # Svuoto i file se esistono
    for f in [dip_ok_file, dip_ko_file]:
        with open(f, 'w', newline='') as fh:
            writer = csv.writer(fh)
            writer.writerow(['IDRUN','CF','NOME','DN','SALARIO','DINS'])
    
    log_func('PLOAD_DIP', 'Tabelle DIP00_OK e DIP00_KO troncate', idrun)
    
    count_ok = 0
    count_ko = 0
    for row in df_raw.collect():
        row_dict = row.asDict()
        row_dict['DINS'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        # Filtra dati validi
        if is_valid_row(row_dict):
            with open(dip_ok_file, 'a', newline='') as f_ok:
                writer = csv.writer(f_ok)
                writer.writerow([idrunn, row_dict['CF'], row_dict['NOME'], row_dict['DN'], row_dict['SALARIO'], row_dict['DINS']])
            count_ok += 1
        else:
            with open(dip_ko_file, 'a', newline='') as f_ko:
                writer = csv.writer(f_ko)
                writer.writerow([idrunn, row_dict['CF'], row_dict['NOME'], row_dict['DN'], row_dict['SALARIO'], row_dict['DINS']])
            count_ko += 1
    
    log_func('PLOAD_DIP', f'Righe inserite in DIP00_OK: {count_ok}', idrun)
    log_func('PLOAD_DIP', f'Righe inserite in DIP00_KO: {count_ko}', idrun)
    log_func('PLOAD_DIP', 'Fine procedura', idrun)
    
    return dip_ok_file, dip_ko_file


In [62]:
dip_ok_file, dip_ko_file = load_dip(df_raw, idrunn, plog)

In [64]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, TimestampType

# Schema per i file dip_ok / dip_ko
schema_result = StructType([
    StructField("IDRUN", IntegerType(), True),
    StructField("CF", StringType(), True),
    StructField("NOME", StringType(), True),
    StructField("DN", StringType(), True),             # lo lasciamo stringa, poi puoi convertirlo
    StructField("SALARIO", IntegerType(), True),        # stessa logica, castabile dopo
    StructField("DINS", StringType(), True)            # o TimestampType se preferisci
])


# Leggo il file degli OK
df_ok = spark.read.csv(
    "dip00_ok.csv",
    header=True,
    schema=schema_result
)

# Leggo il file degli KO
df_ko = spark.read.csv(
    "dip00_ko.csv",
    header=True,
    schema=schema_result
)

# Mostro le due "tabelle"
print("=== DIP_OK ===")
df_ok.show(truncate=False)

print("=== DIP_KO ===")
df_ko.show(truncate=False)


=== DIP_OK ===
+-----+----------------+--------------+----------+-------+-------------------+
|IDRUN|CF              |NOME          |DN        |SALARIO|DINS               |
+-----+----------------+--------------+----------+-------+-------------------+
|1    |RSSMRA85M01H501Z|Mario Rossi   |1985-03-15|2500   |2025-09-25 15:00:20|
|1    |BNCLGU90B22F205X|Luca Bianchi  |1990-07-22|3200   |2025-09-25 15:00:20|
|1    |NGRMRT75D10E345W|Martina Neri  |1975-12-10|3100   |2025-09-25 15:00:20|
|1    |FBLGPP88A12H501V|Filippo Baldi |1988-01-12|2700   |2025-09-25 15:00:20|
|1    |GRLFRN83B25H501S|Giorgia Ferri |1983-02-25|2900   |2025-09-25 15:00:20|
|1    |PPLGRD91D05E345Q|Paolo Pugliese|1991-05-05|3400   |2025-09-25 15:00:20|
|1    |NGRMRT75D10E345W|Maria Santa   |1975-10-10|3100   |2025-09-25 15:00:20|
+-----+----------------+--------------+----------+-------+-------------------+

=== DIP_KO ===
+-----+---------------------------------+-----------------+-----------+-------+-------------------+


In [66]:
df_filtrato = df_ok.filter(col("SALARIO") > 3000)
df_filtrato.show()

+-----+----------------+--------------+----------+-------+-------------------+
|IDRUN|              CF|          NOME|        DN|SALARIO|               DINS|
+-----+----------------+--------------+----------+-------+-------------------+
|    1|BNCLGU90B22F205X|  Luca Bianchi|1990-07-22|   3200|2025-09-25 15:00:20|
|    1|NGRMRT75D10E345W|  Martina Neri|1975-12-10|   3100|2025-09-25 15:00:20|
|    1|PPLGRD91D05E345Q|Paolo Pugliese|1991-05-05|   3400|2025-09-25 15:00:20|
|    1|NGRMRT75D10E345W|   Maria Santa|1975-10-10|   3100|2025-09-25 15:00:20|
+-----+----------------+--------------+----------+-------+-------------------+

