# Iteration 4

In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, stddev, col, randn, when, lit,sum, mean,row_number, rand, monotonically_increasing_id

from pyspark.sql.window import Window
spark = SparkSession.builder.appName('Iteration4').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/22 13:28:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# load the data
df = spark.read.csv('./suicide/suicide_rates_1990-2022.csv', inferSchema=True, header=True)

                                                                                

# 2.Data Understanding

## 2.3 Data Exploration

In [None]:
# describe data
df.printSchema()
print('Index:',df.count())
print('Data columns:',len(df.columns))

In [3]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
# caculate the sum of SuicideCount with same AgeGroup
gdp = df.groupBy("GDPPerCapita").agg(sum("SuicideCount").alias("Total_SuicideCount"))

# trans to Pandas DataFrame
pandas_df = gdp.toPandas()
pandas_df = pandas_df.sort_values(by="GDPPerCapita")

plt.figure(figsize=(10, 6))
plt.plot(pandas_df["GDPPerCapita"], pandas_df["Total_SuicideCount"], color='skyblue')
plt.xlabel('GDPPerCapita')
plt.ylabel('Total SuicideCount')
plt.title('Total SuicideCount by GDPPerCapita')
plt.show()

In [None]:
age_group_counts = df.groupBy("AgeGroup").agg(count("*").alias("count"))

# trans to Pandas DataFrame
pandas_age_group_counts = age_group_counts.toPandas()
pandas_age_group_counts = pandas_age_group_counts.sort_values(by="AgeGroup")

# plot histogram
plt.figure(figsize=(12, 6))
plt.bar(pandas_age_group_counts["AgeGroup"], pandas_age_group_counts["count"], color='skyblue')
plt.xlabel('Age Group')
plt.ylabel('Count')
plt.title('Age Group Distribution')
plt.xticks(rotation=45)
plt.show()


In [None]:
# caculate the sum of SuicideCount with same AgeGroup
aggregated_df = df.groupBy("AgeGroup").agg(sum("SuicideCount").alias("Total_SuicideCount"))

# trans to Pandas DataFrame
pandas_df = aggregated_df.toPandas()
pandas_df = pandas_df.sort_values(by="AgeGroup")

plt.figure(figsize=(10, 6))
plt.bar(pandas_df["AgeGroup"], pandas_df["Total_SuicideCount"], color='skyblue')
plt.xlabel('AgeGroup')
plt.ylabel('Total SuicideCount')
plt.title('Total SuicideCount by AgeGroup')
plt.show()

In [None]:
# caculate the sum of SuicideCount with same AgeGroup
year = df.groupBy("Year").agg(sum("SuicideCount").alias("Total_SuicideCount"))

# trans to Pandas DataFrame
pandas_df = year.toPandas()
pandas_df = pandas_df.sort_values(by="Year")

plt.figure(figsize=(10, 6))
plt.plot(pandas_df["Year"], pandas_df["Total_SuicideCount"], color='skyblue')
plt.grid(True, which='both', linestyle='--', linewidth=0.5)
plt.xlabel('Year')
plt.ylabel('Total SuicideCount')
plt.title('Total SuicideCount by AgeGroup')
plt.show()

## 2.4 Verifying Data Quality

In [None]:
df.show(10)

In [4]:
# missing value
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
null_counts.show()



+----------+----------+-----------+-----------+----+---+--------+----------+------------+----------------------------+----------------+----------+----+------------+-------------------+------------+-------------+-------------------------+
|RegionCode|RegionName|CountryCode|CountryName|Year|Sex|AgeGroup|Generation|SuicideCount|CauseSpecificDeathPercentage|DeathRatePer100K|Population| GDP|GDPPerCapita|GrossNationalIncome|GNIPerCapita|InflationRate|EmploymentPopulationRatio|
+----------+----------+-----------+-----------+----+---+--------+----------+------------+----------------------------+----------------+----------+----+------------+-------------------+------------+-------------+-------------------------+
|         0|         0|          0|          0|   0|  0|       0|         0|         464|                        4289|           10664|      5920|7240|        7240|               9960|       10760|        14460|                    11120|
+----------+----------+-----------+-----------+-

                                                                                

In [None]:
round(df.dropna().count() / df.count() * 100, 2)

# 3.Data Preparation
## 3.1 Data selection

In [5]:
df_drop = df.drop('RegionCode', 'CountryCode', 'Generation', 'CauseSpecificDeathPercentage')

df_drop.show(10)

