# Creación de datasets 

**Nota:** Por favor leer el archivo vaepReadMe.md para obtener más información incluyendo el orden en que se deben ejecutar los scripts

El siguiente script toma los csv originales, convierte a parquet los que usaran en los análisis en las jupyter notebooks y guarda en postgres los que se analizaran con superset.

In [1]:
import findspark

findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("Vaep-dataset-creation")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [3]:
spark.version

'2.4.5'

# Creación de base de datos en archivos parquet

### Partidos jugados liga española, inglesa y francesa

In [4]:
#Define el directorio de origen (donde están los csv) y de destino incluyendo el nombre del archivo parquet

source = '/dataset/vaep/matches/'
output = '/dataset/vaep/parquet/matches.parquet'

In [5]:
esp = spark.read.csv(source + 'matches_Spain' +'.csv', header=True)
eng = spark.read.csv(source + 'matches_England' +'.csv', header=True)
ita = spark.read.csv(source + 'matches_Italy' +'.csv', header=True)

In [6]:
matches=esp.union(eng.union(ita))

In [7]:
matches.show(2)

+------+-------+--------+--------------------+--------+-------------------+------+--------------------+-------+--------------------+--------------------+--------------------+--------+-------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+
|status|roundId|gameweek|           teamsData|seasonId|            dateutc|winner|               venue|   wyId|               label|                date|            referees|duration|competitionId|team1.scoreET|team1.coachId|team1.side|team1.teamId|team1.score|team1.scoreP|team1.hasFormation|     team1.formation|team1.scoreHT|team1.formation.bench|team1.formation.lineup|team1.formation.s

In [8]:
matches.printSchema()

