# BIG DATA PROJECT - eCommerce behavior data analysis

Il dataset sottoposto ad esame può essere recuperato al [seguente link](https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store?resource=download). Esso contiene dati comportamentali relativi ad eventi generati dagli utenti all’interno di un portale di e-commerce. Il dataset è relativo al mese di Novembre 2019, contiene più di 1 milione di record e ha una dimensione di 8.38GB. 
I record di cui il dataset si compone possono essere di diverse tipologie: 

- ```view```: un utente ha visualizzato un prodotto
- ```cart```: un utente ha inserito un prodotto nel carrello 
- ```purchase```: un utente ha acquistato un prdotto
- ```remove_from_cart```: un utente ha rimosso dal carrello un prodotto

Come anticipato, ogni riga del file identifica un evento. Tutti gli eventi fanno riferimento ad un interazione tra un utente e un prodotto disponibile sull'e-commerce.
I dati sono stati collezionati da [Open CDP](https://rees46.com/en/open-cdp) e sono stati utilizzati in conformità con la licenza *Data files © Original Authors*.

## Semantica dei dati e Struttura del file

Per poter comprendere la semantica che descrive i record memorizzati all'intero del dataset è possibile seguire il seguente schema: 

*L'utente ```user_id``` durante la sessione ```user_session``` ha aggiunto al carrello (```event_type``` di tipo ```cart```) il prodotto ```product_id``` del brand ```brand``` appartenente alla categoria ```category_code```  con prezzo 
```price```. L'evento è stato generato nel momento ```event_time```*.

Inoltre, esplorando il **file CSV** relativo al dataset è possibile identificare la seguente struttura:

- ```event_time```: momento in cui l'evento è avvenuto (UTC)
- ```event_type```: tipo di evento avvenuto (view, cart, purchase)
- ```product_id```: Id del prodotto interessato
- ```category_id```: Id della categoria a cui appartiene il prodotto
- ```category_code```: categoria di appartenenza del prodotto
- ```brand```: brand di apparteneza del prodotto
- ```price```: prezzo del prodotto
- ```user_id```: Id dell'utente che ha generato l'evento
- ```user_session```: Id della sessione utente. Questo ID di sessione rimane lo stesso per tutte le sessioni di quel particolare utente. Tuttavia, quando l'utente visita nuovamente l'e-commerce dopo una lunga pausa, l'ID di sessione viene modificato.

&Egrave; importante notare che una singola *user_session* possa includere più eventi, in quanto potrebbero essere stati acquistati più prodotti durante la sessione. Questa informazione sarà utile in fase di analisi.

## Finalità dell'analisi

L'obiettivo di progetto è quello di analizzare il dataset in modalità **batch**, al fine di estrarre informazioni utili sui dati raccolti, anche a fini statistici.
A seguito di un’analisi preliminare sui dati, sono emersi diversi interrogativi a cui si è cercato di dare risposta:

- qual è il numero di dei prodotti visualizzati (view)?
- qual è il numero dei prodotti aggiunti al carrello (cart)?
- qual è il numero dei prodotti acquistati (purchase)?
- qual è il numero degli ordini distinti?
- qual è il numero di acquisti effettuati per utente?
- quale, tra i prodotti venduti, è il meno costoso? quale il più costoso? qual è il range dei prezzi?
- quanti acquisti effettua ogni utente in media?
- qual è il prezzo medio relativo ai prodotti venduti?
- qual è il costo medio di un acquisto sapendo che un ordine può comprendere più prodotti?
- sulla base delle vendite effettuate, quante sono le vendite per ogni brand? 
- qual è l’incasso ottenuto da ogni brand?
- sulla base degli incassi dalle vendite, quali sono le categorie prodotto più fruttuose in termini di denaro e in termini di vendite totali (quindi le più popolari)? 
- sulla base dei prodotti, quanti di questi sono stati visti, aggiunti al carrello e acquistati? 
- per ogni categoria “principale”, quali sono le sottocategorie ad esse associate?
- come sono classificati i brand sulla base del ricavato totale?
- qual è la relazione tra le categorie di prodotti e la distribuzione dei brand, avendo applicato un criterio di classificazione sui brand usando come metrica la media dei prezzi dei prodotti venduti da ciascun brand?

L'approccio utilizzato seguirà quello evidenziato durante le lezioni del corso: in primo luogo si prenderanno in considerazione le tematiche relative alle diverse possibili configurazioni del cluster, richiamando le formule teoriche per definire la configurazione del cluster ottimale. Si approfondirà quindi il tuning in termini di CPU e memoria. Si entrerà poi nel merito del caching di un RDD e delle possibili ottimizzazioni. Nella fase successiva si effettuerà l'analisi vera e propria del dataset attraverso **query esplorative** e **query** più **complesse**. Per finire, i risultati ottenuti verranno caricati su **Tableau Desktop** al fine di visualizzarli e manipolarli graficamente.

## Configurazione del Cluster

Il servizio ```EMR (Elastic MapReduce)``` di **AWS** è un servizio di cloud computing che consente di eseguire e gestire in modo semplice e scalabile framework di elaborazione di dati come *Apache Hadoop* e *Apache Spark*. EMR semplifica la configurazione, la distribuzione e la gestione di **cluster** di grandi dimensioni per l'elaborazione e l'analisi di grandi quantità di dati. Il servizio gestisce automaticamente il monitoraggio, la scalabilità e la tolleranza ai guasti del cluster. 

EMR offre integrazioni con altri servizi AWS, ad esempio con ```Amazon S3``` per l'archiviazione dei dati, e con ```Amazon EC2``` per le istanze di calcolo. In particolare, **EC2 (Elastic Compute Cloud)** è un servizio di computing scalabile e flessibile in stretta relazione con EMR, che consente di creare e gestire istanze virtuali nel cloud AWS. Queste istanze sono macchine virtuali che eseguono un'ampia varietà di sistemi operativi, consentendo di eseguire applicazioni e servizi in un ambiente cloud altamente affidabile e scalabile.

### Cluster Nodes

Considerando la configurazione dei cluster utilizzata durante le lezioni, si deciso di riutilizzare il medesimo setup. Tramite EMR si è quindi definita una configurazione che si compone di ```un'istanza Master``` [m5.xlarge](https://aws.amazon.com/it/ec2/instance-types/m5/) con 4 vCore e 16GB di memoria RAM, e di ```due istanze Slave``` [m5.xlarge](https://aws.amazon.com/it/ec2/instance-types/m5/), entrambe con 4 vCore e 16GB di memoria RAM.

La versione di EMR utilizzata è la *6.10.0*. Le istanza virtuali del cluster sono state configurate con i seguenti pacchetti: 

- Hadoop 3.3.3
- Hive 3.1.3
- JupyterEnterpriseGateway 2.6.0
- Spark 3.3.1
- Livy 0.7.1

### Tuning numero di Core

In un cluster Spark, vengono eseguiti sulle diverse macchine disponibili degli ```executor```, processi responsabili dell'esecuzione delle attività di elaborazione dei dati. Essi lavorano insieme per elaborare i dati in modo parallelo e distribuito. Ogni executor esegue i compiti assegnati loro dal **driver** di Spark, che coordina l'elaborazione dell'applicazione Spark. Gli executor possono essere visti come i lavoratori del sistema, responsabili dell'esecuzione effettiva delle operazioni di trasformazione e azione sui dati.
Dal punto di vista teorico, si è osservato che una configurazione preferita per definire il *numero di executor* da allocare sulle macchine del cluster consiste nell'avere un numero limitato di executor con un elevato numero di core assegnati a ciascuno di essi. Questo approccio è vantaggioso perché, all'interno di ogni executor, la presenza di numerosi core consente l'esecuzione simultanea di più task.

Le linee guida per una configurazione efficiente di un cluster suggeriscono che il numero di core assegnati a ciascun executor dovrebbe essere compreso tra 3 e 5, lasciando almeno un core libero nelle macchine per il funzionamento del sistema operativo e di altri servizi.

Fatta questa premessa è possibile arrivare ad indicare quanti executor è possibile allocare all'interno del cluster: 
 - (core disponibili per nodo - un core per gli altri servizi)/ numero di core per executor => (4-1)/3 = **1 executor per nodo**
 
### Tuning Memoria RAM
Nel contesto della gestione della memoria, è importante considerare diversi aspetti. Quando si allocano grandi quantità di memoria agli executor, possono verificarsi problemi legati alla Garbage Collection. Pertanto, è consigliabile non superare i 64 GB di RAM per ogni executor, anche se questa considerazione non si applica alla configurazione attuale. Inoltre, è necessario assicurarsi di lasciare abbastanza memoria disponibile per eseguire altri processi sul sistema.

Per di più, è importante notare che la memoria richiesta non corrisponde esattamente alla quantità di memoria allocata da Spark: la parte di memoria richiesta dall'utente viene utilizzata per caricare gli RDD, gestire la cache, gestire gli shuffle ecc... Spark però alloca una porzione di memoria off-heap per gestire i suoi overhead per la gestione dei container. Questo surplus di memoria aggiuntiva varia da un minimo di **384 MB** fino a un massimo del **10% della memoria richiesta**. 

A questo punto, è possibile determinare la quantità di memoria RAM da allocare per ciascun executor. Considerando l'ipotesi di riservare il 25% delle risorse per altri servizi e il 10% per il surplus richiesto da Spark, possiamo dividere le risorse rimanenti per il numero di executor presenti in ciascun nodo:
- {(RAM disponibile per nodo * risorse disponibili) * (off-heap)}/numero di executor per nodo => {(16 * 0.75) * (1-0.1)}/1 = **10.8GB di RAM per executor**

Nonostante il calcolo iniziale indicasse un'allocazione di 11 GB di memoria RAM per ogni executor, tale quantità supera il limite massimo predefinito consentito nel cluster EMR di YARN. Pertanto, è stata presa la decisione di limitare l'allocazione di memoria a 8 GB per ogni executor al fine di rimanere all'interno dei limiti consentiti dal cluster. 

A fronte di ciò, la configurazione utilizzata è la seguente:

- 2 executor (uno per nodo) con 3 core ciascuno 
- 8GB di memoria per executor

In [1]:
%%configure -f
{"executorMemory":"8G", "numExecutors":2, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

In [1]:
val bucketname = "unibo-bd2223-mmongardiproject"
val path_events_ecommerce = "s3a://"+bucketname+"/datasets/project/2019-Nov.csv"

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1686559935583_0001,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2223-mmongardiproject
path_events_ecommerce: String = s3a://unibo-bd2223-mmongardiproject/datasets/project/2019-Nov.csv
res2: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1686559935583_0001/


### Inizializzazione degli oggetti per strutturare l'RDD degli eventi

In [7]:
import org.apache.spark.sql.SaveMode

object EcommerceEventParser {

  case class Event(time: String,
                   mode: String,
                   productId: Long,
                   categoryId: Long,
                   categoryCode: String,
                   brand: String,
                   price: Double,
                   userId: Long,
                   userSession: String)

  def parse(line: String): Option[Event] = {
    try {
      val input = line.split(',')
      Some(
        Event(
          time = input(0),
          mode = input(1),
          productId = input(2).toLong,
          categoryId = input(3).toLong,
          categoryCode = input(4),
          brand = input(5),
          price = input(6).toDouble,
          userId = input(7).toLong,
          userSession = input(8)
        )
      )
    } catch {
      case _: Throwable => None
    }
  }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.SaveMode
defined object EcommerceEventParser


### Definizione del numero ottimale di partizioni

In [3]:
val rddEvents = sc.textFile(path_events_ecommerce).flatMap(EcommerceEventParser.parse)
val numPartitions = rddEvents.getNumPartitions
println("Numero di partizioni: "+numPartitions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEvents: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[2] at flatMap at <console>:26
numPartitions: Int = 269
Numero di partizioni: 269


Le partizioni che sono state create da S3 sono **269**. Questo perchè il dataset ha una dimensione di 8.38GB e ogni partizione di S3 contiene 32MB di dati, quindi, 8602/32 = **269 partizioni**. 

Le partizioni hanno due scopi principali:

- parallelizzare la computazione tra gli executor
- minimizzare il traffico di rete

Se non specificato diversamente, Spark setta il numero di partizioni autonomamente. Tarare in modo errato il numero delle partizioni può portare a dei grossi problemi di performance: 

- se si definiscono *poche partizioni* allora non verrà sfruttata la concorrenza in maniera adeguata
- se si definiscono *troppe partizioni* si avrà un overhead nello scheduling di molti task di piccole dimensioni

In generale, le buone pratiche dicono che sarebbe opportuno avere almeno due/tre partizioni per core (```lower bound```) e una durata di esecuzione per task di almeno 100-200 mS (```upper bound```).

Come si è potuto constatare, vengono create 269 partizioni, quindi 269 task. Nella nostra cluster configuation abbiamo 2 executor ognuno con 3 core. Questo significa che abbiamo 6 core su cui eseguire in parallelo i tasks. Possiamo quindi pensare di avere 269/6=45 partizioni per core (45 task per core). 

Si è scelto di ridefinire il numero di partizioni in modo tale che ogni core potesse analizzare 10 partizioni. A tale scopo, si è utilizzata la funzione ```coalesce```, specificando un numero diverso di partizioni, precisamente ```60```.

È importante sottolineare che la funzione coalesce non implementa un criterio di partizionamento, pertanto non ridistribuisce attivamente i dati tra le partizioni. Essa effettua semplicemente un'accorpamento delle partizioni nel caso in cui ne venga specificato un numero inferiore rispetto a quello predefinito.

Nonostante si sia proceduto a ridurre in modo significativo il numero delle partizioni, non si è riscontrato un miglioramento sostanziale delle prestazioni ottenute dall'esecuzione di un semplice job, come si evince dai risultati sottostanti.

In [5]:
val rddEventsRepartitioned = rddEvents.coalesce(60)
rddEventsRepartitioned.getNumPartitions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEventsRepartitioned: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = CoalescedRDD[3] at coalesce at <console>:24
res5: Int = 60



![partitions.PNG](images/partitions.PNG)

### Memory Occupation

Come noto, un RDD di default viene ricalcolato ogni volta che **un'azione** richiede il suo utilizzo. Tuttavia, in determinati casi, può essere vantaggioso persistere un RDD se si prevede che verrà riutilizzato più volte. Per rendere un RDD persistente, è possibile utilizzare due azioni principali:

- ```persist```
- ```cache```

La differenza tra le due azioni consiste nel fatto che ```persist``` consente di specificare il livello di persistenza dei dati, mentre ```cache``` salva i dati direttamente nella memoria centrale, utilizzando il livello di persistenza predefinito ```(MEMORY_ONLY)```. L'azione persist consente di specificare diversi livelli di persistenza, tra cui:

- *MEMORY_ONLY*: memorizza i dati nell'heap di memoria del nodo.
- *MEMORY_AND_DISK*: memorizza i dati nell'heap di memoria del nodo e, se necessario, li salva anche su disco.
- *MEMORY_ONLY_SER*: memorizza i dati nell'heap di memoria del nodo, ma li serializza per ridurre lo spazio occupato.
- *MEMORY_AND_DISK_SER*: memorizza i dati nell'heap di memoria del nodo e, se necessario, li salva anche su disco, utilizzando la serializzazione per ridurre lo spazio occupato.
- *DISK_ONLY*: salva i dati direttamente su disco, senza memorizzarli in memoria.

Durante questa fase esplorativa del dataset, si è deciso di approfondire la tematica relativa all'occupazione di memoria. Come evidenziato durante le lezioni, è stata testata la funzionalità di *caching* per determinare la quantità di spazio occupato dal dataset in memoria RAM. Inizialmente, è stata eseguita una semplice operazione di *conteggio* sull'RDD in cache. Nonostante non si siano trattati a fondo dataset di questa portata, è stata riscontrata la necessità di effettuare un'analisi più dettagliata a livello di caching. Si ricorda che il dataset originale ha una dimensione di 8.38GB. 

In [6]:
val rddEventsRepartitionedCached = rddEventsRepartitioned.cache
rddEventsRepartitionedCached.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEventsRepartitionedCached: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = CoalescedRDD[4] at coalesce at <console>:25
res12: Long = 67501969


Accedendo alla sezione ```Storage``` della *Spark UI* tramite tunnel SSH (*porta 20888*), è stata osservata un'occupazione in memoria dell'RDD pari a 7.7 GB. Tuttavia, non si potuto fare a meno di notare anche le indicazioni **Cached Partitions** e **Fraction_Cached**, corrispondenti a 17 e a 28%. 
![size_cached.PNG](images/size_cached.PNG)

Ciò ha spinto a un'ulteriore esplorazione di questi aspetti. Esaminando le singole unità di memorizzazione, è stato notato che, delle 60 partizioni che compongono l'RDD, soltanto 17 erano state correttamente salvate in memoria. Ciò ha portato a pensare che l'RDD generato avesse una dimensione molto maggiore rispetto al dataset originale. A tal proposito, si è analizzata nuovamente la configurazione del cluster precedentemente definta: a ciascun executor erano stati assegnati 8 GB di memoria RAM per gestire correttamente i dati. Avendo a disposizione due executor, la RAM totale usufruibile sarebbe dovuta essere di 16 GB. Nonostante i vari test sull'occupazione di memoria, non sono mai stati superati gli 8 GB di storage utilizzato. Ciò ha sollevato un ulteriore interrogativo, che ha trovato risposta dopo svariate ricerche sul web: la memoria disponibile per lo storage dei dati all'interno degli executor corrisponde approssimativamente alla metà di quella definita nella configurazione del cluster. Ciò avviene perché essa viene divisa in due sottosezioni, la ```Storage Memory``` e l'```Execution Memory```. 

Per ulteriori delucidazioni in merito:

- [Memory Management in Spark 3](https://www.educative.io/answers/memory-management-in-spark-3)
- [Apache Spark Memory Management](https://medium.com/analytics-vidhya/apache-spark-memory-management-49682ded3d42)


![info_caching.PNG](images/info_caching.PNG)

## New Cluster

Date le considerazioni precedenti e considerando la fase iniziale di esplorazione del cluster, si è deciso di adottare una configurazione alternativa al fine di testare un po' più a fondo le potenzialità offerte da AWS, in particolare in termini di scalabilità, e per agevolare le operazioni successive sull'RDD, soprattutto per quanto riguarda la persistenza nella memoria RAM.

In particolare, si è deciso di creare una nuovo cluster così composto: 
- 1 [m5.xlarge](https://aws.amazon.com/it/ec2/instance-types/m5/) 4 vCore, 16GB RAM
- 2 [m5.4xlarge](https://aws.amazon.com/it/ec2/instance-types/m5/) 16 vCore, 64GB di memoria RAM.

La versione di EMR utilizzata è sempre la stessa, ovvero la *6.10.0*. Le istanza virtuali del cluster sono state configurate con i pacchetti precedentemente citati.

### New Cluster Configuration

In conformità alle linee guida precedentemente menzionate riguardanti il *tuning dei core* e della *memoria RAM*, si è presa la decisione di definire la nuova configurazione del cluster nel modo seguente:

- (core disponibili per nodo - un core per gli altri servizi)/ numero di core per executor => (16-1)/3 = **5 executor per nodo**
- {(RAM disponibile per nodo * risorse disponibili) * (off-heap)}/numero di executor per nodo => {(64 * 0.75) * (1-0.1)}/5 = **8.6GB di RAM per executor**

A fronte di ciò, la configurazione utilizzata è la seguente:

- 10 executor (5 per nodo) con 3 core ciascuno 
- 8GB di memoria per executor (per rientrare nei limiti predefiniti imposti da EMR)

In [2]:
%%configure -f
{"executorMemory":"8G", "numExecutors":10, "executorCores":3, "conf": {"spark.dynamicAllocation.enabled": "false"}}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1686559935583_0002,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1686559935583_0002,spark,idle,Link,Link,,✔


In [3]:
val bucketname = "unibo-bd2223-mmongardiproject"
val path_events_ecommerce = "s3a://"+bucketname+"/datasets/project/2019-Nov.csv"

"SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/" + sc.applicationId + "/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

bucketname: String = unibo-bd2223-mmongardiproject
path_events_ecommerce: String = s3a://unibo-bd2223-mmongardiproject/datasets/project/2019-Nov.csv
res2: String = SPARK UI: Enable forwarding of port 20888 and connect to http://localhost:20888/proxy/application_1686559935583_0002/


Sono inoltre state prese nuovamente in consideraizone le buone pratiche per stabilire un numero di partizioni ottimale da allocare ad ogni core. Come si è potuto constatare, vengono create 269 partizioni, quindi 269 task. Nella nuova cluster-configuation si dispone di 10 executor, ognuno con 3 core. Questo significa che si può usufuire di 30 core su cui eseguire in parallelo i tasks. Nella configurazione di defult, è possibile pensare che ogni CPU gestisca 9 partizioni => *(269/30=9)*. 

Si è scelto di ridefinire il numero di partizioni in modo tale che ogni core potesse analizzare 2 partizioni. A tale scopo, si è utilizzata nuovamente la funzione ```coalesce```, specificando come numero di partizioni ```60``` => *(60/30=2)*. Questa attività preliminare, con questa specifica configurazione del cluster, può risultare superflua visti i risultati ottenuti di seguito, ma per essere conforme alle buone pratiche di progettazione si è deciso di attuarla ugualemente.

#### *N.B*: prima di eseguire il codice seguente, ricordare di lanciare il codice relativo al *parser* dichiarato ad inizio file.

In [5]:
val rddEvents = sc.textFile(path_events_ecommerce).flatMap(EcommerceEventParser.parse)
val numPartitions = rddEvents.getNumPartitions
println("Numero di partizioni: "+numPartitions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEvents: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[2] at flatMap at <console>:26
numPartitions: Int = 269
Numero di partizioni: 269


## Testing the new Cluster

### Testing not-partitioned

In [7]:
rddEvents.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res5: Long = 67501969



![not_partitioned_count.PNG](images/not_partitioned_count.PNG)

### Testing partitioned

In [6]:
val rddEventsPartitioned = rddEvents.coalesce(60)
rddEventsPartitioned.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEventsPartitioned: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = CoalescedRDD[6] at coalesce at <console>:24
res9: Long = 67501969


![partitioned_count.PNG](images/partitioned_count.PNG)


### Testing not-partitioned cached 

In [19]:
val rddEventsCached = rddEvents.cache
rddEventsCached.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEventsCached: rddEvents.type = MapPartitionsRDD[5] at flatMap at <console>:26
res24: Long = 67501969
res25: Long = 67501969



![job_count_not_partitioned_count_cache.PNG](images/job_count_not_partitioned_count_cache.PNG)
![not_partitioned_count_cached.PNG](images/not_partitioned_count_cached.PNG)
![notPartitioned_cached_count.PNG](images/notPartitioned_cached_count.PNG)

In [12]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Testing partitioned cached

In [None]:
val rddEventsPartitionedCached = rddEventsPartitioned.cache
rddEventsPartitionedCached.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEventsPartitionedCached: rddEventsPartitioned.type = CoalescedRDD[16] at coalesce at <console>:24
res28: Long = 67501969



![job_count_partitioned_count_cache.PNG](images/job_count_partitioned_count_cache.PNG)
![partitioned_count_cached.PNG](images/partitioned_count_cached.PNG)
![partitioned_cached_count.PNG](images/partitioned_cached_count.PNG)

In [29]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Come accennato in precedenza, l'operazione di aggregazione delle partizioni utilizzando il metodo ```coalesce``` non comporta un notevole miglioramento delle prestazioni, come originariamente previsto. Tuttavia, un'analisi più dettagliata dei tempi di esecuzione dei singoli task nelle due configurazioni di partizioni rivela che quella con *60* partizioni riporta tempi di esecuzione più in linea con i limiti indicati dalle linee guida. Questo sarà ancora più evidente durante la fase di **analisi** dei record.


![single_task_duration_notpartitioned_notcached.PNG](images/single_task_duration_notpartitioned_notcached.PNG)
![single_task_duration_partitioned_notcached.PNG](images/single_task_duration_partitioned_notcached.PNG)

# Prestazioni del cluster: confronto tra modalità su RDD di eventi specifici

Dopo aver condotto dei semplici test sull'intero insieme di record presenti nell'RDD di origine, si è proceduto ad effettuare ulteriori test di piccola entità su una porzione selezionata di questo RDD. In particolare, si sono voluto valutare le prestazioni del cluster in modalità "*partizionata*" tramite ```coalesce```, e "*non partizionato*", utilizzando anche il caching per entrambe le modalità. Per condurre tali valutazioni, è stato deciso di creare tre nuovi RDD, ciascuno contenente esclusivamente gli eventi di tipo ```view```, ```cart``` e ```purchase```, rispettivamente.

## Test not-partitioned: *view*, *cart*, *purchase*

In [5]:
val rddViews = rddEvents.filter(x => x.mode == "view")
val rddCarts = rddEvents.filter(x => x.mode == "cart")
val rddPurchases = rddEvents.filter(x => x.mode == "purchase")

rddViews.count()
rddCarts.count()
rddPurchases.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddViews: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[3] at filter at <console>:24
rddCarts: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[4] at filter at <console>:24
rddPurchases: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[5] at filter at <console>:24
res6: Long = 63556110
res7: Long = 3028920
res8: Long = 916939



![count_vcp_notpart_notcache.PNG](images/count_vcp_notpart_notcache.PNG)

## Test partitioned: *view*, *cart*, *purchase*

In [6]:
val rddEventsPart = rddEvents.coalesce(60)
rddEventsPart.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEventsPart: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = CoalescedRDD[3] at coalesce at <console>:24
res5: Long = 67501969


In [8]:
val rddPartitionedViews = rddEventsPart.filter(x => x.mode == "view")
val rddPartitionedCarts = rddEventsPart.filter(x => x.mode == "cart")
val rddPartitionedPurchases = rddEventsPart.filter(x => x.mode == "purchase")

rddPartitionedViews.count()
rddPartitionedCarts.count()
rddPartitionedPurchases.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddPartitionedViews: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[7] at filter at <console>:24
rddPartitionedCarts: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[8] at filter at <console>:24
rddPartitionedPurchases: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[9] at filter at <console>:24
res11: Long = 63556110
res12: Long = 3028920
res13: Long = 916939



![count_vcp_part_notcache.PNG](images/count_vcp_part_notcache.PNG)

## Test not-partitioned cached: *view*, *cart*, *purchase*

In [9]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}
val rddEventsC = rddEvents.cache
rddEventsC.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddEventsC: rddEvents.type = MapPartitionsRDD[2] at flatMap at <console>:26
res14: Long = 67501969


In [10]:
val rddViewsCache = rddEventsC.filter(x => x.mode == "view")
val rddCartsCache = rddEventsC.filter(x => x.mode == "cart")
val rddPurchasesCache = rddEventsC.filter(x => x.mode == "purchase")

rddViewsCache.count()
rddCartsCache.count()
rddPurchasesCache.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddViewsCache: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[10] at filter at <console>:24
rddCartsCache: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[11] at filter at <console>:24
rddPurchasesCache: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[12] at filter at <console>:24
res16: Long = 63556110
res17: Long = 3028920
res18: Long = 916939



![count_vcp_notpart_cache.PNG](images/count_vcp_notpart_cache.PNG)

## Test partitioned cached: *view*, *cart*, *purchase*

In [12]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}
val rddPartEventsCached = rddEventsPart.cache
rddPartEventsCached.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddPartEventsCached: rddEventsPart.type = CoalescedRDD[3] at coalesce at <console>:24
res22: Long = 67501969


In [13]:
val rddPartViewsCached = rddPartEventsCached.filter(x => x.mode == "view")
val rddPartCartsCached = rddPartEventsCached.filter(x => x.mode == "cart")
val rddPartPurchasesCached = rddPartEventsCached.filter(x => x.mode == "purchase")

rddPartViewsCached.count()
rddPartCartsCached.count()
rddPartPurchasesCached.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddPartViewsCached: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[13] at filter at <console>:24
rddPartCartsCached: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[14] at filter at <console>:24
rddPartPurchasesCached: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[15] at filter at <console>:24
res24: Long = 63556110
res25: Long = 3028920
res26: Long = 916939


![count_vcp_part_cache.PNG](images/count_vcp_part_cache.PNG)

## Test not-partitioned: *view* cached, *cart* cached, *purchase* cached

In [18]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}
val rddViewsC = rddEvents.filter(x => x.mode == "view").cache
val rddCartsC = rddEvents.filter(x => x.mode == "cart").cache
val rddPurchasesC = rddEvents.filter(x => x.mode == "purchase").cache

rddViewsC.count()
rddCartsC.count()
rddPurchasesC.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddViewsC: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[22] at filter at <console>:24
rddCartsC: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[23] at filter at <console>:24
rddPurchasesC: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[24] at filter at <console>:24
res43: Long = 63556110
res44: Long = 3028920
res45: Long = 916939



![vcpCached_notpart.PNG](images/vcpCached_notpart.PNG)

In [19]:
rddViewsC.count()
rddCartsC.count()
rddPurchasesC.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res46: Long = 63556110
res47: Long = 3028920
res48: Long = 916939



![count_vcpCached_notpart.PNG](images/count_vcpCached_notpart.PNG)

## Test partitioned: *view* cached, *cart* cached, *purchase* cached

In [17]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}
val rddPartitionedViews = rddEventsPart.filter(x => x.mode == "view").cache
val rddPartitionedCarts = rddEventsPart.filter(x => x.mode == "cart").cache
val rddPartitionedPurchases = rddEventsPart.filter(x => x.mode == "purchase").cache

rddPartitionedViews.count()
rddPartitionedCarts.count()
rddPartitionedPurchases.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddPartitionedViews: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[19] at filter at <console>:24
rddPartitionedCarts: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[20] at filter at <console>:24
rddPartitionedPurchases: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = MapPartitionsRDD[21] at filter at <console>:24
res38: Long = 63556110
res39: Long = 3028920
res40: Long = 916939



![vcpCached_part.PNG](images/vcpCached_part.PNG)

In [16]:
rddPartitionedViews.count()
rddPartitionedCarts.count()
rddPartitionedPurchases.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res33: Long = 63556110
res34: Long = 3028920
res35: Long = 916939



![count_vcpCached_part.PNG](images/count_vcpCached_part.PNG)

Come previsto, i risultati dei test precedenti confermano che, con la configurazione attuale del cluster, non è strettamente necessario ridefinire il numero di partizioni tramite l'utilizzo della funzionalità ```coalesce```. Nonostante ciò, è stata presa la decisione di mantenere questa modalità poiché, seppur in modo marginale, contribuisce al miglioramento delle prestazioni e ad una distribuzione più efficiente del carico di lavoro tra i core.

## Test serializzazione

Al fine di agevolare l'analisi dei dati, è stato implementato un secondo parser che ha comportato una modifica degli elementi del dataset originale all'interno dell'RDD risultante: non è stato preso in consiedazione l'```event_time```, in quanto considerato superfluo in questo contesto. 
Successivamente, si è voluto approfondire l'aspetto della **serializzazione** dei dati, riconosciuto come fondamentale anche durante le sessioni di laboratorio, soprattutto quando si ha a che fare con grandi quantità di dati come nel caso seguente. 

La serializzazione dei dati di un RDD prima di caricarli in memoria RAM ha lo scopo di ottimizzare l'efficienza e la velocità delle operazioni sull'RDD stesso, a fronte di un ragionevole costo in termini computazionali. I principali vantaggi della serializzazione riguadano la **riduzione dello spazio di memoria richiesto per memorizzare i dati** e **prestazioni di trasferimento dati migliorare**. Si può dire che la serializzazione dei dati migliori l'efficienza complessiva del sistema, consentendo un'elaborazione più veloce e ottimizzata di questi ultimi.

In [36]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
//(mode, productId, categoryId, categoryCode, brand, price, userId, userSession)

def parser(line:String) = {
    try{
      val input = line.split(',')
      Some(input(1),
           input(2).toLong,
           input(3).toLong,
           input(4),
           input(5),
           input(6).toDouble,
           input(7).toLong,
           input(8)
        )
    } catch {
      case _: Throwable => None
    }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

parser: (line: String)Option[(String, Long, Long, String, String, Double, Long, String)]


In [35]:
import org.apache.spark.storage.StorageLevel

val rddESerialized = sc.textFile(path_events_ecommerce).
    flatMap(x => parser(x)).
    coalesce(60).
    persist(StorageLevel.MEMORY_ONLY_SER)

rddESerialized.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.storage.StorageLevel
rddESerialized: org.apache.spark.rdd.RDD[(String, Long, Long, String, String, Double, Long, String)] = CoalescedRDD[48] at coalesce at <console>:42
res72: Long = 67501969


Dai riusltati sottostanti si può notare come la serializzazione dei dati abbia ridotto enormemente la dimensione dell'RDD in memoria. Si è passati da 34.2GB a 9.1GB, con un fattore di riduzione del *73.4%*.

![serialized_.PNG](images/serialized_.PNG)
![serialized_job.PNG](images/serialized_job.PNG)

# Analisi sui dati

La serializzazione dei dati, sebbene presenti vantaggi come la riduzione della dimensione e la possibilità di caching efficiente, presenta uno svantaggio principale: limita l'accesso dettagliato al contenuto dei record e richiede l'uso di un **serializzatore** appositamente configurato. *KryoSerializer* è il serializzatore suggerito da Spark che supporta questo tipo di operazioni, ma si è lasciato l'approfondimento di tale strumento per un eventuale sviluppo futuro. 


Al fine di eseguire operazioni complesse come la pre-aggregazione, risulta più pratico lavorare direttamente sull'RDD non serializzato. Per tali ragioni, l'analisi sui dati è stata effettuata a partire dall'RDD d'origine.


#### *N.B*: prima di eseguire il codice seguente, ricordare di lanciare il codice relativo al *parser* dichiarato ad inizio file.

In [8]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}

val rddE_Partitioned =  sc.textFile(path_events_ecommerce).flatMap(EcommerceEventParser.parse).coalesce(60)
val rddE_View_Cache = rddE_Partitioned.map(x => (x.mode, x.productId, x.userId)).filter(x => x._1 == "view").cache
print("Record totali: "+rddE_View_Cache.count)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_Partitioned: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = CoalescedRDD[3] at coalesce at <console>:27
rddE_View_Cache: org.apache.spark.rdd.RDD[(String, Long, Long)] = MapPartitionsRDD[5] at filter at <console>:24
Record totali: 63556110

## Query Esplorative

## Numero di prodotti visualizzati - ```view```

In [9]:
val productVieed = rddE_View_Cache.count
println("Numero di prodotti visualizzati: "+productVieed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

productVieed: Long = 63556110
Numero di prodotti visualizzati: 63556110


## Numero di prodotti distinti visualizzati

In [10]:
//productId
val rddE_View_Dist = rddE_View_Cache.map(x => x._2).distinct
println("Numero di prodotti distinti visualizzati: "+rddE_View_Dist.count)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_View_Dist: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[9] at distinct at <console>:25
Numero di prodotti distinti visualizzati: 190662


## Numero di prodotti distinti visualizzati dagli utenti

In [11]:
//userId, productId
val rddE_User_View_Dist = rddE_View_Cache.map(x => (x._3, x._2)).distinct
println("Numero di prodotti distinti visualizzati dagli utenti: "+rddE_User_View_Dist.count)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_User_View_Dist: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[13] at distinct at <console>:25
Numero di prodotti distinti visualizzati dagli utenti: 34437947


## Numero di prodotti aggiunti al carrello - ```cart```

In [12]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}
val rddE_Cart_Cache = rddE_Partitioned.map(x => (x.mode, x.productId, x.userSession)).filter(x => x._1 == "cart").cache

val rddE_Cart = rddE_Cart_Cache.count
println("Numero di prodotti aggiunti al carrello: "+rddE_Cart)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_Cart_Cache: org.apache.spark.rdd.RDD[(String, Long, String)] = MapPartitionsRDD[15] at filter at <console>:24
rddE_Cart: Long = 3028920
Numero di prodotti aggiunti al carrello: 3028920


## Numero di prodotti nel carrello per sessione

In questa analisi si è deciso di conteggiare *lo stesso prodotto* aggiunto al carrello più volte *nella stessa sessione*. 

In [13]:
//userSession, productId
val rddE_CartPerSession = rddE_Cart_Cache.map(x => (x._3, x._2)).aggregateByKey(0)((a,v) => a+1, (a1, a2)=> a1+a2)
rddE_CartPerSession.sortBy(_._2, false).collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_CartPerSession: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at aggregateByKey at <console>:25
(8c322efd-e368-42ae-9b71-37351e74ec55,709)
(42e83712-4c9b-c40b-7cc5-9beb15fe4e9b,414)
(0b1c58be-29a4-4bb1-909f-07211e51ed55,389)
(c491177d-d680-4104-8b08-bc6952e910d9,299)
(7789e256-97cd-4481-9f0f-cffd9a2e6003,237)
(1cbaa67f-a1c1-4b3c-9559-100a39a4c1c0,207)
(f2a7fef1-632e-420e-a165-702229fa7582,203)
(84f2e900-9da5-c970-7a9e-6c573b03406c,193)
(9bcd9a7b-1180-4dfc-aaa0-2897d25fd3cd,192)
(da41d673-1b99-4feb-a4e4-fa8dd4c53ebd,186)


## Numero di prodotti acquistati - ```purchase```

In [14]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}
val rddE_Purchase_Cache = rddE_Partitioned.map(x => (x.mode, x.productId, x.categoryCode, x.brand, x.price, x.userId, x.userSession)).filter(x => x._1 == "purchase").cache

println("Numero di prodotti acquistati: "+rddE_Purchase_Cache.count)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_Purchase_Cache: org.apache.spark.rdd.RDD[(String, Long, String, String, Double, Long, String)] = MapPartitionsRDD[24] at filter at <console>:24
Numero di prodotti acquistati: 916939


## Numero di ordini distinti

In [15]:
//userSession
val rddE_Distinct_Purchase = rddE_Purchase_Cache.map(x => x._7).distinct()
println("Numero di ordini distinti: "+rddE_Distinct_Purchase.count)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_Distinct_Purchase: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at distinct at <console>:25
Numero di ordini distinti: 773214


## Numero di utenti distinti che hanno effettuato almeno 1 acquisto

In [16]:
//userId
val rddE_PurchasedByUser = rddE_Purchase_Cache.map(x => x._6).distinct()
println("Numero di utenti distinti che hanno effettuato almeno 1 acquisto: "+rddE_PurchasedByUser.count)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_PurchasedByUser: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[32] at distinct at <console>:25
Numero di utenti distinti che hanno effettuato almeno 1 acquisto: 441638


## Numero di acquisti effettuati per utente

In [17]:
//userId
val rddE_Purchase_X_User = rddE_Purchase_Cache.map(x => (x._6, 1)).aggregateByKey(0)((a,v) => a+1, (a1, a2)=> a1+a2)
rddE_Purchase_X_User.sortBy(_._2, false).collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_Purchase_X_User: org.apache.spark.rdd.RDD[(Long, Int)] = ShuffledRDD[34] at aggregateByKey at <console>:25
(564068124,519)
(512386086,268)
(549109608,222)
(518514099,202)
(549030056,190)
(566448225,175)
(538473314,163)
(513230794,156)
(543128872,156)
(569181579,142)


## Acquisti medi per utente

In [18]:
//userId
val numUser = rddE_Purchase_Cache.map(x => x._6).distinct.count
val numSales = rddE_Purchase_Cache.count

println("Acquisti medi per utente: "+numSales.toDouble/numUser)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

numUser: Long = 441638
numSales: Long = 916939
Acquisti medi per utente: 2.076223060515626


## Prezzi - *min*, *max* e *range* e relativa *categoria*

In [19]:
//categoryCode, price
val rddE_Price = rddE_Purchase_Cache.map(x => (x._3, x._5)).filter({case(categoryCode, price) => categoryCode.nonEmpty && price > 0})

val min_Product = rddE_Price.min()(Ordering.by(_._2))
val max_Product = rddE_Price.max()(Ordering.by(_._2))

val min_Price = min_Product._2.toDouble
val category_min = min_Product._1

val max_Price = max_Product._2.toDouble
val category_max = max_Product._1

val range = max_Price - min_Price

println("Prezzo minimo: " + min_Price + ", Categoria: " + category_min)
println("Prezzo massimo: " + max_Price + ", Categoria: " + category_max)
println("Range di prezzo: " + range)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_Price: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[45] at filter at <console>:25
min_Product: (String, Double) = (electronics.audio.headphone,0.87)
max_Product: (String, Double) = (electronics.clocks,2574.07)
min_Price: Double = 0.87
category_min: String = electronics.audio.headphone
max_Price: Double = 2574.07
category_max: String = electronics.clocks
range: Double = 2573.2000000000003
Prezzo minimo: 0.87, Categoria: electronics.audio.headphone
Prezzo massimo: 2574.07, Categoria: electronics.clocks
Range di prezzo: 2573.2000000000003


## Prezzo medio dei prodotti venduti

In [20]:
//price
val itemSold = rddE_Purchase_Cache.map(x => x._5).aggregate((0.0,0))((acc, price)=>(acc._1+price,acc._2+1),(acc1, acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2))
val avgPrice = itemSold._1 / itemSold._2.toDouble
println("Il prezzo medio dei prodotti venduti e': "+avgPrice)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

itemSold: (Double, Int) = (2.751948905000006E8,916939)
avgPrice: Double = 300.1234438714032
Il prezzo medio dei prodotti venduti e': 300.1234438714032


## Spese media per le sessioni in cui si sono acquistati prodotti 

In [21]:
//userSession, price
val rdd_avgSessionExpense = rddE_Purchase_Cache.map(x => (x._7, x._5)).aggregateByKey((0.0,0))((a,v)=>(a._1+v, a._2+1),(a1,a2)=>(a1._1+a2._1,a1._2+a2._2)).map(x => (x._1, (x._2._1/x._2._2)))
//val avgSessionExpense = sessionExpenseTotal.mapValues({case (total, count) => total/count})

rdd_avgSessionExpense.sortBy(_._2, false).collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rdd_avgSessionExpense: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[49] at map at <console>:25
(ce9bfd7c-619b-4761-a8e0-13895d85c4e0,2574.07)
(a2336ead-8ed4-49a3-9cb8-64870c320c4e,2574.07)
(a19f78fd-04ef-42dd-ba67-1d89315b6f2e,2574.07)
(2a2cb0d4-0c17-44c1-b91d-0194d3f5a373,2574.07)
(4cc4a3b8-9b87-4563-a09d-84e7d7688722,2574.04)
(9b3d6b9e-91df-41e7-9b6c-e19dd20a5217,2574.04)
(f3b2be78-853e-4627-9540-8a1a02ff6bdd,2574.04)
(84a22b50-43a5-418b-8f7b-4d10d0971960,2574.04)
(666a88f5-8962-4acc-a702-f6e9624044a7,2574.04)
(aadbf6ce-d278-42c6-920f-09c5f2eedbe7,2574.04)


## Vendite per brand

In [22]:
//brand
val rdd_Brand_Sales = rddE_Purchase_Cache.map(x=>(x._4, 1)).filter(x => x._1.nonEmpty).aggregateByKey(0)((a,v) => a+1, (a1, a2)=> a1+a2)
rdd_Brand_Sales.sortBy(_._2, false).collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rdd_Brand_Sales: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[57] at aggregateByKey at <console>:25
(samsung,200027)
(apple,166064)
(xiaomi,68292)
(huawei,23703)
(cordiant,16983)
(oppo,15080)
(lucente,14559)
(lg,12879)
(sony,10309)
(artel,9267)


## Incasso per Brand con relativo numero di vendite

In [23]:
//brand, price
val rdd_Revenue = rddE_Purchase_Cache.map(x=>(x._4, x._5)).filter(x => x._1.nonEmpty).aggregateByKey((0.0,0))((a,v)=>(a._1+v,a._2+1), (a1,a2)=>(a1._1+a2._1,a1._2+a2._2))
rdd_Revenue.map(x => (x._1, x._2._1, x._2._2)).sortBy(_._2, false).collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rdd_Revenue: org.apache.spark.rdd.RDD[(String, (Double, Int))] = ShuffledRDD[65] at aggregateByKey at <console>:25
(apple,1.2751252488000032E8,166064)
(samsung,5.4869880869999856E7,200027)
(xiaomi,1.1259865960000014E7,68292)
(lg,5239018.76,12879)
(huawei,4780682.349999998,23703)
(sony,3862886.3000000017,10309)
(lucente,3527545.5700000003,14559)
(oppo,3488540.7599999965,15080)
(acer,3347306.5300000007,6402)
(lenovo,2698106.3,6547)


## Query Complesse

Al fine di garantire una migliore tracciabilità e analisi dei risultati ottenuti, si è deciso di adottare una strategia di salvataggio dei risultati delle query complesse in formato `CSV`. A tal fine, verranno create specifiche directory per ospitare i file CSV, secondo le definizioni che seguono.

In [24]:
val path_VCP_Products = "s3a://"+bucketname+"/spark/project/VCP_Products"
val path_BrandClassification = "s3a://"+bucketname+"/spark/project/BrandClassification"
val path_ComplexQuery01_01 = "s3a://"+bucketname+"/spark/project/path_ComplexQuery/01/01"
val path_ComplexQuery01_02 = "s3a://"+bucketname+"/spark/project/path_ComplexQuery/01/02"
val path_ComplexQuery02_01= "s3a://"+bucketname+"/spark/project/path_ComplexQuery/02/01"
val path_ComplexQuery02_02 = "s3a://"+bucketname+"/spark/project/path_ComplexQuery/02/02"
val path_ComplexQuery03_01= "s3a://"+bucketname+"/spark/project/path_ComplexQuery/03/01"
val path_ComplexQuery03_02 = "s3a://"+bucketname+"/spark/project/path_ComplexQuery/03/02"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

path_VCP_Products: String = s3a://unibo-bd2223-mmongardi/spark/project/VCP_Products
path_BrandClassification: String = s3a://unibo-bd2223-mmongardi/spark/project/BrandClassification
path_ComplexQuery01_01: String = s3a://unibo-bd2223-mmongardi/spark/project/path_ComplexQuery/01/01
path_ComplexQuery01_02: String = s3a://unibo-bd2223-mmongardi/spark/project/path_ComplexQuery/01/02
path_ComplexQuery02_01: String = s3a://unibo-bd2223-mmongardi/spark/project/path_ComplexQuery/02/01
path_ComplexQuery02_02: String = s3a://unibo-bd2223-mmongardi/spark/project/path_ComplexQuery/02/02
path_ComplexQuery03_01: String = s3a://unibo-bd2223-mmongardi/spark/project/path_ComplexQuery/03/01
path_ComplexQuery03_02: String = s3a://unibo-bd2223-mmongardi/spark/project/path_ComplexQuery/03/02


### Eventi `view`, `cart`, `purchase` per prodotto

In [25]:
sc.getPersistentRDDs.foreach{case (k,v) => v.unpersist()}
val rddE_Partitioned =  sc.textFile(path_events_ecommerce).flatMap(EcommerceEventParser.parse).coalesce(60)
val rddEvents = rddE_Partitioned.map(x => (x.productId, x.mode, x.categoryCode, x.brand, x.price)).cache
rddEvents.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddE_Partitioned: org.apache.spark.rdd.RDD[EcommerceEventParser.Event] = CoalescedRDD[75] at coalesce at <console>:26
rddEvents: org.apache.spark.rdd.RDD[(Long, String, String, String, Double)] = MapPartitionsRDD[76] at map at <console>:24
res51: Long = 67501969


In [28]:
//productId, mode
val metrics = rddEvents.map(x => (x._1, x._2 match {
              case "view" => (1, 0, 0)
              case "cart" => (0, 1, 0)
              case "purchase" => (0, 0, 1)
            })).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, x._3 + y._3)).
            map(x => (x._1, x._2._1, x._2._2, x._2._3)).
            sortBy(productId => productId).collect().take(10).foreach(println(_))
            //coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_VCP_Products)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1000365,1,0,0)
(1000978,7260,60,20)
(1001588,1715,16,6)
(1001606,15,0,0)
(1001618,1499,69,36)
(1001619,121,1,1)
(1001894,1,0,0)
(1002042,66,0,0)
(1002062,3235,46,6)
(1002098,6598,91,26)
metrics: Unit = ()


### "Valore" totale per Brand

In questa *query* si sta cercando di trovare la somma dei prezzi relativa ad ogni brand, considerando tutte le interazioni degli utenti sull'ecommerce, indipendentemente dal tipo di interazione (view, cart o purchase). Si sta quindi conducendo un'analisi sul valore totale dei prodotti interagiti per brand. Questa metrica rappresenta una misura complessiva del valore monetario associato al brand e alle interazioni degli utenti con i prodotti del brand sull'ecommerce. È importante sottolineare che questa metrica non rappresenta il ricavato che ogni brand ha incassato, poiché non tiene conto solo delle vendite effettive, ma considera anche le visualizzazioni e gli inserimenti dei prodotti nel carrello. Tuttavia, fornisce comunque un'indicazione del coinvolgimento e dell'interesse degli utenti verso i prodotti di un determinato brand.

In [27]:
//brand, price
val totalValue = rddEvents.map(x=>(x._4, x._5)).filter({case(brand, price)=>brand.nonEmpty && price > 0}).reduceByKey(_+_).sortBy(_._2, false).collect().take(10).foreach(println(_))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(apple,5.171550206519732E9)
(samsung,2.8094897419798064E9)
(xiaomi,9.450350431300377E8)
(lg,5.1614301536000896E8)
(acer,4.38923375619997E8)
(lenovo,4.0750582587000674E8)
(huawei,3.7222788205000347E8)
(sony,3.5280309000999075E8)
(asus,2.915951945999978E8)
(lucente,2.864294579200025E8)
totalValue: Unit = ()


#### Top 10 Brand con maggior valore

### Classificazione dei Brand sulla base del loro Valore

Una volta individuato il valore totale dei prodotti interagiti per brand, è stato sviluppato un criterio di classificazione basato sul valore per assegnare una categoria a ciascun brand. Nello specifico, è stato deciso di utilizzare la media dei prezzi dei prodotti interagiti come base per la classificazione. Di seguito si riportano i dettagli della logica adottata:

- I brand con una media dei prezzi superiore a 700 sono stati classificati come `High-end brad`, rappresentando marchi di fascia alta.
- I brand con una media dei prezzi compresa tra 700 e 500 sono stati classificati come `Mid-range brad`, corrispondendo a marchi di fascia media.
- I brand con una media dei prezzi compresa tra 500 e 300 sono stati classificati come `Affordable brad`, rientrando nella fascia medio-bassa.
- I brand con una media dei prezzi inferiore a 300 sono stati classificati come `Budget brand`, appartenendo alla fascia bassa del mercato.

Questa classifica deriva dall'ideologia consolidata e conosciuta che riguarda la percezione del marchio e il suo posizionamento all'interno delle diverse fasce di mercato.

- High-end brand: Questi brand rappresentano il segmento di fascia alta, offrendo prodotti di lusso e alta qualità che spesso sono associati a prezzi elevati.
- Mid-range brand: Questi marchi si collocano a metà strada tra i brand di fascia alta e quelli più accessibili. Offrono prodotti di qualità decente a prezzi più accessibili rispetto ai brand di fascia alta.
- Affordable brand: Questi brand si concentrano sull'offerta di prodotti di buona qualità a prezzi ragionevoli. Sono più accessibili rispetto ai brand di fascia alta e mid-range.
- Budget brand: Questi marchi si concentrano principalmente sull'offerta di prodotti a basso costo. L'obiettivo principale è fornire prodotti economici, anche se a volte la qualità potrebbe essere inferiore rispetto ai marchi di fascia alta, mid-range e affordable.

In [29]:
val brandClassification = rddEvents.map(x => (x._4, x._5)).filter({case (brand, price) => brand.nonEmpty && price > 0}).
    aggregateByKey((0.0,0))((acc, price)=>(acc._1+price,acc._2+1),(acc1, acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2)).
    map(x => (x._1, x._2._1/x._2._2)).
    map(x => (x._1, x._2 match {
    case p if p >= 500 => "High-end brand"
    case p if p >= 300 && p < 500 => "Mid-range brand"
    case p if p >= 100 && p < 300 =>"Affordable brand"
    case _ => "Budget brand"
  }))

//brandClassification.filter(x=>x._2=="High-end brand").count()
//brandClassification.filter(x=>x._2=="Mid-range brand").count()
//brandClassification.filter(x=>x._2=="Affordable brand").count()
//brandClassification.filter(x=>x._2=="Budget brand").count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

brandClassification: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[103] at map at <console>:27
res58: Long = 370
res59: Long = 336
res60: Long = 979
res61: Long = 2516


In [11]:
brandClassification.coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_BrandClassification)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Alla luce delle analisi effettuate, si intende acquisire informazioni rilevanti riguardanti le relazioni tra le **categorie di prodotti** e la **distribuzione dei brand**, basandosi sulla precedente classificazione.
Si crede che la ricerca di queste informazioni sia un'analisi di grande rilevanza e interesse per diverse motivazioni. Esplorare la distribuzione dei brand all'interno delle diverse categorie di prodotti permette di identificare importanti tendenze di mercato. Ad esempio, potrebbe emergere che alcuni brand sono particolarmente popolari in determinate categorie, mentre altri hanno un appeal maggiore in altre. Un altro aspetto interessante di questa analisi è la valutazione dell'efficacia delle strategie di branding adottate dai brand. Esaminando la relazione tra le categorie di prodotti e la distribuzione dei brand, è possibile valutare se le strategie di branding messe in atto hanno avuto successo nel creare una forte associazione tra il brand e una specifica categoria di prodotti. A tal fine, si è deciso di testare diversi flussi di lavoro per condurre l'analisi sopracitata. In particolare, si vogliono recuperare le informazioni sul valore totale generato dai prodotti di una determinata categoria *(categoryCode, brandClassification)* e brand, insieme al numero di eventi su cui è basato il calcolo. Può essere utile per comprendere la performance economica di una specifica combinazione di categoria e brand, nonché l'intensità dell'interazione degli utenti con i prodotti correlati.

### Primo flusso di lavoro - Join tra i due RDD

In [30]:
//brand, categoryCode, price
val rddValueByBrand = rddEvents.map(x => (x._4, (x._3, x._5))).
                         join(brandClassification).// (brand,((categoryCode, price),(category)))
                         flatMap(x=> x._2._1._1.split('.'). // (brand,((categoryCode_main, categoryCode.sub, price),(category)))
                         map(y => ((y, x._2._2, x._1), x._2._1._2))). //((categoryCode, category, brand), price)
                         aggregateByKey((0.0,0))((agg, v) => (agg._1 + v, agg._2 + 1),((agg1,agg2) => (agg1._1 + agg2._1, agg1._2 + agg2._2))). //((categoryCode, category, brand), value, count)
                         map(x => (x._1._1, x._1._2, x._1._3, x._2._1, x._2._2)). //categoryCode, category, brand, value, count
                         collect()
                         //coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_ComplexQuery01_01)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddValueByBrand: Array[(String, String, String, Double, Int)] = Array((alarm,Budget brand,cenmax,684583.3700000213,9695), (headphone,Affordable brand,honor,88525.25999999983,649), (player,Budget brand,mystery,28561.699999999695,634), (tv,Affordable brand,kivi,2.317747378001211E7,84220), ("",Affordable brand,karcher,1.1596822289999379E7,38140), (carriage,Affordable brand,skillmax,268549.98000000237,995), ("",Budget brand,suda,25.259999999999998,3), (electronics,Affordable brand,hifiman,4447.990000000001,25), (microphone,Budget brand,hyperx,4627.819999999987,202), (kids,Budget brand,mefferts,117.64000000000001,17), (apparel,Budget brand,catimini,4167.22,67), (music_tools,Budget brand,akai,342400.67000000144,3615), (pillow,Affordable brand,vegas,227393.3600000001,4438), (components,Budget ...


### Tempi di esecuzione

![job_1workflow.PNG](images/job_1workflow.PNG)
![job_1workflow_dag.PNG](images/job_1workflow_dag.PNG)

Dai risultati osservati, emerge che il presente flusso di lavoro non manifesta prestazioni ottimali. Ciò può essere attribuito alla scelta di eseguire un'operazione di Join come prima operazione, la quale ha provocato un aumento del volume complessivo dei dati da elaborare. In questo contesto, sarebbe stato più ragionevole eseguire l'operazione di Join successivamente ad un'operazione di *aggregazione*, che consente di **ridurre** il numero di dati da manipolare. Anche una semplice operazione di filter, effettuata prima dell'operazione di Join, può migliorare significativamente le prestazioni. In generale, è consigliabile posticipare il più possibile l'operazione di Join per ottenere prestazioni ottimizzate, in modo da ridurre i dati che devono essere ridistribuiti tra le varie partizioni a seguito di questa operazione. Il **tempo di esecuzione medio** per questo job corrisponde a **37 secondi**.  

### Secondo flusso di lavoro - Utilizzo di una variabile Broadcast

In Apache Spark, una variabile broadcast è un meccanismo per condividere in modo efficiente dati immutabili tra i nodi di un cluster, in particolare essa viene condivisa in **sola lettura** da tutti gli executor. Essa consente di distribuire in modo ottimizzato un'intera variabile a tutti i nodi senza doverla serializzare e trasferire ripetutamente su ogni nodo, evitando quindi di effettuare l'operazione di Join che, come è noto, è molto costosa.
La variabile broadcast è utile quando si ha a che fare con una grande mole di dati in quanto consente di evitare il trasferimento ripetuto dei dati sulla rete e di ridurre l'overhead di comunicazione tra i nodi, migliorando le prestazioni complessive dell'applicazione. Per tali ragioni, il seguente flusso di lavoro rappresenta un'alternativa migliore rispetto a quello precedente.

In [31]:
val brandClass = sc.broadcast(brandClassification.collectAsMap())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

brandClass: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(61)


In [32]:
//brand, categoryCode, price
val rddValueWithBroad = rddEvents.map(x => (x._4, (x._3, x._5))).
                         map(x => (x._1, (x._2, brandClass.value.get(x._1)))). // (brand,((categoryCode, price),(category)))
                         flatMap(x=> x._2._1._1.split('.'). // (brand,((categoryCode_main, categoryCode.sub, price),(category)))
                         map(y => ((y, x._2._2, x._1), x._2._1._2))). //((categoryCode, category, brand), price)
                         aggregateByKey((0.0,0))((agg, v) => (agg._1 + v, agg._2 + 1),((agg1,agg2) => (agg1._1 + agg2._1, agg1._2 + agg2._2))). // ((categoryCode, category, brand), value, count)
                         map(x => (x._1._1, x._1._2, x._1._3, x._2._1, x._2._2)). //categoryCode, category, brand, value, count
                         collect()
                         //coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_ComplexQuery01_02)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

rddValueWithBroad: Unit = ()


### Tempi di esecuzione

![job2_broad_comp.PNG](images/job2_broad_comp.PNG)
![job_2workflow.PNG](images/job_2workflow.PNG)
![job_2workflow_dag.PNG](images/job_2workflow_dag.PNG)

L'uso di una variabile di broadcast migliora le performance nell'esecuzione dei task, come evidenziato dai risultati ottenuti. Ciò è dovuto al fatto che l'impiego di questa variabile consente di trasferire una quantità inferiore di dati sulla rete, riducendo così anche gli stage che compongono la computazione (due al posto di 4). Di conseguenza, si ottiene un miglioramento delle prestazioni complessive, grazie alla riduzione del carico di trasferimento dei dati e alla maggiore efficienza nell'esecuzione delle operazioni, nonostante la quantità di dati in input sia la stessa. Il **tempo di esecuzione medio** si è abbassato a **11 secondi**.

Nonostante il cluster utilizzato sia composto di macchine molto performanti, si può notare un significativo guadagno in termini di performance nei due test appena condotti. Questa situazione evidenzia l'importanza di ottimizzare le performance e l'utilizzo delle risorse in ambito lavorativo, dove i costi associati all'uso di una determinata configurazione cluster possono essere significativi: a differenza dell'ambito accademico o di testing, dove non ci sono conseguenze finanziarie dirette, è fondamentale implementare miglioramenti a livello di codice per massimizzare l'efficienza computazionale e minimizzare le risorse necessarie per eseguire determinate operazioni. Questo può comportare un risparmio in termini di tempo e di denaro, migliorando l'efficacia complessiva del sistema.

### Incassi totali generati dalle vendite per ogni combinazione di Classe e Brand

In questa analisi alternativa sono state applicate in maniera **congiunta** le pratiche migliori, citate precedentemente, per la definizione di un job computazionalmente pesante. Si è adottato un approccio in cui le operazioni di aggregazione sono state eseguite preventivamente, e si è fatto uso di una variabile broadcast al posto dell'operazione di Join. Questa scelta è stata motivata, come abbiamo già visto, dall'obiettivo di ridurre il traffico di rete e migliorare ulteriormente le performance complessive del processo. Si sono messi in paragone le due modalità di analisi sui dati per valutare il guadagno in termini di performance.

### Join

In [33]:
// (brand, ((categoryCode, mode, price), category))
val cashInTot = rddEvents.map(x => (x._4, (x._3, x._2, x._5))).join(brandClassification).filter(x => x._2._1._2 == "purchase"). 
                          map(x => ((x._2._2, x._1), x._2._1._3)). //((category, brand), price)
                          aggregateByKey((0.0,0))((agg, v) => (agg._1 + v, agg._2 + 1),((agg1,agg2) => (agg1._1 + agg2._1, agg1._2 + agg2._2))). // ((category, brand), cash-in, count)
                          map(x => (x._1._1, x._1._2, x._2._1, x._2._2)). //category, brand, cash-in, count
                          collect()
                          //coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_ComplexQuery02_01)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cashInTot: Unit = ()


**Tempo di esecuzione medio** di **24 secondi**

![job_alt_1.PNG](images/job_alt_1.PNG)

### Broadcast

In [34]:
//(brand, categoryCode), price
val cashInTotBroad = rddEvents.filter(x => x._2 == "purchase").
    map(x => ((x._4, x._3), (x._5, 1))). //(brand, categoryCode)(price, count)
    reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)). // (brand, categoryCode),(price, totCount))
    map(x => ((brandClass.value.get(x._1._1), x._1._1), x._2)). //(category, brand),(price, count)
    reduceByKey((x,y) => (x._1 + y._1, x._2 + y._2)). // ((category, brand),(cash-in, count))
    map(x => (x._1._1, x._1._2, x._2._1, x._2._2)). // category, brandClass, cash-in, count
    collect()
    //coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_ComplexQuery02_02)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cashInTotBroad: Unit = ()