+----------+-----------+----+----+-----------+------------+----------------+----------+-------------+------------+-------------------+------------+-------------+-------------------------+
|RegionName|CountryName|Year| Sex|   AgeGroup|SuicideCount|DeathRatePer100K|Population|          GDP|GDPPerCapita|GrossNationalIncome|GNIPerCapita|InflationRate|EmploymentPopulationRatio|
+----------+-----------+----+----+-----------+------------+----------------+----------+-------------+------------+-------------------+------------+-------------+-------------------------+
|    Europe|    Albania|1992|Male| 0-14 years|           0|             0.0|   3247039|6.521749908E8| 200.8522198|      9.061842123E8|        1740|  226.0054213|                   45.315|
|    Europe|    Albania|1992|Male| 0-14 years|           0|             0.0|   3247039|6.521749908E8| 200.8522198|      9.061842123E8|        1740|  226.0054213|                   45.315|
|    Europe|    Albania|1992|Male| 0-14 years|           0| 

In [None]:
df_drop.printSchema()

## 3.2 Clean Data

### Null value

In [None]:
null_counts = df_drop.select([sum(col(column).isNull().cast("int")).alias(column) for column in df_drop.columns])
null_counts.show()

In [6]:
# fill null value for SuicideCount and Population
mean_values = df_drop.select(mean("SuicideCount").alias("mean_SuicideCount"), mean("Population").alias("mean_Population")).collect()[0]

df_drop = df_drop.na.fill({"SuicideCount": mean_values[0], "Population": mean_values[1]})

                                                                                

In [7]:
# use the normal distribution to randomly fill
# caculate mean and stand dev
stats = df_drop.select(
    [avg(c).alias(f"mean_{c}") for c in df_drop.columns] +
    [stddev(c).alias(f"stddev_{c}") for c in df_drop.columns]
).collect()[0]


mean_stddev_dict = {col: (stats[f"mean_{col}"], stats[f"stddev_{col}"]) for col in df_drop.columns}

# fill null value in each column
for col_name in df_drop.columns:
    mean = mean_stddev_dict[col_name][0]
    stddev = mean_stddev_dict[col_name][1]
    
    # normal distribution
    random_col = randn() * stddev + mean
    df_drop = df_drop.withColumn(
        col_name,
        when(col(col_name).isNull(), random_col).otherwise(col(col_name))
    )



24/05/22 13:29:19 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [8]:
null_counts = df_drop.select([sum(col(column).isNull().cast("int")).alias(column) for column in df_drop.columns])
null_counts.show()



+----------+-----------+----+---+--------+------------+----------------+----------+---+------------+-------------------+------------+-------------+-------------------------+
|RegionName|CountryName|Year|Sex|AgeGroup|SuicideCount|DeathRatePer100K|Population|GDP|GDPPerCapita|GrossNationalIncome|GNIPerCapita|InflationRate|EmploymentPopulationRatio|
+----------+-----------+----+---+--------+------------+----------------+----------+---+------------+-------------------+------------+-------------+-------------------------+
|         0|          0|   0|  0|       0|           0|               0|         0|  0|           0|                  0|           0|            0|                        0|
+----------+-----------+----+---+--------+------------+----------------+----------+---+------------+-------------------+------------+-------------+-------------------------+



                                                                                

### Extrem value

In [None]:
df_drop.select('GDP').describe().show()

In [None]:
df_drop.filter(df_drop['GDP']<0).count()

In [9]:
df_drop = df_drop.filter(df_drop['GDP']>=0)

# check again
df_drop.select('GDP').describe().show()

+-------+--------------------+
|summary|                 GDP|
+-------+--------------------+
|  count|              115763|
|   mean|5.278274121838045E11|
| stddev|1.646984305618207...|
|    min|        2.19762963E8|
|    max|          2.33151E13|
+-------+--------------------+



## 3.3 Constructing/Deriving a New Feature and Data Integration

In [10]:
aggregated_df = df_drop.groupBy("RegionName", "Year", "Sex", "AgeGroup", "Population", 
                           "GDP", "GDPPerCapita", "GrossNationalIncome", "GNIPerCapita", 
                           "InflationRate", "EmploymentPopulationRatio").agg(
    sum("SuicideCount").alias("SuicideCount"),
    avg("DeathRatePer100K").alias("DeathRatePer100K")
)
aggregated_df.show(10)



