# Esercitazione 8

## Esercizio 3
###Analisi dati con Spark RDD
Il dataset `prodotti` contiene i dati dei prodotti in vendita:
- un identificativo univoco (`item_id`)
- la categoria del prodotto (`category`)
- il prezzo del singolo prodotto (`price`)

Il dataset `venditori` contiene i dati dei venditori:
- un identificativo univoco (`seller_id`)
- il tipo del venditore (`seller_type`), che può essere First-Party o Third-Party

Il dataset `transazioni` contiene i dati delle vendite effettuate:
- id univoco della transazione (`transaction_id`)
- id univoco del prodotto venduto (`item_id`)
- id univoco del venditore (`seller_id`)
- anno, mese e giorno della transazione (`year`, `month` e `day`)
- quantità di prodotti venduti (`num_items`)

In [None]:
import os

!apt-get upgrade -y -qq > /dev/null

# Install JDK 11
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-11-openjdk-amd64"
# Install Spark 3.4.0
!curl -O https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar xf spark-3.4.0-bin-hadoop3.tgz
!ln -s spark-3.4.0-bin-hadoop3 spark
!rm -f *.tgz
os.environ["SPARK_HOME"] = "/content/spark"
#Install findspark using pip to make pyspark importable as regular library
!pip -q install findspark
import findspark
findspark.init()

#importing pyspark
import pyspark
#importing sparksession
from pyspark.sql import SparkSession

#creating a sparksession object and providing appName
spark=SparkSession.builder.appName("local").getOrCreate()
sc = spark.sparkContext

In [None]:
transazioni = spark.read.csv('/content/drive/MyDrive/data/vendite/transazioni.csv', header=True, inferSchema=True).rdd.cache()
prodotti = spark.read.csv('/content/drive/MyDrive/data/vendite/prodotti.csv', header=True, inferSchema=True).rdd.cache()
venditori = spark.read.csv('/content/drive/MyDrive/data/vendite/venditori.csv', header=True, inferSchema=True).rdd.cache()

In [None]:
transazioni.take(3)

In [None]:
prodotti.take(3)

In [None]:
venditori.take(3)

1. Per ogni mese, restituire la quantità venduta (`num_items`) massima in una singola transazione.

In [None]:
month_num_items = transazioni.map(lambda row: (row.month, row.num_items))
month_num_items.reduceByKey(max).sortByKey().collect()


2. Calcolare il ricavo ottenuto da ogni transazione, moltiplicando il prezzo del prodotto e la quantità venduta.

In [None]:
# Creiamo i Pair RDD con item_id come chiave
item_id_prodotti = prodotti.map(lambda row: (row.item_id, row))
item_id_transazioni = transazioni.map(lambda row: (row.item_id, row))
# Effettuiamo il join e manteniamo solo i valori
prodotti_transazioni = item_id_prodotti.join(item_id_transazioni).values().cache()
# Calcoliamo il ricavo per ogni transazione
def calc_ricavi(prod_tr_row):
    prodotti_row, transazioni_row = prod_tr_row
    ricavo = prodotti_row.price * transazioni_row.num_items

    value = {'prodotto': prodotti_row, 'transazione': transazioni_row, 'ricavo': ricavo}
    return value
prod_tr_ricavi = prodotti_transazioni.map(calc_ricavi)
prod_tr_ricavi.take(3)


3. Calcolare il ricavo massimo ottenuto da ciascun venditore.

In [None]:
# Creiamo un Pair RDD del tipo (seller_id, ricavo)
seller_id_ricavi = prod_tr_ricavi.map(lambda x: (x['transazione'].seller_id, x['ricavo']))
# Calcoliamo il ricavo massimo per ciascun venditore
seller_id_ricavi.reduceByKey(max).sortByKey().collect()


4. Calcolare il ricavo totale ottenuto da ciascun venditore.

In [None]:
# Calcoliamo il ricavo totale per ciascun venditore
ricavi_totali_venditori = seller_id_ricavi.reduceByKey(lambda x, y: x + y)
# Ordiniamo le chiavi e arrotondiamo i valori
ricavi_totali_venditori.mapValues(lambda x: round(x, 2)).sortByKey().collect()


5. Calcolare il ricavo medio ottenuto da ciascun venditore.

In [None]:
# Trasformazioni per ordinare un generico Pair RDD: mapValues + reduceByKey + mapValues
def avg_by_key(pair_rdd):
    return pair_rdd.mapValues(lambda x: (x, 1)).reduceByKey(sum_values_counters).mapValues(calc_avg)

def sum_values_counters(x, y):
    value1, count1 = x
    value2, count2 = y
    return value1 + value2, count1 + count2

def calc_avg(sum_values_counter):
    sum_values, counter = sum_values_counter
    return sum_values / counter

ricavi_medi = avg_by_key(seller_id_ricavi).mapValues(lambda x: round(x, 2))
ricavi_medi.sortByKey().collect()


6. Ordinare i risultati ottenuti nell'esercizio precedente in base al ricavo medio, in modo decrescente.

In [None]:
ricavi_medi.sortBy(lambda x: x[1], ascending=False).collect()

7. Calcolare i 10 venditori che hanno ottenuto la somma di ricavi maggiore nel mese di Maggio del 2009.


In [None]:
# Filtriamo l'RDD per ottenere solo le transazioni di Maggio 2009
# Creiamo un Pair RDD del tipo (seller_id, ricavo)
# Calcoliamo la somma dei ricavi per ogni chiave (cioè per ogni venditore)
def get_maggio2019(x):
    return x['transazione'].month == 5 and x['transazione'].year == 2009

seller_ricavi_maggio = (prod_tr_ricavi
                            .filter(get_maggio2019)
                            .map(lambda x: (x['transazione'].seller_id, x['ricavo']))
                            .reduceByKey(lambda x, y: x + y))

# Utilizziamo top per ottenere i 10 venditori con la somma di ricavi maggiore
seller_ricavi_maggio.top(10, key=lambda x: x[1])

8. Calcolare il ricavo medio ottenuto dai venditori "First-Party" per ogni categoria di prodotto.

In [None]:
# Le informazioni che ci servono sono le seguenti:
# `seller_type`, che si trova nell'RDD `venditori`
# `ricavo`, che si trova nell'RDD `prod_tr_ricavi`
# `category`, che si trova nell'RDD `prod_tr_ricavi`

# Filtriamo i venditori mantenendo solo quelli First-Party
venditori_first = venditori.filter(lambda row: row.seller_type == 'First-Party')
# Facciamo il join con `prod_tr_ricavi` in base al `seller_id`
seller_id_venditori_first = venditori_first.map(lambda row: (row.seller_id, row))
seller_id_prod_tr_ricavi = prod_tr_ricavi.map(lambda x: (x['transazione'].seller_id, x))
seller_prod_tr_ricavi = seller_id_venditori_first.join(seller_id_prod_tr_ricavi).values().cache()
# Creiamo il pair rdd `(categoria, ricavo)`
categoria_ricavo = seller_prod_tr_ricavi.values().map(lambda x: (x['prodotto'].category, x['ricavo']))
# Calcoliamo la media per ogni categoria
(avg_by_key(categoria_ricavo)               # calcoliamo la media
     .mapValues(lambda x: round(x, 2))      # arrotondiamo i valori
     .sortByKey()                           # ordiniamo in base alla chiave
     .collectAsMap())                       # otteniamo i risultati come dizionario