In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=583849bfe89e618c96e4615900b9b513de3c13efe10c482be49a2caf254a362c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import ChiSqSelector
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler, ChiSqSelector, OneHotEncoder

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Obesity_Level_Classification") \
    .getOrCreate()


# Load the dataset
df = spark.read.csv("ObesityDataSet.csv", header=True, inferSchema=True)
# Handle missing values
df = df.na.drop()

# Select first 600 rows
df = df.limit(600)

# Collect the data for the NObeyesdad column to the driver
nobeysdad_data = df.select("NObeyesdad").toPandas()

# Plot the distribution using matplotlib
plt.figure(figsize=(8, 6))
plt.hist(nobeysdad_data["NObeyesdad"], bins=20, color='skyblue', alpha=0.7)
plt.title('Distribution of NObeyesdad Column')
plt.xlabel('Obesity Level')
plt.ylabel('Frequency')
plt.xticks(rotation=45)  # Rotate x-axis labels for better readability if necessary
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()


# Select relevant columns
selected_columns = ['Gender', 'Age', 'Height', 'Weight', 'family_history_with_overweight',
                    'FAVC', 'FCVC', 'NCP', 'CAEC', 'CH2O', 'SCC', 'FAF', 'TUE', 'CALC', 'MTRANS', 'NObeyesdad']
df = df.select(selected_columns)

# Encode categorical variables
categorical_columns = ['Gender', 'family_history_with_overweight', 'MTRANS']
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in categorical_columns]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Encode remaining categorical columns
categorical_columns_remaining = ['FAVC', 'CAEC', 'SCC', 'CALC']
indexers_remaining = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in categorical_columns_remaining]
pipeline_remaining = Pipeline(stages=indexers_remaining)
df = pipeline_remaining.fit(df).transform(df)


# Assemble features
feature_columns = ['Gender_index', 'Age', 'Height', 'Weight', 'family_history_with_overweight_index',
                   'FAVC_index', 'FCVC', 'NCP', 'CAEC_index', 'CH2O', 'SCC_index', 'FAF', 'TUE', 'CALC_index', 'MTRANS_index']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)
# Update feature_columns to include the encoded columns
feature_columns += [column+"_encoded" for column in categorical_columns_remaining]

# Encode target variable
target_indexer = StringIndexer(inputCol="NObeyesdad", outputCol="NObeyesdad_index", handleInvalid="keep")
df = target_indexer.fit(df).transform(df)
# Select relevant features using ChiSqSelector
selector = ChiSqSelector(numTopFeatures=10, featuresCol="scaled_features", outputCol="selected_features",
                         labelCol="NObeyesdad_index")
selector_model = selector.fit(df)
df = selector_model.transform(df)

# Split the data into training and testing sets
(training_data, testing_data) = df.randomSplit([0.8, 0.2], seed=42)

# Initialize machine learning models
rf = RandomForestClassifier(labelCol="NObeyesdad_index", featuresCol="selected_features")
lr = LogisticRegression(labelCol="NObeyesdad_index", featuresCol="selected_features")

# Train models
rf_model = rf.fit(training_data)
lr_model = lr.fit(training_data)


# Evaluate models
evaluator = MulticlassClassificationEvaluator(labelCol="NObeyesdad_index", metricName="accuracy")

rf_predictions = rf_model.transform(testing_data)
rf_accuracy = evaluator.evaluate(rf_predictions)

lr_predictions = lr_model.transform(testing_data)
lr_accuracy = evaluator.evaluate(lr_predictions)

print("Random Forest Accuracy:", rf_accuracy)
print("Logistic Regression Accuracy:", lr_accuracy)


# Stop SparkSession
spark.stop()
