## Limpeza de dados Cartola ano 2016

In [3]:
from pyspark.sql import HiveContext
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.functions import lower, col, lit, regexp_replace, trim, substring, when, expr, udf, count, sum, monotonically_increasing_id
import pandas as pd
import json
import requests

# Confirguração para não sobrescrever DF
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

## Funções Custonmizadas

In [4]:
def remove_after_hiphen(team_name):
    if team_name.startswith('atletico') or team_name.startswith('atl')  or team_name.startswith('Atl') or team_name.startswith('Ath'):
        return team_name
    else:
        return team_name.split('-', 1)[0]

In [5]:
#Remove dos valores das Strings o que estiver após os hiphen
remove_hiphen_udf = udf(remove_after_hiphen, StringType())
#partidas_2014_ct = partidas_2014_df.withColumn('away_team', remove_hiphen_udf(partidas_2014_df['away_team']))

## Jogadores

In [6]:
#Carrega arquivo CSV
jogadores_2016_df = spark.read.option("encoding", "UTF-8").csv("/cartola/data/2016/2016_jogadores.csv", header=True)
sorted_jogadores_2016_df = jogadores_2016_df.sort(jogadores_2016_df.ID.asc())


In [7]:
#Adiciona coluna ANO = 2016
jogadores_2016_ano = sorted_jogadores_2016_df.withColumn('year', lit(2016))
jogadores_2016_ano.toPandas()

Unnamed: 0,ID,Apelido,ClubeID,PosicaoID,year
0,36540,Juan,262,3,2016
1,36591,Leonardo,344,3,2016
2,36612,Zé Roberto,275,4,2016
3,36650,Magno Alves,266,5,2016
4,36856,Celso Roth,285,6,2016
...,...,...,...,...,...
996,97448,Thalisson Kelven,294,3,2016
997,97449,Carvalho,294,4,2016
998,97450,Gustavo Mosquito,294,5,2016
999,97451,Talysson Lalau,294,4,2016


In [8]:
jogadores_2016_ano.write.mode('overwrite').partitionBy('year').parquet('/cartola/clean/jogadores/')

In [9]:
#transforma arquivo em parquet
jogadores_2016_parquet = spark.read.option('basePath', '/cartola/clean/jogadores/').parquet('/cartola/clean/jogadores/*')

In [13]:
jogadores_2016_parquet.toPandas()

Unnamed: 0,ID,Apelido,ClubeID,PosicaoID,year
0,54797,Túlio De Melo,,5,2015
1,55519,Edmílson,315,5,2015
2,56102,João Paulo,264,4,2015
3,60752,Cristóvão Borges,,6,2015
4,60780,Vinícius Eutrópio,315,6,2015
...,...,...,...,...,...
3055,89342,Caju,277,2,2015
3056,89343,Júnior,284,2,2015
3057,89444,Hugo Ragelli,,5,2015
3058,89445,Yuri,315,5,2015


In [14]:
print(jogadores_2016_parquet.count())

3060


## Partidas_2015

In [15]:
#Carrega arquivo CSV
partidas_2016_df = spark.read.option("encoding", "UTF-8").csv("/cartola/data/2016/2016_partidas.csv", header=True)

In [16]:
partidas_2016_df.show(5)

+---+----+-----+------------------+--------------+-----+--------------+--------------------+----+
|_c0|game|round|              date|     home_team|score|     away_team|               arena|   X|
+---+----+-----+------------------+--------------+-----+--------------+--------------------+----+
|  1|   1|    1|14/05/2016 - 16:00|Palmeiras - SP|4 x 0| Atlético - PR|Allianz Parque - ...|null|
|  2|   2|    1|14/05/2016 - 16:00| Flamengo - RJ|1 x 0|    Sport - PE|Raulino de Olivei...|null|
|  3|   3|    1|14/05/2016 - 18:30| Atlético - MG|1 x 0|   Santos - SP|Independência - B...|null|
|  4|   4|    1|14/05/2016 - 21:00| Coritiba - PR|1 x 0| Cruzeiro - MG|Couto Pereira - C...|null|
|  5|   5|    1|15/05/2016 - 11:00| Botafogo - RJ|0 x 1|São Paulo - SP|Raulino de Olivei...|null|
+---+----+-----+------------------+--------------+-----+--------------+--------------------+----+
only showing top 5 rows



