In [5]:
import random
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("LargeDatasetExample") \
    .getOrCreate()

# Function to generate random data
def generate_data(num_rows):
    data = []
    for _ in range(num_rows):
        age = random.randint(18, 65)
        income = random.uniform(20000, 120000)
        gender = random.choice(['M', 'F'])
        marital_status = random.choice(['Single', 'Married'])
        label = random.choice([0, 1])
        data.append((age, income, gender, marital_status, label))
    return data

# Define schema
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("income", DoubleType(), True),
    StructField("gender", StringType(), True),
    StructField("marital_status", StringType(), True),
    StructField("label", IntegerType(), True)
])

# Generate and convert data to PySpark DataFrame
num_rows = 1000
data = generate_data(num_rows)
df = spark.createDataFrame(data, schema)

# Show the first few rows of the DataFrame
df.show(5)


+---+------------------+------+--------------+-----+
|age|            income|gender|marital_status|label|
+---+------------------+------+--------------+-----+
| 51|23703.570699519765|     F|        Single|    0|
| 43|118994.01530315925|     M|       Married|    1|
| 49| 23292.26600156681|     M|       Married|    0|
| 32|22782.329451253994|     F|       Married|    0|
| 31|101042.92559140343|     M|       Married|    0|
+---+------------------+------+--------------+-----+
only showing top 5 rows



24/09/14 14:12:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [6]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when

# For demonstration, handling missing values (even though the data has no missing values)
# Placeholder step
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['age', 'income'], outputCols=['age', 'income'])
df = imputer.fit(df).transform(df)

# Convert categorical columns to numeric indices
indexers = [
    StringIndexer(inputCol="gender", outputCol="gender_index"),
    StringIndexer(inputCol="marital_status", outputCol="marital_status_index")
]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Drop the original categorical columns
df = df.drop("gender", "marital_status")

# Assemble features into a single feature vector
assembler = VectorAssembler(inputCols=['age', 'income', 'gender_index', 'marital_status_index'], outputCol='features')
df = assembler.transform(df)

# Show the transformed DataFrame
df.show(5)


+---+------------------+-----+------------+--------------------+--------------------+
|age|            income|label|gender_index|marital_status_index|            features|
+---+------------------+-----+------------+--------------------+--------------------+
| 51|23703.570699519765|    0|         0.0|                 1.0|[51.0,23703.57069...|
| 43|118994.01530315925|    1|         1.0|                 0.0|[43.0,118994.0153...|
| 49| 23292.26600156681|    0|         1.0|                 0.0|[49.0,23292.26600...|
| 32|22782.329451253994|    0|         0.0|                 0.0|[32.0,22782.32945...|
| 31|101042.92559140343|    0|         1.0|                 0.0|[31.0,101042.9255...|
+---+------------------+-----+------------+--------------------+--------------------+
only showing top 5 rows



In [7]:
from pyspark.ml.classification import DecisionTreeClassifier

# Split data into training and testing sets
(trainingData, testData) = df.randomSplit([0.8, 0.2], seed=1234)

# Initialize and train the Decision Tree model
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')
model = dt.fit(trainingData)

# Make predictions on the test data
predictions = model.transform(testData)

# Show predictions
predictions.select('label', 'prediction').show(10)


+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       1.0|
|    0|       1.0|
+-----+----------+
only showing top 10 rows



In [8]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize evaluators for accuracy, precision, and recall
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol='label', predictionCol='prediction', metricName='accuracy')

evaluator_precision = MulticlassClassificationEvaluator(
    labelCol='label', predictionCol='prediction', metricName='precisionByLabel')

evaluator_recall = MulticlassClassificationEvaluator(
    labelCol='label', predictionCol='prediction', metricName='recallByLabel')

# Calculate metrics
accuracy = evaluator_accuracy.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)

print(f"Accuracy = {accuracy}")
print(f"Precision = {precision}")
print(f"Recall = {recall}")


Accuracy = 0.4744897959183674
Precision = 0.5058823529411764
Recall = 0.41346153846153844
