# Phase I
In this phase we import the raw data and perform essential data exploration and preprocessing

Fix the cell size to maximize visable code per line

In [1]:
%%html
<style>
    .container { width:100% !important; }
</style>

Let's import the libraries we will need for this initial analysis

In [2]:
%run relevant_libraries_phase_1.ipynb

3.9.7 (default, Sep 16 2021, 16:59:28) [MSC v.1916 64 bit (AMD64)]
C:\Users\mmccoy\Anaconda3\python.exe
C:\Users\mmccoy\AppData\Local\Microsoft\WindowsApps\python.exe


In [3]:
import os
os.environ['PYSPARK_PYTHON'] = 'C:/Users/mmccoy/AppData/Local/Microsoft/WindowsApps/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/mmccoy/AppData/Local/Microsoft/WindowsApps/python.exe'

In [4]:
spark = SparkSession.builder.appName("World Happiness Report").getOrCreate()

## Preprocessing/Cleaning
____________________________________________________________________________________________

Import the raw data

In [5]:
df = spark.read.csv("../2022.csv", header = True, inferSchema = True)

____________________________________________________________________________________________
Check the data types to see if any adjustments need to be made.

In [6]:
#Let's take a look at the columns and their types
df.dtypes

[('RANK', 'int'),
 ('Country', 'string'),
 ('Happiness score', 'string'),
 ('Whisker-high', 'string'),
 ('Whisker-low', 'string'),
 ('Dystopia (1.83) + residual', 'string'),
 ('Explained by: GDP per capita', 'string'),
 ('Explained by: Social support', 'string'),
 ('Explained by: Healthy life expectancy', 'string'),
 ('Explained by: Freedom to make life choices', 'string'),
 ('Explained by: Generosity', 'string'),
 ('Explained by: Perceptions of corruption', 'string')]

Notice that we need to convert some string types to float types for machine learning purposes. Let's first rename our columns so they're easier to work with and we don't have to worry about special characters.

In [7]:
df = (df.withColumnRenamed("RANK", "rank")
      .withColumnRenamed("Country", "country")
      .withColumnRenamed("Happiness score", "happiness_score")
      .withColumnRenamed("Whisker-high", "whisker_high")
      .withColumnRenamed("Whisker-low", "whisker_low")
      .withColumnRenamed("Dystopia (1.83) + residual", "dystopia_183_residual")
      .withColumnRenamed("Explained by: GDP per capita", "gdp_per_capita")
      .withColumnRenamed("Explained by: Social support", "social_support")
      .withColumnRenamed("Explained by: Healthy life expectancy", "healthy_life_expectancy")
      .withColumnRenamed("Explained by: Freedom to make life choices", "freedom_to_make_life_choices")
      .withColumnRenamed("Explained by: Generosity", "generosity")
      .withColumnRenamed("Explained by: Perceptions of corruption", "perception_of_corruption")
      .withColumn('country', when(col('country') == 'Taiwan Province of China', 'Taiwan').otherwise(col('country')))
      .withColumn('country', when(col('country') == 'Hong Kong S.A.R. of China', 'Hong Kong').otherwise(col('country')))
     )

In [8]:
df.dtypes

[('rank', 'int'),
 ('country', 'string'),
 ('happiness_score', 'string'),
 ('whisker_high', 'string'),
 ('whisker_low', 'string'),
 ('dystopia_183_residual', 'string'),
 ('gdp_per_capita', 'string'),
 ('social_support', 'string'),
 ('healthy_life_expectancy', 'string'),
 ('freedom_to_make_life_choices', 'string'),
 ('generosity', 'string'),
 ('perception_of_corruption', 'string')]

In [9]:
#It looks like we need to remove commas to get our data types correct
numeric_type = DoubleType()
for col_name in df.columns:
    # Replace the "," with "."
    df = df.withColumn(col_name, regexp_replace(col_name, ",", "."))
    
for col_name in df.columns[2:]:
    df = df.withColumn(col_name, col(col_name).cast(numeric_type))

In [10]:
df.dtypes

[('rank', 'string'),
 ('country', 'string'),
 ('happiness_score', 'double'),
 ('whisker_high', 'double'),
 ('whisker_low', 'double'),
 ('dystopia_183_residual', 'double'),
 ('gdp_per_capita', 'double'),
 ('social_support', 'double'),
 ('healthy_life_expectancy', 'double'),
 ('freedom_to_make_life_choices', 'double'),
 ('generosity', 'double'),
 ('perception_of_corruption', 'double')]

____________________________________________________________________________________________
Investigate NA values relative to ``happiness_score``

In [11]:
# Count the number of NA values in each column
na_counts = df.select([sum(when(isnan(c), 1)).alias(c) for c in df.columns])

# Display the NA counts for each column
na_counts.show()

