## 01-BU
INFOSYS722 - Iteration 4 - BDAS
Author: S. Schmidt<br>Date: 10/05/2024<br>
Desc: Primary question: Is the date the UN set of the year 2030 achievable for Goal 2, Zero world hunger?<br><br>
This Spark program reads in the Global Health Index values collected for each country and calculates a Mean value<br>
for each collected year. <br>This mean value is then used to predict the date when "Zero Hunger" will be reached and in turn, provide insight into when Zero Hunger will be reached.


## # 02-DU
Load up libraries and retrieve datasource


In [None]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
import sys

#from pyspark.sql.SQLContext import sqlContext
spark = SparkSession.builder.appName('predict_un_ghi_target_date').getOrCreate()
#from pyspark.ml.regression import LinearRegression

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StringIndexer

#from pyspark.pandas import pypandas
import pandas as pd
pd.options.mode.chained_assignment = None
import numpy as np
from sklearn.linear_model import LinearRegression

print(pd.__version__)


In [None]:
#platform = sys.platform
#recur_limit = sys.getrecursionlimit()
#print("The recursion limit for %s platform is %s" % (platform, recur_limit))
#sys.setrecursionlimit(1000)
#new_recur_limit = sys.getrecursionlimit()
#print("The new recursion limit for %s platform is %s" % (platform, new_recur_limit))

# Use Spark to read in the Ecommerce Customers csv file. You can infer csv schemas. 
data = spark.read.csv("global-hunger-index.csv",inferSchema=True,header=True)

In [None]:
# Print the schema of the DataFrame. Get a view of the datasource structure
data.printSchema()

## # 03-DP


In [None]:
# Dump the first 10 instances of the datasource
data.head(10)

In [None]:
# Get a breakdown on info contained in dataframe
data.toPandas().describe()

## 04-DT
Data Transform - Process data source to produce Mean values for each year.

In [None]:
#dRemove unused columns. Not nessary but keeps data clean.
data = data.drop('Code')
data = data.drop('411773-annotations')
data.head(4)

In [None]:
# Calculate the Mean GHI for each year.
df_raw = data.groupBy("Year").mean("Global Hunger Index (2021)")
df_raw = df_raw.withColumnRenamed("avg(Global Hunger Index (2021))", "MeanGHIraw")
df_raw.show()


In [None]:
# Clean up MeanGHI and prep data for SQL
df_raw.summary().show()
df_raw.createOrReplaceTempView("df_Raw_Table")
df = spark.sql("select Year, round(MeanGHIraw, 4) as MeanGHI from df_Raw_Table")
df
df.createOrReplaceTempView("df_Table")


## 05-DMM
Data Transform - Process data source to produce Mean values for each year.

In [None]:
## Data Graphs - Actual MeanGHI
## Plot Initial MeanGHI data
df1=spark.sql("Select * from df_Table order by Year")
df1.show()
pdf1=df1.toPandas()
pdf1.plot(kind='line', x='Year',y='MeanGHI', linestyle="solid", 
          marker="o", color="blue", title="Actual MeanGHI", xlabel='Year', ylabel='MeanGHI')
actual = pdf1

In [None]:
## Data Graphs - 1. UN - Original GHI prediction Values only
## Plot Initial MeanGHI data minus the last reading after COVID
df1b=spark.sql("Select * from df_Table order by Year limit 3")
#df1b.show()
pdf1=df1b.toPandas()

X = np.array(df1b.select("Year").collect())
Y = np.array(df1b.select("MeanGHI").collect())

reg1 = LinearRegression().fit(X, Y)
print('------------------------------------------------')
print('Model Run - 1. Original GHI prediction - Outputs')
print(f'R2 score: {reg1.score(X, Y)}')
print(f'Coefficients: {reg1.coef_}')
print(f'Intercept: {reg1.intercept_}')
print('------------------------------------------------')

# Kept adding a year on and rerunning until the value reached/past Zero.
predict_years = [ [2000], [2006], [2012], [2021] ]
future = reg1.predict(np.array(predict_years))

#future[1] = future[0] 
#print("Future predictions are %s" % (future))

columns = ["Year", "MeanGHI"]
new_data = np.column_stack((predict_years, future))
new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
plt_2nd=new_row.select("MeanGHI")
plt_2nd = plt_2nd.toPandas().unstack()

