In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *



In [2]:
spark = SparkSession.builder.master("local[1]").appName('Bundesliga').getOrCreate()

In [3]:
def cargar_dataframe(filename):
    df = spark.read.format('csv').options(header='True', delimiter=',').load(filename)
    return df

df_matches = cargar_dataframe('./Proyecto_Bundesliga/Matches.csv')
df_matches.printSchema()
df_matches.limit(5).show()

root
 |-- Match_ID: string (nullable = true)
 |-- Div: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- FTHG: string (nullable = true)
 |-- FTAG: string (nullable = true)
 |-- FTR: string (nullable = true)

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+---+------+----------+-------------+----

### renombro columnas para una mayor comprension del dataset

In [4]:
df_matches = df_matches.withColumnRenamed("Div","Division") \
            .withColumnRenamed("Season","Temporada") \
            .withColumnRenamed("Date","Fecha") \
            .withColumnRenamed("HomeTeam","Local") \
            .withColumnRenamed("AwayTeam","Visitante") \
            .withColumnRenamed("FTHG","GolesLocal") \
            .withColumnRenamed("FTAG","GolesVistante") \
            .withColumnRenamed("FTR","Resultado")

df_matches.limit(5).toPandas()

Unnamed: 0,Match_ID,Division,Temporada,Fecha,Local,Visitante,GolesLocal,GolesVistante,Resultado
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A


# Ganadores de la Bundesliga Primera Division entre el año 2000 y 2010

### Comienzo filtrando por el año a buscar y primera division para reducir el tamaño del dataset con el que voy a trabajar

In [5]:
df_matches_2 = df_matches.filter( (col("Temporada") >= 2000) & (col("Temporada") <= 2010) & (col("Division") == "D1") )
df_matches_2.show(5)


+--------+--------+---------+----------+-------------+----------+----------+-------------+---------+
|Match_ID|Division|Temporada|     Fecha|        Local| Visitante|GolesLocal|GolesVistante|Resultado|
+--------+--------+---------+----------+-------------+----------+----------+-------------+---------+
|      21|      D1|     2009|2010-02-06|       Bochum|Leverkusen|         1|            1|        D|
|      22|      D1|     2009|2009-11-22|Bayern Munich|Leverkusen|         1|            1|        D|
|      23|      D1|     2009|2010-05-08|   M'gladbach|Leverkusen|         1|            1|        D|
|      24|      D1|     2009|2009-08-08|        Mainz|Leverkusen|         2|            2|        D|
|      25|      D1|     2009|2009-10-17|      Hamburg|Leverkusen|         0|            0|        D|
+--------+--------+---------+----------+-------------+----------+----------+-------------+---------+
only showing top 5 rows



### Calculo el puntaje en cada partido por equipo

In [6]:
df_matches_2 = df_matches_2.withColumn("PuntosLocal", when(col("Resultado") == "H",3).when(col("Resultado") == "D",1).when(col("Resultado") == "A",0) ) \
            .withColumn("PuntosVisitante", when(col("Resultado") == "H",0).when(col("Resultado") == "D",1).when(col("Resultado") == "A",3) )

df_matches_2.show(5)

+--------+--------+---------+----------+-------------+----------+----------+-------------+---------+-----------+---------------+
|Match_ID|Division|Temporada|     Fecha|        Local| Visitante|GolesLocal|GolesVistante|Resultado|PuntosLocal|PuntosVisitante|
+--------+--------+---------+----------+-------------+----------+----------+-------------+---------+-----------+---------------+
|      21|      D1|     2009|2010-02-06|       Bochum|Leverkusen|         1|            1|        D|          1|              1|
|      22|      D1|     2009|2009-11-22|Bayern Munich|Leverkusen|         1|            1|        D|          1|              1|
|      23|      D1|     2009|2010-05-08|   M'gladbach|Leverkusen|         1|            1|        D|          1|              1|
|      24|      D1|     2009|2009-08-08|        Mainz|Leverkusen|         2|            2|        D|          1|              1|
|      25|      D1|     2009|2009-10-17|      Hamburg|Leverkusen|         0|            0|       