# Filter the rows where happiness_score is null and group by country
null_scores_by_country = df.filter(df.happiness_score.isNull()) \
                          .groupBy("country") \
                          .count()

# Display the countries with null happiness scores and the number of occurrences
null_scores_by_country.show()


+----+-------+---------------+------------+-----------+---------------------+--------------+--------------+-----------------------+----------------------------+----------+------------------------+
|rank|country|happiness_score|whisker_high|whisker_low|dystopia_183_residual|gdp_per_capita|social_support|healthy_life_expectancy|freedom_to_make_life_choices|generosity|perception_of_corruption|
+----+-------+---------------+------------+-----------+---------------------+--------------+--------------+-----------------------+----------------------------+----------+------------------------+
|null|   null|           null|        null|       null|                 null|          null|          null|                   null|                        null|      null|                    null|
+----+-------+---------------+------------+-----------+---------------------+--------------+--------------+-----------------------+----------------------------+----------+------------------------+

+-------+-----

Drop NA values

In [12]:
# Drop all rows with any null values
df = df.na.drop()

____________________________________________________________________________________________
Let's see if we have any duplicate rows

In [13]:
duplicate_row = df.groupBy(df.columns).count().where(col("count") > 1).show()

+----+-------+---------------+------------+-----------+---------------------+--------------+--------------+-----------------------+----------------------------+----------+------------------------+-----+
|rank|country|happiness_score|whisker_high|whisker_low|dystopia_183_residual|gdp_per_capita|social_support|healthy_life_expectancy|freedom_to_make_life_choices|generosity|perception_of_corruption|count|
+----+-------+---------------+------------+-----------+---------------------+--------------+--------------+-----------------------+----------------------------+----------+------------------------+-----+
+----+-------+---------------+------------+-----------+---------------------+--------------+--------------+-----------------------+----------------------------+----------+------------------------+-----+



We can drop the duplicates, but we don't see any for this data set. 

In [14]:
df = df.dropDuplicates()

____________________________________________________________________________________________
Let's encode the categorical variables. We will use one-hot encoding.

In [15]:
# Index the string column
indexer = StringIndexer(inputCol="country", outputCol="country_index")
indexed = indexer.fit(df).transform(df)

# One-hot encode the indexed column
encoder = OneHotEncoder(inputCols=["country_index"],
                                 outputCols=["country_onehot"])
encoded = encoder.fit(indexed).transform(indexed)

# Drop the original categorical column and the index column
encoded = encoded.drop("country", "country_index")


Next, we will standardize df. Recall that df is a Spark ``pyspark.sql.dataframe.DataFrame``. We will do this in the following steps:
1. Select the columns we want to normalize 
2. Assemble the selected columns into a vector column
3. Scale and normalize the vector column
4. Drop the original columns and keep only the normalized features

In [16]:
df.printSchema()

root
 |-- rank: string (nullable = true)
 |-- country: string (nullable = true)
 |-- happiness_score: double (nullable = true)
 |-- whisker_high: double (nullable = true)
 |-- whisker_low: double (nullable = true)
 |-- dystopia_183_residual: double (nullable = true)
 |-- gdp_per_capita: double (nullable = true)
 |-- social_support: double (nullable = true)
 |-- healthy_life_expectancy: double (nullable = true)
 |-- freedom_to_make_life_choices: double (nullable = true)
 |-- generosity: double (nullable = true)
 |-- perception_of_corruption: double (nullable = true)



In [17]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import udf, struct, col
from pyspark.sql.types import StructType, StructField, DoubleType

#select the columns to normalize
num_cols = df.columns[2:]

# vectorize the features
assembler = VectorAssembler(inputCols=num_cols, outputCol="features")
df_vector = assembler.transform(df)

# standardize the features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

# convert Vector column to StructType column
vector_to_struct = udf(lambda v: struct([float(x) for x in v]), StructType([StructField(col, DoubleType()) for col in num_cols]))
df_scaled = df_scaled.withColumn("scaledFeatures", vector_to_struct(col("scaledFeatures")))

# # select the necessary columns
# selected_cols = num_cols
# for i in range(len(selected_cols)):
#     df_scaled = df_scaled.withColumn(selected_cols[i], col(selected_cols[i]))

# # drop the original columns and the scaled features column
# df_scaled = df_scaled.drop(*num_cols).drop("features").drop("scaledFeatures")


In [18]:
new_df = df_scaled.toPandas()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "C:\Users\mmccoy\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 540, in main
RuntimeError: Python in worker has different version 3.10 than that in driver 3.9, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


# Exploratory Analysis
____________________________________________________________________________________________

Let's grab some descriptive statistics for scores.

In [None]:

df.select('happiness_score').describe().show()