**Tempo di esecuzione medio** di **2 secondi**

![job_alt_2.PNG](images/job_alt_2.PNG)

In questo esempio si può apprezzare appieno il potere dell'utilizzo combinato delle aggregazioni/operazioni di filtraggio e delle variabili di broadcast. Infatti, viene eseguita un'aggregazione sui dati prima di utilizzare la variabile di broadcast per recuperare la classe di appartenenza di ogni brand. &Egrave; evidente come, rispetto all'esempio precedente con il join, in cui era necessario accedere a 20GB di dati (definiti da più di 67 milioni di record), ora si deve accedere solo a 2.5MB di dati, che corrispondono a circa il 99.9% di volume in meno.

### Raggruppamento dei Brand sulla base della CategoryCode e della Classe
Come ultima query complessa si è deciso di effettuare un'analisi che fornisca informazioni utili sulle combinazioni di categoryCode e class associate a un **insieme di brand**, consentendo di comprendere quali brand sono presenti in diverse categorie e sottocategorie di prodotti. Complessivamente, l'analisi fornisce un quadro sulla relazione tra brand, categoryCode e brandClass, consentendo di comprendere meglio la distribuzione dei brand nelle diverse combinazioni di categorie e di individuare pattern o associazioni significative tra di loro. Le informazioni sulla distribuzione dei brand possono essere utilizzate per effettuare ulteriori analisi di aggregazione o filtraggio, ad esempio si può valutare la distribuzione dei brand all'interno delle diverse combinazioni di categoryCode e category, identificando quelle con un numero elevato di brand e quelle meno rappresentate.

