In [1]:
# 02 data understanding
from pyspark.sql import SparkSession
from pyspark.sql.functions import round as spark_round, col, count, when, isnan, sum as spark_sum, mean, stddev
from pyspark.sql.types import IntegerType, FloatType, DoubleType
import matplotlib.pyplot as plt


In [2]:


# Initialize Spark session
spark = SparkSession.builder.appName("DataUnderstanding").getOrCreate()

# Load datasets
poverty_data = spark.read.csv('multidimensional_poverty.csv', header=True, inferSchema=True)
#education_data = spark.read.csv('Inequality in Education.csv', header=True, inferSchema=True)
income_data = spark.read.csv('Inequality in Income.csv', header=True, inferSchema=True)
gender_ineq_data = spark.read.csv('gender_inequality.csv', header=True, inferSchema=True)

# Renaming columns to avoid issues with special characters and spaces
poverty_data = poverty_data.withColumnRenamed('Multidimensional Poverty Index (MPI, HDRO)', 'MPI_HDRO') \
    .withColumnRenamed('Population Below $1.25 per Day', 'Population_Below_1_25_per_Day') \
    .withColumnRenamed('Year and Survey', 'Year_and_Survey') \
    .withColumnRenamed('MPI HDRO Percent', 'MPI_HDRO_Percent') \
    .withColumnRenamed('Multidimensional Poverty Index (MPI, 2010)', 'MPI_2010') \
    .withColumnRenamed('MPI 2010 Percent', 'MPI_2010_Percent') \
    .withColumnRenamed('Population in Multidimensional Poverty', 'Population_in_Multidimensional_Poverty') \
    .withColumnRenamed('Intensity of Deprivation', 'Intensity_of_Deprivation') \
    .withColumnRenamed('Education Deprivation', 'Education_Deprivation') \
    .withColumnRenamed('Health Deprivation', 'Health_Deprivation') \
    .withColumnRenamed('Living Standards', 'Living_Standards') \
    .withColumnRenamed('Population Below National Poverty Line', 'Population_Below_National_Poverty_Line')

# DataFrame references
p_df = poverty_data
i_df = income_data
g_df = gender_ineq_data


In [3]:

# Show DataFrame information
# p_df.show()
p_df.printSchema()
#p_df.describe().show()
# get cols and rows
p_rows = p_df.count()
p_columns = len(p_df.columns)
# show cols and rows
print(f"poverty rows: {p_rows}, poverty cols: {p_columns}")



#i_df.show()
i_df.printSchema()
#i_df.describe().show()
# get cols and rows
i_rows = i_df.count()
i_columns = len(i_df.columns)
# show cols and rows
print(f"income rows: {i_rows},income cols: {i_columns}")

#g_df.show()
g_df.printSchema()
#g_df.describe().show()
# get cols and rows
g_rows = g_df.count()
g_columns = len(g_df.columns)
# show cols and rows
print(f"gender rows: {g_rows},gender cols: {g_columns}")


root
 |-- Country: string (nullable = true)
 |-- Year_and_Survey: string (nullable = true)
 |-- MPI_HDRO: double (nullable = true)
 |-- MPI_HDRO_Percent: double (nullable = true)
 |-- MPI_2010: double (nullable = true)
 |-- MPI_2010_Percent: double (nullable = true)
 |-- Population_in_Multidimensional_Poverty: string (nullable = true)
 |-- Intensity_of_Deprivation: double (nullable = true)
 |-- Education_Deprivation: double (nullable = true)
 |-- Health_Deprivation: double (nullable = true)
 |-- Living_Standards: double (nullable = true)
 |-- Population_Below_National_Poverty_Line: string (nullable = true)
 |-- Population_Below_1_25_per_Day: string (nullable = true)

poverty rows: 101, poverty cols: 13
root
 |-- ISO3: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Continent: string (nullable = true)
 |-- Hemisphere: string (nullable = true)
 |-- Human Development Groups: string (nullable = true)
 |-- UNDP Developing Regions: string (nullable = true)
 |-- HDI Rank 

In [None]:

# Explore Data
p_df.describe().show()
p_df.printSchema()
p_df_desc = p_df.describe()
p_df_desc = p_df_desc.select([spark_round(col(c), 2).alias(c) for c in p_df_desc.columns])
p_df_desc.show()