In [17]:
# Criar nome do time com a string antes do Hífen
partidas_2016_ct = partidas_2016_df.withColumn('away_team', remove_hiphen_udf(partidas_2016_df['away_team']))

partidas_2016_ct = partidas_2016_ct.withColumn('away_team', lower(col('away_team')))

partidas_2016_ct = partidas_2016_ct.withColumn('away_team', trim(col('away_team')))

partidas_2016_ct = partidas_2016_ct.withColumn('home_team', remove_hiphen_udf(partidas_2016_ct['home_team']))

partidas_2016_ct = partidas_2016_ct.withColumn('home_team', lower(col('home_team')))

partidas_2016_ct = partidas_2016_ct.withColumn('home_team', trim(col('home_team')))

partidas_2016_ct = partidas_2016_ct.withColumn('home_score', substring('score', 1,1))\

partidas_2016_ct = partidas_2016_ct.withColumn('away_score', substring('score', 5, 5))\

partidas_2016_ct = partidas_2016_ct.withColumn('home_score', partidas_2016_ct['home_score'].cast(IntegerType()))

partidas_2016_ct = partidas_2016_ct.withColumn('away_score', partidas_2016_ct['away_score'].cast(IntegerType()))

partidas_2016_ct = partidas_2016_ct.withColumn('total_gols', partidas_2016_ct['away_score'] + partidas_2016_ct['home_score'] )

partidas_2016_ct = partidas_2016_ct.withColumn('year', lit(2016))

time_ganhador = expr(
    """IF(home_score > away_score, home_team, IF(home_score = away_score, 'empate', away_team))"""
)

partidas_2016_ct = partidas_2016_ct.withColumn('result', time_ganhador)

partidas_2016_ct.toPandas()

Unnamed: 0,_c0,game,round,date,home_team,score,away_team,arena,X,home_score,away_score,total_gols,year,result
0,1,1,1,14/05/2016 - 16:00,palmeiras,4 x 0,atlético - pr,Allianz Parque - Sao Paulo - SP,,4.0,0.0,4.0,2016,palmeiras
1,2,2,1,14/05/2016 - 16:00,flamengo,1 x 0,sport,Raulino de Oliveira - Volta Redonda - RJ,,1.0,0.0,1.0,2016,flamengo
2,3,3,1,14/05/2016 - 18:30,atlético - mg,1 x 0,santos,Independência - Belo Horizonte - MG,,1.0,0.0,1.0,2016,atlético - mg
3,4,4,1,14/05/2016 - 21:00,coritiba,1 x 0,cruzeiro,Couto Pereira - Curitiba - PR,,1.0,0.0,1.0,2016,coritiba
4,5,5,1,15/05/2016 - 11:00,botafogo,0 x 1,são paulo,Raulino de Oliveira - Volta Redonda - RJ,,0.0,1.0,1.0,2016,são paulo
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
375,376,376,38,11/12/2016 - 17:00,grêmio,0 x 1,botafogo,Arena do Grêmio - Porto Alegre - RS,,0.0,1.0,1.0,2016,botafogo
376,377,377,38,11/12/2016 - 17:00,atlético - pr,0 x 0,flamengo,Arena da Baixada - Curitiba - PR,,0.0,0.0,0.0,2016,empate
377,378,378,38,11/12/2016 - 17:00,chapecoense,x,atlético - mg,Arena Condá - Chapeco - SC,(W.O. Duplo),,,,2016,atlético - mg
378,379,379,38,11/12/2016 - 17:00,ponte preta,2 x 0,coritiba,Moisés Lucarelli - Campinas - SP,,2.0,0.0,2.0,2016,ponte preta


In [18]:
# Removendo colunas
partidas_2016_ct = partidas_2016_ct.drop('_c0', 'X', 'game_id')
# Criando ID para DF
# partidas_2016_ct = partidas_2016_ct.withColumn('game_id', monotonically_increasing_id())
partidas_2016_ct.toPandas()

