In [None]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.6"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [None]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WeatherPredictionPune").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [None]:
# Read the CSV file into a Spark DataFrame
df_spark = spark.read.csv("file:///home/talentum/Pune.csv", header=True, inferSchema=True)

# Show the DataFrame
df_spark.show()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas DataFrame
df = df_spark.toPandas()

# Plotting
sns.set(style="darkgrid")
fig, axs = plt.subplots(2, 3, figsize=(10, 8))

sns.histplot(data=df, x="maxtempC", kde=True, ax=axs[0, 0], color='red')
sns.histplot(data=df, x="mintempC", kde=True, ax=axs[0, 1], color='green')
sns.histplot(data=df, x="pressure", kde=True, ax=axs[0, 2], color='blue')
sns.histplot(data=df, x="humidity", kde=True, ax=axs[1, 0], color='orange')
sns.histplot(data=df, x="HeatIndexC", kde=True, ax=axs[1, 1], color='black')
sns.histplot(data=df, x="uvIndex", kde=True, ax=axs[1, 2], color='yellow')

# Adjust layout to prevent overlap
plt.tight_layout()

# Save the figure
plt.savefig('histogram_distribution.png')


In [None]:
# Get number of rows
num_rows = df_spark.count()

# Get number of columns
num_columns = len(df_spark.columns)

# Print the shape
print(f"Shape of the DataFrame: ({num_rows}, {num_columns})")

In [None]:
df_spark.dtypes

In [None]:
from pyspark.sql.functions import col, sum

# Print the schema

print("Schema:")
df_spark.printSchema()

# Print column names
print("\nColumns:")
print(df_spark.columns)

# Count number of rows
num_rows = df_spark.count()
print("\nNumber of rows:", num_rows)

# Show basic statistics
print("\nBasic statistics:")
df_spark.describe().show()

# Count nulls in each column
print("\nNull counts:")
null_counts = df_spark.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_spark.columns])
null_counts.show()

In [None]:
df_spark.describe().show()

In [None]:
columns_to_drop = ['date_time', 'moonrise', 'moonset', 'sunrise', 'sunset', 'FeelsLikeC']
df_spark = df_spark.drop(*columns_to_drop)

# Show the updated DataFrame schema
df_spark.printSchema()

In [None]:
from pyspark.sql import functions as F

# Create a list of column names with corresponding null counts
null_counts = df_spark.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_spark.columns])

# Show the null counts
null_counts.show()


In [None]:
from pyspark.sql.functions import mean, col, sum

# Identify numerical columns
numerical_columns = [c for c, dtype in df_spark.dtypes if dtype in ('int', 'double')]

# Calculate mean for each numerical column
mean_values = df_spark.select([mean(col(c)).alias(c) for c in numerical_columns]).first().asDict()

# Fill missing values with the calculated means
filled_df = df_spark.fillna(mean_values)

# Count nulls in each column after filling
null_counts = filled_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in filled_df.columns])

# Show the null counts to verify
null_counts.show()


In [None]:
# Drop the 'tempC' column to create X
X = filled_df.drop('tempC')

# Select the 'tempC' column to create Y
Y = filled_df.select('tempC')

# Show X and Y to verify
X.show()
Y.show()

In [None]:
# Split data into training and testing sets

train_df, test_df = filled_df.randomSplit([0.8, 0.2], seed=69)

# HDFS namenode address and port
namenode = "hdfs://localhost"
port = "9000"

# Full HDFS path
hdfs_path = f"{namenode}:{port}/user/talentum/"

# Save the training and testing sets to HDFS
train_df.write.mode('overwrite').parquet(f'{hdfs_path}/train_df')
test_df.write.mode('overwrite').parquet(f'{hdfs_path}/test_df')

# Separate features and target in training and testing sets
X_train = train_df.drop('tempC')
Y_train = train_df.select('tempC')
X_test = test_df.drop('tempC')
Y_test = test_df.select('tempC')

# Save the training and testing sets to HDFS
X_train.write.mode('overwrite').parquet(f'{hdfs_path}/X_train')
Y_train.write.mode('overwrite').parquet(f'{hdfs_path}/Y_train')
X_test.write.mode('overwrite').parquet(f'{hdfs_path}/X_test')
Y_test.write.mode('overwrite').parquet(f'{hdfs_path}/Y_test')


