In [1]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/27/67/5158f846202d7f012d1c9ca21c3549a58fd3c6707ae8ee823adcaca6473c/pyspark-3.0.2.tar.gz (204.8MB)
[K     |████████████████████████████████| 204.8MB 71kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 19.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=2844cce557bc286073595e7720414ad44da25ef407efbc8999d38b2c52d54975
  Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


In [2]:
# Imports
import pandas as pd
import numpy as np
import pyspark 
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from datetime import datetime
import os
import shutil

spark = SparkSession.builder.master("local").getOrCreate()
#spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [3]:
# Objeto con todas las contantes
class Constants_c:
  imc_overweight = 25
  const_unit_weight = "LBS"
  const_empty= ""
  f_lsbs_kg = 2.2046
  f_uno = 1
  f_pies_cms = 0.032808
  f_pulgadas_cms = 0.39370
  f_pie_simb = '\''
  f_pulgada_simb = '\"'
  f_cuad = 2
  f_100 = 100
  f_output = 'output_1/'
  f_output2 = 'output_2/'
  f_output_cbarra = '/'

const = Constants_c()

In [4]:
#Funciones de conversion

#Convertir el peso al Sistema internaciona de unidades
@F.udf("float")
def weight_eng_to_siu(weight_p):
  num_weight = int(weight_p.upper().replace(const.const_unit_weight,const.const_empty))
  kg_value = num_weight/const.f_lsbs_kg

  return kg_value

#Convertir la altura al Sistema internaciona de unidades
@F.udf("float")
def height_eng_to_siu(height_p):
  pos_pie = height_p.index(const.f_pie_simb)
  pos_pulgada = height_p.index(const.f_pulgada_simb)

  val_pie = int(height_p[:pos_pie]) / const.f_pies_cms
  val_pulgada = int(height_p[pos_pie + const.f_uno:pos_pulgada]) / const.f_pulgadas_cms

  cms_value = val_pie + val_pulgada

  return cms_value

#Se reemplazan los caracteres especiales para crear las carpetas con letras inglesas a-z
#tambien se borraron el &amp;
@F.udf("string")
def clean_strings(str_p):
  str_cleaned = str_p.replace("í",'i').replace('é','e').replace('ã','a').replace(' ','_').replace('&amp;','')
  str_cleaned = str_cleaned.replace('__','_')
  return str_cleaned

#Calculo del IMs con el SIU
@F.udf("float")
def calc_imc(weight_p,height_p):
  imc = weight_p / pow((height_p/const.f_100),const.f_cuad)

  return imc

#Se convierte de cadena a array, para el caso de las posiciones de los jugares
# AA BB CC ->["AA","BB","CC"] para elegir al mejor por posicion
def get_array(str_values):
  r_array = str_values.split()
  
  return r_array
get_array_udf = F.udf(lambda y: get_array(y), ArrayType(StringType()))



In [5]:
# se obtiene informacion del archivo fuente de datos
pd_df = pd.read_csv('Fifa_data/fifa21_male2.csv')
pd_df_selected = pd_df[["ID","Name", "Age", "Nationality", "Club","Position", "Height", "Weight","Sprint Speed" ,"Total Stats"]]

  interactivity=interactivity, compiler=compiler, result=result)


In [6]:
#Crear dataframe spark seleccionando solo las columnas necesarias para el ejercicio.
field = [
      StructField("ID", IntegerType(), True),
      StructField("Name", StringType(), True),
      StructField("Age", IntegerType(), True),
      StructField("Nationality", StringType(), True),
      StructField("Club", StringType(), True),
      StructField("Position", StringType(), True),
      StructField("Height", StringType(), True),
      StructField("Weight", StringType(), True),
      StructField("Sprint Speed", IntegerType(), True),
      StructField("Total Stats", IntegerType(), True)
]
schema = StructType(field)
df_players = spark.createDataFrame(pd_df_selected,schema)

In [7]:
#se agregan las columnas con la info nueva generada
df_players = df_players.withColumn("Nationality_clean", clean_strings(F.col("Nationality")))\
                        .withColumn("Weight_SIU_kg", weight_eng_to_siu(F.col("Weight")))\
                        .withColumn("Height_SIU_cms", height_eng_to_siu(F.col("Height")))\
                        .withColumn("IMC", calc_imc(F.col("Weight_SIU_kg"),F.col("Height_SIU_cms")))\
                        .withColumn("array_position",  get_array_udf(F.col("Position")))