Unnamed: 0,game,round,date,home_team,score,away_team,arena,home_score,away_score,total_gols,year,result
0,1,1,14/05/2016 - 16:00,palmeiras,4 x 0,atlético - pr,Allianz Parque - Sao Paulo - SP,4.0,0.0,4.0,2016,palmeiras
1,2,1,14/05/2016 - 16:00,flamengo,1 x 0,sport,Raulino de Oliveira - Volta Redonda - RJ,1.0,0.0,1.0,2016,flamengo
2,3,1,14/05/2016 - 18:30,atlético - mg,1 x 0,santos,Independência - Belo Horizonte - MG,1.0,0.0,1.0,2016,atlético - mg
3,4,1,14/05/2016 - 21:00,coritiba,1 x 0,cruzeiro,Couto Pereira - Curitiba - PR,1.0,0.0,1.0,2016,coritiba
4,5,1,15/05/2016 - 11:00,botafogo,0 x 1,são paulo,Raulino de Oliveira - Volta Redonda - RJ,0.0,1.0,1.0,2016,são paulo
...,...,...,...,...,...,...,...,...,...,...,...,...
375,376,38,11/12/2016 - 17:00,grêmio,0 x 1,botafogo,Arena do Grêmio - Porto Alegre - RS,0.0,1.0,1.0,2016,botafogo
376,377,38,11/12/2016 - 17:00,atlético - pr,0 x 0,flamengo,Arena da Baixada - Curitiba - PR,0.0,0.0,0.0,2016,empate
377,378,38,11/12/2016 - 17:00,chapecoense,x,atlético - mg,Arena Condá - Chapeco - SC,,,,2016,atlético - mg
378,379,38,11/12/2016 - 17:00,ponte preta,2 x 0,coritiba,Moisés Lucarelli - Campinas - SP,2.0,0.0,2.0,2016,ponte preta


In [19]:
partidas_2016_ct.write.mode('overwrite').partitionBy('year').parquet('/cartola/clean/partidas/')

In [49]:
#transforma arquivo em parquet
partidas_2016_parquet = spark.read.option('basePath', '/cartola/clean/partidas/').parquet('/cartola/clean/partidas/*')

In [50]:
partidas_2016_parquet.toPandas()


Unnamed: 0,game,round,date,home_team,score,away_team,arena,home_score,away_score,total_gols,result,year
0,1,1,20/04/2014 - 18:30,flamengo,0 x 0,goiás,Mané Garrincha - Brasilia - DF,0.0,0.0,0.0,empate,2014
1,2,1,19/04/2014 - 18:30,fluminense,3 x 0,figueirense,Maracanã - Rio de Janeiro - RJ,3.0,0.0,3.0,fluminense,2014
2,3,1,20/04/2014 - 16:00,são paulo,3 x 0,botafogo,Morumbi - Sao Paulo - SP,3.0,0.0,3.0,são paulo,2014
3,4,1,20/04/2014 - 18:30,santos,1 x 1,sport,Vila Belmiro - Santos - SP,1.0,1.0,2.0,empate,2014
4,5,1,20/04/2014 - 16:00,atletico - pr,1 x 0,grêmio,Orlando Scarpelli - Florianopolis - SC,1.0,0.0,1.0,atletico - pr,2014
...,...,...,...,...,...,...,...,...,...,...,...,...
1135,376,38,06/12/2015 - 17:00,atlético - mg,3 x 0,chapecoense,Mineirão - Belo Horizonte - MG,3.0,0.0,3.0,atlético - mg,2015
1136,377,38,06/12/2015 - 17:00,figueirense,1 x 0,fluminense,Orlando Scarpelli - Florianopolis - SC,1.0,0.0,1.0,figueirense,2015
1137,378,38,06/12/2015 - 17:00,coritiba,0 x 0,vasco da gama,Couto Pereira - Curitiba - PR,0.0,0.0,0.0,empate,2015
1138,379,38,06/12/2015 - 17:00,goiás,0 x 1,são paulo,Serra Dourada - Goiania - GO,0.0,1.0,1.0,são paulo,2015


## Times

In [23]:
times_2016_df = spark.read.option("encoding", "UTF-8").csv("/cartola/data/2016/2016_times.csv", header=True)
sorted_times_2016_df = times_2016_df.sort(times_2016_df.ID.asc())

#Chegar se o arquivo está no hdfs

times_2016_ano = times_2016_df.withColumn('year', lit(2016))
times_2016_ano.toPandas()

