### Predisposizione del Cluster Spark

In [None]:
Innanzitutto ho predisposto un cluster di macchine virtuali mediante Oracle VirtualBox, così organizzato:

- master@192.168.56.101 ->  nodo master | Ram: 4 gb | CPU: 6 | HHD: 25 gb | Version: Ubuntu 16.04
- node1@192.168.56.102  ->  nodo worker | Ram: 4 gb | CPU: 6 | HHD: 25 gb | Version: Ubuntu 16.04
- node2@192.168.56.103  ->  nodo worker | Ram: 4 gb | CPU: 6 | HHD: 25 gb | Version: Ubuntu 16.04
Su queste macchine ho aperto le connessioni SSH ed ho provveduto ad installare Apache Spark 3.1.2

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import pandas as pd
import numpy as np

In [None]:
app_name = 'ANAC_Application'
master_ip = 'spark://192.168.56.101:7077'
local = 'local[*]'


sc = SparkSession.builder.appName(app_name).master(master_ip).\
        config("spark.executor.memory", '2g').\
        config("spark.driver.memory", '2g').getOrCreate()

In [None]:
cig_2021_01 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_01.csv', 
    header=True,sep=';').cache()
cig_2021_02 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_02.csv', 
    header=True,sep=';').cache()
cig_2021_03 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_03.csv', 
    header=True,sep=';').cache()
cig_2021_04 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_04.csv', 
    header=True,sep=';').cache()
cig_2021_05 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_05.csv', 
    header=True,sep=';').cache()
cig_2021_06 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_06.csv', 
    header=True,sep=';').cache()
cig_2021_07 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_07.csv', 
    header=True,sep=';').cache()
cig_2021_08 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_08.csv', 
    header=True,sep=';').cache()
cig_2021_09 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_09.csv', 
    header=True,sep=';').cache()
cig_2021_10 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_10.csv', 
    header=True,sep=';').cache()
cig_2021_11 = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/cig_csv_2021_11.csv', 
    header=True,sep=';').cache()

In [None]:
cig2021 = cig_2021_01.union(cig_2021_02).union(cig_2021_03).union(cig_2021_04).\
        union(cig_2021_05).union(cig_2021_06).union(cig_2021_07).\
        union(cig_2021_08).union(cig_2021_09).union(cig_2021_10).union(cig_2021_11)

In [None]:
sc.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
cig2021_wrk = cig2021.withColumn("mese_pubblicazione_des", date_format(to_date(cig2021.mese_pubblicazione, 'MM'), 'MMMM'))
cig2021_wrk = cig2021_wrk.withColumn("mese_pubblicazione_cod", col("mese_pubblicazione").cast("integer"))
cig2021_wrk = cig2021_wrk.withColumn("data_pubblicazione", to_date(cig2021.data_pubblicazione))

cig2021_wrk = cig2021_wrk.withColumn("importo_lotto", col('importo_lotto').cast(DoubleType()))
cig2021_wrk = cig2021_wrk.withColumn("importo_complessivo_gara", col('importo_complessivo_gara').cast(DoubleType()))
cig2021_wrk = cig2021_wrk.withColumn("n_lotti_componenti", col('n_lotti_componenti').cast('integer'))

cig2021 = cig2021_wrk.select("cig"
    , "cf_amministrazione_appaltante"
    , "data_pubblicazione"
    , "mese_pubblicazione_cod"
    , "mese_pubblicazione_des"
    , "oggetto_lotto"
    , "oggetto_gara"
    , "settore"
    , "sezione_regionale"
    , "modalita_realizzazione"
    , "tipo_scelta_contraente"
    , "importo_lotto"
    , "n_lotti_componenti"
    , "importo_complessivo_gara")

cig2021.createOrReplaceTempView("F_BANDI_CIG")
cig2021.printSchema()

In [None]:
print('Distribuzione modalità di realizzazione\n')
for i in cig2021.groupby('modalita_realizzazione').count().sort('count', ascending=False).collect():
    print(f"{i['count']}\t {i['modalita_realizzazione']}")

In [None]:
print('Distribuzione tipo scelta contraente\n')

for i in cig2021.groupby('tipo_scelta_contraente').count().sort('count', ascending=False).collect():
    print(f"{i['count']}\t {i['tipo_scelta_contraente']}")

In [None]:
cig2021 = cig2021.withColumn("tipo_scelta_contraente_rid", when(cig2021.tipo_scelta_contraente.like('AFFIDAMENTO DIRETTO%'), 'AFFIDAMENTO DIRETTO').\
                when(cig2021.tipo_scelta_contraente.like('PROCEDURA NEGOZIATA%'), 'PROCEDURA NEGOZIATA').\
                when(cig2021.tipo_scelta_contraente.like('PROCEDURA APERTA%'), 'PROCEDURA APERTA').otherwise('ALTRO'))

In [None]:
for i in cig2021.groupby('tipo_scelta_contraente_rid').count().sort('count', ascending=False).collect():
    print(f"{i['count']}\t {i['tipo_scelta_contraente_rid']}")

