In [1]:
import os
# 1. Imposta il percorso di Java 11 (usa l'output del comando sopra)
java_home_path = "/Library/Java/JavaVirtualMachines/openjdk-11.jdk/Contents/Home"
os.environ["JAVA_HOME"] = java_home_path

# 2. Aggiungi Java al PATH
os.environ["PATH"] = f"{java_home_path}/bin:{os.environ['PATH']}"

# 3. Verifica (opzionale)
!java -version



java version "21.0.2" 2024-01-16 LTS
Java(TM) SE Runtime Environment (build 21.0.2+13-LTS-58)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.2+13-LTS-58, mixed mode, sharing)


In [None]:
###JAVA MAC
import os
from pyspark.sql import SparkSession

# 1. Configurazione ambiente Java e Spark
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/openjdk-11.jdk/Contents/Home"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.mongodb.spark:mongo-spark-connector_2.12:10.5.0 pyspark-shell"

# 2. Inizializzazione Spark con configurazione esplicita
spark = SparkSession.builder \
    .appName("ClinicalDataAnalysis") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.5.0") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/cartella_clinica_db.interventi") \
    .getOrCreate()

# 3. Lettura dati con sintassi corretta
df = spark.read \
  .format("mongodb") \
  .option("uri", "mongodb://localhost:27017") \
  .option("database", "cartella_clinica_db") \
  .option("collection", "interventi") \
  .load()

# Verifica
df.show(10)

In [None]:
### JAVA WINDOWS

import os
# 1. Imposta il percorso di Java 11 (usa l'output del comando sopra)
java_home_path = r"C:\Program Files\Java\jdk-11"
os.environ["HADOOP_HOME"] = r"C:\Program Files\hadoop"
os.environ["JAVA_HOME"] = java_home_path

# 2. Aggiungi Java al PATH
os.environ["PATH"] = f"{java_home_path}\\bin;{os.environ['PATH']}"

# 3. Verifica (opzionale)
!java -version

java version "11.0.27" 2025-04-15 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.27+8-LTS-232)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.27+8-LTS-232, mixed mode)


In [10]:
#### SPARK SARA
import os
from pyspark.sql import SparkSession

# 1. Configurazione ambiente Spark
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.mongodb.spark:mongo-spark-connector_2.12:10.1.1 pyspark-shell"

# 2. Inizializzazione Spark con configurazione esplicita
spark = SparkSession.builder \
    .appName("ClinicalDataAnalysis") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.5.0") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/cartella_clinica_db.interventi") \
    .getOrCreate()

# 3. Lettura dati con sintassi corretta
df = spark.read \
  .format("mongodb") \
  .option("uri", "mongodb://localhost:27017") \
  .option("database", "cartella_clinica_db") \
  .option("collection", "interventi") \
  .load()

# Verifica
df.show(10)



+--------------------+--------------------+-----------+-----+-------+---------------+----------------+--------------+-------------+-----------------+---------------------+--------------------+----------+-------------+-----------------+------------------+-------------+---------------+---------------------+------------------+---------------+--------------------------+----------------+--------------------+-------------------+-------------+---------------+---------------------+--------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-------------+--------------------+------------+----------------------+----------------------+----------------+---------------------+--------------------+--------------------+-------------------+----------------------+--------------------------+-----------------------+--------------------+---------+-----------------+-------------------+----------------------+--------------------+------------

# Tempo di servizio ambulanza

In [35]:
from pyspark.sql.functions import unix_timestamp, col, round

# Converte gli orari in timestamp e calcola la differenza in minuti
df_tempo = df.withColumn("partenza_ts", unix_timestamp(col("ora_partenza_ambulanza"), "HH:mm")) \
             .withColumn("arrivo_ts", unix_timestamp(col("ora_arrivo_ps"), "HH:mm")) \
             .withColumn("chiamata_ts", unix_timestamp(col("ora_chiamata"), "HH:mm")) \
             .withColumn("ora_intervento", unix_timestamp(col("ora_arrivo_sul_posto"), "HH:mm")) \
             .withColumn("durata_minuti", round((col("arrivo_ts") - col("partenza_ts")) / 60)) \
             .withColumn("tempo_di_intervento", round((col("ora_intervento") - col("chiamata_ts")) / 60))

# Mostra i tempi di occupazione
df_tempo.select("ora_partenza_ambulanza", "ora_arrivo_ps", "durata_minuti", "ora_chiamata", "ora_arrivo_sul_posto", "tempo_di_intervento") \
        .orderBy(col("durata_minuti").desc()) \
        .show(10)