Unnamed: 0,ID,Nome,Abreviacao,Slug,year
0,262,Flamengo,FLA,Flamengo,2016
1,263,Botafogo,BOT,Botafogo,2016
2,264,Corinthians,COR,Corinthians,2016
3,266,Fluminense,FLU,Fluminense,2016
4,275,Palmeiras,PAL,Palmeiras,2016
5,276,São Paulo,SAO,Sao-Paulo,2016
6,277,Santos,SAN,Santos,2016
7,282,Atlético-MG,CAM,Atletico-MG,2016
8,283,Cruzeiro,CRU,Cruzeiro,2016
9,284,Grêmio,GRE,Gremio,2016


In [26]:
times_2016_ano.write.mode('overwrite').partitionBy('year').parquet('/cartola/clean/times/')
#transforma arquivo em parquet
times_2016_parquet = spark.read.option('basePath', '/cartola/clean/times/').parquet('/cartola/clean/times/*')

In [27]:
times_2016_parquet.toPandas()

Unnamed: 0,ID,Nome,Abreviacao,Slug,year
0,262,Flamengo,FLA,Flamengo,2016
1,263,Botafogo,BOT,Botafogo,2016
2,264,Corinthians,COR,Corinthians,2016
3,266,Fluminense,FLU,Fluminense,2016
4,275,Palmeiras,PAL,Palmeiras,2016
5,276,São Paulo,SAO,Sao-Paulo,2016
6,277,Santos,SAN,Santos,2016
7,282,Atlético-MG,CAM,Atletico-MG,2016
8,283,Cruzeiro,CRU,Cruzeiro,2016
9,284,Grêmio,GRE,Gremio,2016


## scouts_raw

In [37]:
# Analisando o arquivo 2014_lances.csv vimos que não é necessário processar esse arquivo, pois as informações relevantes estão em scouts_raw.

In [38]:
scouts_raw_2016_df = spark.read.option("encoding", "UTF-8").csv("/cartola/data/2016/2016_scouts_raw.csv", header=True)

In [39]:
scouts_raw_2016_ano = scouts_raw_2016_df.withColumn('year', lit(2016))

In [40]:
scouts_raw_2016_ano = scouts_raw_2016_ano.withColumn("Pontos", scouts_raw_2016_ano["Pontos"].cast(FloatType()))

In [41]:
scouts_raw_2016_ano.write.mode('overwrite').partitionBy('year').parquet('/cartola/clean/scouts/')
#transforma arquivo em parquet
scouts_raw_2016_parquet = spark.read.option('basePath', '/cartola/clean/scouts/').parquet('/cartola/clean/scouts/*')

In [42]:
scouts_raw_2016_parquet.toPandas()

Unnamed: 0,AtletaID,Rodada,ClubeID,Participou,Posicao,Jogos,Pontos,PontosMedia,Preco,PrecoVariacao,...,RB,FC,GC,CA,CV,SG,DD,DP,GS,year
0,81219,0,262,,,0,0.0,0,1,0,...,0,0,0,0,0,0,0,0,0,2015
1,88072,0,262,,,0,0.0,0,1,0,...,0,0,0,0,0,0,0,0,0,2015
2,89258,0,262,,,0,0.0,0,1,0,...,0,0,0,0,0,0,0,0,0,2015
3,91263,0,262,,,0,0.0,0,1,0,...,0,0,0,0,0,0,0,0,0,2015
4,74103,0,262,,,0,0.0,0,2,0,...,0,0,0,0,0,0,0,0,0,2015
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
93240,97451,38,294,FALSE,,,0.0,0,1,0,...,0,0,0,0,0,0,0,0,0,2016
93241,97450,38,294,FALSE,,,0.0,0,1,0,...,0,0,0,0,0,0,0,0,0,2016
93242,97460,38,344,TRUE,,,2.0,2,1.78,0.78,...,1,1,0,0,0,0,0,0,0,2016
93243,82626,38,285,FALSE,,,0.0,0,1,0,...,0,0,0,0,0,0,0,0,0,2016


In [47]:
# -        Quantas partidas resultaram em empate?
pontos_por_atleta = scouts_raw_2016_ano[scouts_raw_2016_ano['year'] == 2016]

jogadores_2016 = jogadores_2016_parquet[jogadores_2016_parquet['year'] == 2016]

pontos_por_atleta = pontos_por_atleta.groupBy("AtletaID").agg(sum("Pontos").alias("SomaPontos"))

#count_result = count_result.withColumn('total_wins', count_result['count(1)'])

pontos_por_atleta = pontos_por_atleta.sort(pontos_por_atleta.SomaPontos.desc())

