# Proyecto: Big Data Processing

Tenemos el dataset que contiene el reporte de países y su índice de felicidad en el transcurso de los años **(world-happiness-report-2021.csv y world-happiness-report.csv)** se les pide a los alumnos desarrollar y responder las siguiente preguntas, utilizando las herramientas vistas a lo largo del módulo.

In [1]:
import pyspark
#Creamos una sesion de spark
from pyspark.sql import SparkSession
#Creamos un objeto de spark session
spark = SparkSession.builder.appName("pysparkdf").getOrCreate()

In [2]:
paths = ["C:/Users/pardo/Documents/BigData_AI_MachineLearning_FullStackBootcamp_2023/8.Big Data Processing/big-data-processing-main/PROYECTO FINAL/Datasets/world-happiness-report.csv", "C:/Users/pardo/Documents/BigData_AI_MachineLearning_FullStackBootcamp_2023/8.Big Data Processing/big-data-processing-main/PROYECTO FINAL/Datasets/world-happiness-report-2021.csv"]
# Load all distinct CSV files
df1 = spark.read.option("header", "true").csv(paths[0])
df2 = spark.read.option("header", "true").csv(paths[1])

In [117]:
# Schema Tabla world-happiness-report.csv
df1.printSchema()
# Schema Tabla world-happiness-report-2021.csv
df2.printSchema()

root
 |-- Country name: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- year: string (nullable = true)
 |-- Life Ladder: string (nullable = true)
 |-- Log GDP per capita: string (nullable = true)
 |-- Social support: string (nullable = true)
 |-- Healthy life expectancy at birth: string (nullable = true)
 |-- Freedom to make life choices: string (nullable = true)
 |-- Generosity: string (nullable = true)
 |-- Perceptions of corruption: string (nullable = true)
 |-- Negative affect: string (nullable = true)

root
 |-- Country name: string (nullable = true)
 |-- Regional indicator: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Ladder score: string (nullable = true)
 |-- Standard error of ladder score: string (nullable = true)
 |-- upperwhisker: string (nullable = true)
 |-- lowerwhisker: string (nullable = true)
 |-- Logged GDP per capita: string (nullable = true)
 |-- Social support: string (nullable = true)
 |-- Healthy life expectancy: string 

In [118]:
# Preparacion de los datos
from pyspark.sql.functions import lit, col
# Preparacion Tabla world-happiness-report.csv
df1_mod = df1.withColumn("Regional indicator", lit(None))
df1_fnl = df1_mod.select(col("Country name").alias("Country name"), col("year").alias("year").cast("Int"),"Region","Regional indicator", col("Life Ladder").alias("Ladder score").cast("float"), col("Log GDP per capita").alias("Logged GDP per capita").cast("float"), col("Social support").alias("Social support"), col("Healthy life expectancy at birth").alias("Healthy life expectancy").cast("float"), col("Freedom to make life choices").alias("Freedom to make life choices"), col("Generosity").alias("Generosity"), col("Perceptions of corruption").alias("Perceptions of corruption"))
df1_fnl.show(3)

# Preparacion Tabla world-happiness-report-2021.csv
df2_mod = df2.withColumn("year", lit(2021))
df2_fnl = df2_mod.select("Country name",col("year").cast("Int"),"Region","Regional indicator", col("Ladder score").cast("float"),col("Logged GDP per capita").cast("float"),"Social support", col("Healthy life expectancy").cast("float"),"Freedom to make life choices","Generosity","Perceptions of corruption" )
df2_fnl.show(3)

# Tabla maestra (Tabla unificada)
df3 = df1_fnl.unionByName(df2_fnl)
df3.orderBy(col("Country name").asc()).show(20)

