In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

In [2]:
# Crear una sesión de Spark
spark = SparkSession.builder.appName("LifeExpectancyPrediction").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/13 09:18:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/13 09:18:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# Cargar los datos de entrenamiento en un DataFrame
data = spark.read.format("parquet").option("header", "true").load("data")

In [4]:
print(data.columns)

['Year', 'Country Code', 'Country Name', 'life_expatancy', 'life_expatancy_male', 'life_expatancy_female', 'ratio_mort_neonatal', 'ratio_mort_one_year', 'ratio_mort_five_year', 'ratio_maternal_mort', 'ratio_mort_in_year', 'ratio_poblation_0_14_year', 'ratio_poblation_15_64_year', 'ratio_poblation_more_64_year', 'ratio_population_growth', 'ratio_mortality_CVD_cancer_diabetes_CRD', 'ratio_infectious_diseases', 'porc_prev_malnutrition', 'doctor_1000_people', 'bed_1000_people', 'porc_water_service', 'porc_sanitation_services', 'porc_electricity', 'porc_internet', 'porc_secundary_school_compl', 'porc_literacy', 'porc_unemployment', 'porc_below_poverty_line', 'PIB_growth_per_capita']


In [5]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[Year: string, Country Code: string, Country Name: string, life_expatancy: string, life_expatancy_male: string, life_expatancy_female: string, ratio_mort_neonatal: string, ratio_mort_one_year: string, ratio_mort_five_year: string, ratio_maternal_mort: string, ratio_mort_in_year: string, ratio_poblation_0_14_year: string, ratio_poblation_15_64_year: string, ratio_poblation_more_64_year: string, ratio_population_growth: string, ratio_mortality_CVD_cancer_diabetes_CRD: string, ratio_infectious_diseases: string, porc_prev_malnutrition: string, doctor_1000_people: string, bed_1000_people: string, porc_water_service: string, porc_sanitation_services: string, porc_electricity: string, porc_internet: string, porc_secundary_school_compl: string, porc_literacy: string, porc_unemployment: string, porc_below_poverty_line: string, PIB_growth_per_capita: string]>

In [6]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

# Supongamos que tienes un DataFrame llamado "data" con las columnas "col1", "col2" y "col3" que son de tipo string
columns_to_convert = ["ratio_mort_neonatal", "ratio_mort_one_year", "ratio_mort_five_year", 'ratio_maternal_mort', 'ratio_mort_in_year']

for column in columns_to_convert:
    data = data.withColumn(column, col(column).cast(FloatType()))

In [7]:
data.printSchema

<bound method DataFrame.printSchema of DataFrame[Year: string, Country Code: string, Country Name: string, life_expatancy: string, life_expatancy_male: string, life_expatancy_female: string, ratio_mort_neonatal: float, ratio_mort_one_year: float, ratio_mort_five_year: float, ratio_maternal_mort: float, ratio_mort_in_year: float, ratio_poblation_0_14_year: string, ratio_poblation_15_64_year: string, ratio_poblation_more_64_year: string, ratio_population_growth: string, ratio_mortality_CVD_cancer_diabetes_CRD: string, ratio_infectious_diseases: string, porc_prev_malnutrition: string, doctor_1000_people: string, bed_1000_people: string, porc_water_service: string, porc_sanitation_services: string, porc_electricity: string, porc_internet: string, porc_secundary_school_compl: string, porc_literacy: string, porc_unemployment: string, porc_below_poverty_line: string, PIB_growth_per_capita: string]>

In [8]:
# Convertir las columnas seleccionadas a tipo Float
columns_to_convert = ["ratio_mort_neonatal", "ratio_mort_one_year", "ratio_mort_five_year", 'ratio_maternal_mort', 'ratio_mort_in_year']
for column in columns_to_convert:
    data = data.withColumn(column, col(column).cast(FloatType()))

In [9]:
# Convertir la variable categórica "país" a una representación numérica
indexer = StringIndexer(inputCol="Country Code", outputCol="country_index")
data = indexer.fit(data).transform(data)

In [10]:
# Crear el objeto OneHotEncoder
encoder = OneHotEncoder(inputCols=["country_index"], outputCols=["country_encoded"])

In [11]:
# Ajustar y transformar los datos
model = encoder.fit(data)
data = model.transform(data)

In [12]:
# Seleccionar las características y la variable objetivo
feature_columns = ["country_encoded", "ratio_mort_neonatal", "ratio_mort_one_year", "ratio_mort_five_year", 'ratio_maternal_mort', 'ratio_mort_in_year']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="keep")
data = assembler.transform(data)

In [13]:
# Seleccionar las columnas finales
data = data.select("Country Name", "features", "life_expatancy").withColumnRenamed("life_expatancy", "label")

In [14]:
# Mostrar modelo machine learning
data.select("Country Name","label").show()

+--------------------+------+
|        Country Name| label|
+--------------------+------+
|               Haití|61.741|
|              Panamá|75.544|
|   Trinidad y Tobago|71.308|
|Micronesia (Estad...|66.859|
|             Jamaica|71.273|
|           Venezuela|72.161|
|            Kiribati|61.422|
|            Dominica|69.852|
|              Belice|69.887|
|            Suriname|72.242|
|      Islas Marshall|63.585|
|             Vanuatu|69.534|
|             Granada|74.765|
|               Chile|71.176|
|San Vicente y las...|73.516|
|              Tuvalu|61.531|
|             Bahamas|73.806|
|            Honduras|71.963|
|            Dominica|70.538|
|          Costa Rica|76.263|
+--------------------+------+
only showing top 20 rows