p_df.select('Health_Deprivation', 'Population_in_Multidimensional_Poverty','Population_Below_National_Poverty_Line', 'Education_Deprivation', 'Living_Standards').describe().show()
p_df.groupBy('Country').count().show()



In [None]:

numeric_cols = [field.name for field in p_df.schema.fields if isinstance(field.dataType, (IntegerType, FloatType, DoubleType))]
numeric_df = p_df.select(numeric_cols)

# Compute correlations
for col1 in numeric_cols:
   for col2 in numeric_cols:
        corr_val = p_df.stat.corr(col1, col2)
        print(f"Correlation between {col1} and {col2}: {corr_val:.2f}")

selected_cols = ['Health_Deprivation', 'Population_in_Multidimensional_Poverty','Population_Below_National_Poverty_Line', 'Education_Deprivation', 'Living_Standards']






pandas_df = p_df.select('Population_Below_National_Poverty_Line', 'Education_Deprivation').toPandas()


plt.figure(figsize=(10, 6))
plt.plot(pandas_df.index, pandas_df['Population_Below_National_Poverty_Line'], label='Population_Below_National_Poverty_Line')
plt.plot(pandas_df.index, pandas_df['Education_Deprivation'], label='Education_Deprivation')
plt.xlabel('Population_Below_National_Poverty_Line')
plt.ylabel('Education_Deprivation')
plt.title('Population Below National Poverty Line and Education Deprivation')
plt.legend()
plt.show()


In [None]:

#correlations income
i_df.printSchema()
selected_cols = ['HDI Rank (2021)', 'Inequality in income (2010)', 'Inequality in income (2011)', 'Inequality in income (2012)','Inequality in income (2013)','Inequality in income (2014)','Inequality in income (2015)','Inequality in income (2016)','Inequality in income (2017)','Inequality in income (2018)','Inequality in income (2019)','Inequality in income (2020)','Inequality in income (2021)']
i_df_corr = i_df.select(selected_cols)
for col1 in selected_cols:
    for col2 in selected_cols:
        corr_val = i_df_corr.stat.corr(col1, col2)
        print(f"Correlation between {col1} and {col2}: {corr_val:.2f}")




g_df=g_df.withColumn('Gender Inequality Index (GII)', col('Gender Inequality Index (GII)').cast(DoubleType()))
g_df=g_df.withColumn('Adolescent Birth Rate', col('Adolescent Birth Rate').cast(DoubleType()))
g_df=g_df.withColumn('Maternal Mortality Ratio', col('Maternal Mortality Ratio').cast(DoubleType()))
g_df=g_df.withColumn('Percent Representation in Parliament', col('Percent Representation in Parliament').cast(DoubleType()))
g_df=g_df.withColumn('Population with Secondary Education (Female)', col('Population with Secondary Education (Female)').cast(DoubleType()))
g_df=g_df.withColumn('Population with Secondary Education (Male)', col('Population with Secondary Education (Male)').cast(DoubleType()))
g_df=g_df.withColumn('Labour Force Participation Rate (Female)', col('Labour Force Participation Rate (Female)').cast(DoubleType()))
g_df=g_df.withColumn('Labour Force Participation Rate (Male)', col('Labour Force Participation Rate (Male)').cast(DoubleType()))
g_df.printSchema()



#correlations gender
selected_cols = ['Gender Inequality Index (GII)','Maternal Mortality Ratio','Adolescent Birth Rate','Percent Representation in Parliament','Population with Secondary Education (Female)','Population with Secondary Education (Male)','Labour Force Participation Rate (Female)','Labour Force Participation Rate (Male)']
g_df_corr = g_df.select(selected_cols)
for col1 in selected_cols:
    for col2 in selected_cols:
        corr_val = g_df_corr.stat.corr(col1, col2)
        print(f"Correlation between {col1} and {col2}: {corr_val:.2f}")    

In [None]:
# Add Visualizations
import seaborn as sns

# Convert to Pandas DataFrame for visualization
pandas_df = p_df.toPandas()

# Plotting
plt.figure(figsize=(10, 6))
pandas_df['MPI_HDRO'].plot.hist(title='Histogram of MPI_HDRO')
plt.show()

plt.figure(figsize=(10, 6))
pandas_df['MPI_HDRO'].plot.density(title='Density Plot of MPI_HDRO')
plt.show()