#new_data = np.column_stack((predict_years, future))
#new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
#plt_2nd=new_row.select("MeanGHI")
#plt_2nd = future
#.toPandas().unstack()

plt_1st = pdf1['MeanGHI']
plt_1st[3] = 0
plt_2nd = plt_2nd['MeanGHI']
plt_2nd[0] = 0
plt_2nd[1] = 0
#print("plt_1st")
#print(plt_1st)
#print("plt_2nd")
#print(plt_2nd)

df5 = pd.DataFrame({'Current': plt_1st.values,
                    'Future': plt_2nd.values},
                    index=[ 2000, 2006, 2012, 2021 ])


df5 = df5.replace(0, None)

print(df5)

df5.plot(kind='line', title="1. UN - Original GHI prediction Values only", linestyle="solid", marker="o", xlabel='Year', ylabel='MeanGHI')
origProj_plt_1st = plt_1st
origProj_plt_2nd = plt_2nd

In [None]:
## Data Graphs - 2. UN - Actual v Planned Original GHI Values only (Figure 9)
## Plot Initial MeanGHI data minus the last reading after COVID
df1b=spark.sql("Select * from df_Table order by Year limit 3")
#df1b.show()
pdf1=df1b.toPandas()

X = np.array(df1b.select("Year").collect())
Y = np.array(df1b.select("MeanGHI").collect())

reg2 = LinearRegression().fit(X, Y)
#reg.score(X, Y)
#reg.coef_
#reg.intercept_
print('------------------------------------------------')
print('Model Run - 2. GHI Original Tracked & Variation Point - Outputs')
print(f'R2 score: {reg2.score(X, Y)}')
print(f'Coefficients: {reg2.coef_}')
print(f'Intercept: {reg2.intercept_}')
print('------------------------------------------------')

# Kept adding a year on and rerunning until the value reached/past Zero.
predict_years = [ [2000], [2006], [2012], [2021], [2027], [2033], [2039] ]
future = reg2.predict(np.array(predict_years))

#future[1] = future[0] 
#print("Future predictions are %s" % (future))

columns = ["Year", "MeanGHI"]
new_data = np.column_stack((predict_years, future))
new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
plt_2nd=new_row.select("MeanGHI")
plt_2nd = plt_2nd.toPandas().unstack()

#new_data = np.column_stack((predict_years, future))
#new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
#plt_2nd=new_row.select("MeanGHI")
#plt_2nd = future
#.toPandas().unstack()

plt_1st = pdf1['MeanGHI']
plt_1st[3] = 0
plt_2nd = plt_2nd['MeanGHI']
plt_2nd[0] = 0
plt_2nd[1] = 0
actual_data = actual['MeanGHI']
#  Init lower part of dataframe first
x = 8
while x<11:
    #print("Init: %s" % (x))
    actual_data.loc[x]=0
    x += 1

#print("plt_1st")
#print(plt_1st)
#print("plt_2nd")
#print(plt_2nd)
#print("Actual_data")
#print(actual_data)

df5 = pd.DataFrame({'Original': plt_2nd.values,
                    'Actual': actual_data.values},
                    index=[ 2000, 2006, 2012, 2021, 2027, 2033, 2039 ])


df5 = df5.replace(0, None)

print(df5)

df5.plot(kind='line', title="2. UN - GHI Original Tracked & Variation Point", linestyle="solid", marker="o", xlabel='Year', ylabel='MeanGHI')
origProj_plt_1st = plt_1st
origProj_plt_2nd = plt_2nd

In [None]:
## Data Graphs - 3. UN - Actual v Planned Trend line GHI Prediction (Figure ?)
## Plot Initial MeanGHI data minus the last reading after COVID
df1b=spark.sql("Select * from df_Table order by Year limit 4")
#df1b.show()
pdf1=df1b.toPandas()

X = np.array(df1b.select("Year").collect())
Y = np.array(df1b.select("MeanGHI").collect())

reg3 = LinearRegression().fit(X, Y)
#reg.score(X, Y)
#reg.coef_
#reg.intercept_
print('------------------------------------------------')
print('Model Run - 3. GHI Actual/Original vs Original Prediction - Outputs')
print(f'R2 score: {reg3.score(X, Y)}')
print(f'Coefficients: {reg3.coef_}')
print(f'Intercept: {reg3.intercept_}')
print('------------------------------------------------')
# Kept adding a year on and rerunning until the value reached/past Zero.
predict_years = [ [2000], [2006], [2012], [2021] ]
future = reg3.predict(np.array(predict_years))