+--------------------+------+------+-----------+-----------+---------------+------------+-------------------+------------+-------------------+-------------------------+------------+------------------+
|          RegionName|  Year|   Sex|   AgeGroup| Population|            GDP|GDPPerCapita|GrossNationalIncome|GNIPerCapita|      InflationRate|EmploymentPopulationRatio|SuicideCount|  DeathRatePer100K|
+--------------------+------+------+-----------+-----------+---------------+------------+-------------------+------------+-------------------+-------------------------+------------+------------------+
|              Europe|2008.0|  Male|    Unknown|  2947314.0|1.2881352894E10| 4370.539716|    1.1892990139E10|      8270.0|        3.320870904|                   46.243|         0.0|17.547048197532455|
|Central and South...|1992.0|Female|25-34 years|3.3568285E7|     2.28779E11|  6815.32933|         2.03568E11|      8540.0| 100.20486338183765|                   56.867|        36.0|        3.03592

                                                                                

In [None]:
print('Index:',aggregated_df.count())
null_counts = aggregated_df.select([sum(col(column).isNull().cast("int")).alias(column) for column in aggregated_df.columns])
null_counts.show()

## 3.4.Data Integration

In [11]:
df1, df2 = aggregated_df.randomSplit([0.5, 0.5])

In [None]:
print('Index of df1:',df1.count())
print('Data columns of df1:',len(df1.columns))

print('Index of df2:',df2.count())
print('Data columns of df2:',len(df2.columns))

In [12]:
merged_df = df1.union(df2)

merged_df.describe().show()

null_counts = merged_df.select([sum(col(column).isNull().cast("int")).alias(column) for column in merged_df.columns])
null_counts.show()

                                                                                

+-------+----------+------------------+-------+----------+-------------------+--------------------+------------------+--------------------+-------------------+------------------+-------------------------+------------------+------------------+
|summary|RegionName|              Year|    Sex|  AgeGroup|         Population|                 GDP|      GDPPerCapita| GrossNationalIncome|       GNIPerCapita|     InflationRate|EmploymentPopulationRatio|      SuicideCount|  DeathRatePer100K|
+-------+----------+------------------+-------+----------+-------------------+--------------------+------------------+--------------------+-------------------+------------------+-------------------------+------------------+------------------+
|  count|     53740|             53740|  53740|     53740|              53740|               53740|             53740|               53740|              53740|             53740|                    53740|             53740|             53740|
|   mean|      null|2004.911



+----------+----+---+--------+----------+---+------------+-------------------+------------+-------------+-------------------------+------------+----------------+
|RegionName|Year|Sex|AgeGroup|Population|GDP|GDPPerCapita|GrossNationalIncome|GNIPerCapita|InflationRate|EmploymentPopulationRatio|SuicideCount|DeathRatePer100K|
+----------+----+---+--------+----------+---+------------+-------------------+------------+-------------+-------------------------+------------+----------------+
|         0|   0|  0|       0|         0|  0|           0|                  0|           0|            0|                        0|           0|               0|
+----------+----+---+--------+----------+---+------------+-------------------+------------+-------------+-------------------------+------------+----------------+





In [None]:
merged_df.show(10)

In [None]:
merged_df.printSchema()

merged_df = merged_df.withColumn("SuicideCount", col("SuicideCount").cast("int"))
# after modify
merged_df.printSchema()

In [None]:
merged_df.groupBy("Sex").count().show()

In [13]:
df_with_id = merged_df.withColumn("ID", monotonically_increasing_id())

unknown_count = df_with_id.filter(df_with_id["Sex"] == "Unknown").count()

window_spec = Window.orderBy("ID")
df_with_index = df_with_id.withColumn("index", row_number().over(window_spec))

half_count = unknown_count // 2
merged_df = df_with_index.withColumn(
    "Sex",
    when(
        (col("Sex") == "Unknown") & (col("index") <= half_count),
        "Male"
    ).when(
        (col("Sex") == "Unknown") & (col("index") > half_count),
        "Female"
    ).otherwise(col("Sex"))
).drop("ID").drop("index")


                                                                                

In [None]:
merged_df.groupBy("Sex").count().show()

In [None]:
merged_df.printSchema()

In [29]:
#####################################################################################################################################


# 4. Data Transformation


In [14]:
from pyspark.sql.functions import log1p
import seaborn as sns
import pandas as pd

In [None]:
# calculate SuicideCount correlation
colum = ['Year', 'Population', 
    'GDP', 'GDPPerCapita', 'GrossNationalIncome', 'GNIPerCapita', 
    'InflationRate', 'EmploymentPopulationRatio']

target_column = 'SuicideCount'
correlations = {column: merged_df.stat.corr(column, target_column) for column in colum}


sorted_correlations = pd.Series(correlations).sort_values(ascending=False)

# result
print(sorted_correlations)

## 4.2 Data Projection

In [None]:
population_data = merged_df.select("Population").rdd.flatMap(lambda x: x).collect()

