redis2history è uno strumento per l'esecuzione di pipeline ETL (Extract, Transform, Load) utilizzando un approccio modulare e configurabile. Le pipeline sono costituite da stadi (stages), ognuno definito da un plugin modulare, che possono essere eseguiti in parallelo o sequenzialmente.
Questo progetto è stato sviluppato come parte della tesi di laurea triennale in Ingegneria Informatica ed Elettronica presso l'Università di Pavia, in collaborazione con l'azienda Siak Sistemi.
-
Installare le dipendenze:
pip install -r requirements.txt
-
Lanciare Redis e MongoDB:
Nel caso non si disponesse di istanze Redis e MongoDB esistenti, è possibile crearne di nuove utilizzando Docker Compose
docker compose up
Ispezionare il file
docker-compose.yaml
per verificare le porte dei container e la posizione del dump Redis -
Configurazione:
Crea un file di configurazione seguendo la struttura specificata in
config_example.json
-
Esegui la pipeline:
python main.py -c <config-file-path>
-c
,--config
: Percorso del file di configurazione (obbligatorio).-m
,--manual_order
: Esegui le pipeline nell'ordine definito nel file di configurazione.-e
,--export_run
: Salva i risultati di ogni stage della pipeline su file pickle.-dr
,--dry_run
: Verifica il file di configurazione e la struttura della pipeline senza eseguire le pipeline.
-
global
: Definisce variabili globali che possono essere referenziate dai pluginGli stage hanno la possibilità di richiedere parametri configurabili, questi vengono prima cercati nei parametri
args
, nel caso non venissero trovati qui, si fa fallback ai parametri specificati inglobal
-
redis
: Definisce i parametri di connessione a Redis,lock
specifica la chiave a cui applicare il lock, se non è presente il lock non viene impostatoInoltre,
lock
supporta valori dinamici del tipo:{{date:<formato>}}
: Data corrente nel formato specificato.{{env:<nome-variabile>}}
: Valore della variabile di ambiente.{{global:<nome-variabile>}}
: Variabile globale definita nella configurazione.
-
mongo
: Definisce i parametri di connessione a MongoDB -
pipelines
: Array di oggetti pipeline conid
: Identificatore univoco della pipeline.parallel
: Indica se gli stage della pipeline devono essere eseguiti in parallelo.source
: Identificativo della pipeline da cui prelevare i dati di ingresso (nella prima pipeline questo campo è nullo o assente).stages
Array di oggetti stage conid
: Identificatore univoco dello stage.script
: Nome dello script del plugin da eseguire.args
: Argomenti specifici dello stage
Le classi RedisClientProxy
e MongoClientProxy
sono classi proxy utilizzate per esporre i metodi delle rispettive classi client in un'interfaccia sicura per l'utilizzo con multiprocessing. Questo è necessario perché i metodi delle classi RedisClient
e MongoClient
potrebbero restituire tipi di dati non serializzabili (non pickable), come iteratori o generatori, che non possono essere passati direttamente tra i processi. Queste classi proxy consentono, se necessario, di trasformare l'oggetto ritornato in un tipo serializzabile sul main thread, prima di essere passati al processo worker
I plugin di downsampling riducono la frequenza dei dati temporali raggruppando i dati in intervalli di tempo e mantenendo solo il dato più rappresentativo per ogni intervallo. Utilizzando la funzione group_datetimes
per raggruppare i dati in intervalli di tempo specificati e quindi seleziona il dato più rappresentativo per ogni intervallo.
Calcolo del dato più rappresentativo
Per ogni intervallo...
- Se esistono data point "out of scope", viene selezionato l'ultimo di essi come rappresentante dell'intervallo.
- Viene calcolato il tempo totale di ciascuno stato all'interno dell'intervallo.
- Se esiste un "precedente stato", viene aggiunto il tempo di durata del primo dato dello stato all'interno dell'intervallo.
- Se nell'intervallo esistono altri data point con stato uguale al
prevStatus
del primo data point, allora anche questo contributo viene aggiunto allo stato - Viene selezionato il l'ultimo data point con lo stato a durata più lunga all'interno dell'intervallo.
Gli unit test si trovano nel file unittests.py
e possono essere eseguiti lanciando il comando
python -m unittest unittests