### Acumulo los puntajes por equipo en cada temporada tanto de local como visitante para luego sumarlos entre si

In [7]:
df_local_matches = df_matches_2.groupBy("Temporada","Local").agg(sum("PuntosLocal").alias("puntajeAcumulado")).sort(asc("Temporada"))
df_local_matches.show(5)

df_local_visitante = df_matches_2.groupBy("Temporada","Visitante").agg(sum("PuntosVisitante").alias("puntajeAcumulado")).sort(asc("Temporada"))
df_local_visitante.show(5)

+---------+-------------+----------------+
|Temporada|        Local|puntajeAcumulado|
+---------+-------------+----------------+
|     2000|  Munich 1860|              27|
|     2000|Ein Frankfurt|              27|
|     2000|      Hamburg|              30|
|     2000|Hansa Rostock|              29|
|     2000|       Bochum|              19|
+---------+-------------+----------------+
only showing top 5 rows

+---------+-------------+----------------+
|Temporada|    Visitante|puntajeAcumulado|
+---------+-------------+----------------+
|     2000|  Munich 1860|              17|
|     2000|Ein Frankfurt|               8|
|     2000|      Hamburg|              11|
|     2000|Hansa Rostock|              14|
|     2000|       Bochum|               8|
+---------+-------------+----------------+
only showing top 5 rows



### obtengo el acumulado de puntos por equipo en cada temporada

In [8]:
df_join = df_local_matches.join(df_local_visitante, 
                      (df_local_matches.Local == df_local_visitante.Visitante) & 
                      (df_local_matches.Temporada == df_local_visitante.Temporada), "inner") \
                    .withColumn("puntajeTotal",df_local_matches.puntajeAcumulado+df_local_visitante.puntajeAcumulado) \
                    .select(df_local_matches["Temporada"],"Local","puntajeTotal")

df_join.show(5)

+---------+--------------+------------+
|Temporada|         Local|puntajeTotal|
+---------+--------------+------------+
|     2005|Kaiserslautern|          33|
|     2006|       Cottbus|          41|
|     2001|      St Pauli|          22|
|     2005|         Mainz|          38|
|     2006|       Hamburg|          45|
+---------+--------------+------------+
only showing top 5 rows



### particiono por temporada y le establezo un secuencia, para luego quedarme con el puntaje mas alto de cada temporada, es decir, el campeon

In [9]:
windowSpec  = Window.partitionBy("Temporada").orderBy(col("puntajeTotal").desc())

df_winners = df_join.withColumn("row_number",row_number().over(windowSpec))

df_winners.show(5)

+---------+-------------+------------+----------+
|Temporada|        Local|puntajeTotal|row_number|
+---------+-------------+------------+----------+
|     2000|Bayern Munich|          63|         1|
|     2000|   Schalke 04|          62|         2|
|     2000|     Dortmund|          58|         3|
|     2000|   Leverkusen|          57|         4|
|     2000|       Hertha|          56|         5|
+---------+-------------+------------+----------+
only showing top 5 rows



### Obtengo el Campeon de cada temporada

In [16]:
df_final = df_winners.filter(df_winners["row_number"]==1).drop("row_number")
df_final.show()

+---------+-------------+------------+
|Temporada|        Local|puntajeTotal|
+---------+-------------+------------+
|     2000|Bayern Munich|          63|
|     2001|     Dortmund|          70|
|     2002|Bayern Munich|          75|
|     2003|Werder Bremen|          74|
|     2004|Bayern Munich|          77|
|     2005|Bayern Munich|          75|
|     2006|    Stuttgart|          70|
|     2007|Bayern Munich|          76|
|     2008|    Wolfsburg|          69|
|     2009|Bayern Munich|          70|
|     2010|     Dortmund|          75|
+---------+-------------+------------+



### escribo csv con el resultado

In [19]:
df_final.toPandas().to_csv("./Proyecto_Bundesliga/campeones_2000-2010.csv",index=False)