In [None]:
# HDFS namenode address and port
namenode = "hdfs://localhost"
port = "9000"

# Full HDFS path
hdfs_path = f"{namenode}:{port}/user/talentum/"

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Load the training and testing sets from HDFS
train_df = spark.read.parquet(f'{hdfs_path}/train_df')
test_df = spark.read.parquet(f'{hdfs_path}/test_df')

# Prepare the feature assembler
assembler = VectorAssembler(inputCols=[col for col in train_df.columns if col != 'tempC'], outputCol="features")

# Transform training and test data
train_data = assembler.transform(train_df).select("features", col("tempC").alias("label"))
test_data = assembler.transform(test_df).select("features", col("tempC").alias("label"))

# Initialize the Linear Regression model with maxIter set to 50
lr = LinearRegression(featuresCol="features", labelCol="label", maxIter=50)

# Set up the parameter grid for tuning
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5])         # Regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])   # ElasticNet mixing parameter
             .build())

# Define the evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="label", predictionCol="prediction")

# Set up cross-validator with parallelism
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,      # 3-fold cross-validation
                          parallelism=4)   # Use 4 parallel tasks

# Fit the cross-validator model
cv_model = crossval.fit(train_data)

# Select the best model
best_model = cv_model.bestModel

# Evaluate the model on test data
test_predictions = best_model.transform(test_data)
rmse = evaluator.evaluate(test_predictions)
print(f"Best Model's RMSE on test data: {rmse}")

# Define the HDFS model path

model_path = f"{namenode}:{port}/user/talentum/lrg_best_model"

# Save the best model to HDFS with overwrite option
try:
    best_model.write().overwrite().save(model_path)
    print(f"Best model saved to: {model_path}")
except Exception as e:
    print(f"Error saving the model: {e}")


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on test data
predictions = best_model.transform(test_data)

# Initialize the RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="mae"  # Mean Absolute Error
)

# Calculate Mean Absolute Error (MAE)
mae = evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on test data
predictions = best_model.transform(test_data)

# Initialize the RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="mse"  # Mean Squared Error
)

# Calculate Mean Squared Error (MSE)
mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error (MSE): {mse}")


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on test data
predictions = best_model.transform(test_data)

# Initialize the RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="r2"  # R-squared score
)

# Calculate R-squared (R²) score
r2 = evaluator.evaluate(predictions)
print(f"R-squared (R²) score: {r2}")


In [None]:
# Make predictions on test data
predictions = best_model.transform(test_data)

# Extract predictions as a DataFrame
predicted_values_df = predictions.select("prediction")

# Collect predictions into a list
y_pred = predicted_values_df.rdd.flatMap(lambda x: x).collect()

# If you also need true values
true_values_df = predictions.select("label")
y_true = true_values_df.rdd.flatMap(lambda x: x).collect()

