# Per iniziare clicca su *Cell* nella barra e poi clicca su *Run all below*

Inizializzazione del contesto Spark.

Nota:
> Il contesto si può lanciare una sola volta perché una volta caricato rimane online
> 
> Per lanciare le singole celle usa il tasto *Run*

In [500]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext("local", "FantaStats")
spark = SQLContext(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=FantaStats, master=local) created by __init__ at <ipython-input-2-7ae76705822f>:4 

Nel seguente blocco ci vanno le funzioni:
- `loadCSV` si usa per caricare i file e per normalizzare i nomi dei calciatori.
- `prefix` si usa per aggiungere un prefisso ai nomi dei campi in modo da riconoscerli

In [501]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

def loadCSV(sdf, filename):
    df = sdf.load(filename)
    return df

def prefix(sdf, prefix):
      for c in sdf.columns:
          sdf = sdf.withColumnRenamed(c, '{}{}'.format(prefix, c))
      return sdf

def showValues(sdf, field):
    sdf.select("c_Nome",field + " Pond", "_2018_"+field, "_2017_"+field, "_2016_"+field, "_2015_"+field, field+"W2018", field+"W2017", field+"W2016", field+"W2015").show()

def drop_fields(finalRoster, field):
    columns_to_drop = ["_2017_"+field, "_2016_"+field, "_2015_"+field]
    finalRoster = finalRoster.drop(*columns_to_drop)
    columns_to_drop = [field+"W2018", field+"W2017", field+"W2016", field+"W2015"]
    finalRoster = finalRoster.drop(*columns_to_drop)
    return finalRoster

def weight_mean(finalRoster, field):
    weight_no = 0
    weight_2018 = 5
    weight_2017 = 3
    weight_2016 = 2
    weight_2015 = 1
    
    finalRoster = finalRoster.withColumn(field+"W2018", when(col("_2018_"+field).isNull() | col("_2018_"+field).eqNullSafe(0), lit(weight_no)).otherwise(lit(weight_2018)))
    finalRoster = finalRoster.withColumn("_2018_"+field, when(col("_2018_"+field).isNull(), lit(weight_no)).otherwise(col("_2018_"+field)))

    finalRoster = finalRoster.withColumn(field+"W2017", when(col("_2017_"+field).isNull() | col("_2017_"+field).eqNullSafe(0), lit(weight_no)).otherwise(lit(weight_2017)))
    finalRoster = finalRoster.withColumn("_2017_"+field, when(col("_2017_"+field).isNull(), lit(weight_no)).otherwise(col("_2017_"+field)))

    finalRoster = finalRoster.withColumn(field+"W2016", when(col("_2016_"+field).isNull() | col("_2016_"+field).eqNullSafe(0), lit(weight_no)).otherwise(lit(weight_2016)))
    finalRoster = finalRoster.withColumn("_2016_"+field, when(col("_2016_"+field).isNull(), lit(weight_no)).otherwise(col("_2016_"+field)))

    finalRoster = finalRoster.withColumn(field+"W2015", when(col("_2015_"+field).isNull() | col("_2015_"+field).eqNullSafe(0), lit(weight_no)).otherwise(lit(weight_2015)))
    finalRoster = finalRoster.withColumn("_2015_"+field, when(col("_2015_"+field).isNull(), lit(weight_no)).otherwise(col("_2015_"+field)))

    finalRoster = finalRoster.withColumn(field+" Pond", (col("_2018_"+field)*col(field+"W2018") + col("_2017_"+field)*col(field+"W2017") + col("_2016_"+field)*col(field+"W2016") + col("_2015_"+field)*col(field+"W2015"))/(col(field+"W2018")+col(field+"W2017")+col(field+"W2016")+col(field+"W2015")))
    finalRoster = finalRoster.withColumn(field+" Pond", when(col(field+" Pond").isNull(), lit(0)).otherwise(lit(col(field+" Pond"))))
                                                                                       
    return finalRoster    

def weight_fields(finalRoster, field, name):
    
    finalRoster = weight_mean(finalRoster, field)
    finalRoster = drop_fields(finalRoster, field)

    finalRoster = finalRoster.withColumnRenamed("_2018_"+field, name)

    return finalRoster

Qui si caricano i file in memoria.

la variable `current_players` ha le informazioni relativamente ai giocatori della stagione *2019-2020* con la loro quotazione.

In [507]:
csvContext = spark.read.format("csv").option("header", "true").option("delimiter", ";")

players_2015 = prefix(loadCSV(csvContext, "Statistiche_Fantacalcio_2015-16.csv"), "_2015_")
players_2016 = prefix(loadCSV(csvContext, "Statistiche_Fantacalcio_2016-17.csv"), "_2016_")
players_2017 = prefix(loadCSV(csvContext, "Statistiche_Fantacalcio_2017-18.csv"), "_2017_")
players_2018 = prefix(loadCSV(csvContext, "Statistiche_Fantacalcio_2018-19.csv"), "_2018_")
current_players = prefix(loadCSV(csvContext, "Quotazioni_Fantacalcio.csv"), "c_")

Qui mettiamo in join tutti i file. 

I file vengono messi in `left join` con i giocatori della stagione corrente. 