In [None]:
cig2021.filter(
    col('importo_complessivo_gara') < col('importo_lotto')).\
        select('cig','importo_lotto', 'importo_complessivo_gara', 'oggetto_lotto', 'oggetto_gara').show()

n_bandi = cig2021.count()
n_row_warning_lotto =  cig2021.filter(col('importo_complessivo_gara') < col('importo_lotto')).count()

print('''
Su {} bandi ce ne sono {} che hanno un importo importo lotto non congruente rispetto all'importo complessivo della gara
'''.format(n_bandi, n_row_warning_lotto))

In [None]:
stazioni_appaltanti = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/stazioni-appaltanti_csv_0.csv', 
    header=True, sep=';').filter(col('citta_codice')!='').cache()

stazioni_appaltanti = stazioni_appaltanti.select("cap"
, "citta_codice"
, "citta_nome"
, "codice_fiscale"
, "denominazione"
, "natura_giuridica_descrizione"
, "partita_iva"
, "provincia_codice"
, "provincia_nome"
, "stato")
stazioni_appaltanti.createOrReplaceTempView("D_STAZIONI_APPALTANTI")
stazioni_appaltanti.show(5)

In [None]:
print("Prime 10 stazioni appaltanti per numero bandi \n")

for i in cig2021.groupby(
            'cf_amministrazione_appaltante').count().sort(
                col('count').desc()).limit(10).join(
                    stazioni_appaltanti, 
                    stazioni_appaltanti.codice_fiscale == cig2021.cf_amministrazione_appaltante, 
                    'inner').select('citta_nome', 'denominazione', 'count').collect():
    print("{} bandi --> |{}  --> |{}".format(i['count'], i['citta_nome'], i['denominazione']))

In [None]:
aggiudicazioni = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/aggiudicazioni_csv_0.csv', 
                             header=True, sep=';').cache()

aggiudicazioni_wrk = aggiudicazioni.withColumn("numero_offerte_ammesse", col('numero_offerte_ammesse').cast('integer'))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("numero_offerte_escluse", col('numero_offerte_escluse').cast('integer'))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("num_imprese_offerenti", col('num_imprese_offerenti').cast('integer'))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("num_imprese_invitate", col('num_imprese_invitate').cast('integer'))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("num_imprese_richiedenti", col('num_imprese_richiedenti').cast('integer'))

aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("importo_aggiudicazione", col('importo_aggiudicazione').cast(DoubleType()))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("massimo_ribasso", col('massimo_ribasso').cast(DoubleType()))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("minimo_ribasso", col('minimo_ribasso').cast(DoubleType()))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("ribasso_aggiudicazione", col('ribasso_aggiudicazione').cast(DoubleType()))

aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("data_aggiudicazione_definitiva", to_date(aggiudicazioni_wrk.data_aggiudicazione_definitiva))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn("data_comunicazione_esito", to_date(aggiudicazioni_wrk.data_comunicazione_esito))
aggiudicazioni_wrk = aggiudicazioni_wrk.withColumn('mese_aggiudicazione_des', date_format(col('data_aggiudicazione_definitiva'),'MMMMM'))

aggiudicazioni = aggiudicazioni_wrk.select("asta_elettronica"
    , "cig"
    , "criterio_aggiudicazione"
    , "data_comunicazione_esito"
    , "data_aggiudicazione_definitiva"
    , "esito"
    , "flag_subappalto"
    , "id_aggiudicazione"
    , "importo_aggiudicazione"
    , "num_imprese_invitate"
    , "num_imprese_offerenti"
    , "num_imprese_richiedenti"
    , "numero_offerte_ammesse"
    , "numero_offerte_escluse"
    , "massimo_ribasso"
    , "minimo_ribasso"
    , "ribasso_aggiudicazione"
    , "mese_aggiudicazione_des")
aggiudicazioni.createOrReplaceTempView("F_AGGIUDICAZIONI")

In [None]:
aggiudicatari = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/aggiudicatari_csv_0.csv', 
                             header=True, sep=';').cache()
aggiudicatari = aggiudicatari.withColumn('tipo_soggetto_rid', split(col('tipo_soggetto'),' ')[0])
#aggiudicatari = aggiudicatari.withColumn('denominazione', regexp_replace(col('denominazione'), ".", ""))

# tagliare il contenuto di tipo_soggetto (rimuovi le parentesi)
aggiudicatari.createOrReplaceTempView("F_AGGIUDICATARI")
aggiudicatari.dtypes

In [None]:
query = '''
select a.tipo_soggetto_rid
    , count(distinct a.cig) as n_bandi
from  F_AGGIUDICATARI a
inner join f_bandi_cig b
on a.cig = b.cig
group by a.tipo_soggetto_rid
order by 2 desc

'''

sc.sql(query).show()

for i in aggiudicatari.select('tipo_soggetto').distinct().collect():
    print(i.tipo_soggetto)


In [None]:
stato_avanzamento = sc.read.csv(
    'file:///usr/local/spark/data/anac_anticorruzione/stati-avanzamento_csv_0.csv', 
                             header=True, sep=';').cache()

