In [None]:
!apt-get install openjdk-8-jdk-headless -q9 > /dev/null
#Install latest Version of pyspark
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
#Set environment variable for colab
import os
os. environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
!pip install -q findspark
import findspark
findspark.init()

In [None]:
pip install pyspark



In [None]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

If you're trying this out with EMR cluster, use the below provided code and paste it in the terminal's python file, One change you need to do is in csv_file_path variable below, use this URI of the s3 bucket where you've uploaded the dataset and make sure you've given necessary permissions to S3 bucket so that we can access them into our cluster. After these changes, run this with spark-submit file_name.py command and you can see the output.  

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import col,isnan, when, count
from pyspark.ml.feature import StringIndexer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

csv_file_path = "/mys3bucket/location.csv"

spark = SparkSession.builder.appName("CovidAnalysis").getOrCreate()

df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

columns_to_drop = ['Data As Of', 'Start Date', 'End Date', 'ICD10_codes']
df = df.drop(*columns_to_drop)

numeric_cols = ['Year', 'Month', 'COVID-19 Deaths', 'Number of Mentions']
categorical_cols = ['Group', 'State', 'Condition Group', 'Condition', 'Age Group']

# Fill missing values in numeric columns with mean
for col_name in numeric_cols:
    mean_value = df.agg({col_name: 'mean'}).collect()[0][0]
    df = df.na.fill(mean_value, [col_name])


# Fill missing values in categorical columns with mode
for col_name in categorical_cols:
    mode_value = df.groupBy(col_name).count().orderBy(col("count").desc()).first()[0]
    df = df.na.fill(mode_value, [col_name])

categorical_cols = ['Group', 'State', 'Condition Group', 'Condition', 'Age Group']

# Indexing categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="skip") for col in categorical_cols]

# Creating a Pipeline for indexing categorical columns
indexer_pipeline = Pipeline(stages=indexers)
df_indexed = indexer_pipeline.fit(df).transform(df)

# Assembling indexed categorical columns into a single vector
cat_cols_indexed = [col+"_index" for col in categorical_cols]
assembler = VectorAssembler(inputCols=cat_cols_indexed, outputCol="cat_features_raw")
df_assembled = assembler.transform(df_indexed)

# Scaling categorical features
scaler = StandardScaler(inputCol="cat_features_raw", outputCol="cat_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_assembled)
df = scaler_model.transform(df_assembled)

# Selecting only relevant columns
selected_cols = cat_cols_indexed + ["cat_features"]
df_result = df.select(*selected_cols)

# Scale numerical features
numerical_cols = ['Year', 'Month',  'Number of Mentions']
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="num_features_raw")
scaler = StandardScaler(inputCol="num_features_raw", outputCol="num_features", withStd=True, withMean=True)


pipeline = Pipeline(stages=[assembler, scaler])
df = pipeline.fit(df).transform(df)

cat_features = ['cat_features']
num_features = ['num_features']

# Assembling categorical and numeric columns into a single feature vector column
assembler = VectorAssembler(inputCols=cat_features + num_features, outputCol="features")
df = assembler.transform(df)

# Selecting the relevant columns for the model
selected_cols = ['features', 'COVID-19 Deaths']
df_selected = df.select(*selected_cols)

train_data, test_data = df_selected.randomSplit([0.8, 0.2], seed=42)

gbt = GBTRegressor(labelCol='COVID-19 Deaths', featuresCol='features', maxDepth=5, maxIter=20, seed=42)

pipeline = Pipeline(stages=[gbt])

model = pipeline.fit(train_data)

Train_predictions = model.transform(train_data)

predictions = model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol='COVID-19 Deaths', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print(f'Root Mean Squared Error on Test Set: {rmse}')

predictions.select("COVID-19 Deaths", "prediction").show()

Root Mean Squared Error on Test Set: 3126.045940173496
+---------------+-------------------+
|COVID-19 Deaths|         prediction|
+---------------+-------------------+
|              0|0.04616758426014178|
|            120| 119.67256404225512|
|              0|0.04616758426014178|
|            120| 119.67256404225512|
|            120| 119.67256404225512|
|            120| 119.67256404225512|
|              0|0.04616758426014178|
|              0|0.04616758426014178|
|              0|0.04616758426014178|
|              0|0.04616758426014178|
|              0|0.04616758426014178|
|            120| 119.67256404225512|
|              0|0.04616758426014178|
|            120| 119.67256404225512|
|              0|0.04616758426014178|
|            120| 119.67256404225512|
|              0|0.04616758426014178|
|            120| 119.67256404225512|
|            120| 119.67256404225512|
|            120| 119.67256404225512|
+---------------+-------------------+
only showing top 20 rows

