In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=4a857cae66610e981f044184d8ae7f4751b43bbbce650534b6bcb6a8df705478
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
# Gaussian Naïve Bayes Classifier dự đoán Angelina Jolie có bệnh tim không
import random
from pyspark.sql import SparkSession
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

# Initialize Spark session
spark = SparkSession.builder.appName("GNBClassifier").getOrCreate()

# Function to generate random LabeledPoint data (replace with real heart disease data)
def generate_data(num_samples=50, num_features=13):  # Adjust num_features as needed
    data = []
    for _ in range(num_samples):
        label = random.choice([0.0, 1.0])
        features = [random.uniform(1.0, 10.0) for _ in range(num_features)]  # Adjust range based on actual data
        data.append(LabeledPoint(label, Vectors.dense(features)))
    return data

# Generate data for training (replace with real data for actual use)
data = generate_data(50, num_features=13)

# Create RDD
rdd = spark.sparkContext.parallelize(data)

# Split the data into training (60%) and test (40%)
training, test = rdd.randomSplit([0.6, 0.4], seed=42)

# Train a Gaussian Naive Bayes model
model = NaiveBayes.train(training, 1.0)

# Make prediction and test accuracy
predictions_and_labels = test.map(lambda p: (model.predict(p.features), p.label))
predictions_and_labels = predictions_and_labels.collect()

# Model accuracy
correct = sum(1 for x in predictions_and_labels if x[0] == x[1])
accuracy = correct / len(predictions_and_labels)
print(f"Model accuracy: {accuracy * 100:.2f}%")

# Predict heart disease
# Data for Angelina Jolie
new_data = [
    [53, 1, 0, 140, 203, 1, 0, 155, 1, 3.1, 0, 0, 3]  # Age, sex, chest pain type, blood pressure, cholesterol, fasting blood sugar, ECG results, heart rate, exercise angina, oldpeak, ST slope, vessels colored, thalassemia
]

# Name corresponding to new data
names = ["Angelina Jolie"]

# Convert new data to RDD of Vectors (no labels needed here)
new_rdd = spark.sparkContext.parallelize(new_data).map(lambda x: Vectors.dense(x))

# Make predictions
try:
    predictions = new_rdd.map(lambda features: model.predict(features))
    predictions_result = predictions.collect()

    # Print predictions with names
    for name, features, prediction in zip(names, new_data, predictions_result):
        print(f"{name} (Features: {features}) - Prediction: {'Heart Disease' if prediction == 1.0 else 'No Heart Disease'}")
except Exception as e:
    print(f"An error occurred: {e}")

# Stop the Spark session
spark.stop()



Model accuracy: 35.29%
Angelina Jolie (Features: [53, 1, 0, 140, 203, 1, 0, 155, 1, 3.1, 0, 0, 3]) - Prediction: No Heart Disease


In [None]:
# K Nearest Neighbor Classifier dự đoán Angelina Jolie có bệnh tim không
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import numpy as np
from pyspark.sql.types import StructType, StructField, FloatType

# Khởi tạo Spark session
spark = SparkSession.builder.appName("KNNClassifier").getOrCreate()

# Hàm sinh dữ liệu giả lập với 13 đặc trưng
def generate_data(num_samples):
    data = []
    for _ in range(num_samples):
        label = float(np.random.choice([0.0, 1.0]))  # Nhãn bệnh tim hoặc không bệnh tim
        features = [float(np.random.uniform(1.0, 20.0)) for _ in range(13)]  # 13 đặc trưng
        data.append((label, *features))
    return data

# Sinh 50 dòng dữ liệu
data = generate_data(50)

# Định nghĩa schema cho DataFrame với 13 đặc trưng
schema = StructType([
    StructField("label", FloatType(), False),
    *[StructField(f"feature{i}", FloatType(), False) for i in range(1, 14)]  # 13 đặc trưng
])