df_fifa_players = df_players.select(F.col("ID"),F.col("Name"),F.col("Age"),F.col("Nationality"),F.col("Nationality_clean"),F.col("Club"),F.col("Position"),F.col("Height"),F.col("Weight"),F.col("Sprint Speed"),F.col("Total Stats"),F.col("Weight_SIU_kg"),F.col("Height_SIU_cms"),F.col("IMC"),F.explode(df_players.array_position).alias("Position_unit"))

In [8]:
df_fifa_players.show()

+---+-------------+---+--------------------+-----------------+-----------------+------------+------+------+------------+-----------+-------------+--------------+---------+-------------+
| ID|         Name|Age|         Nationality|Nationality_clean|             Club|    Position|Height|Weight|Sprint Speed|Total Stats|Weight_SIU_kg|Height_SIU_cms|      IMC|Position_unit|
+---+-------------+---+--------------------+-----------------+-----------------+------------+------+------+------------+-----------+-------------+--------------+---------+-------------+
|  2|  G. Pasquale| 33|               Italy|            Italy|          Udinese|          LM|  6'0"|181lbs|          74|       1929|     82.10106|     182.88222|24.547417|           LM|
| 16|  Luis García| 37|               Spain|            Spain|        KAS Eupen|  CM CAM CDM| 5'10"|143lbs|          50|       1906|     64.86437|     177.80191|20.517921|           CM|
| 16|  Luis García| 37|               Spain|            Spain|        

In [9]:
#We need write one folder for each Nationality, each folder should contains the players with that Nationality
df_nationalities=df_fifa_players.select(F.col("Nationality_clean")).distinct().orderBy("Nationality_clean")

lst_letras = []

field = [
      StructField("ID", IntegerType(), True),
      StructField("Name", StringType(), True),
      StructField("Age", IntegerType(), True),
      StructField("Nationality", StringType(), True),
      StructField("Nationality_clean", StringType(), True),
      StructField("Position", StringType(), True),
      StructField("Height", StringType(), True),
      StructField("Weight", StringType(), True),
      StructField("Total Stats", IntegerType(), True),
      StructField("Weight_SIU_kg", FloatType(), True),
      StructField("Height_SIU_cms", FloatType(), True),
      StructField("IMC", FloatType(), True)
]
schema = StructType(field)
df_player = spark.createDataFrame([],schema)

lst_nationalities = df_nationalities.select('Nationality_clean').collect()
# se itera por nacionalidad
for e  in lst_nationalities:
  print(e.Nationality_clean)
  df_player = df_fifa_players.select("ID","Name","Age","Nationality","Nationality_clean","Position","Height","Weight","Total Stats","Weight_SIU_kg","Height_SIU_cms","IMC")\
    .filter(df_fifa_players["Nationality_clean"] == e.Nationality_clean).distinct().orderBy("ID")
  
  #se guarda en una ruta temporal
  temppath = 'temp'
  df_player.write.format("orc").option("header", "true").mode("overwrite").save(temppath)

  
  #En el filesistem de colab cuando se enlistan con listdir al parecer de duplican los archivos aunque no existan como tal
  #por ejemplo File.orc y tambien aparece -> .File.orc.orc, 
  #aqui obtengo solo los arhivos correctos
  lst_files = []
  for entry in os.listdir(temppath):
    if entry[0:1] != '.' and entry[0:1] != '_':
      file_str = "temp/" + entry
      lst_files.append(file_str)

  #Se crea el folder, si existe lo borra y lo vuelve a crear
  str_path_save = const.f_output + e.Nationality_clean   
  try:
    os.makedirs(str_path_save) 
  except OSError:
    shutil.rmtree(str_path_save)
    os.makedirs(str_path_save) 
    
  #Mover los archivos del folder temp al folder final del pais correspondiente con un nombre unico
  for file_ in lst_files:
    now = datetime.now()
    dt_str = now.strftime("%Y_%m_%d_%H_%M_%S_%f")
    
    str_name_file_save = "Players_" + e.Nationality_clean + '_' + dt_str + '.orc'
    str_path_file_save = str_path_save +'/' +str_name_file_save
    shutil.copyfile(file_, str_path_file_save)    