# Display some results
print(f"First few true values: {y_true[:5]}")
print(f"First few predicted values: {y_pred[:5]}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np


# Create a figure with a specific size
plt.figure(figsize=(12, 8))  # Width = 12 inches, Height = 8 inches

# Create a scatter plot
plt.scatter(y_true, y_pred, c='blue', label='Actual vs. Predicted')
plt.plot([min(y_true), max(y_true)], [min(y_true), max(y_true)], linestyle='--', color='red', label='Prediction')

# Add labels and title
plt.xlabel('Actual Values (y_true)')
plt.ylabel('Predicted Values (y_pred)')
plt.title('Scatter Plot of Actual vs. Predicted Values')

# Add a legend
plt.legend()

# Show the plot
plt.show()


In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegressionModel



# Define the HDFS path where the model is saved
model_path = "hdfs://localhost:9000/user/talentum/lrg_best_model"  # Replace with your HDFS path

# Define the artificial input data with all 16 features
input_data = np.array([[31, 17, 11, 6, 20, 20714, 18, 18, 18, 5, 1, 63, 0, 1014, 10, 82]])

# Convert input_data to a Spark DataFrame
feature_columns = [f"feature{i+1}" for i in range(input_data.shape[1])]
input_df = spark.createDataFrame(pd.DataFrame(input_data, columns=feature_columns))

# Load the trained model from HDFS
model = LinearRegressionModel.load(model_path)

# Prepare the feature assembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
input_data_transformed = assembler.transform(input_df)

# Make predictions
predictions = model.transform(input_data_transformed)

# Show predictions (for demonstration purposes)
predictions.show()



In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Define the HDFS path
hdfs_path = "hdfs://localhost:9000/user/talentum/"  # Replace with your actual HDFS path

# Load the training and testing sets from HDFS
train_df = spark.read.parquet(f'{hdfs_path}/train_df')
test_df = spark.read.parquet(f'{hdfs_path}/test_df')

# Prepare the feature assembler
assembler = VectorAssembler(inputCols=[col for col in train_df.columns if col != 'tempC'], outputCol="features")

# Transform the data to include the feature column
train_data_assembled = assembler.transform(train_df)
test_data_assembled = assembler.transform(test_df)

# Scale the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(train_data_assembled)
train_data_scaled = scaler_model.transform(train_data_assembled)
test_data_scaled = scaler_model.transform(test_data_assembled)

# Prepare final training and test datasets
train_data_final = train_data_scaled.select(col("scaled_features").alias("features"), col("tempC").alias("label"))
test_data_final = test_data_scaled.select(col("scaled_features").alias("features"), col("tempC").alias("label"))

# Initialize the Decision Tree Regressor model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="label")

# Set up the parameter grid for tuning only maxDepth and minInfoGain
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10, 15])   # Test different tree depths
             .addGrid(dt.minInfoGain, [0.0, 0.01])  # Test different thresholds for info gain
             .build())

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Set up cross-validator
crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,      # 3-fold cross-validation
                          parallelism=4)   # Use 4 parallel tasks

# Fit the cross-validator model
cv_model = crossval.fit(train_data_final)

# Select the best model
best_model = cv_model.bestModel

# Evaluate the best model on test data
predictions = best_model.transform(test_data_final)
rmse = evaluator.evaluate(predictions)
print(f"Best Model's RMSE on test data: {rmse}")

# Define model paths
model_path_hdfs = f'{hdfs_path}dt_model'
scaler_model_path_hdfs = f'{hdfs_path}scaler_model'

# Save the best Decision Tree model to HDFS
best_model.write().overwrite().save(model_path_hdfs)
print(f"Decision Tree model saved to: {model_path_hdfs}")

# Save the Standard Scaler model to HDFS
scaler_model.write().overwrite().save(scaler_model_path_hdfs)
print(f"Scaler model saved to: {scaler_model_path_hdfs}")

# Evaluate the model using MAE
evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE) on test data = {mae}")

# Show some prediction results
predictions.select("prediction", "label", "features").show(5)


In [None]:
# Evaluate the model using MSE
evaluator_mse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
mse = evaluator_mse.evaluate(predictions)
print(f"Mean Squared Error (MSE) on test data = {mse}")

In [None]:
# Evaluate the model using R²
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R² score on test data = {r2}")

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScalerModel
from pyspark.ml.regression import DecisionTreeRegressionModel
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession, Row

# Define the HDFS path
hdfs_model_path = "hdfs://localhost:9000/user/talentum/dt_model"
hdfs_scaler_path = "hdfs://localhost:9000/user/talentum/scaler_model"

# Load the trained model from HDFS
dt_model = DecisionTreeRegressionModel.load(hdfs_model_path)

# Load the scaler model from HDFS
scaler_model = StandardScalerModel.load(hdfs_scaler_path)

# Prepare the input data
input_data = np.array([[31, 17, 11, 6, 20, 20714, 18, 18, 18, 5, 1, 63, 0, 1014, 10, 82]])
input_df = spark.createDataFrame([Row(features=Vectors.dense(input_data[0]))])

# Scale the input data
input_data_scaled = scaler_model.transform(input_df)

# Make predictions
predictions = dt_model.transform(input_data_scaled)
predictions.select("prediction").show()




In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize the Random Forest Regressor model
rf = RandomForestRegressor(featuresCol="features", labelCol="label")

# Set up the parameter grid for tuning numTrees, maxDepth, and minInfoGain
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10, 20, 50])
             .addGrid(rf.maxDepth, [5, 10, 15])
             .addGrid(rf.minInfoGain, [0.0, 0.01, 0.1])
             .build())

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Set up cross-validator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,      # 3-fold cross-validation
                          parallelism=4)   # Use 4 parallel tasks

