In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Waiting for headers] [Waiting for headers] [1 InRelease 0 B/3,626 B 0%] [Co[0m[33m0% [Waiting for headers] [Waiting for headers] [Connecting to ppa.launchpadcont[0m                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadconte[0m                                                                               Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelea

In [13]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, PolynomialExpansion, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import log

spark_session = SparkSession.builder.appName('bda').getOrCreate()

data = spark_session.read.csv("CleanData.csv", header=True, inferSchema=True)



In [14]:
data.show()

+----+---------+---------+---------+-------+----------+-------------+---------------+------------+------------+---------------+-----------+
|Year|    Total|     Male|   Female|  Ratio|Median Age|Increase Rate|Life Expectancy|   Inflation|Unemployment|            GDP|  Homicides|
+----+---------+---------+---------+-------+----------+-------------+---------------+------------+------------+---------------+-----------+
|1960|45954.226|24795.178|21159.049|117.185|    18.429|        2.485|         43.355| 6.947368421|         0.4|  3.749265015E9|3.651251851|
|1961|47060.915|25363.721|21697.194|116.899|     18.36|        2.277|          44.18| 1.640419948|         0.4|  4.118647627E9|3.651251851|
|1962|48161.841|25930.189|22231.652|116.636|    18.271|        2.347|         45.009|-0.516462234|         0.4|  4.310163797E9|3.651251851|
|1963| 49325.05|26526.519| 22798.53|116.352|    18.183|        2.425|         46.318| 1.456488448|         0.4|  4.630827383E9|3.651251851|
|1964|50552.592|2715

In [15]:
# Filter data for relevant years
data = data.filter(data['Year'] >= 1990)


In [16]:

# Prepare data for population model
data_population = data.select('Year', 'Total')
data_population = data_population.withColumnRenamed('Total', 'label')

In [17]:
assembler = VectorAssembler(inputCols=['Year'], outputCol='features')
data_population = assembler.transform(data_population)


In [29]:
data_population.show()

+----+----------+--------+
|Year|     label|features|
+----+----------+--------+
|1990|115414.069|[1990.0]|
|1991|119203.569|[1991.0]|
|1992|122375.179|[1992.0]|
|1993|125546.615|[1993.0]|
|1994|129245.139|[1994.0]|
|1995|133117.476|[1995.0]|
|1996| 137234.81|[1996.0]|
|1997|141330.267|[1997.0]|
|1998|145476.106|[1998.0]|
|1999|149694.462|[1999.0]|
|2000|154369.924|[2000.0]|
|2001|159217.727|[2001.0]|
|2002|163262.807|[2002.0]|
|2003| 166876.68|[2003.0]|
|2004| 170648.62|[2004.0]|
|2005|174372.098|[2005.0]|
|2006|178069.984|[2006.0]|
|2007|181924.521|[2007.0]|
|2008|185931.955|[2008.0]|
|2009|190123.222|[2009.0]|
+----+----------+--------+
only showing top 20 rows



In [18]:
# Split data into training and testing sets
train_population, test_population = data_population.randomSplit([0.8, 0.2], seed=42)

In [19]:
# Train Linear Regression model for population
lr = LinearRegression(featuresCol='features', labelCol='label')
population_model = lr.fit(train_population)

In [20]:
# Evaluate the model
predictions = population_model.transform(test_population)
evaluator = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='r2')
accuracy = evaluator.evaluate(predictions)
print("Population Model Accuracy: ", round(accuracy * 100, 2), "%")

Population Model Accuracy:  99.73 %


In [36]:
# Prepare data for Unemployment model
data_unemployment = data.withColumn("label", log(data['Unemployment']))
data_unemployment = assembler.transform(data_unemployment)

In [37]:

# Split data into training and testing sets
train_unemployment, test_unemployment = data_unemployment.randomSplit([0.8, 0.2], seed=42)


In [42]:
# Polynomial features for Unemployment
train_unemployment = poly_expansion.transform(train_unemployment)
test_unemployment = poly_expansion.transform(test_unemployment)

In [45]:
# Train Linear Regression model for Unemployment
lr = LinearRegression(featuresCol='polyFeatures', labelCol='label')
unemployment_model = lr.fit(train_unemployment)


In [46]:

# Evaluate the model
predictions = unemployment_model.transform(test_unemployment)
accuracy = evaluator.evaluate(predictions)
print("Unemployment Model Accuracy: ", round(accuracy * 100, 2), "%")


Unemployment Model Accuracy:  85.04 %


In [48]:

# Predictions for a specific year
year = 2025
features_for_prediction = spark.createDataFrame([(year,)], ["Year"])
features_for_prediction = assembler.transform(features_for_prediction)

# Predict population
predicted_population = population_model.transform(features_for_prediction).select('prediction').collect()[0][0]
print(f"Predicted Population in {year}: {int(predicted_population)}")



Predicted Population in 2025: 248870


In [50]:
import numpy as np
# Predict unemployment
predicted_unemployment = unemployment_model.transform(poly_expansion.transform(features_for_prediction)).select('prediction').collect()[0][0]
predicted_unemployment = np.exp(predicted_unemployment)
print(f"Predicted Unemployment in {year}: {int(predicted_unemployment)}")

Predicted Unemployment in 2025: 21