stato_avanzamento_wrk = stato_avanzamento.withColumn("progressivo_sal", col('progressivo_sal').cast('integer'))
stato_avanzamento = stato_avanzamento.withColumn("importo_sal", col('importo_sal').cast(DoubleType()))
stato_avanzamento.createOrReplaceTempView("F_STATO_AVANZAMENTO")
stato_avanzamento.dtypes

## Flag sospetto

Uno degli obiettivi che mi sono posto è quello di individuare un set di bandi "sospetti", ovvero caratterizzati da incongruenze sui dati (come dati mancanti) oppure da situazioni anomale

Individuato il bando sospetto, si potrà passare all'osservazione più dettagliata dello stesso. 

Un lavoro di monitoraggio di questo tipo può portare ad avere una maggiore qualità dei dati pubblici, in conformità alla dottrina dell'Open Government che si basa sul principio per il quale tutte le attività dei Governi e delle Pubbliche Amministrazioni devono essere trasparenti, aperte e disponibili


Assunzioni per la valorizzazione del **Flag Sospetto**:
- **Numero di offerenti non indicati**: un numero non congruente di offerte rispetto ad un bando può determinare il fatto che ci sia poca competizione in una determinata gara. Non avere l'informazione non ci consente di giudicare possibili casistiche del genere

- **Prima gara vinta**: Indicatore di "novità" nel sistema. Purtroppo non abbiamo ulteriori informazioni, 

- **Elevata differenza percentuale tra l'importo di aggiudicazione e l'importo complessivo della gara**


In [None]:
# Numero offerenti non indicati
query = '''
select distinct cig as cig 
from (select b.cig
            , first(num_imprese_invitate) as num_imprese_invitate
            , first(num_imprese_richiedenti)  as num_imprese_richiedenti
            , first(num_imprese_offerenti) as num_imprese_offerenti
            , count(*) as n_row
        from  F_aggiudicazioni a
        inner join f_bandi_cig b
        on a.cig = b.cig
        group by b.cig)
where num_imprese_offerenti = 0

'''

offerenti_non_indicati = sc.sql(query)
offerenti_non_indicati.createOrReplaceTempView("WRN_OFFERENTI")

In [None]:
# Prima gara vinta
query = '''

select distinct c.cig as cig
from f_aggiudicatari a
inner join (select codice_fiscale
        , count(*) as n_gare_vinte
    from f_aggiudicatari 
    group by codice_fiscale
    having n_gare_vinte = 1) b
on a.codice_fiscale = b.codice_fiscale
inner join f_bandi_cig c
on a.cig = c.cig
'''

prima_gara_vinta = sc.sql(query)
prima_gara_vinta.createOrReplaceTempView("WRN_PRIMA_GARA")

In [None]:
# Elevata differenza percentuale tra l'importo di aggiudicazione e l'importo complessivo della gara
query = '''
select distinct cig as cig
from (select c.cig
        , a.denominazione as nome_aggiudicatario
        , b.importo_aggiudicazione
        , d.denominazione as nome_stazione_appaltante
        , c.oggetto_gara
        , c.importo_complessivo_gara
        , b.importo_aggiudicazione/c.importo_complessivo_gara as diff_perc
    from  F_aggiudicatari a
    inner join f_aggiudicazioni b
    on a.id_aggiudicazione = b.id_aggiudicazione
    inner join f_bandi_cig c
    on a.cig = c.cig
    inner join D_STAZIONI_APPALTANTI d
    on d.codice_fiscale = c.cf_amministrazione_appaltante)
where diff_perc > 1
'''


diff_perc_aggiudicaz_bando = sc.sql(query)
diff_perc_aggiudicaz_bando.createOrReplaceTempView("WRN_IMPORTO_AGGIUDICAZ")

In [None]:
warnings = offerenti_non_indicati.union(prima_gara_vinta).union(diff_perc_aggiudicaz_bando).distinct()
warnings.createOrReplaceTempView("WRN_TOTALI")

In [None]:
query = '''

select t1.*
    , case when t2.cig is not null then 'SOSPETTO' else 'OK' end as flag_sospetto
from (select c.cig
        , a.denominazione as nome_aggiudicatario
        , b.importo_aggiudicazione
        , d.denominazione as nome_stazione_appaltante
        , d.citta_nome
        , c.oggetto_gara
        , c.importo_complessivo_gara
        , c.mese_pubblicazione_cod
        , c.mese_pubblicazione_des
        , c.data_pubblicazione
        , b.num_imprese_invitate
        , b.num_imprese_offerenti
        
    from  F_aggiudicatari a
    inner join f_aggiudicazioni b
    on a.id_aggiudicazione = b.id_aggiudicazione
    inner join f_bandi_cig c
    on a.cig = c.cig
    inner join D_STAZIONI_APPALTANTI d
    on d.codice_fiscale = c.cf_amministrazione_appaltante) t1
left join WRN_TOTALI t2
on t1.cig = t2.cig

'''

percorso_del_bando = sc.sql(query)