### Join

In [35]:
//brand, categoryCode
val brandByCatClass = rddEvents.map(x => (x._4, x._3)).
                        join(brandClassification). // (brand,(categoryCode, category)
                        flatMap(x => x._2._1.split('.'). // (brand,((categoryCode_main, categoryCode.sub),(category)))
                        map(y => ((y, x._2._2), x._1))). //((categoryCode, category), brand)
                        distinct.    
                        groupByKey().
                        map(x => (x._1._1, x._1._2, x._2.toList.mkString(","))).
                        coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_ComplexQuery03_01)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

brandByCatClass: Unit = ()


**Tempo di esecuzione medio** di **25 secondi**

![job_alt2_1.PNG](images/job_alt2_1.PNG)

### Broadcast

In [36]:
//brand, categoryCode
val brandByCatClassBroad = rddEvents.map(x => (x._4, x._3)).
                             map(x => (x._1, (x._2, brandClass.value.get(x._1)))). // (brand,((categoryCode, category)))
                             flatMap(x=> x._2._1.split('.'). // (brand,((categoryCode_main, categoryCode.sub, price),(category)))
                             map(y => ((y, x._2._2), x._1))). //((categoryCode, category), brand)
                             distinct.
                             groupByKey().
                             map(x => (x._1._1, x._1._2, x._2.toList.mkString(","))).
                             coalesce(1).toDF().write.format("csv").mode(SaveMode.Overwrite).save(path_ComplexQuery03_02)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

