In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pysparkdf").getOrCreate()
from pyspark.sql.window import Window

In [3]:
dfHappiness2021 = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./world-happiness-report-2021.csv")
dfHappiness = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./world-happiness-report.csv")

In [4]:
dfHappiness2021.limit(5).show()

+------------+------------------+------------+------------------------------+------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+------------------------+--------------------------------+----------------------------+-------------------------------------+------------------------------------------+------------------------+---------------------------------------+-------------------+
|Country name|Regional indicator|Ladder score|Standard error of ladder score|upperwhisker|lowerwhisker|Logged GDP per capita|Social support|Healthy life expectancy|Freedom to make life choices|Generosity|Perceptions of corruption|Ladder score in Dystopia|Explained by: Log GDP per capita|Explained by: Social support|Explained by: Healthy life expectancy|Explained by: Freedom to make life choices|Explained by: Generosity|Explained by: Perceptions of corruption|Dystopia + residual|
+------------+------------------

In [5]:
dfHappiness2021.printSchema()

root
 |-- Country name: string (nullable = true)
 |-- Regional indicator: string (nullable = true)
 |-- Ladder score: double (nullable = true)
 |-- Standard error of ladder score: double (nullable = true)
 |-- upperwhisker: double (nullable = true)
 |-- lowerwhisker: double (nullable = true)
 |-- Logged GDP per capita: double (nullable = true)
 |-- Social support: double (nullable = true)
 |-- Healthy life expectancy: double (nullable = true)
 |-- Freedom to make life choices: double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Perceptions of corruption: double (nullable = true)
 |-- Ladder score in Dystopia: double (nullable = true)
 |-- Explained by: Log GDP per capita: double (nullable = true)
 |-- Explained by: Social support: double (nullable = true)
 |-- Explained by: Healthy life expectancy: double (nullable = true)
 |-- Explained by: Freedom to make life choices: double (nullable = true)
 |-- Explained by: Generosity: double (nullable = true)
 |-- Explained 

In [6]:
dfHappiness.limit(5).show()

+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
|Country name|year|Life Ladder|Log GDP per capita|Social support|Healthy life expectancy at birth|Freedom to make life choices|Generosity|Perceptions of corruption|Positive affect|Negative affect|
+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
| Afghanistan|2008|      3.724|              7.37|         0.451|                            50.8|                       0.718|     0.168|                    0.882|          0.518|          0.258|
| Afghanistan|2009|      4.402|              7.54|         0.552|                            51.2|                       0.679|      0.19|                     0.85|          0.584|          0.237|
| Afghanistan|2

In [7]:
dfHappiness.printSchema()

root
 |-- Country name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- Life Ladder: double (nullable = true)
 |-- Log GDP per capita: double (nullable = true)
 |-- Social support: double (nullable = true)
 |-- Healthy life expectancy at birth: double (nullable = true)
 |-- Freedom to make life choices: double (nullable = true)
 |-- Generosity: double (nullable = true)
 |-- Perceptions of corruption: double (nullable = true)
 |-- Positive affect: double (nullable = true)
 |-- Negative affect: double (nullable = true)



Ejercicio 1

In [8]:
result1 = dfHappiness2021.select(
  "Country name",
  "Ladder score"
).orderBy(desc("Ladder score"))\
  .limit(1)

result1.cache
result1.show()

+------------+------------+
|Country name|Ladder score|
+------------+------------+
|     Finland|       7.842|
+------------+------------+



Ejercicio 2

In [9]:
dfContinents = spark.createDataFrame([("Western Europe", "Europe"), ("North America and ANZ", "America"),
  ("Middle East and North Africa", "Africa"), ("Latin America and Caribbean", "America"),
  ("Central and Eastern Europe", "Europe"), ("East Asia", "Asia"),
  ("Southeast Asia", "Asia"), ("Commonwealth of Independent States", "Asia"),
  ("Sub-Saharan Africa", "Africa"), ("South Asia","Asia")],["Regional Indicator","Continent"])

dfContinents.show(truncate=False)

+----------------------------------+---------+
|Regional Indicator                |Continent|
+----------------------------------+---------+
|Western Europe                    |Europe   |
|North America and ANZ             |America  |
|Middle East and North Africa      |Africa   |
|Latin America and Caribbean       |America  |
|Central and Eastern Europe        |Europe   |
|East Asia                         |Asia     |
|Southeast Asia                    |Asia     |
|Commonwealth of Independent States|Asia     |
|Sub-Saharan Africa                |Africa   |
|South Asia                        |Asia     |
+----------------------------------+---------+



In [10]:
dfJoin2021 = dfHappiness2021.join(dfContinents, ["Regional Indicator"])
dfJoin2021.limit(5).show()