plt.figure(figsize=(10, 6))
pandas_df['MPI_HDRO'].plot.box(title='Box Plot of MPI_HDRO')
plt.show()

plt.figure(figsize=(10, 6))
pandas_df['Education_Deprivation'].value_counts().plot.bar(title='Bar Plot of Education Deprivation')
plt.show()

plt.figure(figsize=(10, 6))
pandas_df.plot.scatter(x='MPI_HDRO', y='Education_Deprivation', title='Scatter Plot of MPI_HDRO vs. Education Deprivation')
plt.show()

plt.figure(figsize=(10, 6))
pandas_df.plot.scatter(x='MPI_HDRO', y='Health_Deprivation', title='Scatter Plot of MPI_HDRO vs. Health Deprivation')
plt.show()

plt.figure(figsize=(10, 6))
pandas_df.plot.scatter(x='MPI_HDRO', y='Living_Standards', title='Scatter Plot of MPI_HDRO vs. Living Standards')
plt.show()

# Seaborn plots
plt.figure(figsize=(10, 6))
sns.histplot(pandas_df['Education_Deprivation'], kde=True).set_title('Seaborn Histogram and KDE of Education Deprivation')
plt.show()

plt.figure(figsize=(10, 6))
sns.violinplot(x='MPI_HDRO', y='Education_Deprivation', data=pandas_df).set_title('Violin Plot of MPI_HDRO vs. Education Deprivation')
plt.show()

# FacetGrid with seaborn
sns.set_theme(style="white", rc={"axes.facecolor": (0, 0, 0, 0)})
g = sns.FacetGrid(pandas_df, row='Education_Deprivation', hue='Education_Deprivation', aspect=3)
g.map_dataframe(sns.kdeplot, x='MPI_HDRO', fill=True, alpha=0.5)
g.set(yticks=[], ylabel='')
g.figure.subplots_adjust(hspace=-0.9)
plt.show()
spark.stop()

In [None]:
# Check for null values/Missing values


#poverty
missing_values = p_df.select([count(when(col(c).isNull(), c)).alias(c) for c in p_df.columns])
missing_values.show()


# income
missing_values_i = income_data.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in income_data.columns])
missing_values_i.show()


# gender
missing_values_g = gender_ineq_data.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in gender_ineq_data.columns])
missing_values_g.show()


#Check for any duplicate 
#poverty
p_rowsduplicate_rows = p_df.count()- p_df.dropDuplicates().count()
print(f"\n p_rowsduplicate_rows:{p_rowsduplicate_rows}" )
#income
i_rowsduplicate_rows = i_df.count()- i_df.dropDuplicates().count()
print(f"\n i_rowsduplicate_rows:{i_rowsduplicate_rows}" )
#gender
g_rowsduplicate_rows = g_df.count()- g_df.dropDuplicates().count()
print(f"\n g_rowsduplicate_rows:{g_rowsduplicate_rows}" )


In [4]:
#3.1 filtering unnecessary cols


#poverty
columns_to_keep_p = ['Country', 'Education_Deprivation', 'Health_Deprivation', 'Living_Standards', 'Population_Below_National_Poverty_Line', 'Population_Below_1_25_per_Day']
p_df_s = poverty_data.select(columns_to_keep_p)
p_df_s.write.csv('Poverty_dataset_selected.csv', header=True, mode='overwrite')


selected_columns_i = ['Country', 'Human Development Groups', 'Inequality in income (2010)', 'Inequality in income (2011)', 'Inequality in income (2012)', 'Inequality in income (2013)', 'Inequality in income (2014)', 'Inequality in income (2015)', 'Inequality in income (2016)', 'Inequality in income (2017)', 'Inequality in income (2018)', 'Inequality in income (2019)', 'Inequality in income (2020)', 'Inequality in income (2021)']
i_df_s = income_data.select(selected_columns_i)
i_df_s.write.csv('Income_dataset_selected.csv', header=True, mode='overwrite')


selected_columns_g = ['Country', 'GII Rank', 'Gender Inequality Index (GII)', 'Population with Secondary Education (Female)', 'Population with Secondary Education (Male)']
g_df_s = gender_ineq_data.select(selected_columns_g)
g_df_s.write.csv('Gender_dataset_selected.csv', header=True, mode='overwrite')



Py4JJavaError: An error occurred while calling o72.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