brandByCatClassBroad: Unit = ()


**Tempo di esecuzione medio** di **10 secondi**

![job_alt2_2.PNG](images/job_alt2_2.PNG)

## Tableau Desktop

Tableau Desktop è un software di visualizzazione dei dati che consente di creare grafici interattivi, dashboard e report per analizzare e presentare i dati in modo efficace. È uno strumento ampiamente utilizzato nel campo della Business Intelligence e dell'analisi dei dati. 
Nell'immagine sottostante è possibile osservare la rappresentazione grafica dei dati tramite una heatmap. Questa visualizzazione è stata generata manipolando le informazioni ottenute dal precedente processo di esecuzione denominato **"Raggruppamento dei Brand sulla base della CategoryCode e della Classe"**.
Una heatmap, come si può ben capire, è una visualizzazione grafica che rappresenta i dati in forma di matrice colorata, dove i colori sono utilizzati per evidenziare le differenze nella distribuzione dei dati.

Per ottenere tale risultato è stato necessario creare un `Campo Calcoalto`, chiamato `BrandXCategoryClass`, che consente di contare il numero di elementi separati da virgole nelle stringhe della colonna *"Brand"*.

La formula è definita come `IF ISNULL([Brand]) THEN 0 ELSE LEN([Brand]) - LEN(REPLACE([Brand], ',', '')) + 1 END`: 