# Tạo DataFrame từ dữ liệu
df = spark.createDataFrame(data, schema)

# Chuyển đổi các đặc trưng thành vector
assembler = VectorAssembler(inputCols=[f"feature{i}" for i in range(1, 14)], outputCol="features")
df_transformed = assembler.transform(df).select("label", "features")

# Chia dữ liệu thành tập huấn luyện và tập kiểm tra
train_data, test_data = df_transformed.randomSplit([0.7, 0.3])

# Convert DataFrames to RDDs
train_rdd = train_data.rdd.map(lambda row: (row.label, np.array(row.features.toArray())))
test_rdd = test_data.rdd.map(lambda row: (row.label, np.array(row.features.toArray())))

# Collect training data to the driver
train_data_list = train_rdd.collect()

# Định nghĩa hàm phân loại bằng KNN
def knn_predict(test_point, train_data_list, k=3):
    distances = [(label, np.linalg.norm(test_point - features)) for label, features in train_data_list]
    nearest_neighbors = sorted(distances, key=lambda x: x[1])[:k]  # Lấy k hàng xóm gần nhất
    labels = [label for label, _ in nearest_neighbors]
    return max(set(labels), key=labels.count)  # Trả về nhãn phổ biến nhất

# Dự đoán nhãn cho dữ liệu kiểm tra
def predict_label(test_point):
    return knn_predict(test_point[1], train_data_list)

predictions = test_rdd.map(lambda test_point: (test_point[0], predict_label(test_point)))

# Tính toán độ chính xác
correct_predictions = predictions.filter(lambda x: x[0] == x[1]).count()
total_predictions = predictions.count()
accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0

print(f"Độ chính xác của mô hình: {accuracy * 100:.2f}%")

# Dự đoán bệnh tim cho Angelina Jolie
# Giả sử Angelina Jolie có 13 đặc trưng như sau
angelina_jolie_features = np.array([53, 1, 0, 140, 203, 1, 0, 155, 1, 3.1, 0, 0, 3])

# Dự đoán bệnh tim cho Angelina Jolie
angelina_prediction = knn_predict(angelina_jolie_features, train_data_list)
print(f"Angelina Jolie - Prediction: {'Heart Disease' if angelina_prediction == 1.0 else 'No Heart Disease'}")

# Dừng Spark session
spark.stop()




Độ chính xác của mô hình: 58.33%
Angelina Jolie - Prediction: No Heart Disease


In [None]:
# Extremely Randomized Trees dự đoán Angelina Jolie có bệnh tim không
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np
from pyspark.sql.types import StructType, StructField, FloatType
from pyspark.ml.linalg import Vectors

# Khởi tạo Spark session
spark = SparkSession.builder.appName("ExtraTreesClassifier").getOrCreate()

# Hàm tạo dữ liệu ngẫu nhiên
def generate_data(num_samples):
    data = []
    for _ in range(num_samples):
        label = float(np.random.choice([0.0, 1.0]))  # Nhãn bệnh tim hoặc không bệnh tim
        features = [float(np.random.uniform(1.0, 20.0)) for _ in range(13)]  # 13 đặc trưng
        data.append((label, *features))
    return data

# Sinh 50 dòng dữ liệu
data = generate_data(50)

# Định nghĩa schema cho DataFrame
schema = StructType([
    StructField("label", FloatType(), False),
    StructField("feature1", FloatType(), False),
    StructField("feature2", FloatType(), False),
    StructField("feature3", FloatType(), False),
    StructField("feature4", FloatType(), False),
    StructField("feature5", FloatType(), False),
    StructField("feature6", FloatType(), False),
    StructField("feature7", FloatType(), False),
    StructField("feature8", FloatType(), False),
    StructField("feature9", FloatType(), False),
    StructField("feature10", FloatType(), False),
    StructField("feature11", FloatType(), False),
    StructField("feature12", FloatType(), False),
    StructField("feature13", FloatType(), False)
])

