In [1]:
import findspark

findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when,col
spark = (
    SparkSession.builder
    .appName("Stocks:ETL")
    .config("spark.driver.memory", "512m")
    .config("spa|rk.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [3]:
spark.version

'2.4.5'

In [4]:
futbol_dir = '/dataset/futbol'

In [5]:
import sys

from pyspark.sql import SparkSession

# UDF
from pyspark.sql.types import StringType
#
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [6]:
import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s-%(name)s-%(levelname)s: %(message)s',
    handlers=[logging.FileHandler('/dataset/seleccion.log'), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)

In [7]:
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv(futbol_dir)

In [8]:
df.count()
df.printSchema()

root
 |-- Rk: integer (nullable = true)
 |-- Player: string (nullable = true)
 |-- Nation: string (nullable = true)
 |-- Pos: string (nullable = true)
 |-- Squad: string (nullable = true)
 |-- Comp: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Born: integer (nullable = true)
 |-- MP: integer (nullable = true)
 |-- Starts: integer (nullable = true)
 |-- Min: integer (nullable = true)
 |-- 90s: double (nullable = true)
 |-- Goals: double (nullable = true)
 |-- Shots: double (nullable = true)
 |-- SoT: double (nullable = true)
 |-- SoT%: double (nullable = true)
 |-- G/Sh: double (nullable = true)
 |-- G/SoT: double (nullable = true)
 |-- ShoDist: double (nullable = true)
 |-- ShoFK: double (nullable = true)
 |-- ShoPK: double (nullable = true)
 |-- PKatt: double (nullable = true)
 |-- PasTotCmp: double (nullable = true)
 |-- PasTotAtt: double (nullable = true)
 |-- PasTotCmp%: double (nullable = true)
 |-- PasTotDist: double (nullable = true)
 |-- PasTotPrgDist: doub

In [9]:
#armo DF solo con argentina (luego armar con otros paises para generar comparaciones)
df_arg = df.filter(df.Nation == "ARG")

In [10]:
#cantidad de jugadores argentinos en el dataset europeo
df_arg.count()

64

In [11]:
#Distribucion cantidad de jugadores argentinos segun ligas europeas
df_arg.groupby('Comp').count().orderBy('Comp').show()

+--------------+-----+
|          Comp|count|
+--------------+-----+
|    Bundesliga|    2|
|       La Liga|   31|
|       Ligue 1|    5|
|Premier League|    8|
|       Serie A|   18|
+--------------+-----+



In [12]:

df_arg.select('Player','Goals').orderBy('Goals', ascending = [False]).show()

+--------------------+-----+
|              Player|Goals|
+--------------------+-----+
|    Giovanni Simeone| 1.33|
|      Juli�n �lvarez| 0.83|
|        Paulo Dybala| 0.78|
|        �ngel Correa| 0.77|
|      Joaqu�n Correa| 0.63|
|    Lautaro Mart�nez| 0.59|
|        Lionel Messi| 0.56|
|    Nicol�s Gonz�lez| 0.48|
|      �ngel Di Mar�a| 0.48|
|      Ezequiel �vila| 0.45|
|      Ezequiel Ponce| 0.43|
|        Lucas Alario| 0.42|
|         �rik Lamela| 0.39|
| Alexis Mac Allister| 0.36|
|   Exequiel Palacios| 0.26|
|Valent�n Castellanos|  0.2|
|     Rodrigo De Paul| 0.19|
|          Lucas Boy�| 0.18|
|         Emi Buend�a| 0.18|
|    Giovani Lo Celso| 0.18|
+--------------------+-----+
only showing top 20 rows



In [13]:
#Jugadoers Argentinos en el dataset, cantidad por posicion de juego.
df_arg.groupby('Pos').count().orderBy('Pos').show()


+----+-----+
| Pos|count|
+----+-----+
|  DF|   19|
|DFFW|    1|
|DFMF|    2|
|  FW|    9|
|FWDF|    1|
|FWMF|    4|
|  GK|    6|
|  MF|   14|
|MFFW|    8|
+----+-----+



In [14]:
#Armo  DF con solo los delanteros
df_arg_del=df_arg.where((df_arg.Pos == 'FW') | (df_arg.Pos == 'FWMF')| (df_arg.Pos == 'MFFW'))

In [15]:
#Orden de Jugadores Argentinos segun orden Goles x partido
df_arg_del.select('Player','Pos','Goals').orderBy('Goals', ascending = [False]).show()
#cantidad de jugadores argentinos como delanteros
df_arg_del.count()

+--------------------+----+-----+
|              Player| Pos|Goals|
+--------------------+----+-----+
|    Giovanni Simeone|  FW| 1.33|
|      Juli�n �lvarez|  FW| 0.83|
|        Paulo Dybala|MFFW| 0.78|
|        �ngel Correa|FWMF| 0.77|
|      Joaqu�n Correa|  FW| 0.63|
|    Lautaro Mart�nez|  FW| 0.59|
|        Lionel Messi|MFFW| 0.56|
|    Nicol�s Gonz�lez|FWMF| 0.48|
|      �ngel Di Mar�a|FWMF| 0.48|
|      Ezequiel �vila|FWMF| 0.45|
|      Ezequiel Ponce|  FW| 0.43|
|        Lucas Alario|  FW| 0.42|
|         �rik Lamela|  FW| 0.39|
|Valent�n Castellanos|  FW|  0.2|
|          Lucas Boy�|  FW| 0.18|
|         Emi Buend�a|MFFW| 0.18|
|    Giovani Lo Celso|MFFW| 0.18|
|   Nicol�s Dom�nguez|MFFW| 0.13|
|         �scar Trejo|MFFW| 0.12|
|          Papu G�mez|MFFW|  0.0|
+--------------------+----+-----+
only showing top 20 rows



21

In [16]:
#correccion ortografica
df_arg_del = df_arg_del.withColumn('Player', when(df_arg_del['Player'] == '�ngel Correa', 'Angel Correa')
                                            .when(df_arg_del['Player'] == 'Juli�n �lvarez', 'Julian Alvarez')
                                            .when(df_arg_del['Player'] == 'Joaqu�n Correa', 'Joaquin Correa')
                                            .when(df_arg_del['Player'] == 'Lautaro Mart�nez', 'Lautaro Martinez')
                                            .when(df_arg_del['Player'] == '�ngel Di Mar�a', 'Angel Di Maria')
                                            .when(df_arg_del['Player'] == 'Nicol�s Gonz�lez', 'Nicolas Gonzalez')
                                            .when(df_arg_del['Player'] == 'Ezequiel �vila', 'Ezequiel Avila')
                                            .when(df_arg_del['Player'] == '�rik Lamela', 'Erik Lamela')
                                            .when(df_arg_del['Player'] == 'Valent�n Castellanos', 'Valentin Castellanos')
                                            .when(df_arg_del['Player'] == 'Lucas Boy�', 'Lucas Boye')
                                            .when(df_arg_del['Player'] == 'Nicol�s Dom�nguez', 'Nicolas Dominguez')
                                            .when(df_arg_del['Player'] == '�scar Trejo', 'Oscar Trejo')
                                            .when(df_arg_del['Player'] == 'Papu G�mez', 'Papu Gomez')
                                            .when(df_arg_del['Player'] == 'Emi Buend�a', 'Emiliano Buendia')
                                            .otherwise(df_arg_del.Player))

In [17]:
#elimino columnas fuera del analisis
df_arg_del_reducido = df_arg_del.drop('Nation','Squad','Born','Starts','90s','PasTotAtt','PasShoAtt','PasMedAtt','PasLonAtt','CkIn','CkOut','CkStr',
            'TI','PaswOther','AerLost','RecProg','Rec','RecTarg','DriAtt','TouLive','Int')

In [18]:
#cargo en nuevo DF el csv con los preseleccionados argentinos
df_selecArg = spark.read.csv('/dataset/jugadoresseleccionArgentina.csv', header=True, sep=";")
#armo segundo DF con solo los delanteros.
df_selecArg_del=df_selecArg.where(df_selecArg.Delanteros == 1)

In [19]:
#elimino columnas innecesarias.
df_selecArg_del_red = df_selecArg_del.drop('Arqueros','Defensores','Mediocampistas')
df_selecArg_del_red.show()

# 12 jugadores preseleccionados para delanteros
df_selecArg_del_red.count()

+------------------+----------+
|            Player|Delanteros|
+------------------+----------+
|      Lionel Messi|         1|
|  Lautaro Martinez|         1|
|    Angel Di Maria|         1|
|    Julian Alvarez|         1|
|      Paulo Dybala|         1|
|  Nicolas Gonzalez|         1|
|      Angel Correa|         1|
|    Joaquin Correa|         1|
|  Giovanni Simeone|         1|
|  Emiliano Buendia|         1|
|      Lucas Alario|         1|
|Alejandro Garnacho|         1|
+------------------+----------+



12

In [20]:
from pyspark.sql.functions import desc

In [21]:
#armo un unico dataframe con las estadisticas de los jugadores, segun nombre de jugador "Player"
df_selecArg_del_data = df_arg_del_reducido.join(df_selecArg_del_red, df_selecArg_del_red.Player == df_arg_del_reducido.Player, 'leftsemi')

#once jugadores tienen estadisticas de juego en ligas europeas, queda un jugador sin estadisticas
df_selecArg_del_data.count()

11

In [22]:
df_selecArg_del_data.printSchema()

root
 |-- Rk: integer (nullable = true)
 |-- Player: string (nullable = true)
 |-- Pos: string (nullable = true)
 |-- Comp: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- MP: integer (nullable = true)
 |-- Min: integer (nullable = true)
 |-- Goals: double (nullable = true)
 |-- Shots: double (nullable = true)
 |-- SoT: double (nullable = true)
 |-- SoT%: double (nullable = true)
 |-- G/Sh: double (nullable = true)
 |-- G/SoT: double (nullable = true)
 |-- ShoDist: double (nullable = true)
 |-- ShoFK: double (nullable = true)
 |-- ShoPK: double (nullable = true)
 |-- PKatt: double (nullable = true)
 |-- PasTotCmp: double (nullable = true)
 |-- PasTotCmp%: double (nullable = true)
 |-- PasTotDist: double (nullable = true)
 |-- PasTotPrgDist: double (nullable = true)
 |-- PasShoCmp: double (nullable = true)
 |-- PasShoCmp%: double (nullable = true)
 |-- PasMedCmp: double (nullable = true)
 |-- PasMedCmp%: double (nullable = true)
 |-- PasLonCmp: double (nullable = true)

In [23]:
#segunda seleccion de columnas en un nuevo DF, dejo las anteriores en su DF para futuros analisis
df_selecArg_metricas = df_selecArg_del_data.select('Player','Age','Goals','SoT','PasTotCmp','PasAss','SCA','DriSucc','GcaDrib','DriPast','GCA','AerWon','PasPress','ShoFK','G/SoT','TB','ShoDist',F.round((df_selecArg_del_data.PasTotPrgDist*0.914),0).alias("DistPasesLargos"))

In [24]:
#renombro columnas
df_selecArg_metricas = df_selecArg_metricas.withColumnRenamed('SoT','Shot2Goal') \
                            .withColumnRenamed('PasTotCmp','PassesCompleted') \
                            .withColumnRenamed('PasAss','AsistenciasxEjecucion') \
                            .withColumnRenamed('SCA','ShotCreationActions') \
                            .withColumnRenamed('DriSucc','DribblesSuccess') \
                            .withColumnRenamed('GCA','GoalCreatingActions') \
                            .withColumnRenamed('GcaDrib','DribblesLeadGoal') \
                            .withColumnRenamed('DriPast','PlayersDribbled') \
                            .withColumnRenamed('ShoFK','ShotsFreeKick') \
                            .withColumnRenamed('G/SoT','GoalsXShot') \
                            .withColumnRenamed('TB','PassesthroughDefenders')
df_selecArg_metricas.printSchema()

root
 |-- Player: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Goals: double (nullable = true)
 |-- Shot2Goal: double (nullable = true)
 |-- PassesCompleted: double (nullable = true)
 |-- AsistenciasxEjecucion: double (nullable = true)
 |-- ShotCreationActions: double (nullable = true)
 |-- DribblesSuccess: double (nullable = true)
 |-- DribblesLeadGoal: double (nullable = true)
 |-- PlayersDribbled: double (nullable = true)
 |-- GoalCreatingActions: double (nullable = true)
 |-- AerWon: double (nullable = true)
 |-- PasPress: double (nullable = true)
 |-- ShotsFreeKick: double (nullable = true)
 |-- GoalsXShot: double (nullable = true)
 |-- PassesthroughDefenders: double (nullable = true)
 |-- ShoDist: double (nullable = true)
 |-- DistPasesLargos: double (nullable = true)



In [25]:
from pyspark.sql.window import Window

from pyspark.sql.functions import monotonically_increasing_id,row_number
#agrego un numero de orden no repetido para cada jugador
df_selecArg_metricas =df_selecArg_metricas.withColumn("NumOrden",row_number().over(Window.orderBy(monotonically_increasing_id())))

In [26]:
df_selecArg_metricas.show()

+----------------+---+-----+---------+---------------+---------------------+-------------------+---------------+----------------+---------------+-------------------+------+--------+-------------+----------+----------------------+-------+---------------+--------+
|          Player|Age|Goals|Shot2Goal|PassesCompleted|AsistenciasxEjecucion|ShotCreationActions|DribblesSuccess|DribblesLeadGoal|PlayersDribbled|GoalCreatingActions|AerWon|PasPress|ShotsFreeKick|GoalsXShot|PassesthroughDefenders|ShoDist|DistPasesLargos|NumOrden|
+----------------+---+-----+---------+---------------+---------------------+-------------------+---------------+----------------+---------------+-------------------+------+--------+-------------+----------+----------------------+-------+---------------+--------+
|    Lucas Alario| 30| 0.42|     0.83|           15.8|                 0.42|               2.08|           0.83|             0.0|           0.83|               0.42|  3.75|    4.58|          0.0|       0.5|     

In [27]:
#edad promedio de los delanteros y goles promedios por partido
from pyspark.sql.functions import avg
df_selecArg_metricas.select(avg('Age').alias('edadpromedio'),avg('Goals').alias('promediogoles')).show()

+------------------+-----------------+
|      edadpromedio|    promediogoles|
+------------------+-----------------+
|27.727272727272727|0.640909090909091|
+------------------+-----------------+



In [28]:
# Write to Postgres
df_selecArg_metricas \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/seleccion") \
    .option("dbtable", "seleccion.argentina") \
    .option("user", "seleccion") \
    .option("password", "selecc10n") \
    .option("driver", "org.postgresql.Driver") \
    .mode('append') \
    .save()

In [37]:
spark.stop()