Per questo motivo alcuni dei giocatori potrebbero non avere statistiche perché non hanno mai giocato nel campionato italiano.

In [508]:
current_2015 = current_players.join(players_2015, current_players.c_Nome == players_2015._2015_Nome, how = 'left')
current_2015_2016 = current_2015.join(players_2016, current_2015.c_Nome == players_2016._2016_Nome, how = 'left')
current_2015_2016_2017 = current_2015_2016.join(players_2017, current_2015_2016.c_Nome == players_2017._2017_Nome, how = 'left')
current_2015_2016_2017_2018 = current_2015_2016_2017.join(players_2018, current_2015_2016_2017.c_Nome == players_2018._2018_Nome, how = 'left')
current_2015_2016_2017_2018 = current_2015_2016_2017_2018.withColumn("NEW", when(col("_2018_Squadra").isNull(), lit("Y")).otherwise(lit("N")))

columns_to_drop = ["_2018_Nome", "_2017_Nome", "_2016_Nome", "_2015_Nome",
                   "_2018_R", "_2017_R", "_2016_R", "_2015_R",
                   "_2018_Id", "_2017_Id", "_2016_Id", "_2015_Id",
                   "_2018_Squadra", "_2017_Squadra", "_2016_Squadra", "_2015_Squadra",
                   "c_Id", "c_Qt. I", "c_Diff."]

finalRoster = current_2015_2016_2017_2018.drop(*columns_to_drop)

Qui è possible fare check intermedi

In [509]:
weight = weight_mean(finalRoster, "Au")
showValues(weight, "Au")

+-----------------+-------+--------+--------+--------+--------+-------+-------+-------+-------+
|           c_Nome|Au Pond|_2018_Au|_2017_Au|_2016_Au|_2015_Au|AuW2018|AuW2017|AuW2016|AuW2015|
+-----------------+-------+--------+--------+--------+--------+-------+-------+-------+-------+
|CRISTIANO RONALDO|    0.0|       0|       0|       0|       0|      0|      0|      0|      0|
|           PIATEK|    0.0|       0|       0|       0|       0|      0|      0|      0|      0|
|         IMMOBILE|    0.0|       0|       0|       0|       0|      0|      0|      0|      0|
|         LUKAKU R|    0.0|       0|       0|       0|       0|      0|      0|      0|      0|
|         ZAPATA D|    0.0|       0|       0|       0|       0|      0|      0|      0|      0|
|     QUAGLIARELLA|    0.0|       0|       0|       0|       0|      0|      0|      0|      0|
|           ICARDI|    1.0|       0|       0|       1|       0|      0|      0|      2|      0|
|            DZEKO|    0.0|       0|    

Una volta fatti i join, i dati vengono associati insieme a partire dal *2018-2019* al *2015-2016*.

In [510]:
finalRoster = weight_fields(finalRoster, "Mv", "Media Voto")
finalRoster = weight_fields(finalRoster, "Mf", "Media Fantavoto")
finalRoster = weight_fields(finalRoster, "Asf", "Assist da fermo")
finalRoster = weight_fields(finalRoster, "Ass", "Assist")
finalRoster = weight_fields(finalRoster, "Amm", "Amm")
finalRoster = weight_fields(finalRoster, "Au", "Autogoal")
finalRoster = weight_fields(finalRoster, "Esp", "Espulsioni")
finalRoster = weight_fields(finalRoster, "R+", "Rigori Segnati")
finalRoster = weight_fields(finalRoster, "R-", "Rigori Sbagliati")
finalRoster = weight_fields(finalRoster, "Pg", "Partite Giocate")
finalRoster = weight_fields(finalRoster, "Gf", "Goal Fatti")
finalRoster = weight_fields(finalRoster, "Gs", "Goal Subiti")
finalRoster = weight_fields(finalRoster, "Rp", "Rigori Parati")
finalRoster = weight_fields(finalRoster, "Rc", "Rigori Calciati")

finalRoster = finalRoster.withColumnRenamed("c_R", "Ruolo")
finalRoster = finalRoster.withColumnRenamed("c_Squadra", "Squadra")
finalRoster = finalRoster.withColumnRenamed("c_Nome", "Nome")
finalRoster = finalRoster.withColumnRenamed("c_Qt. A", "Quotazione")
finalRoster = finalRoster.withColumnRenamed("MF", "_2018_Mf")



Ogni ruolo ha dei campi che non interessano, per esempio gli assist per i portieri. Ricorda che devi sempre assegnare la variabile al risultato.

Qui filtriamo per ruolo ed eliminiamo i campi non interessanti.

I metodi disponibili li puoi trovare qui. Generalmente puoi usare *sql like* queries dentro i metodi, guarda per esempio `filter`: https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [None]:
finalRoster.write.format('csv').option("header", "true").save('fantacalcio-stats.csv')

In [None]:
columns_to_drop_keepers = ["Assist", "Rigori Sbagliati", "Ruolo", "Goal Fatti"]
keepers = finalRoster.filter("Ruolo == 'P'").drop(*columns_to_drop_keepers)

Per mostrare il risultato finale basta usare il metodo `display` e chiamare `.toPandas()` sul dataframe.

In [None]:
display(keepers.select("Nome", "New").toPandas())
#keepers.select("Au Pond", "Autogoal").show()