# Tạo DataFrame từ dữ liệu
df = spark.createDataFrame(data, schema)

# Chuyển đổi các cột đặc trưng thành một vector
assembler = VectorAssembler(
    inputCols=[f"feature{i}" for i in range(1, 14)],
    outputCol="features"
)

df = assembler.transform(df)
df = df.select("label", "features")

# Chia dữ liệu thành tập huấn luyện và tập kiểm tra
(training_data, test_data) = df.randomSplit([0.6, 0.4])

# Tạo mô hình RandomForestClassifier (Extra Trees là một biến thể của RandomForest)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# Huấn luyện mô hình
model = rf.fit(training_data)

# Dự đoán trên tập kiểm tra
predictions = model.transform(test_data)

# Đánh giá mô hình
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model accuracy: {accuracy * 100:.2f}%")

# Dự đoán bệnh tim cho Angelina Jolie
# Cần thay đổi thông tin cụ thể của Angelina Jolie thành 13 đặc trưng tương ứng
angelina_jolie_features = np.array([53, 1, 0, 140, 203, 1, 0, 155, 1, 3.1, 0, 0, 3])

# Chuyển đổi dữ liệu của Angelina Jolie thành DataFrame (với kiểu Vector cho cột 'features')
def create_features_vector(features):
    return Vectors.dense(features)

# Chuyển dữ liệu của Angelina Jolie thành DataFrame
angelina_df = spark.createDataFrame([(create_features_vector(angelina_jolie_features.tolist()),)], ["features"])

# Dự đoán
angelina_prediction = model.transform(angelina_df).select("prediction").collect()[0][0]
print(f"Angelina Jolie - Prediction: {'Heart Disease' if angelina_prediction == 1.0 else 'No Heart Disease'}")

# Dừng Spark session
spark.stop()


Model accuracy: 60.00%
Angelina Jolie - Prediction: Heart Disease


In [3]:
# Neural Network - Multi-Layer Perceptron dự đoán Angelina Jolie có bệnh tim không
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
import numpy as np

# Khởi tạo Spark session
spark = SparkSession.builder.appName("MLPClassifier").getOrCreate()

# Hàm tạo dữ liệu ngẫu nhiên
def generate_data(num_samples):
    data = []
    for _ in range(num_samples):
        label = float(np.random.choice([0.0, 1.0]))  # Nhãn bệnh tim hoặc không bệnh tim
        features = [float(np.random.uniform(1.0, 20.0)) for _ in range(13)]  # 13 đặc trưng
        data.append((label, *features))
    return data

# Sinh 50 dòng dữ liệu
data = generate_data(50)

# Định nghĩa schema cho DataFrame
schema = StructType([
    StructField("label", FloatType(), False),
    *[StructField(f"feature{i}", FloatType(), False) for i in range(1, 14)]
])

# Tạo DataFrame từ dữ liệu
df = spark.createDataFrame(data, schema)

# Chuyển đổi các cột đặc trưng thành một vector
assembler = VectorAssembler(
    inputCols=[f"feature{i}" for i in range(1, 14)],
    outputCol="features"
)

df = assembler.transform(df)
df = df.select("label", "features")

# Định nghĩa cấu trúc mạng nơ-ron
layers = [13, 8, 6, 2]  # 13 đặc trưng đầu vào, 8 nơ-ron lớp ẩn đầu tiên, 6 nơ-ron lớp ẩn thứ hai, 2 lớp đầu ra

# Tạo mô hình MultiLayerPerceptronClassifier
mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=100,
    layers=layers,
    blockSize=128,
    seed=1234
)

# Chia dữ liệu thành tập huấn luyện và tập kiểm tra
(training_data, test_data) = df.randomSplit([0.7, 0.3])

# Huấn luyện mô hình
model = mlp.fit(training_data)

# Dự đoán trên tập kiểm tra
predictions = model.transform(test_data)