+------------------+------------+------------+------------------------------+------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+------------------------+--------------------------------+----------------------------+-------------------------------------+------------------------------------------+------------------------+---------------------------------------+-------------------+---------+
|Regional indicator|Country name|Ladder score|Standard error of ladder score|upperwhisker|lowerwhisker|Logged GDP per capita|Social support|Healthy life expectancy|Freedom to make life choices|Generosity|Perceptions of corruption|Ladder score in Dystopia|Explained by: Log GDP per capita|Explained by: Social support|Explained by: Healthy life expectancy|Explained by: Freedom to make life choices|Explained by: Generosity|Explained by: Perceptions of corruption|Dystopia + residual|Continent|
+-----------

In [11]:
print("Antes del join: ", dfHappiness2021.count())
print("Después del join: ", dfJoin2021.count())

Antes del join:  149
Después del join:  149


In [12]:
windowByContinent2021 = Window.partitionBy("Continent").orderBy(desc("Ladder score"))

result2 = dfJoin2021.withColumn("Rank", rank().over(windowByContinent2021))\
  .filter(col("Rank") == 1)\
  .select(
    "Continent",
    "Country name",
    "Ladder score"
  )

result2.cache
result2.show(truncate=False)

+---------+------------------------+------------+
|Continent|Country name            |Ladder score|
+---------+------------------------+------------+
|Africa   |Israel                  |7.157       |
|America  |New Zealand             |7.277       |
|Asia     |Taiwan Province of China|6.584       |
|Europe   |Finland                 |7.842       |
+---------+------------------------+------------+



Ejercicio 3

In [13]:
dfHappiness.select("year").distinct().orderBy("year").show()

+----+
|year|
+----+
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
+----+



In [14]:
dfHappinessYears = dfHappiness.select("Country name", "year", "Life Ladder")
dfHappinessYears.show(5)

+------------+----+-----------+
|Country name|year|Life Ladder|
+------------+----+-----------+
| Afghanistan|2008|      3.724|
| Afghanistan|2009|      4.402|
| Afghanistan|2010|      4.758|
| Afghanistan|2011|      3.832|
| Afghanistan|2012|      3.783|
+------------+----+-----------+
only showing top 5 rows



In [15]:
dfHappinessYear2021 = dfHappiness2021.select(col("Country name"), lit(2021).alias("year"), col("Ladder score").alias("Life Ladder"))
dfHappinessYear2021.show(5)

+------------+----+-----------+
|Country name|year|Life Ladder|
+------------+----+-----------+
|     Finland|2021|      7.842|
|     Denmark|2021|       7.62|
| Switzerland|2021|      7.571|
|     Iceland|2021|      7.554|
| Netherlands|2021|      7.464|
+------------+----+-----------+
only showing top 5 rows



In [16]:
dfUnionHappiness = dfHappinessYears.union(dfHappinessYear2021)
print("Suma de los registros: " + str(dfHappinessYears.count() + dfHappinessYear2021.count()))
print("Registros de la unión: " + str(dfUnionHappiness.count()))

Suma de los registros: 2098
Registros de la unión: 2098


In [17]:
windowByYearHappiness = Window.partitionBy("year").orderBy(desc("Life Ladder"))

result3 = dfUnionHappiness.withColumn("Rank", rank().over(windowByYearHappiness))\
  .filter(col("Rank") == 1)\
  .groupBy("Country name").count().withColumnRenamed("count", "Times first")\
  .orderBy(desc("Times first"))\
  .limit(2)

result3.cache
result3.show()

+------------+-----------+
|Country name|Times first|
+------------+-----------+
|     Finland|          7|
|     Denmark|          7|
+------------+-----------+



Ejercicio 4

In [18]:
dfHappiness.limit(5).show()

+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
|Country name|year|Life Ladder|Log GDP per capita|Social support|Healthy life expectancy at birth|Freedom to make life choices|Generosity|Perceptions of corruption|Positive affect|Negative affect|
+------------+----+-----------+------------------+--------------+--------------------------------+----------------------------+----------+-------------------------+---------------+---------------+
| Afghanistan|2008|      3.724|              7.37|         0.451|                            50.8|                       0.718|     0.168|                    0.882|          0.518|          0.258|
| Afghanistan|2009|      4.402|              7.54|         0.552|                            51.2|                       0.679|      0.19|                     0.85|          0.584|          0.237|
| Afghanistan|2

In [19]:
windowByYearGDP = Window.partitionBy("year").orderBy(desc("Log GDP per capita"))

result4 = dfHappiness.withColumn("GDP Rank", rank().over(windowByYearGDP))\
  .withColumn("Life Ladder Rank", rank().over(windowByYearHappiness))\
  .filter((col("year") == 2020) & (col("GDP Rank") == 1))\
  .select("Country name", "Life Ladder Rank", "Log GDP per capita")

result4.cache
result4.show()

+------------+----------------+------------------+
|Country name|Life Ladder Rank|Log GDP per capita|
+------------+----------------+------------------+
|     Ireland|              13|            11.323|
+------------+----------------+------------------+



Ejercicio 5

In [20]:
dfHappiness2021.show()