#count_result = count_result.drop('count(1)')

#count_result.show(5)


pontos_por_atleta.toPandas()

Unnamed: 0,AtletaID,SomaPontos
0,68952,210.999998
1,62121,189.299999
2,87863,183.700001
3,88065,176.700000
4,87552,171.800000
...,...,...
996,37917,-6.500000
997,86527,-6.800000
998,71724,-7.600000
999,84847,-7.900000


In [48]:
scouts_atletas = pontos_por_atleta.join(jogadores_2016, pontos_por_atleta.AtletaID == jogadores_2016.ID)
scouts_atletas = scouts_atletas.sort(scouts_atletas.SomaPontos.desc())
scouts_atletas.toPandas()

Unnamed: 0,AtletaID,SomaPontos,ID,Apelido,ClubeID,PosicaoID,year
0,68952,210.999998,68952,Marinho,287,5,2016
1,62121,189.299999,62121,Vanderlei,277,1,2016
2,87863,183.700001,87863,Arrascaeta,283,4,2016
3,88065,176.700000,88065,Jorge,262,2,2016
4,87552,171.800000,87552,Vitor Bueno,277,4,2016
...,...,...,...,...,...,...,...
996,37917,-6.500000,37917,Elano,277,4,2016
997,86527,-6.800000,86527,Wellington Cézar,344,4,2016
998,71724,-7.600000,71724,Agenor,292,1,2016
999,84847,-7.900000,84847,Luan Peres,344,3,2016


## DEMONSTRAÇÃO

In [None]:
partidas_2014_df = spark.read.csv("/cartola/data/2014/2014_jogadores.csv", header=True)
partidas_2014_ct = partidas_2014_df.withColumn('time', regexp_replace('home_team', ' - RJ', ''))
final_partidas = partidas_2014_ct.withColumn('time_low', lower(col('time'))).show(truncate=False)

In [None]:
with_ano_partidas = partidas_2014_ct.withColumn('ano', lit(2014)).show(truncate=False)
with_ano_partidas.show()

In [None]:
Comando para sobrescrever arquivo caso já existente.
Agrupar scouts por ID e contar (caso tenha duplicidade)

In [85]:
inner_join = partidas_ids_2014_ano.join(times_2014_ano, partidas_ids_2014_ano.Casa == times_2014_ano.ID)
inner_join.toPandas()

Unnamed: 0,ID,Rodada,Casa,Visitante,PlacarCasa,PlacarVisitante,Resultado,ano,ID.1,Nome,Abreviacao,Slug,ano.1
0,179872,1,262,290,0,0,Empate,2014,262,flamengo,FLA,flamengo,2014
1,179873,1,266,316,3,0,Casa,2014,266,fluminense,FLU,fluminense,2014
2,179874,1,276,263,3,0,Casa,2014,276,são paulo,SAO,sao-paulo,2014
3,179875,1,277,292,1,1,Empate,2014,277,santos,SAN,santos,2014
4,179876,1,293,284,1,0,Casa,2014,293,atlético-pr,CAP,atletico-pr,2014
...,...,...,...,...,...,...,...,...,...,...,...,...,...
375,180250,29,282,315,1,0,Casa,2014,282,atlético-mg,CAM,atletico-mg,2014
376,180251,29,287,283,0,1,Visitante,2014,287,vitória,VIT,vitoria,2014
377,180252,29,285,264,1,2,Visitante,2014,285,internacional,INT,internacional,2014
378,180253,29,316,294,4,0,Casa,2014,316,figueirense,FIG,figueirense,2014


In [None]:
scouts_atletas = pontos_por_atleta.join(jogadores_2014_parquet, pontos_por_atleta.Atleta == jogadores_2014_parquet.ID)
scouts_atletas = scouts_atletas.sort(scouts_atletas.SomaPontos.desc())
scouts_atletas.toPandas()

In [None]:
# -        Quantas partidas resultaram em empate?
pontos_por_atleta = scouts_raw_2014_ano.groupBy("Atleta").agg(sum("Pontos").alias("SomaPontos"))

#count_result = count_result.withColumn('total_wins', count_result['count(1)'])

pontos_por_atleta = pontos_por_atleta.sort(pontos_por_atleta.SomaPontos.desc())

#count_result = count_result.drop('count(1)')

#count_result.show(5)


pontos_por_atleta.toPandas()