In [1]:
from pymongo import MongoClient
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

In [2]:
ss = SparkSession.builder.getOrCreate()
ss.sparkContext.setLogLevel("ERROR")

25/03/02 19:31:44 WARN Utils: Your hostname, MacBook-Pro-584.local resolves to a loopback address: 127.0.0.1; using 10.0.0.170 instead (on interface en0)
25/03/02 19:31:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/02 19:31:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Read data from MongoDB

In [3]:
client = MongoClient("mongodb+srv://climateindicators:climatemongo@usf.6ejgq.mongodb.net/")
print(client.list_database_names())

['ClimateIndicators', 'admin', 'climate_db', 'config', 'local', 'mongodbVSCodePlaygroundDB', 'msds', 'msds697', 'test']


In [4]:
client.climate_db.list_collection_names()

['climateindicators-dds_icesheet',
 'climateindicators-dds_lpi',
 'climateindicators-dds_co2',
 'climateindicators-dds_globaltemp',
 'climateindicators-dds_sealevel']

In [5]:
db = client["climate_db"]
collection = db["climateindicators-dds_globaltemp"]
data = list(collection.find({},  {"_id": 0}))
df = ss.createDataFrame(pd.DataFrame(data))
df.show()

                                                                                

+----+------------+---------+
|year|No_Smoothing|Lowess(5)|
+----+------------+---------+
|1880|       -0.17|     -0.1|
|1881|       -0.09|    -0.13|
|1882|       -0.11|    -0.17|
|1883|       -0.17|     -0.2|
|1884|       -0.28|    -0.24|
|1885|       -0.33|    -0.26|
|1886|       -0.31|    -0.27|
|1887|       -0.36|    -0.27|
|1888|       -0.17|    -0.26|
|1889|       -0.11|    -0.26|
|1890|       -0.35|    -0.25|
|1891|       -0.22|    -0.26|
|1892|       -0.27|    -0.26|
|1893|       -0.31|    -0.26|
|1894|        -0.3|    -0.24|
|1895|       -0.23|    -0.22|
|1896|       -0.11|    -0.21|
|1897|       -0.11|    -0.18|
|1898|       -0.27|    -0.17|
|1899|       -0.18|    -0.18|
+----+------------+---------+
only showing top 20 rows



#### Feature engineering

In [6]:
window_spec = Window.orderBy("year")

df_lags = df.withColumn("lag_1_no_smooth", lag(col("No_Smoothing"), 1).over(window_spec))
df_lags = df_lags.withColumn("lag_2_no_smooth", lag(col("No_Smoothing"), 2).over(window_spec))
df_lags = df_lags.withColumn("lag_3_no_smooth", lag(col("No_Smoothing"), 3).over(window_spec))
df_lags = df_lags.withColumn("lag_4_no_smooth", lag(col("No_Smoothing"), 4).over(window_spec))
df_lags = df_lags.withColumn("lag_5_no_smooth", lag(col("No_Smoothing"), 5).over(window_spec))

df_lags = df_lags.dropna()

df_lags.show()

+----+------------+---------+---------------+---------------+---------------+---------------+---------------+
|year|No_Smoothing|Lowess(5)|lag_1_no_smooth|lag_2_no_smooth|lag_3_no_smooth|lag_4_no_smooth|lag_5_no_smooth|
+----+------------+---------+---------------+---------------+---------------+---------------+---------------+
|1885|       -0.33|    -0.26|          -0.28|          -0.17|          -0.11|          -0.09|          -0.17|
|1886|       -0.31|    -0.27|          -0.33|          -0.28|          -0.17|          -0.11|          -0.09|
|1887|       -0.36|    -0.27|          -0.31|          -0.33|          -0.28|          -0.17|          -0.11|
|1888|       -0.17|    -0.26|          -0.36|          -0.31|          -0.33|          -0.28|          -0.17|
|1889|       -0.11|    -0.26|          -0.17|          -0.36|          -0.31|          -0.33|          -0.28|
|1890|       -0.35|    -0.25|          -0.11|          -0.17|          -0.36|          -0.31|          -0.33|
|1891|    

In [7]:
features = ["lag_1_no_smooth", "lag_2_no_smooth", "lag_3_no_smooth", "lag_4_no_smooth", "lag_5_no_smooth"]

assembler_no_smooth = VectorAssembler(inputCols=features, outputCol="features")

df_feat = assembler_no_smooth.transform(df_lags).select("year", "features", col("No_Smoothing").alias("label"))

df_feat.show()