# Salva in CSV
df_tempo.select("ora_partenza_ambulanza", "ora_arrivo_ps", "durata_minuti", "ora_chiamata", "ora_arrivo_sul_posto", "tempo_di_intervento") \
        .toPandas() \
        .to_csv("output/tempi_occupazione_ambulanza.csv", index=False)



+----------------------+-------------+-------------+------------+--------------------+-------------------+
|ora_partenza_ambulanza|ora_arrivo_ps|durata_minuti|ora_chiamata|ora_arrivo_sul_posto|tempo_di_intervento|
+----------------------+-------------+-------------+------------+--------------------+-------------------+
|                 14:25|        15:05|         40.0|       14:22|               14:33|               11.0|
|                 08:45|        09:24|         39.0|       08:42|               08:53|               11.0|
|                 09:23|        10:00|         37.0|       09:20|               09:30|               10.0|
|                 09:48|        10:25|         37.0|       09:45|               09:55|               10.0|
|                 14:33|        15:10|         37.0|       14:30|               14:40|               10.0|
|                 10:18|        10:49|         31.0|       10:15|               10:24|                9.0|
|                 14:15|        14:44

# Tempo di servizio ambulanza per città (per vedere se ci sono città più lente o più veloci di altre)

In [37]:
from pyspark.sql.functions import unix_timestamp, col, round, when, count, sum

# Calcola la durata in minuti
df = df.withColumn("chiamata_ts", unix_timestamp(col("ora_chiamata"), "HH:mm")) \
       .withColumn("arrivo_ts", unix_timestamp(col("ora_arrivo_sul_posto"), "HH:mm")) \
       .withColumn("durata_minuti", round((col("arrivo_ts") - col("chiamata_ts")) / 60))

# Definisci soglie per rapido e lento
soglia_rapido = 8
soglia_lento = 20

# Aggiungi colonne per classificare
df = df.withColumn("is_rapido", when(col("durata_minuti") <= soglia_rapido, 1).otherwise(0)) \
       .withColumn("is_lento", when(col("durata_minuti") >= soglia_lento, 1).otherwise(0))

# Raggruppa per città e calcola totali e percentuali
df_riepilogo = df.groupBy("citta").agg(
    count("*").alias("totale_interventi"),
    sum("is_rapido").alias("interventi_rapidi"),
    sum("is_lento").alias("interventi_lenti")
)

df_percentuali = df_riepilogo.withColumn(
    "percentuale_rapidi",
    round((col("interventi_rapidi") / col("totale_interventi")) * 100, 2)
).withColumn(
    "percentuale_lenti",
    round((col("interventi_lenti") / col("totale_interventi")) * 100, 2)
)

# Filtra le città senza interventi rapidi né lenti
df_filtrato = df_percentuali.filter(
    (col("interventi_rapidi") > 0) | (col("interventi_lenti") > 0)
)

# Mostra il risultato
df_filtrato.orderBy("citta").show(truncate=False)

# Salva solo le città con dati rilevanti
df_filtrato.toPandas().to_csv("output/distribuzione_ambulanze_estreme.csv", index=False)




+-------+-----------------+-----------------+----------------+------------------+-----------------+
|citta  |totale_interventi|interventi_rapidi|interventi_lenti|percentuale_rapidi|percentuale_lenti|
+-------+-----------------+-----------------+----------------+------------------+-----------------+
|Arezzo |2                |2                |0               |100.0             |0.0              |
|Bologna|2                |1                |0               |50.0              |0.0              |
|Empoli |1                |1                |0               |100.0             |0.0              |
|Firenze|3                |1                |0               |33.33             |0.0              |
|Milano |2                |1                |0               |50.0              |0.0              |
|Parma  |3                |1                |0               |33.33             |0.0              |
+-------+-----------------+-----------------+----------------+------------------+-----------------+


# Analitica Area Geografica

In [27]:
from pyspark.sql.functions import col
import os

# Assicurati che la cartella di output esista
os.makedirs("output", exist_ok=True)

# Calcola top 10 città con più interventi
top_cities = df.groupBy("citta") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

spark.conf.set("spark.hadoop.io.native.lib.available", "false")

# Trasformo in pandas
top_cities_pd = top_cities.toPandas()

# Salvo in CSV
top_cities_pd.to_csv("output/top_citta_interventi.csv", index=False)