+------------+----+------+------------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+
|Country name|year|Region|Regional indicator|Ladder score|Logged GDP per capita|Social support|Healthy life expectancy|Freedom to make life choices|Generosity|Perceptions of corruption|
+------------+----+------+------------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+
| Afghanistan|2008|  Asia|              NULL|      3724.0|               7370.0|         0.451|                50800.0|                       0.718|     0.168|                    0.882|
| Afghanistan|2009|  Asia|              NULL|      4402.0|               7540.0|         0.552|                51200.0|                       0.679|     0.190|                    0.850|
| Afghanistan|2010|  Asia|              NULL|      4758.0|            

1. ¿Cuál es el país más “feliz” del 2021 según la data? **(considerar que la columna “Ladder score” mayor número más feliz es el país)**

In [77]:
from pyspark.sql.functions import col, max

# Calcula el valor máximo de 'Ladder score'
max_ladder_score = df2.agg(max("Ladder score")).collect()[0][0]

# Filtra el DataFrame usando el valor máximo calculado
result = df3.filter((col("year") == 2021) & (col("Ladder score") == max_ladder_score))

result.select("Country name", "year", "Region", "Regional indicator", "Ladder score").show()


+------------+----+------+------------------+------------+
|Country name|year|Region|Regional indicator|Ladder score|
+------------+----+------+------------------+------------+
|     Finland|2021|Europe|    Western Europe|      7842.0|
+------------+----+------+------------------+------------+



2. ¿Cuál es el país más “feliz” del 2021 por region según la data?

In [78]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, row_number
from pyspark.sql.window import Window

# Filtrar por el año 2021 y calcular el máximo "Ladder score" por región
max_ladder_por_region = df3.groupBy("Country name", "year", "Region", "Regional indicator").agg(max("Ladder score").alias("Ladder score"))
felicidad_2021 = max_ladder_por_region.filter(col("year") == 2021)

# Definir la ventana por región y ordenar por "Ladder score" en orden descendente
window_spec = Window.partitionBy("Region").orderBy(col("Ladder score").desc())


# Asignar un número de fila a cada registro dentro de cada región
max_ladder_por_region_with_rank = felicidad_2021.withColumn("rank", row_number().over(window_spec))


# Filtrar solo los registros con rank igual a 1 (el máximo "Ladder score" por región)
max_ladder_por_region_with_rank.filter(col("rank") == 1).show()


+------------+----+-------------+--------------------+------------+----+
|Country name|year|       Region|  Regional indicator|Ladder score|rank|
+------------+----+-------------+--------------------+------------+----+
|     Bahrain|2021|       Africa|Middle East and N...|      6647.0|   1|
|      Israel|2021|         Asia|Middle East and N...|      7157.0|   1|
|     Finland|2021|       Europe|      Western Europe|      7842.0|   1|
|      Canada|2021|North America|North America and...|      7103.0|   1|
| New Zealand|2021|      Oceania|North America and...|      7277.0|   1|
|  Costa Rica|2021|South America|Latin America and...|      7069.0|   1|
+------------+----+-------------+--------------------+------------+----+



3. ¿Cuál es el país que más veces ocupó el primer lugar en todos los años?

In [79]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, row_number
from pyspark.sql.window import Window

primer_lugar = df3.groupBy("Country name", "year", "Region").agg(max("Ladder score").alias("Ladder score"))

# Definir la ventana por región y ordenar por "Ladder score" en orden descendente
window_spec1 = Window.partitionBy("year").orderBy(col("Ladder score").desc())


# Asignar un número de fila a cada registro dentro de cada región
primer_lugar_with_rank = primer_lugar.withColumn("rank", row_number().over(window_spec1))

contar_primer_lugar = primer_lugar_with_rank.filter(col("rank") == 1)

# Paises que ocupan el primer puesto por año de Ladder score
contar_primer_lugar.show()

# Tabla acumulada de paises que ocupan el primer lugar en todos los años
contar_primer_lugar.groupBy("Country name").sum("rank").show()