+----+--------------------+-----+
|year|            features|label|
+----+--------------------+-----+
|1885|[-0.28,-0.17,-0.1...|-0.33|
|1886|[-0.33,-0.28,-0.1...|-0.31|
|1887|[-0.31,-0.33,-0.2...|-0.36|
|1888|[-0.36,-0.31,-0.3...|-0.17|
|1889|[-0.17,-0.36,-0.3...|-0.11|
|1890|[-0.11,-0.17,-0.3...|-0.35|
|1891|[-0.35,-0.11,-0.1...|-0.22|
|1892|[-0.22,-0.35,-0.1...|-0.27|
|1893|[-0.27,-0.22,-0.3...|-0.31|
|1894|[-0.31,-0.27,-0.2...| -0.3|
|1895|[-0.3,-0.31,-0.27...|-0.23|
|1896|[-0.23,-0.3,-0.31...|-0.11|
|1897|[-0.11,-0.23,-0.3...|-0.11|
|1898|[-0.11,-0.11,-0.2...|-0.27|
|1899|[-0.27,-0.11,-0.1...|-0.18|
|1900|[-0.18,-0.27,-0.1...|-0.09|
|1901|[-0.09,-0.18,-0.2...|-0.16|
|1902|[-0.16,-0.09,-0.1...|-0.28|
|1903|[-0.28,-0.16,-0.0...|-0.37|
|1904|[-0.37,-0.28,-0.1...|-0.47|
+----+--------------------+-----+
only showing top 20 rows



#### Split into train, test set

In [8]:
df_ordered = df_feat.orderBy("year")
split_year = df_ordered.approxQuantile("year", [0.8], 0.01)[0]

train = df_feat.filter(col("year") <= split_year)
test = df_feat.filter(col("year") > split_year)

print(f"train size: {train.count()}, test size: {test.count()}")
print(f"Split year: {split_year}")

train size: 111, test size: 29
Split year: 1995.0


In [9]:
train_pd = train.select("year", "label").toPandas()
test_pd = test.select("year", "label").toPandas()
print(f"Train years: {min(train_pd['year'])} to {max(train_pd['year'])}")
print(f"Test years: {min(test_pd['year'])} to {max(test_pd['year'])}")

Train years: 1885 to 1995
Test years: 1996 to 2024


#### Fit the model

In [10]:
rf = RandomForestRegressor(featuresCol="features", 
                          labelCol="label", 
                          numTrees=100,     # Reduced from 200
                          maxDepth=5,       # Reduced from 10
                          minInstancesPerNode=5, # Increased from 2
                          maxBins=32)       # Added parameter
model = rf.fit(train)

predictions = model.transform(test)
predictions.select("year", "label", "prediction").orderBy("year").show()



+----+-----+-------------------+
|year|label|         prediction|
+----+-----+-------------------+
|1996| 0.33|0.30772384199134206|
|1997| 0.46| 0.3065821753246754|
|1998| 0.61|0.30796443722943734|
|1999| 0.38|0.30864884199134207|
|2000| 0.39| 0.3062844372294374|
|2001| 0.53| 0.3075469372294373|
|2002| 0.63| 0.3075469372294373|
|2003| 0.61| 0.3075469372294373|
|2004| 0.53| 0.3075469372294373|
|2005| 0.68| 0.3075469372294373|
|2006| 0.63| 0.3075469372294373|
|2007| 0.66| 0.3075469372294373|
|2008| 0.54| 0.3075469372294373|
|2009| 0.65| 0.3075469372294373|
|2010| 0.72| 0.3075469372294373|
|2011| 0.61| 0.3075469372294373|
|2012| 0.64| 0.3075469372294373|
|2013| 0.67| 0.3075469372294373|
|2014| 0.75| 0.3075469372294373|
|2015| 0.89| 0.3075469372294373|
+----+-----+-------------------+
only showing top 20 rows



#### Evaluate the model

In [11]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

Root Mean Squared Error (RMSE) on test data = 0.46999072962281657


#### Forecast next 100 years

In [12]:
# train model on all available data first
model = rf.fit(df_feat)

# extract the last 5 observations
last5 = (
    df_feat.orderBy(col("year").desc())
           .limit(5)
           .orderBy(col("year").asc())
           .select("label")
           .rdd.flatMap(lambda x: x)
           .collect()
)

print("Last 5 observed values:", last5)

# forecast the next 100 time steps (years)
forecast_years = 100
forecast = []
current_lags = last5[:]  # make a copy of the last 5 values

for i in range(forecast_years):
    # DenseVector for current lag features
    features_vector = Vectors.dense(current_lags)
    
    # create a single-row df for prediction
    pred_df = ss.createDataFrame([(features_vector,)], ["features"])
    
    pred_value = model.transform(pred_df).select("prediction").collect()[0][0]
    forecast.append(pred_value)
    
    # update current_lags: drop the oldest lag and append the new prediction
    current_lags = current_lags[1:] + [pred_value]

print("Forecast for the next 100 years:")
for i, val in enumerate(forecast, 1):
    print(f"Year +{i}: {val}")

Last 5 observed values: [1.01, 0.84, 0.89, 1.17, 1.28]
Forecast for the next 100 years:
Year +1: 0.9815144533244533
Year +2: 0.9882835009435008
Year +3: 0.984066477133977
Year +4: 0.9833455247530247
Year +5: 0.9833455247530247
Year +6: 0.9833455247530247
Year +7: 0.9833455247530247
Year +8: 0.9833455247530247
Year +9: 0.9833455247530247
Year +10: 0.9833455247530247
Year +11: 0.9833455247530247
Year +12: 0.9833455247530247
Year +13: 0.9833455247530247
Year +14: 0.9833455247530247
Year +15: 0.9833455247530247
Year +16: 0.9833455247530247
Year +17: 0.9833455247530247
Year +18: 0.9833455247530247
Year +19: 0.9833455247530247
Year +20: 0.9833455247530247
Year +21: 0.9833455247530247
Year +22: 0.9833455247530247
Year +23: 0.9833455247530247
Year +24: 0.9833455247530247
Year +25: 0.9833455247530247
Year +26: 0.9833455247530247
Year +27: 0.9833455247530247
Year +28: 0.9833455247530247
Year +29: 0.9833455247530247
Year +30: 0.9833455247530247
Year +31: 0.9833455247530247
Year +32: 0.98334552475

Time series prediction in PySpark has limitations because it does not natively support models like ARIMA/SARIMA, LSTMs, or Prophet. Though achieving highly accurate forecasts may be challenging, this is a good practice to apply ML techniques in PySpark and explore feature engineering with lagged variables.

In [24]:
# ss.stop()