#future[1] = future[0] 
#print("Future predictions are %s" % (future))

columns = ["Year", "MeanGHI"]
new_data = np.column_stack((predict_years, future))
new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
plt_2nd=new_row.select("MeanGHI")
plt_2nd = plt_2nd.toPandas().unstack()

#new_data = np.column_stack((predict_years, future))
#new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
#plt_2nd=new_row.select("MeanGHI")
#plt_2nd = future
#.toPandas().unstack()

plt_1st = pdf1['MeanGHI']
#plt_1st[3] = 0
plt_2nd = plt_2nd['MeanGHI']
#plt_2nd[0] = 0
#plt_2nd[1] = 0
actual_data = actual['MeanGHI']
#print("plt_1st")
#print(plt_1st)
#print("plt_2nd")
#print(plt_2nd)
#print("Actual")
#print(actual)

df5 = pd.DataFrame({'Original': plt_2nd.values,
                    'Actual': actual_data.values},
                    index=[ 2000, 2006, 2012, 2021 ])


df5 = df5.replace(0, None)

print(df5)

df5.plot(kind='line', title="3. UN - GHI Actual/Original vs Original Prediction", linestyle="solid", marker="o", xlabel='Year', ylabel='MeanGHI')
origProj_plt_1st = plt_1st
origProj_plt_2nd = plt_2nd

In [None]:
## Data Graphs - 4. UN - Actual v Planned Trend line GHI Prediction (Figure ?)
## Plot Initial MeanGHI data minus the last reading after COVID
df1b=spark.sql("Select * from df_Table order by Year limit 4")
#df1b.show()
pdf1=df1b.toPandas()

X = np.array(df1b.select("Year").collect())
Y = np.array(df1b.select("MeanGHI").collect())

reg4 = LinearRegression().fit(X, Y)
#reg.score(X, Y)
#reg.coef_
#reg.intercept_
print('------------------------------------------------')
print('Model Run - 4. Actual v New Prediction - Outputs')
print(f'R2 score: {reg4.score(X, Y)}')
print(f'Coefficients: {reg4.coef_}')
print(f'Intercept: {reg4.intercept_}')
print('------------------------------------------------')
# Kept adding a year on and rerunning until the value reached/past Zero.
predict_years = [ [2000], [2006], [2012], [2021], [2027], [2033], [2039], [2043], [2049], [2055], [2061] ]
future = reg4.predict(np.array(predict_years))

#future[1] = future[0] 
#print("Future predictions are %s" % (future))

columns = ["Year", "MeanGHI"]
new_data = np.column_stack((predict_years, future))
new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
plt_2nd=new_row.select("MeanGHI")
plt_2nd = plt_2nd.toPandas().unstack()

#new_data = np.column_stack((predict_years, future))
#new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
#plt_2nd=new_row.select("MeanGHI")
#plt_2nd = future
#.toPandas().unstack()

plt_1st = pdf1['MeanGHI']
#  Init lower part of dataframe first
x = 7
while x<11:
    #print("Init: %s" % (x))
    plt_1st.loc[x]=0
    x += 1

#plt_1st[3] = 0
plt_2nd = plt_2nd['MeanGHI']
#plt_2nd[0] = 0
#plt_2nd[1] = 0
actual_data = actual['MeanGHI']
x = 4
while x<11:
    #print("Init: %s" % (x))
    actual_data.loc[x]=0
    x += 1

#print("plt_1st")
#print(plt_1st)
#print("plt_2nd")
#print(plt_2nd)
#print("Actual")
#print(actual)

df5 = pd.DataFrame({'New Predict': plt_2nd.values,
                    'Actual': actual_data.values},
                    index=[ 2000, 2006, 2012, 2021, 2027, 2033, 2039, 2043, 2049, 2055, 2061 ])


df5 = df5.replace(0, None)

print(df5)

df5.plot(kind='line', title="4. UN - Actual v New Prediction", linestyle="solid", marker="o", xlabel='Year', ylabel='MeanGHI')
#origProj_plt_1st = plt_1st
#origProj_plt_2nd = plt_2nd
origFullProj_plt_2nd = plt_2nd

In [None]:
## Data Graphs - 5. Current Future prediction
## Plot Initial MeanGHI data including the reading after COVID projecting out to zero