+------------+----+-------------+------------+----+
|Country name|year|       Region|Ladder score|rank|
+------------+----+-------------+------------+----+
|     Denmark|2005|       Europe|      8019.0|   1|
|     Finland|2006|       Europe|      7672.0|   1|
|     Denmark|2007|       Europe|      7834.0|   1|
|     Denmark|2008|       Europe|      7971.0|   1|
|     Denmark|2009|       Europe|      7683.0|   1|
|     Denmark|2010|       Europe|      7771.0|   1|
|     Denmark|2011|       Europe|      7788.0|   1|
| Switzerland|2012|       Europe|      7776.0|   1|
|      Canada|2013|North America|      7594.0|   1|
|     Denmark|2014|       Europe|      7508.0|   1|
|      Norway|2015|       Europe|      7603.0|   1|
|     Finland|2016|       Europe|      7660.0|   1|
|     Finland|2017|       Europe|      7788.0|   1|
|     Finland|2018|       Europe|      7858.0|   1|
|     Finland|2019|       Europe|      7780.0|   1|
|     Finland|2020|       Europe|      7889.0|   1|
|     Finlan

4. ¿Qué puesto de Felicidad tiene el país con mayor GDP del 2020?

In [97]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, row_number
from pyspark.sql.window import Window

# Filtrar por el año 2021 y calcular el máximo "Ladder score" por región
Logged_GDP_per_capita = df3.groupBy("Country name", "year", "Region","Ladder score").agg(max("Logged GDP per capita").alias("Logged GDP per capita"))
Logged_GDP_per_capita_filter = Logged_GDP_per_capita.filter(col("year") == 2020)

Max_Logged_GDP_per_capita =Logged_GDP_per_capita_filter.orderBy(col("Logged GDP per capita").desc())
# Definir la ventana por región y ordenar por "Ladder score" en orden descendente
window_spec3 = Window.partitionBy().orderBy(col("Logged GDP per capita").desc())


# Asignar un número de fila a cada registro dentro de cada región
max_ladder_por_region_with_rank = Max_Logged_GDP_per_capita.withColumn("rank", row_number().over(window_spec3))

max_ladder_por_region_with_rank.show()


+--------------------+----+-------------+------------+---------------------+----+
|        Country name|year|       Region|Ladder score|Logged GDP per capita|rank|
+--------------------+----+-------------+------------+---------------------+----+
|             Ireland|2020|       Europe|      7035.0|              11323.0|   1|
|         Switzerland|2020|       Europe|      7508.0|              11081.0|   2|
|United Arab Emirates|2020|         Asia|      6458.0|              11053.0|   3|
|              Norway|2020|       Europe|      7290.0|              11042.0|   4|
|       United States|2020|North America|      7028.0|              11001.0|   5|
|             Denmark|2020|       Europe|      7515.0|              10910.0|   6|
|         Netherlands|2020|       Europe|      7504.0|              10901.0|   7|
|             Austria|2020|       Europe|      7213.0|              10851.0|   8|
|              Sweden|2020|       Europe|      7314.0|              10838.0|   9|
|             Ge

In [109]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

# Filtrar por el año 2021 y calcular el máximo "Ladder score" por región
Logged_GDP_per_capita_filter = Logged_GDP_per_capita.filter(col("year") == 2020)

# Asignar un identificador único a cada fila
Logged_GDP_per_capita_with_id = Logged_GDP_per_capita_filter.withColumn("id", monotonically_increasing_id())

# Ordenar por "Logged GDP per capita" en orden descendente
Max_Logged_GDP_per_capita = Logged_GDP_per_capita_with_id.orderBy(col("Logged GDP per capita").desc()).show(1)

+------------+----+------+------------+---------------------+---+
|Country name|year|Region|Ladder score|Logged GDP per capita| id|
+------------+----+------+------------+---------------------+---+
|     Ireland|2020|Europe|      7035.0|              11323.0| 68|
+------------+----+------+------------+---------------------+---+
only showing top 1 row