# Fit the cross-validator model
cv_model = crossval.fit(train_data_final)

# Select the best model
best_model = cv_model.bestModel

# Save the best Random Forest model to HDFS
model_path_hdfs = f'{hdfs_path}rf_model'
scaler_model_path_hdfs = f'{hdfs_path}scaler_model'
best_model.write().overwrite().save(model_path_hdfs)
print(f"Random Forest model saved to: {model_path_hdfs}")

# Make predictions
predictions = best_model.transform(test_data_final)

# Evaluate the best model
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)
print(f"Best Model's RMSE on test data: {rmse}")

# Evaluate the model using MAE
evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE) on test data = {mae}")

# Evaluate the model using R-squared (R2)
evaluator_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R-squared (R2) on test data = {r2}")

# Show some prediction results
predictions.select("prediction", "label", "features").show(5)


In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize the Random Forest Regressor model
rf = RandomForestRegressor(featuresCol="features", labelCol="label")

# Reduced parameter grid for quicker training
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10, 20])
             .addGrid(rf.maxDepth, [5, 10])
             .addGrid(rf.minInfoGain, [0.0])
             .build())

# Define the evaluator
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Set up cross-validator with reduced number of folds and increased parallelism
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2,      # Reduced number of folds
                          parallelism=8)   # Increased parallelism

# Sample data to speed up the process
sample_fraction = 0.5  # Use 50% of the data
train_data_sample = train_data_final.sample(withReplacement=False, fraction=sample_fraction)

# Fit the cross-validator model
cv_model = crossval.fit(train_data_sample)

# Select the best model
best_model = cv_model.bestModel

# Save the best Random Forest model to HDFS
model_path_hdfs = f'{hdfs_path}rf_model'
scaler_model_path_hdfs = f'{hdfs_path}scaler_model'
best_model.write().overwrite().save(model_path_hdfs)
print(f"Random Forest model saved to: {model_path_hdfs}")

# Make predictions
predictions = best_model.transform(test_data_final)

# Evaluate the best model
rmse = evaluator.evaluate(predictions)
print(f"Best Model's RMSE on test data: {rmse}")

# Additional metrics
mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae").evaluate(predictions)
r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2").evaluate(predictions)
print(f"Mean Absolute Error (MAE) on test data = {mae}")
print(f"R-squared (R2) on test data = {r2}")

# Show some prediction results
predictions.select("prediction", "label", "features").show(5)


In [None]:
import pandas as pd
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import StandardScaler
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

# Load your dataset
data = pd.read_csv("file:///home/talentum/Pune.csv")

# Drop unsupported columns
data = data.drop(columns=['date_time', 'moonrise', 'moonset', 'sunrise', 'sunset', 'FeelsLikeC'])

# Prepare the data
X = data.drop(columns=['tempC'])  # Features
y = data['tempC']                 # Target variable

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=123)

# Standardize the features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Initialize the XGBoost Regressor
xgb = XGBRegressor(objective='reg:squarederror', n_estimators=100)

# Define a smaller parameter grid
param_grid = {
    'max_depth': [3, 5],
    'learning_rate': [0.05, 0.1],
    'gamma': [0, 0.1]
}

# Set up GridSearchCV with a smaller grid
grid_search = GridSearchCV(
    estimator=xgb,
    param_grid=param_grid,
    scoring='neg_mean_squared_error',
    cv=3,              # 3-fold cross-validation
    verbose=1,         # Print progress
    n_jobs=-1          # Use all available CPU cores
)

# Train the model with hyperparameter tuning
grid_search.fit(X_train, y_train)

# Get the best model
best_xgb = grid_search.best_estimator_

# Make predictions on the test data
y_pred = best_xgb.predict(X_test)

# Evaluate the model
rmse = mean_squared_error(y_test, y_pred, squared=False)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")
print(f"Mean Absolute Error (MAE) on test data: {mae}")
print(f"R-squared (R2) on test data: {r2}")

# Show some prediction results
results = pd.DataFrame({'Actual': y_test, 'Predicted': y_pred})
print(results.head())
