In [15]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("HadoopReadExample") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.dir", "file:/home/jovyan/work/logs") \
    .getOrCreate()

# Hadoop namenode address

# hadoop_namenode_address = "172.18.0.6"
hadoop_namenode_address = "namenode.docker-hadoop-default"
csv_path = f"hdfs://{hadoop_namenode_address}:9000/user/root/student_data"
# HDFS path to your CSV file


# Read the CSV file into a PySpark DataFrame
df_from_hdfs = spark.read.csv(csv_path, header=True, inferSchema=True)

# Show the DataFrame
df_from_hdfs.show()

+--------------+----------------+-----------------+------+--------------------------+----------------------+-----------+----------------------+----------------------+-------------------+-------------------+---------+-------------------------+------+-----------------------+------+------------------+-----------------+-------------+-----------------------------------+-----------------------------------+--------------------------------------+-----------------------------------+--------------------------------+----------------------------------------------+-----------------------------------+-----------------------------------+--------------------------------------+-----------------------------------+--------------------------------+----------------------------------------------+-----------------+--------------+-----+--------+
|Marital status|Application mode|Application order|Course|Daytime/evening attendance|Previous qualification|Nacionality|Mother's qualification|Father's qualification|

In [16]:
from pyspark.sql.functions import col

# Drop rows with null or NaN values in the target column
df_from_hdfs = df_from_hdfs.dropna(subset=["Target"])

# Drop rows with null or NaN values in any feature column
for feature_column in feature_columns:
    df_from_hdfs = df_from_hdfs.dropna(subset=[feature_column])

# Debug print to check the count of the dataset after preprocessing
print("Count of rows in the dataset after preprocessing:", df_from_hdfs.count())


Count of rows in the dataset after preprocessing: 4424


In [18]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define feature columns (excluding the target column)
feature_columns = df_from_hdfs.columns[:-1]  # Assuming the last column is the target column
target_column = "Target"  # Replace "Target" with your actual target column name

# Assemble features into a vector
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Convert string labels to numeric labels
label_indexer = StringIndexer(inputCol=target_column, outputCol="label")

# Create a Random Forest classifier
rf_classifier = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# Create a pipeline
pipeline = Pipeline(stages=[vector_assembler, label_indexer, rf_classifier])

# Split the data into training and testing sets
(training_data, testing_data) = df_from_hdfs.randomSplit([0.8, 0.2], seed=1234)

# Train the Random Forest model
model = pipeline.fit(training_data)

# Make predictions on the testing data
predictions = model.transform(testing_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Print the accuracy
print("Accuracy:", accuracy)


Accuracy: 0.7617924528301887