- IF ISNULL([Brand]) THEN 0 ELSE ... END: verifica se il valore nella colonna è nullo. Se lo è, restituisce 0 come risultato. Altrimenti, passa alla parte successiva della formula.
- LEN([Brand]) - LEN(REPLACE([Brand], ',', '')): calcola la differenza tra la lunghezza totale della stringa e la lunghezza della stessa senza le virgole. In questo modo è possibile calcolare il numero di virgole presenti nella stringa.
    - E&grave; necessario aggiunge 1 al risultato ottenuto perché il numero totale di elementi separati da virgole in una stringa è uguale al numero di virgole più uno.

Dai risultati ottenuti, si può osservare una distribuzione dei brand basata sulla relazione tra le categorie di appartenenza e il loro criterio di classificazione. In particolare, concentriamoci sull'esempio degli "accessori". La distribuzione dei brand che vendono prodotti appartenenti alla categoria "accessori" mostra una netta prevalenza di prodotti a basso costo. Solo alcuni di questi brand sono classificati come fornitori di accessori "High-end brand", con un prezzo più elevato. Questa distribuzione riflette la situazione attuale del mercato degli accessori, in cui la maggioranza dei brand offre prodotti a prezzi contenuti. Se consideriamo invece la categoria "bicycle", notiamo una distribuzione abbastanza equilibrata tra i brand che offrono biciclette. Questo ci permette di comprendere la distribuzione dei brand, sia di nicchia che non, che forniscono prodotti specifici in una determinata categoria e a quale fascia di prezzo appartengono tali prodotti.


![tableau.PNG](images/tableau.PNG)