df2 = spark.sql("Select Year, MeanGHI from df_Table order by Year limit 4")

X = np.array(df2.select("Year").collect())
Y = np.array(df2.select("MeanGHI").collect())

reg5 = LinearRegression().fit(X, Y)
#reg.score(X, Y)
#reg.coef_
#reg.intercept_
print('------------------------------------------------')
print('Model Run - 5. Current Future prediction - Outputs')
print(f'R2 score: {reg5.score(X, Y)}')
print(f'Coefficients: {reg5.coef_}')
print(f'Intercept: {reg5.intercept_}')
print('------------------------------------------------')
# Kept adding a year on and rerunning until the value reached/past Zero.
predict_years = [ [2027], [2033], [2039], [2043], [2049], [2055], [2061] ]
future = reg5.predict(np.array(predict_years))
#print("Future predictions are %s" % (future))

plotYears = X
plotMeanGHI = Y
#print("xxxx")
#print(plotYears)
#print(plotMeanGHI)
#print("xxxx")

columns = ["Year", "MeanGHI"]
plt_future = df2
plt_1st=plt_future.select("MeanGHI").toPandas().unstack()

x = 4
while x<11:
    plt_1st.loc['MeanGHI', x]=0
    x += 1

new_data = np.column_stack((predict_years, future))
new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
plt_2nd=new_row.select("MeanGHI")
plt_2nd = plt_2nd.toPandas().unstack()

#  Init lower part of dataframe first
x = 7
while x<11:
    #print("Init: %s" % (x))
    plt_2nd.loc['MeanGHI', x]=0
    x += 1

# Rearrange dataframe
x = 10
while x>-1:
    #print("Rearrange: %s" % (x))
    if x<4:
        plt_2nd.loc['MeanGHI',x]=0
    else:
        plt_2nd.loc['MeanGHI',x]=plt_2nd.loc['MeanGHI',(x-4)]
        #print("Value: %s" % (plt_2nd.loc['MeanGHI',(x-4)]))
    x -= 1

# Cross over point
plt_1st.loc['MeanGHI',4]=plt_2nd.loc['MeanGHI',4]

plt_future = plt_future.union(new_row)

#print("....")
#new_row.printSchema()
#new_row.show(truncate=False)
#plt_future.printSchema()
#plt_future.show(truncate=False)
#print("....")

## Plot Initial MeanGHI data
#plt_future.show()
plt_df2=plt_future.toPandas()

#  Convert to series
plt_1st = plt_1st['MeanGHI']
plt_2nd = plt_2nd['MeanGHI']
#print("plt_1st")
#print(plt_1st)
#print("plt_2nd")
#print(plt_2nd)

df5 = pd.DataFrame({'Actual': plt_1st.values,
                    'Future': plt_2nd.values},
                    index=[ 2000, 2006, 2012, 2021, 2027, 2033, 2039, 2043, 2049, 2055, 2061 ])


df5 = df5.replace(0, None)

print(df5)

df5.plot(kind='line', title="5. Current Actual/Predicted Timeline", linestyle="solid", marker="o", xlabel='Year', ylabel='MeanGHI')
curpredict_1st = plt_1st
curpredict_2nd = plt_2nd

In [None]:
## Data Graphs - 6. Original/Actual/NewPredict
# Example graph

df2 = spark.sql("Select Year, MeanGHI from df_Table order by Year limit 3")

X = np.array(df2.select("Year").collect())
Y = np.array(df2.select("MeanGHI").collect())

reg6 = LinearRegression().fit(X, Y)
#reg.score(X, Y)
#reg.coef_
#reg.intercept_
print('------------------------------------------------')
print('Model Run - 6. Original/Actual/NewPredict - Outputs')
print(f'R2 score: {reg6.score(X, Y)}')
print(f'Coefficients: {reg6.coef_}')
print(f'Intercept: {reg6.intercept_}')
print('------------------------------------------------')
# Kept adding a year on and rerunning until the value reached/past Zero.
predict_years = [ [2012], [2021], [2027], [2033], [2039], [2043], [2049], [2055], [2061] ]
future = reg6.predict(np.array(predict_years))
#print("Future predictions are %s" % (future))

plotYears = X
plotMeanGHI = Y
#print("xxxx")
#print(plotYears)
##print(plotMeanGHI)
##print("xxxx")

columns = ["Year", "MeanGHI"]
#plt_future = future
#plt_1st=plt_future.select("MeanGHI").toPandas().unstack()