# Đánh giá mô hình
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model accuracy: {accuracy * 100:.2f}%")

# Dự đoán bệnh tim cho Angelina Jolie
# Cần thay đổi thông tin cụ thể của Angelina Jolie thành 13 đặc trưng tương ứng
angelina_jolie_features = np.array([53, 1, 0, 140, 203, 1, 0, 155, 1, 3.1, 0, 0, 3])

# Chuyển đổi dữ liệu của Angelina Jolie thành DataFrame (với kiểu Vector cho cột 'features')
def create_features_vector(features):
    return Vectors.dense(features)

# Chuyển dữ liệu của Angelina Jolie thành DataFrame
angelina_df = spark.createDataFrame([(create_features_vector(angelina_jolie_features.tolist()),)], ["features"])

# Dự đoán
angelina_prediction = model.transform(angelina_df).select("prediction").collect()[0][0]
print(f"Angelina Jolie - Prediction: {'Heart Disease' if angelina_prediction == 1.0 else 'No Heart Disease'}")

# Dừng Spark session
spark.stop()


Model accuracy: 52.17%
Angelina Jolie - Prediction: No Heart Disease


In [7]:
# Decision Tree Classifier dự đoán bệnh tim cho Angelina Jolie
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Khởi tạo Spark session
spark = SparkSession.builder.appName("DecisionTreeClassifier").getOrCreate()

# Hàm tạo dữ liệu ngẫu nhiên với 13 đặc trưng
def generate_data(num_samples):
    data = []
    for _ in range(num_samples):
        label = float(np.random.choice([0.0, 1.0]))  # Chuyển đổi thành float
        features = [float(np.random.uniform(1.0, 20.0)) for _ in range(13)]
        data.append((label,) + tuple(features))
    return data

# Sinh 50 dòng dữ liệu
data = generate_data(50)

# Định nghĩa schema cho DataFrame
schema = StructType([
    StructField("label", FloatType(), False),
    *[StructField(f"feature{i}", FloatType(), False) for i in range(1, 14)]  # 13 đặc trưng
])

# Tạo DataFrame từ dữ liệu
df = spark.createDataFrame(data, schema)

# Chuyển đổi các cột đặc trưng thành một vector
assembler = VectorAssembler(
    inputCols=[f"feature{i}" for i in range(1, 14)],
    outputCol="features"
)

df = assembler.transform(df)
df = df.select("label", "features")

# Tạo mô hình DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Chia dữ liệu thành tập huấn luyện và tập kiểm tra
(training_data, test_data) = df.randomSplit([0.7, 0.3])

# Huấn luyện mô hình
model = dt.fit(training_data)

# Dự đoán trên tập kiểm tra
predictions = model.transform(test_data)

# Đánh giá mô hình
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model accuracy: {accuracy * 100:.2f}%")

# Dự đoán cho một bệnh nhân cụ thể
# Ví dụ về đặc trưng của bệnh nhân với 13 đặc trưng
angelina_features = [53.0, 1.0, 0.0, 140.0, 203.0, 1.0, 0.0, 155.0, 1.0, 3.1, 0.0, 0.0, 3.0]

# Chuyển đổi đặc trưng thành DataFrame
angelina_df = spark.createDataFrame(
    [tuple(angelina_features)],
    schema=StructType([StructField(f"feature{i}", FloatType(), False) for i in range(1, 14)])
)

# Chuyển đổi các cột đặc trưng thành một vector
angelina_df = assembler.transform(angelina_df)

# Dự đoán
angelina_prediction = model.transform(angelina_df).select("prediction").collect()[0][0]
print(f"Angelina Jolie - Prediction: {'Heart Disease' if angelina_prediction == 1.0 else 'No Heart Disease'}")

# Dừng Spark session
spark.stop()


Model accuracy: 62.50%
Angelina Jolie - Prediction: No Heart Disease