Afghanistan
Albania
Algeria
Andorra
Angola
Antigua_Barbuda
Argentina
Armenia
Aruba
Australia
Austria
Azerbaijan
Barbados
Belarus
Belgium
Belize
Benin
Bermuda
Bolivia
Bosnia_Herzegovina
Brazil
Brunei_Darussalam
Bulgaria
Burkina_Faso
Burundi
Cameroon
Canada
Cape_Verde
Central_African_Republic
Chad
Chile
China_PR
Chinese_Taipei
Colombia
Comoros
Congo
Costa_Rica
Croatia
Cuba
Curacao
Cyprus
Czech_Republic
DR_Congo
Denmark
Dominican_Republic
Ecuador
Egypt
El_Salvador
England
Equatorial_Guinea
Eritrea
Estonia
Faroe_Islands
Finland
France
Gabon
Gambia
Georgia
Germany
Ghana
Gibraltar
Greece
Grenada
Guam
Guinea
Guinea_Bissau
Guyana
Haiti
Honduras
Hong_Kong
Hungary
Iceland
India
Indonesia
Iran
Iraq
Israel
Italy
Ivory_Coast
Jamaica
Japan
Jordan
Kazakhstan
Kenya
Korea_DPR
Korea_Republic
Kosovo
Latvia
Lebanon
Liberia
Libya
Liechtenstein
Lithuania
Luxembourg
Macau
Madagascar
Malawi
Malaysia
Mali
Malta
Mauritania
Mexico
Moldova
Montenegro
Montserrat
Morocco
Mozambique
Namibia
Netherlands
New_Caledonia

In [10]:
#We need to know who are the 10 top players for each position, you should
#write this data too in another output folder
df_positions=df_fifa_players.select(F.col("Position_unit")).distinct().orderBy("Position_unit")

lst_letras = []

field = [
      StructField("ID", IntegerType(), True),
      StructField("Name", StringType(), True),
      StructField("Age", IntegerType(), True),
      StructField("Nationality", StringType(), True),
      StructField("Nationality_clean", StringType(), True),
      StructField("Position", StringType(), True),
      StructField("Height", StringType(), True),
      StructField("Weight", StringType(), True),
      StructField("Total Stats", IntegerType(), True),
      StructField("Weight_SIU_kg", FloatType(), True),
      StructField("Height_SIU_cms", FloatType(), True),
      StructField("IMC", FloatType(), True),
      StructField("Position_unit", StringType(), True)
]
schema = StructType(field)
df_player = spark.createDataFrame([],schema)

lst_positions = df_positions.select('Position_unit').collect()
# se itera por posicion
for e  in lst_positions:
  print(e.Position_unit)
  df_player = df_fifa_players.select("ID","Name","Age","Nationality","Nationality_clean","Position","Height","Weight","Total Stats","Weight_SIU_kg","Height_SIU_cms","IMC","Position_unit")\
    .filter(F.col("Position_unit") == e.Position_unit).sort(F.col("Total Stats").desc()).limit(10)

  #se guarda en una ruta temporal
  temppath = 'temp2'
  df_player.repartition(1).write.format("orc").option("header", "true").mode("overwrite").save(temppath)

  
  #En el filesistem de colab cuando se enlistan los archivos con listdir al parecer de duplican aunque no existan como tal
  #por ejemplo File.orc y tambien aparece otro con el nombre -> .File.orc.orc, aunque no exista, 
  #aqui obtengo solo los arhivos
  lst_files = []
  for entry in os.listdir(temppath):
    if entry[0:1] != '.' and entry[0:1] != '_':
      file_str = "temp2/" + entry
      lst_files.append(file_str)

  #Se crea el folder, si existe lo borra y lo vuelve a crear
  str_path_save = const.f_output2 + e.Position_unit   
  try:
    os.makedirs(str_path_save) 
  except OSError:
    shutil.rmtree(str_path_save)
    os.makedirs(str_path_save) 
    
  #Mover los archivos del folder temp al folder final del pais correspondiente con un nombre unico
  for file_ in lst_files:
    now = datetime.now()
    dt_str = now.strftime("%Y_%m_%d_%H_%M_%S_%f")
    
    str_name_file_save = "Players_" + e.Position_unit + '_' + dt_str + '.orc'
    str_path_file_save = str_path_save +'/' +str_name_file_save
    shutil.copyfile(file_, str_path_file_save)    