#x = 4
#while x<11:
#    plt_1st.loc['MeanGHI', x]=0
#    x += 1

new_data = np.column_stack((predict_years, future))
new_row = spark.createDataFrame(new_data.tolist(), schema=columns)
plt_1st=new_row.select("MeanGHI")
plt_1st = plt_1st.toPandas().unstack()

#print("plt_1st")
#print(plt_1st)

##  Init lower part of dataframe first
x = 7
while x<11:
    #print("Init: %s" % (x))
    plt_1st.loc['MeanGHI', x]=0
    x += 1


## Rearrange dataframe
x = 10
while x>-1:
    #print("Rearrange: %s" % (x))
    if x<2:
        plt_1st.loc['MeanGHI',x]=0
    else:
        if plt_1st.loc['MeanGHI',(x-2)]>0:
            plt_1st.loc['MeanGHI',x]=plt_1st.loc['MeanGHI',(x-2)]
        else:
            plt_1st.loc['MeanGHI',x]=0
        #print("Value: %s" % (plt_2nd.loc['MeanGHI',(x-4)]))
    x -= 1

## Cross over point
#plt_1st.loc['MeanGHI',4]=plt_2nd.loc['MeanGHI',4]

#plt_future = plt_future.union(new_row)

##print("....")
##new_row.printSchema()
##new_row.show(truncate=False)
##plt_future.printSchema()
##plt_future.show(truncate=False)
##print("....")

## Plot Initial MeanGHI data
##plt_future.show()
#plt_df2=plt_future.toPandas()

##  Convert to series
#plt_1st = plt_1st['MeanGHI']
#plt_2nd = plt_2nd['MeanGHI']
#print("plt_1st")
#print(plt_1st)
##print("plt_2nd")
##print(plt_2nd)
#print("curpredict_1st")
#print(curpredict_1st)
#print("curpredict_2nd")
#print(curpredict_2nd)
#print("plt_1st")
#print(plt_1st)


df5 = pd.DataFrame({'Actual':   curpredict_1st.values,
                    'Future':   curpredict_2nd.values,
                    #'Original': origpredict_2nd.values},
                    'Original': plt_1st.values},                   
                    index=[ 2000, 2006, 2012, 2021, 2027, 2033, 2039, 2043, 2049, 2055, 2061 ])


df5 = df5.replace(0, None)

print(df5)

df5.plot(kind='line', title="6. Original/Actual/NewPrediction Timelines", linestyle="solid", marker="o", xlabel='Year', ylabel='MeanGHI')

In [None]:
print('------------------------------------------------')
print('Model Run - 1. Original GHI prediction - Outputs')
print(f'R2 score: {reg1.score(X, Y)}')
print(f'Coefficients: {reg1.coef_}')
print(f'Intercept: {reg1.intercept_}')
print('------------------------------------------------')
print('------------------------------------------------')
print('Model Run - 2. GHI Original Tracked & Variation Point - Outputs')
print(f'R2 score: {reg2.score(X, Y)}')
print(f'Coefficients: {reg2.coef_}')
print(f'Intercept: {reg2.intercept_}')
print('------------------------------------------------')
print('------------------------------------------------')
print('Model Run - 3. GHI Actual/Original vs Original Prediction - Outputs')
print(f'R2 score: {reg3.score(X, Y)}')
print(f'Coefficients: {reg3.coef_}')
print(f'Intercept: {reg3.intercept_}')
print('------------------------------------------------')
print('------------------------------------------------')
print('Model Run - 4. Actual v New Prediction - Outputs')
print(f'R2 score: {reg4.score(X, Y)}')
print(f'Coefficients: {reg4.coef_}')
print(f'Intercept: {reg4.intercept_}')
print('------------------------------------------------')
print('------------------------------------------------')
print('Model Run - 5. Current Future prediction - Outputs')
print(f'R2 score: {reg5.score(X, Y)}')
print(f'Coefficients: {reg5.coef_}')
print(f'Intercept: {reg5.intercept_}')
print('------------------------------------------------')
print('------------------------------------------------')
print('Model Run - 6. Original/Actual/NewPredict - Outputs')
print(f'R2 score: {reg6.score(X, Y)}')
print(f'Coefficients: {reg6.coef_}')
print(f'Intercept: {reg6.intercept_}')
print('------------------------------------------------')
print("Run Completed.")