# Population hist plot
sns.histplot(population_data, bins=10, kde=True, color='green')
plt.xlabel('Population')

In [None]:
sns.histplot(merged_df.select("GDP").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('GDP')

In [None]:
sns.histplot(merged_df.select("SuicideCount").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('SuicideCount')

In [None]:
sns.histplot(merged_df.select("GNIPerCapita").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('GNIPerCapita')

In [None]:
sns.histplot(merged_df.select("GrossNationalIncome").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('GrossNationalIncome')

In [None]:
sns.histplot(merged_df.select("EmploymentPopulationRatio").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('EmploymentPopulationRatio')

In [None]:
sns.histplot(merged_df.select("DeathRatePer100K").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('DeathRatePer100K')

In [None]:
sns.histplot(merged_df.select("GDPPerCapita").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('GDPPerCapita')

In [None]:
sns.histplot(merged_df.select("InflationRate").rdd.flatMap(lambda x: x).collect(), bins=10, kde=True, color='green')
plt.xlabel('InflationRate')

In [15]:
columns_to_transform = ['Population', 'SuicideCount', 'GDP', 'DeathRatePer100K',
                        'GrossNationalIncome', 'GNIPerCapita', 'EmploymentPopulationRatio', 'GDPPerCapita','InflationRate']

for column in columns_to_transform:
    merged_df = merged_df.withColumn(column, log1p(col(column)))



In [None]:
for column in columns_to_transform:
    plt.figure()
    column_data = merged_df.select(column).rdd.flatMap(lambda x: x).collect()
    # plot histogram
    sns.histplot(column_data, bins=10, kde=True, color='green')
    plt.title(f'Histogram of {column}')
    plt.xlabel(f'log_{column}')
    plt.show()

In [None]:
merged_df.printSchema()

# 6.Data-Mining Algorithm(s) Selection
## 6.3. Build/Select Model with Algorithm/Model Parameter(s)

In [28]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [26]:
null_counts = merged_df.select([sum(col(column).isNull().cast("int")).alias(column) for column in merged_df.columns])
null_counts.show()


24/05/22 13:47:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:47:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:47:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:47:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+----+---+--------+----------+---+------------+-------------------+------------+-------------+-------------------------+------------+----------------+
|RegionName|Year|Sex|AgeGroup|Population|GDP|GDPPerCapita|GrossNationalIncome|GNIPerCapita|InflationRate|EmploymentPopulationRatio|SuicideCount|DeathRatePer100K|
+----------+----+---+--------+----------+---+------------+-------------------+------------+-------------+-------------------------+------------+----------------+
|         0|   0|  0|       0|         0|  0|           0|                  0|           0|            0|                        0|           0|               0|
+----------+----+---+--------+----------+---+------------+-------------------+------------+-------------+-------------------------+------------+----------------+



                                                                                

In [24]:
merged_df = merged_df.na.drop()
null_counts = merged_df.select([sum(col(column).isNull().cast("int")).alias(column) for column in merged_df.columns])
null_counts.show()

24/05/22 13:46:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:46:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:46:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:46:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

42580

In [30]:
string_columns = ['RegionName', 'Sex', 'AgeGroup']
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_indexed") for col in string_columns]

feature_columns = ['Year', 'Population', 'GDP', 'GDPPerCapita', 'GrossNationalIncome', 
                   'GNIPerCapita', 'InflationRate', 'EmploymentPopulationRatio'] + \
                   [f"{col}_indexed" for col in string_columns]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# use Pipeline
pipeline = Pipeline(stages=indexers + [assembler])
df_prepared = pipeline.fit(merged_df).transform(merged_df)

24/05/22 13:48:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 1

In [31]:
train_df, test_df = df_prepared.randomSplit([0.7, 0.3], seed=42)

In [32]:
from pyspark.ml.regression import RandomForestRegressor

In [33]:
rf = RandomForestRegressor(labelCol="SuicideCount", featuresCol="features", numTrees=100)

# training model
rf_model = rf.fit(train_df)


24/05/22 13:48:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:48:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [35]:
from pyspark.ml.evaluation import RegressionEvaluator

# predict on test set
predictions = rf_model.transform(test_df)

# evaluate
evaluator = RegressionEvaluator(labelCol="SuicideCount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
evaluator_r2 = RegressionEvaluator(labelCol="SuicideCount", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")
print(f"R-squared on test data = {r2}")

24/05/22 13:54:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:54:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:54:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:54:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:54:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 13:54:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/22 1

Root Mean Squared Error (RMSE) on test data = 1.1831729125669672
R-squared on test data = 0.7306460709610217


                                                                                