ACB
CAM
CB
CDM
CF
CM
GK
IILB
IIRM
IVST
LB
LM
LW
LWB
ND
NaN
RB
RM
RW
RWB
ST


In [11]:
#We need how many players for each position have each Club.
#Este solo muestra la informacion, no se pidio que se guardara
df_player_position = df_fifa_players.groupBy("Club","Position_unit").count().orderBy("Club","Position_unit")
df_player_position.show(25)

+--------------------+-------------+-----+
|                Club|Position_unit|count|
+--------------------+-------------+-----+
|1. FC Heidenheim ...|          CAM|    1|
|1. FC Heidenheim ...|           CB|    3|
|1. FC Heidenheim ...|          CDM|    4|
|1. FC Heidenheim ...|           CF|    1|
|1. FC Heidenheim ...|           CM|    5|
|1. FC Heidenheim ...|           GK|    3|
|1. FC Heidenheim ...|           LB|    4|
|1. FC Heidenheim ...|           LM|    5|
|1. FC Heidenheim ...|           RB|    3|
|1. FC Heidenheim ...|           RM|    4|
|1. FC Heidenheim ...|           ST|    5|
|1. FC Kaiserslautern|          CAM|    7|
|1. FC Kaiserslautern|           CB|    7|
|1. FC Kaiserslautern|          CDM|    5|
|1. FC Kaiserslautern|           CM|   10|
|1. FC Kaiserslautern|           GK|    2|
|1. FC Kaiserslautern|           LB|    3|
|1. FC Kaiserslautern|           LM|    4|
|1. FC Kaiserslautern|           RB|    2|
|1. FC Kaiserslautern|           RM|    6|
|1. FC Kais

In [12]:
#We need to know the top 10 clubs for sprint speed average.
#Este solo muestra la informacion, no se pidio que se guardara
df_club_speend_avg = df_fifa_players.groupBy("Club").avg("Sprint Speed")\
  .withColumnRenamed("avg(Sprint Speed)", "Sprint Speed avg").sort(F.col("Sprint Speed avg").desc()).limit(10)
df_club_speend_avg.show()

+------------------+----------------+
|              Club|Sprint Speed avg|
+------------------+----------------+
|     United States|            91.0|
|  Alemannia Aachen|            91.0|
|    UCAM Murcia CF|            90.0|
|     GIF Sundsvall|            89.0|
|      Terek Grozny|            89.0|
|        Córdoba CF|            88.0|
|         Dundee FC|            86.0|
|Dorados de Sinaloa|            85.0|
|       Netherlands|            84.0|
|           Palermo|            83.0|
+------------------+----------------+



In [13]:
#Calculate the IMC for each player, we need to know all players with overweight (IMC>25).
df_player_imc = df_fifa_players.select("ID","Name","Club","Height","Weight","IMC")\
    .filter(F.col("IMC") > const.imc_overweight).distinct().sort("IMC")
df_player_imc.show()

+------+--------------+--------------------+------+------+---------+
|    ID|          Name|                Club|Height|Weight|      IMC|
+------+--------------+--------------------+------+------+---------+
|232832|      L. Gómez|Club Atlético Ban...|  5'4"|146lbs|25.060265|
|183671|  E. Terranova|        US Cremonese|  6'1"|190lbs|25.066875|
|152603|     O. Occéan|        Mjøndalen IF|  6'1"|190lbs|25.066875|
|175141|      D. Ovono|            Paris FC|  6'1"|190lbs|25.066875|
|223823| T. Königsmann| SV Waldhof Mannheim|  6'1"|190lbs|25.066875|
|111661|     C. Kameni|            Cameroon|  6'1"|190lbs|25.066875|
|190461|B. Sigurðarson|             Iceland|  6'1"|190lbs|25.066875|
|192476|        Fontàs|Sporting Kansas City|  6'1"|190lbs|25.066875|
|251498|      J. Rojas|             Bolivia|  6'1"|190lbs|25.066875|
|222123|       A. Long|  New York Red Bulls|  6'1"|190lbs|25.066875|
|213901|       A. Madu|            Al Nassr|  6'1"|190lbs|25.066875|
|219931|    D. Mendoza|Club Atléti