In [None]:
df.select(percentile_approx("happiness_score", 0.25).alias("25th_percentile"),
                                percentile_approx("happiness_score", 0.5).alias("50th_percentile"),
                                percentile_approx("happiness_score", 0.75).alias("75th_percentile")).show()

In [None]:
def percentile_val(df,col,perc):
    df = df.select(percentile_approx(col,perc).alias("percentile_val"))
    return df

In [None]:
(percentile_val(df, 'happiness_score', 0.5)).show()

In [None]:

# happiness_df = df.select('country','happiness_score').where(col('happiness_score') > 5.559).toPandas()

grouped_df = df.groupBy('country').agg(max('happiness_score').alias("desc_score"))
top_10_df = grouped_df.sort('desc_score', ascending=False).limit(10)
top_10_df = top_10_df.toPandas()


In [None]:
fig, ax = plt.subplots(figsize=(15, 6))
ax.bar(top_10_df['country'], top_10_df['desc_score'], color='b')
ax.set_title('Top 10 Countries by Happiness Score')
ax.set_xlabel('Country')
ax.set_ylabel('Happiness Score')
plt.show()

In [None]:
num_df = df.drop('rank').drop('country')

In [None]:
num_df=num_df.toPandas()

In [None]:
corr_df = num_df.corr()

In [None]:
fig, ax = plt.subplots(figsize=(15, 6))
sns.heatmap(num_df.corr())

In [None]:
corr_df_interesting = corr_df.where(abs(corr_df) > 0.7)
display(corr_df_interesting)

In [None]:
test_df = df.select('country','happiness_score').toPandas()

In [None]:
type(test_df)

In [None]:
import geopandas as gpd
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Load the world map shapefile
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))

# Create a list of countries
# countries = ['United States', 'Canada', 'Mexico', 'Brazil', 'Argentina']

# Generate artificial happiness scores for each country
# scores = pd.DataFrame({'Country': countries, 'Happiness Score': np.random.rand(len(countries))})

scores = test_df = df.select('country','happiness_score').toPandas()

# Merge the world map with the happiness scores data
world = world.merge(scores, left_on='name', right_on='country')

# Define the color map for the happiness scores
cmap = 'Reds'

# Plot the map with happiness scores as colors
fig, ax = plt.subplots(figsize=(15,25))
ax.set_aspect('equal')
world.plot(
    ax=ax,
    column='happiness_score',
    cmap=cmap
)
# Create a separate axis for the colorbar
cax = fig.add_axes([1, 0.35, 0.05, 0.3])
sm = plt.cm.ScalarMappable(cmap=cmap, norm=plt.Normalize(vmin=world['happiness_score'].min(), vmax=world['happiness_score'].max()))
sm._A = []
fig.colorbar(sm, cax=cax)
plt.show()


Notice this map isn't super useful. Let's take a look an alternative map. 

In [None]:
#convert country column to a list
list_ex = test_df['country'].tolist()
list_ex = [c.replace('*','') for c in list_ex]

In [None]:
# List of all countries
all_countries = list_ex

# List of countries to remove - these are countries that don't match the country dictionary
countries_to_remove = ['Kosovo', 'North Cyprus', 'Palestinian Territories','Eswatini. Kingdom of']

# Create a new list that contains only the countries that are not in the list of countries to remove
countries = [country for country in all_countries if country not in countries_to_remove]


In [None]:
from geopy.geocoders import Nominatim

# countries = list_ex
geolocator = Nominatim(user_agent="my_app",timeout = 100)

#Get the locations
locations = []
for country in countries:
    location = geolocator.geocode(country)
    code = country_name_to_country_alpha2(country, cn_name_format="default")
    cn_continent = country_alpha2_to_continent_code(code)
    if location is not None:
        locations.append((country, location.latitude, location.longitude, code, cn_continent))

In [None]:
locations

In [None]:
#Create location dataframe
locations_df = pd.DataFrame(locations, columns = ['country', 'latitude', 'longitude', 'code', 'continent'])
locations_df

In [None]:
#merge on country
new_df = pd.merge(test_df, locations_df, on ='country', how ='inner')

In [None]:
new_df

In [None]:
import folium
from folium.plugins import MarkerCluster

In [None]:
world_map = folium.Map(tile = "cartodbpositron")

In [None]:
marker_cluster = MarkerCluster().add_to(world_map)

In [None]:
#for each coordinate, create circlemarker of user percent
for i in range(len(new_df)):
        lat = new_df.iloc[i]['latitude']
        long = new_df.iloc[i]['longitude']
        radius=5
        popup_text = """Country : {}<br>
                    Happiness Score : {}<br>"""
        popup_text = popup_text.format(new_df.iloc[i]['country'],
                                   new_df.iloc[i]['happiness_score']
                                   )
        folium.CircleMarker(location = [lat, long], radius=radius, popup= popup_text, fill =True).add_to(marker_cluster)
#show the map
world_map.save('map.html')