+--------------+--------------------+------------+------------------------------+------------+------------+---------------------+--------------+-----------------------+----------------------------+----------+-------------------------+------------------------+--------------------------------+----------------------------+-------------------------------------+------------------------------------------+------------------------+---------------------------------------+-------------------+
|  Country name|  Regional indicator|Ladder score|Standard error of ladder score|upperwhisker|lowerwhisker|Logged GDP per capita|Social support|Healthy life expectancy|Freedom to make life choices|Generosity|Perceptions of corruption|Ladder score in Dystopia|Explained by: Log GDP per capita|Explained by: Social support|Explained by: Healthy life expectancy|Explained by: Freedom to make life choices|Explained by: Generosity|Explained by: Perceptions of corruption|Dystopia + residual|
+--------------+--------

In [21]:
dfAvgGDP2020 = dfHappiness.filter(col("year") == 2020).agg(
  avg(col("Log GDP per capita"))
).withColumnRenamed("avg(Log GDP per capita)", "Avg GDP 2020")
dfAvgGDP2020.show()

+-----------------+
|     Avg GDP 2020|
+-----------------+
|9.751329545454546|
+-----------------+



In [22]:
dfAvgGDP2021 = dfHappiness2021.agg(
  avg(col("Logged GDP per capita"))
).withColumnRenamed("avg(Logged GDP per capita)", "Avg GDP 2021")
dfAvgGDP2021.show()

+-----------------+
|     Avg GDP 2021|
+-----------------+
|9.432208053691273|
+-----------------+



In [23]:
result5 = dfAvgGDP2020.join(dfAvgGDP2021)\
  .withColumn("Difference percentage", ((col("Avg GDP 2020") - col("Avg GDP 2021")) / col("Avg GDP 2021")) * 100)\
  .withColumn("Type of change", when(col("Difference percentage")> 0, "Increased").when(col("Difference percentage") == 0, "Unchanged").otherwise("Decreased"))

result5.cache
result5.show()

+-----------------+-----------------+---------------------+--------------+
|     Avg GDP 2020|     Avg GDP 2021|Difference percentage|Type of change|
+-----------------+-----------------+---------------------+--------------+
|9.751329545454546|9.432208053691273|    3.383316927984697|     Increased|
+-----------------+-----------------+---------------------+--------------+



Ejercicio 6

In [24]:
dfExpectancy = dfHappiness.select(col("year"), col("Country name"), col("Healthy life expectancy at birth"))
dfExpectancy2021 = dfHappiness2021.select(lit(2021), col("Country name"), col("Healthy life expectancy").alias("Healthy life expectancy at birth"))
dfUnionExpectancy = dfExpectancy.union(dfExpectancy2021)
dfUnionExpectancy.show(5)

+----+------------+--------------------------------+
|year|Country name|Healthy life expectancy at birth|
+----+------------+--------------------------------+
|2008| Afghanistan|                            50.8|
|2009| Afghanistan|                            51.2|
|2010| Afghanistan|                            51.6|
|2011| Afghanistan|                           51.92|
|2012| Afghanistan|                           52.24|
+----+------------+--------------------------------+
only showing top 5 rows



In [25]:
#En el último año

lastYear = dfUnionExpectancy.agg(max(col("year")))\
  .limit(1)\
  .take(1)[0][0]
print(lastYear)

2021


In [26]:
windowByYearExpectancy = Window.partitionBy("year").orderBy(desc("Healthy life expectancy at birth"))

dfRankedByYearExpectancy = dfUnionExpectancy.withColumn("Rank", rank().over(windowByYearExpectancy))

result6a = dfUnionExpectancy.withColumn("Rank", rank().over(windowByYearExpectancy))\
  .filter((col("Rank") == 1) & (col("year")== lastYear))\
  .drop("Rank")

result6a.cache
result6a.show()

+----+------------+--------------------------------+
|year|Country name|Healthy life expectancy at birth|
+----+------------+--------------------------------+
|2021|   Singapore|                          76.953|
+----+------------+--------------------------------+



In [27]:
#En los últimos 5 años

result6b = dfUnionExpectancy.filter(col("year").between(lastYear - 4, lastYear))\
  .groupBy("Country name").avg("Healthy life expectancy at birth")\
  .withColumnRenamed("avg(Healthy life expectancy at birth)", "Average life expectancy")\
  .orderBy(desc("Average life expectancy"))\
  .limit(1)

result6b.cache
result6b.show()

+------------+-----------------------+
|Country name|Average life expectancy|
+------------+-----------------------+
|   Singapore|               76.83825|
+------------+-----------------------+



In [28]:
#En 2019

result6c = result6a.select("Country name")\
  .join(dfRankedByYearExpectancy.filter(col("year") == 2019), ["Country name"])\
  .drop("Rank").drop("year")

result6c.cache
dfRankedByYearExpectancy.unpersist
result6c.show()

+------------+--------------------------------+
|Country name|Healthy life expectancy at birth|
+------------+--------------------------------+
|   Singapore|                            77.1|
+------------+--------------------------------+



ENTREGA ARTÍCULO MEDIUM

https://medium.com/@gonzalo.robles92/mi-experiencia-388797776763