5. ¿En que porcentaje a variado a nivel mundial el GDP promedio del 2020 respecto al 2021? ¿Aumentó o disminuyó?

In [115]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

table_2020 = df3.filter(col("year") == 2020)
table_2021 = df3.filter(col("year") == 2021)

# table_2020.avg("Logged GDP per capita").show()
# table_2021.avg("Logged GDP per capita").show()

# Filtrar por el año 2020 y calcular el promedio de "Logged GDP per capita"
table_2020 = df3.filter(col("year") == 2020)
avg_2020 = table_2020.agg(avg("Logged GDP per capita").alias("avg_logged_gdp_2020"))
avg_2020.show()

# Filtrar por el año 2021 y calcular el promedio de "Logged GDP per capita"
table_2021 = df3.filter(col("year") == 2021)
avg_2021 = table_2021.agg(avg("Logged GDP per capita").alias("avg_logged_gdp_2021"))
avg_2021.show()

# Obtener los valores numéricos para realizar la operación de crecimiento porcentual
avg_2020_value = avg_2020.collect()[0]["avg_logged_gdp_2020"]
avg_2021_value = avg_2021.collect()[0]["avg_logged_gdp_2021"]

# Calcular el crecimiento porcentual
crecimiento_avg = (avg_2021_value - avg_2020_value) / avg_2020_value * 100

print(f"Crecimiento porcentual: {crecimiento_avg}%")

+-------------------+
|avg_logged_gdp_2020|
+-------------------+
|  9751.329545454546|
+-------------------+

+-------------------+
|avg_logged_gdp_2021|
+-------------------+
|  9432.208053691274|
+-------------------+

Crecimiento porcentual: -3.272594678251088%


6. ¿Cuál es el país con mayor expectativa de vide **(“Healthy life expectancy at birth”)**? Y ¿Cuánto tenia en ese indicador en el 2019?

In [132]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
Healthy_life_expectancy_birth = df3

# Ordenar por "Healthy life expectancy" en orden descendente
Result = Healthy_life_expectancy_birth.orderBy(col("Healthy life expectancy").desc())

# Seleccionar la primera fila
print(f"Tabla por año de los paises con mayor expectativa de vida")
Result.select("Country name", "year", "Region", "Ladder score", "Healthy life expectancy").show(10)

Tabla por año de los paises con mayor expectativa de vida
+--------------------+----+------+------------+-----------------------+
|        Country name|year|Region|Ladder score|Healthy life expectancy|
+--------------------+----+------+------------+-----------------------+
|           Singapore|2019|  Asia|      6378.0|                77100.0|
|           Singapore|2021|  Asia|      6377.0|                76953.0|
|Hong Kong S.A.R. ...|2021|  Asia|      5477.0|                76820.0|
|           Singapore|2018|  Asia|      6375.0|                76800.0|
|           Singapore|2017|  Asia|      6378.0|                76500.0|
|           Singapore|2016|  Asia|      6033.0|                76200.0|
|           Singapore|2015|  Asia|      6620.0|                75900.0|
|           Singapore|2014|  Asia|      7062.0|                75680.0|
|           Singapore|2013|  Asia|      6533.0|                75460.0|
|               Japan|2020|  Asia|      6118.0|                75200.0|
+-----

In [133]:
from pyspark.sql import SparkSession

Healthy_life_expectancy_birth_2019 = df3.filter(col("year") == 2019)

print(f"Expectativa de vide en el año 2019")
Result = Healthy_life_expectancy_birth_2019.orderBy(col("Healthy life expectancy").desc())
Result.select("Country name", "year", "Region", "Ladder score","Healthy life expectancy").show(1)

Expectativa de vide en el año 2019
+------------+----+------+------------+-----------------------+
|Country name|year|Region|Ladder score|Healthy life expectancy|
+------------+----+------+------------+-----------------------+
|   Singapore|2019|  Asia|      6378.0|                77100.0|
+------------+----+------+------------+-----------------------+
only showing top 1 row