root
 |-- status: string (nullable = true)
 |-- roundId: string (nullable = true)
 |-- gameweek: string (nullable = true)
 |-- teamsData: string (nullable = true)
 |-- seasonId: string (nullable = true)
 |-- dateutc: string (nullable = true)
 |-- winner: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- wyId: string (nullable = true)
 |-- label: string (nullable = true)
 |-- date: string (nullable = true)
 |-- referees: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- competitionId: string (nullable = true)
 |-- team1.scoreET: string (nullable = true)
 |-- team1.coachId: string (nullable = true)
 |-- team1.side: string (nullable = true)
 |-- team1.teamId: string (nullable = true)
 |-- team1.score: string (nullable = true)
 |-- team1.scoreP: string (nullable = true)
 |-- team1.hasFormation: string (nullable = true)
 |-- team1.formation: string (nullable = true)
 |-- team1.scoreHT: string (nullable = true)
 |-- team1.formation.bench: string (nullable

In [9]:
#Define una función para renombrar las columnas del dataset

def rename_cols(df, names):
  for row in names: 
     df = df.withColumnRenamed(row[0],row[1])
  return df

In [10]:
#Crea una lista de listas con el nombre original y el nombre nuevo de la columna

names=[
['team1.scoreET','team1_scoreET'],
['team1.coachId','team1_coachId'],
['team1.side','team1_side'],
['team1.teamId','team1_teamId'],
['team1.score','team1_score'],
['team1.scoreP','team1_scoreP'],
['team1.hasFormation','team1_hasFormation'],
['team1.formation','team1_formation'],
['team1.scoreHT','team1_scoreHT'],
['team1.formation.bench','team1_formation_bench'],
['team1.formation.lineup','team1_formation_lineup'],
['team1.formation.substitutions','team1_formation_substitutions'],
['team2.scoreET','team2_scoreET'],
['team2.coachId','team2_coachId'],
['team2.side','team2_side'],
['team2.teamId','team2_teamId'],
['team2.score','team2_score'],
['team2.scoreP','team2_scoreP'],
['team2.hasFormation','team2_hasFormation'],
['team2.formation','team2_formation'],
['team2.scoreHT','team2_scoreHT'],
['team2.formation.bench','team2_formation_bench'],
['team2.formation.lineup','team2_formation_lineup'],
['team2.formation.substitutions','team2_formation_substitutions']    
]

In [11]:
# Cambia los nombres

matches=rename_cols(matches, names)

In [12]:
matches.printSchema()

root
 |-- status: string (nullable = true)
 |-- roundId: string (nullable = true)
 |-- gameweek: string (nullable = true)
 |-- teamsData: string (nullable = true)
 |-- seasonId: string (nullable = true)
 |-- dateutc: string (nullable = true)
 |-- winner: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- wyId: string (nullable = true)
 |-- label: string (nullable = true)
 |-- date: string (nullable = true)
 |-- referees: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- competitionId: string (nullable = true)
 |-- team1_scoreET: string (nullable = true)
 |-- team1_coachId: string (nullable = true)
 |-- team1_side: string (nullable = true)
 |-- team1_teamId: string (nullable = true)
 |-- team1_score: string (nullable = true)
 |-- team1_scoreP: string (nullable = true)
 |-- team1_hasFormation: string (nullable = true)
 |-- team1_formation: string (nullable = true)
 |-- team1_scoreHT: string (nullable = true)
 |-- team1_formation_bench: string (nullable

In [13]:
from pyspark.sql.types import DoubleType, IntegerType , StringType, TimestampType, DateType, FloatType

In [14]:
#Crea una función para definir el tipo de dato en la variable

def cast_cols(df, cols, new_type):
  for col in cols: 
     df = df.withColumn(col, df[col].cast(new_type()))
  return df

In [15]:
#Crea una lista por tipo de variable con el nombre de las columnas que corresponden a dicho tipo

stringCols=['status','teamsData','venue','label','referees','duration',
            'team1_side','team1_formation','team1_formation_bench','team1_formation_lineup','team1_formation_substitutions',
           'team2_side','team2_formation','team2_formation_bench','team2_formation_lineup','team2_formation_substitutions']

dateCols=['date']

timeStampCols=['dateutc']

integerCols=['roundId','gameweek','seasonId','winner','wyId','competitionId','team1_scoreET','team1_coachId','team1_teamId',
            'team1_score','team1_scoreP','team1_hasFormation','team1_scoreHT','team2_scoreET','team2_coachId','team2_teamId',
            'team2_score','team2_scoreP','team2_hasFormation','team2_scoreHT']





In [16]:
# Aplica la función para cambiar el tipo de datos

matches = cast_cols(matches, stringCols, StringType)
matches = cast_cols(matches, dateCols, DateType)
matches = cast_cols(matches, timeStampCols, TimestampType)
matches = cast_cols(matches, integerCols, IntegerType)

In [17]:
matches.printSchema()

root
 |-- status: string (nullable = true)
 |-- roundId: integer (nullable = true)
 |-- gameweek: integer (nullable = true)
 |-- teamsData: string (nullable = true)
 |-- seasonId: integer (nullable = true)
 |-- dateutc: timestamp (nullable = true)
 |-- winner: integer (nullable = true)
 |-- venue: string (nullable = true)
 |-- wyId: integer (nullable = true)
 |-- label: string (nullable = true)
 |-- date: date (nullable = true)
 |-- referees: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- competitionId: integer (nullable = true)
 |-- team1_scoreET: integer (nullable = true)
 |-- team1_coachId: integer (nullable = true)
 |-- team1_side: string (nullable = true)
 |-- team1_teamId: integer (nullable = true)
 |-- team1_score: integer (nullable = true)
 |-- team1_scoreP: integer (nullable = true)
 |-- team1_hasFormation: integer (nullable = true)
 |-- team1_formation: string (nullable = true)
 |-- team1_scoreHT: integer (nullable = true)
 |-- team1_formation_bench: st

In [18]:
matches.show(2)

+------+-------+--------+--------------------+--------+-------------------+------+--------------------+-------+--------------------+----+--------------------+--------+-------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+
|status|roundId|gameweek|           teamsData|seasonId|            dateutc|winner|               venue|   wyId|               label|date|            referees|duration|competitionId|team1_scoreET|team1_coachId|team1_side|team1_teamId|team1_score|team1_scoreP|team1_hasFormation|     team1_formation|team1_scoreHT|team1_formation_bench|team1_formation_lineup|team1_formation_substitutions|team2_scoreET|team2

In [19]:
matches.count()

1140

In [20]:
# Crea la base de datos parquet particionada por el id de la competición y el id del partido

matches.write.mode('overwrite').partitionBy("competitionId", "wyId").parquet(output)

In [21]:
#Las siguientes sentencias simplemente revisan que la base se pueda leer correctamente

df_parquet = spark.read.parquet(output)

In [22]:
df_parquet.count()

1140

In [23]:
df_parquet.createOrReplaceTempView("aux")

In [24]:
query='''
SELECT * 
from aux
limit 3
'''

In [25]:
result = spark.sql(query)
result.show()

+------+-------+--------+--------------------+--------+-------------------+------+--------------------+--------------------+----+--------------------+--------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------------+----------+------------+-----------+------------+------------------+--------------------+-------------+---------------------+----------------------+-----------------------------+-------------+-------+
|status|roundId|gameweek|           teamsData|seasonId|            dateutc|winner|               venue|               label|date|            referees|duration|team1_scoreET|team1_coachId|team1_side|team1_teamId|team1_score|team1_scoreP|team1_hasFormation|     team1_formation|team1_scoreHT|team1_formation_bench|team1_formation_lineup|team1_formation_substitutions|team2_scoreET|team2_coachId|team2_side|te

### Eventos del partido evaluados 

In [26]:
#Define el directorio de origen (donde están los csv) y de destino incluyendo el nombre del archivo parquet

source = '/dataset/vaep/analysisOutcome/'
output = '/dataset/vaep/parquet/match_events.parquet'

In [27]:
esp = spark.read.csv(source + 'eventsPerMatchEsp' +'.csv', header=True)
eng = spark.read.csv(source + 'eventsPerMatchEng' +'.csv', header=True)
ita = spark.read.csv(source + 'eventsPerMatchIta' +'.csv', header=True)

In [28]:
events=esp.union(eng.union(ita))

In [29]:
events.printSchema()

root
 |-- action_id: string (nullable = true)
 |-- game_id: string (nullable = true)
 |-- period_id: string (nullable = true)
 |-- time_seconds: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- player_id: string (nullable = true)
 |-- start_x: string (nullable = true)
 |-- start_y: string (nullable = true)
 |-- end_x: string (nullable = true)
 |-- end_y: string (nullable = true)
 |-- bodypart_id: string (nullable = true)
 |-- type_id: string (nullable = true)
 |-- result_id: string (nullable = true)
 |-- type_name: string (nullable = true)
 |-- result_name: string (nullable = true)
 |-- bodypart_name: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- short_team_name: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- scores: string (nullable = true)
 |-- concedes: string (nullable = true)
 |-- offensiv

In [30]:
#Crea una lista por tipo de variable con el nombre de las columnas que corresponden a dicho tipo

stringCols=['type_name','result_name','bodypart_name','short_name','first_name','last_name','short_team_name','team_name']

dateCols=['birth_date']

floatCols=['time_seconds','start_x','start_y','end_x','end_y','scores','concedes','offensive_value','defensive_value','vaep_value']

integerCols=['action_id','game_id','period_id','team_id','player_id','bodypart_id','type_id','result_id']


In [31]:
# Aplica la función para cambiar el tipo de datos

events = cast_cols(events, stringCols, StringType)
events = cast_cols(events, dateCols, DateType)
events = cast_cols(events, floatCols, FloatType)
events = cast_cols(events, integerCols, IntegerType)

In [32]:
events.printSchema()

root
 |-- action_id: integer (nullable = true)
 |-- game_id: integer (nullable = true)
 |-- period_id: integer (nullable = true)
 |-- time_seconds: float (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- player_id: integer (nullable = true)
 |-- start_x: float (nullable = true)
 |-- start_y: float (nullable = true)
 |-- end_x: float (nullable = true)
 |-- end_y: float (nullable = true)
 |-- bodypart_id: integer (nullable = true)
 |-- type_id: integer (nullable = true)
 |-- result_id: integer (nullable = true)
 |-- type_name: string (nullable = true)
 |-- result_name: string (nullable = true)
 |-- bodypart_name: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- short_team_name: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- scores: float (nullable = true)
 |-- concedes: float (nullable = true)
 |-- offensive

In [33]:
events.show(2)

+---------+-------+---------+------------+-------+---------+-------+-------+-----+-----+-----------+-------+---------+---------+-----------+-------------+----------+----------+---------------+----------+---------------+--------------------+------------+------------+---------------+---------------+------------+
|action_id|game_id|period_id|time_seconds|team_id|player_id|start_x|start_y|end_x|end_y|bodypart_id|type_id|result_id|type_name|result_name|bodypart_name|short_name|first_name|      last_name|birth_date|short_team_name|           team_name|      scores|    concedes|offensive_value|defensive_value|  vaep_value|
+---------+-------+---------+------------+-------+---------+-------+-------+-----+-----+-----------+-------+---------+---------+-----------+-------------+----------+----------+---------------+----------+---------------+--------------------+------------+------------+---------------+---------------+------------+
|        0|2565548|        1|    2.994582|    682|     3542|  66

In [34]:
events.count()

1454382

In [35]:
# Crea la base de datos parquet particionada por el id del partido

events.write.mode('overwrite').partitionBy("game_id").parquet(output)

In [36]:
#Las siguientes sentencias simplemente revisan que la base se pueda leer correctamente

df_parquet = spark.read.parquet(output)

In [37]:
df_parquet.count()

1454382

In [38]:
df_parquet.createOrReplaceTempView("aux")

In [39]:
query='''
SELECT * 
from aux
limit 3
'''

In [40]:
result = spark.sql(query)
result.show()

+---------+---------+------------+-------+---------+-------+-------+-----+-----+-----------+-------+---------+---------+-----------+-------------+--------------+----------+--------------------+----------+---------------+--------------------+------------+------------+---------------+---------------+-----------+-------+
|action_id|period_id|time_seconds|team_id|player_id|start_x|start_y|end_x|end_y|bodypart_id|type_id|result_id|type_name|result_name|bodypart_name|    short_name|first_name|           last_name|birth_date|short_team_name|           team_name|      scores|    concedes|offensive_value|defensive_value| vaep_value|game_id|
+---------+---------+------------+-------+---------+-------+-------+-----+-----+-----------+-------+---------+---------+-----------+-------------+--------------+----------+--------------------+----------+---------------+--------------------+------------+------------+---------------+---------------+-----------+-------+
|        0|        1|    1.984215|   316

# Creación de base de datos en postgres

### Resultado del análisis VAEP por jugador por acción

In [41]:
#Define el directorio de origen (donde están los csv)

source = '/dataset/vaep/analysisOutcome/'

In [42]:
esp = spark.read.csv(source + 'actionRatingAnalysisEsp' +'.csv', header=True)
eng = spark.read.csv(source + 'actionRatingAnalysisEng' +'.csv', header=True)
ita = spark.read.csv(source + 'actionRatingAnalysisIta' +'.csv', header=True)

In [43]:
action_analysis_per_player=esp.union(eng.union(ita))

In [44]:
action_analysis_per_player.show(2)

+---------+------------+--------------------+--------------------+--------------+------------+-----+-------------------+----+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-------------+------------+------+-------+--------+--------------------+------------------+
|player_id|  short_name|           team_name|           clearance|corner_crossed|corner_short|cross|            dribble|foul|freekick_crossed|      freekick_short|            goalkick|        interception|         keeper_save|                pass|shot|shot_freekick|shot_penalty|tackle|take_on|throw_in|               total|    minutes_played|
+---------+------------+--------------------+--------------------+--------------+------------+-----+-------------------+----+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+-------------+------------+------+-------+--------+---------

In [45]:
action_analysis_per_player.printSchema()

root
 |-- player_id: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- clearance: string (nullable = true)
 |-- corner_crossed: string (nullable = true)
 |-- corner_short: string (nullable = true)
 |-- cross: string (nullable = true)
 |-- dribble: string (nullable = true)
 |-- foul: string (nullable = true)
 |-- freekick_crossed: string (nullable = true)
 |-- freekick_short: string (nullable = true)
 |-- goalkick: string (nullable = true)
 |-- interception: string (nullable = true)
 |-- keeper_save: string (nullable = true)
 |-- pass: string (nullable = true)
 |-- shot: string (nullable = true)
 |-- shot_freekick: string (nullable = true)
 |-- shot_penalty: string (nullable = true)
 |-- tackle: string (nullable = true)
 |-- take_on: string (nullable = true)
 |-- throw_in: string (nullable = true)
 |-- total: string (nullable = true)
 |-- minutes_played: string (nullable = true)



In [46]:
#Crea una lista por tipo de variable con el nombre de las columnas que corresponden a dicho tipo

stringCols=['short_name','team_name']

floatCols=['clearance','corner_crossed','corner_short','cross','dribble','foul','freekick_crossed','freekick_short','goalkick',
          'interception','keeper_save','pass','shot','shot_freekick','shot_penalty','tackle','take_on','throw_in',
           'total','minutes_played']

integerCols=['player_id']


In [47]:
# Aplica la función para cambiar el tipo de datos

action_analysis_per_player = cast_cols(action_analysis_per_player, stringCols, StringType)
action_analysis_per_player = cast_cols(action_analysis_per_player, floatCols, FloatType)
action_analysis_per_player = cast_cols(action_analysis_per_player, integerCols, IntegerType)

In [48]:
action_analysis_per_player.printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- short_name: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- clearance: float (nullable = true)
 |-- corner_crossed: float (nullable = true)
 |-- corner_short: float (nullable = true)
 |-- cross: float (nullable = true)
 |-- dribble: float (nullable = true)
 |-- foul: float (nullable = true)
 |-- freekick_crossed: float (nullable = true)
 |-- freekick_short: float (nullable = true)
 |-- goalkick: float (nullable = true)
 |-- interception: float (nullable = true)
 |-- keeper_save: float (nullable = true)
 |-- pass: float (nullable = true)
 |-- shot: float (nullable = true)
 |-- shot_freekick: float (nullable = true)
 |-- shot_penalty: float (nullable = true)
 |-- tackle: float (nullable = true)
 |-- take_on: float (nullable = true)
 |-- throw_in: float (nullable = true)
 |-- total: float (nullable = true)
 |-- minutes_played: float (nullable = true)



In [49]:
action_analysis_per_player.show(2)

+---------+------------+--------------------+-----------+--------------+------------+-----+----------+----+----------------+--------------+------------+-------------+------------+-----------+----+-------------+------------+------+-------+--------+-----------+--------------+
|player_id|  short_name|           team_name|  clearance|corner_crossed|corner_short|cross|   dribble|foul|freekick_crossed|freekick_short|    goalkick| interception| keeper_save|       pass|shot|shot_freekick|shot_penalty|tackle|take_on|throw_in|      total|minutes_played|
+---------+------------+--------------------+-----------+--------------+------------+-----+----------+----+----------------+--------------+------------+-------------+------------+-----------+----+-------------+------------+------+-------+--------+-----------+--------------+
|       33|J. Cillessen|        FC Barcelona|0.011196077|           0.0|         0.0|  0.0|0.08924883| 0.0|             0.0|           0.0| 0.022523472| 4.0703794E-4|-0.022269

In [50]:
action_analysis_per_player.count()

1650

In [51]:
# Crea la base de datos en postgres

action_analysis_per_player \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/workshop") \
    .option("dbtable", "workshop.action_analysis_per_player") \
    .option("user", "workshop") \
    .option("password", "w0rkzh0p") \
    .option("driver", "org.postgresql.Driver") \
    .mode('overwrite') \
    .save()


### Resultado del análisis VAEP por jugador (Total y normalizado)

In [52]:
#Define el directorio de origen (donde están los csv)

source = '/dataset/vaep/analysisOutcome/'

In [53]:
esp = spark.read.csv(source + 'rankingAnalysisEsp' +'.csv', header=True)
eng = spark.read.csv(source + 'rankingAnalysisEng' +'.csv', header=True)
ita = spark.read.csv(source + 'rankingAnalysisIta' +'.csv', header=True)

In [54]:
vaep_rating_per_player=esp.union(eng.union(ita))

In [55]:
vaep_rating_per_player.show(2)

+---------+----------+-------+--------------------+----------+--------------------+------------------+------------------+--------------------+------------------+-------------------+------------------+------------------+-----------------+
|player_id|short_name|team_id|           team_name|vaep_count|           vaep_mean|          vaep_sum|vaep_sum_Offensive| vaep_mean_Offensive|vaep_sum_defensive|vaep_mean_defensive|    minutes_played|       vaep_rating|      actions_p90|
+---------+----------+-------+--------------------+----------+--------------------+------------------+------------------+--------------------+------------------+-------------------+------------------+------------------+-----------------+
|   3359.0|  L. Messi|  676.0|        FC Barcelona|      2753|0.013302911959650067|36.622916624916634| 38.90842350529312|0.014133099711330593|         -2.285507|     -0.00083018775|3486.5457777666666|0.9453661894420371|71.06460542695383|
|   3840.0|Iago Aspas|  692.0|Real Club Celta d.

In [56]:
vaep_rating_per_player.printSchema()

root
 |-- player_id: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- team_id: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- vaep_count: string (nullable = true)
 |-- vaep_mean: string (nullable = true)
 |-- vaep_sum: string (nullable = true)
 |-- vaep_sum_Offensive: string (nullable = true)
 |-- vaep_mean_Offensive: string (nullable = true)
 |-- vaep_sum_defensive: string (nullable = true)
 |-- vaep_mean_defensive: string (nullable = true)
 |-- minutes_played: string (nullable = true)
 |-- vaep_rating: string (nullable = true)
 |-- actions_p90: string (nullable = true)



In [57]:
#Crea una lista por tipo de variable con el nombre de las columnas que corresponden a dicho tipo

stringCols=['short_name','team_name']

floatCols=['vaep_mean','vaep_sum','vaep_sum_Offensive','vaep_mean_Offensive','vaep_sum_defensive','minutes_played',
          'vaep_rating','actions_p90','vaep_mean_defensive']

integerCols=['player_id','team_id','vaep_count']


In [58]:
# Aplica la función para cambiar el tipo de datos

vaep_rating_per_player = cast_cols(vaep_rating_per_player, stringCols, StringType)
vaep_rating_per_player = cast_cols(vaep_rating_per_player, floatCols, FloatType)
vaep_rating_per_player = cast_cols(vaep_rating_per_player, integerCols, IntegerType)

In [59]:
vaep_rating_per_player.printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- short_name: string (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- team_name: string (nullable = true)
 |-- vaep_count: integer (nullable = true)
 |-- vaep_mean: float (nullable = true)
 |-- vaep_sum: float (nullable = true)
 |-- vaep_sum_Offensive: float (nullable = true)
 |-- vaep_mean_Offensive: float (nullable = true)
 |-- vaep_sum_defensive: float (nullable = true)
 |-- vaep_mean_defensive: float (nullable = true)
 |-- minutes_played: float (nullable = true)
 |-- vaep_rating: float (nullable = true)
 |-- actions_p90: float (nullable = true)



In [60]:
vaep_rating_per_player.show(2)

+---------+----------+-------+--------------------+----------+-----------+---------+------------------+-------------------+------------------+-------------------+--------------+-----------+-----------+
|player_id|short_name|team_id|           team_name|vaep_count|  vaep_mean| vaep_sum|vaep_sum_Offensive|vaep_mean_Offensive|vaep_sum_defensive|vaep_mean_defensive|minutes_played|vaep_rating|actions_p90|
+---------+----------+-------+--------------------+----------+-----------+---------+------------------+-------------------+------------------+-------------------+--------------+-----------+-----------+
|     3359|  L. Messi|    676|        FC Barcelona|      2753|0.013302912|36.622917|         38.908424|       0.0141330995|         -2.285507|      -8.3018775E-4|      3486.546|  0.9453662|  71.064606|
|     3840|Iago Aspas|    692|Real Club Celta d...|      1771|0.011590924|20.527527|         20.759287|        0.011721788|       -0.23176043|      -1.3086417E-4|     3093.3923|  0.5972335|   

In [61]:
vaep_rating_per_player.count()

1650

In [62]:
# Crea la base de datos en postgres

vaep_rating_per_player \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/workshop") \
    .option("dbtable", "workshop.vaep_rating_per_player") \
    .option("user", "workshop") \
    .option("password", "w0rkzh0p") \
    .option("driver", "org.postgresql.Driver") \
    .mode('overwrite') \
    .save()

### Resultado del análisis VAEP por jugador (toma de riesgo)

In [63]:
#Define el directorio de origen (donde están los csv)

source = '/dataset/vaep/analysisOutcome/'

In [64]:
esp = spark.read.csv(source + 'riskAnalysisEsp' +'.csv', header=True)
eng = spark.read.csv(source + 'riskAnalysisEng' +'.csv', header=True)
ita = spark.read.csv(source + 'riskAnalysisIta' +'.csv', header=True)

In [65]:
vaep_risk_per_player=esp.union(eng.union(ita))

In [66]:
vaep_risk_per_player.show(2)

+---------+------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|player_id|  short_name|           team_name|             success|                fail|    minutes_played|         success_sum|            fail_sum|    vaep_risk_rating|
+---------+------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|     33.0|J. Cillessen|        FC Barcelona|0.013786335304679273| -0.1408459140443113|         92.903192|0.014231050619855523|-0.14538927772082388|-0.12705957873963203|
|     99.0|    P. Tytoń|Real Club Deporti...|-0.34220138178785153|-0.00364457954232...|103.09706778333333| -0.3919995450414717|-0.00417494960129261| -0.3458459613301717|
+---------+------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--

In [67]:
vaep_risk_per_player.printSchema()

root
 |-- player_id: string (nullable = true)
 |-- short_name: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- success: string (nullable = true)
 |-- fail: string (nullable = true)
 |-- minutes_played: string (nullable = true)
 |-- success_sum: string (nullable = true)
 |-- fail_sum: string (nullable = true)
 |-- vaep_risk_rating: string (nullable = true)



In [68]:
#Crea una lista por tipo de variable con el nombre de las columnas que corresponden a dicho tipo

stringCols=['short_name','team_name']

floatCols=['success','fail','minutes_played','success_sum','fail_sum','vaep_risk_rating']

integerCols=['player_id']

In [69]:
# Aplica la función para cambiar el tipo de datos

vaep_risk_per_player = cast_cols(vaep_risk_per_player, stringCols, StringType)
vaep_risk_per_player = cast_cols(vaep_risk_per_player, floatCols, FloatType)
vaep_risk_per_player = cast_cols(vaep_risk_per_player, integerCols, IntegerType)

In [70]:
vaep_risk_per_player.printSchema()

root
 |-- player_id: integer (nullable = true)
 |-- short_name: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- success: float (nullable = true)
 |-- fail: float (nullable = true)
 |-- minutes_played: float (nullable = true)
 |-- success_sum: float (nullable = true)
 |-- fail_sum: float (nullable = true)
 |-- vaep_risk_rating: float (nullable = true)



In [71]:
vaep_risk_per_player.show(2)

+---------+------------+--------------------+------------+-------------+--------------+-----------+-------------+----------------+
|player_id|  short_name|           team_name|     success|         fail|minutes_played|success_sum|     fail_sum|vaep_risk_rating|
+---------+------------+--------------------+------------+-------------+--------------+-----------+-------------+----------------+
|       33|J. Cillessen|        FC Barcelona|0.0137863355|  -0.14084591|      92.90319| 0.01423105|  -0.14538927|     -0.12705958|
|       99|    P. Tytoń|Real Club Deporti...| -0.34220138|-0.0036445796|     103.09707|-0.39199954|-0.0041749496|     -0.34584597|
+---------+------------+--------------------+------------+-------------+--------------+-----------+-------------+----------------+
only showing top 2 rows



In [72]:
vaep_risk_per_player.count()

1647

In [73]:
# Crea la base de datos en postgres

vaep_risk_per_player \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/workshop") \
    .option("dbtable", "workshop.vaep_risk_per_player") \
    .option("user", "workshop") \
    .option("password", "w0rkzh0p") \
    .option("driver", "org.postgresql.Driver") \
    .mode('overwrite') \
    .save()

In [74]:
spark.stop()