In [37]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('stroke')\
    .config("spark.sql.debug.maxToStringFields", "100")\
    .getOrCreate()

In [38]:
# import data with header and auto-datatype
data = spark.read.csv('brain_stroke.csv', header=True, inferSchema=True)

In [39]:
# data glance
data.show(5)

data.columns

+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
|Female|79.0|           1|            0|         Yes|Self-employed|         Rural|           174.12|24.0|   never smoked|     1|
|  Male|81.0|           0|            0|         Yes|      Private|         Urban|           186.

['gender',
 'age',
 'hypertension',
 'heart_disease',
 'ever_married',
 'work_type',
 'Residence_type',
 'avg_glucose_level',
 'bmi',
 'smoking_status',
 'stroke']

In [40]:
data.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [41]:
from pyspark.sql.functions import col, count, mean, stddev, abs
from pyspark.sql.types import FloatType, DoubleType, IntegerType, LongType, ShortType, StringType

# 总行数
total_rows = data.count()

# 计算每列的非空（non-null）值的比例
complete_rates = {column: (data.filter(col(column).isNotNull()).count() / total_rows) * 100 
                  for column in data.columns}

# 输出
print("Complete Rates:")
for column, rate in complete_rates.items():
    print(f"{column}: {rate:.2f}%")

# 计算每列的均值和标准差
from itertools import chain

# 计算每列的均值和标准差
mean_stddev = data.select(*chain(*[(mean(col(column)).alias(f"{column}_mean"), 
                                   stddev(col(column)).alias(f"{column}_stddev")) 
                                  for column in data.columns if data.schema[column].dataType in 
                                  [FloatType(), DoubleType(), IntegerType(), LongType(), ShortType()]]))


# 将结果从 Row 转换为字典以便使用
mean_stddev_values = {**mean_stddev.first().asDict()}

# 计算 Z 分数
# Z = (X-mean)/std
for column in data.columns:
    if data.schema[column].dataType in [FloatType(), DoubleType(), IntegerType(), LongType(), ShortType()]:
        data = data.withColumn(f"{column}_zscore", 
                           (col(column) - mean_stddev_values[f"{column}_mean"]) / 
                           mean_stddev_values[f"{column}_stddev"])

# 指定的阈值,设置为3.2*std
threshold = 3.2

# 计算每个列的离群值的比例
outlier_percentages = {column: (data.filter(col(f"{column}_zscore").isNotNull() & 
                                       (abs(col(f"{column}_zscore")) > threshold)).count() / total_rows) * 100 
                       for column in data.columns if f"{column}_zscore" in data.columns}

# 输出
print("\nOutlier Percentages:")
for column, percentage in outlier_percentages.items():
    print(f"{column}: {percentage:.2f}%")



Complete Rates:
gender: 100.00%
age: 100.00%
hypertension: 100.00%
heart_disease: 100.00%
ever_married: 100.00%
work_type: 100.00%
Residence_type: 100.00%
avg_glucose_level: 100.00%
bmi: 100.00%
smoking_status: 100.00%
stroke: 100.00%

Outlier Percentages:
age: 0.00%
hypertension: 0.00%
heart_disease: 5.52%
avg_glucose_level: 0.50%
bmi: 0.00%
stroke: 4.98%


In [42]:
count_all = data.groupBy(data.columns)\
    .count()\
    .where("count > 1")

print(count_all)
# no duplicates

# 与原始数据集合并，只保留重复的行
duplicates = data.join(count_all, on=data.columns, how="inner")

# 显示重复行
duplicates.show(5)

DataFrame[gender: string, age: double, hypertension: int, heart_disease: int, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: double, bmi: double, smoking_status: string, stroke: int, age_zscore: double, hypertension_zscore: double, heart_disease_zscore: double, avg_glucose_level_zscore: double, bmi_zscore: double, stroke_zscore: double, count: bigint]
+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+----------+-------------------+--------------------+------------------------+----------+-------------+-----+
|gender|age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|bmi|smoking_status|stroke|age_zscore|hypertension_zscore|heart_disease_zscore|avg_glucose_level_zscore|bmi_zscore|stroke_zscore|count|
+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+----------+-----------------

In [43]:
from pyspark.sql.functions import when, col, udf

# 将 gender 和 ever_married 列映射到整数
data = data.withColumn("gender", when(col("gender") == "Male", 1).otherwise(0))
data = data.withColumn("ever_married", when(col("ever_married") == "Yes", 1).otherwise(0))

# 定义用于创建新特征的 UDFs
def map_bmi(bmi):
    if bmi < 18.5:
        return 'underweight'
    elif bmi < 23.9:
        return 'normal weight'
    elif bmi < 27.9:
        return 'overweight'
    else:
        return 'obese'

def map_avg_glucose_level(avg_glucose_level):
    if avg_glucose_level < 70:
        return 'low blood sugar'
    elif avg_glucose_level < 100:
        return 'normal'
    else:
        return 'diabetes'

udf_map_bmi = udf(map_bmi, StringType())
udf_map_avg_glucose_level = udf(map_avg_glucose_level, StringType())

# 应用 UDFs 创建新列
data = data.withColumn("bmi_", udf_map_bmi("bmi"))
data = data.withColumn("avg_glucose_level_", udf_map_avg_glucose_level("avg_glucose_level"))

# 将年龄分箱
data = data.withColumn("age_", when(col("age") <= 20, "0-20")\
    .when((col("age") > 20) & (col("age") <= 40), "21-40")\
    .when((col("age") > 40) & (col("age") <= 60), "41-60")\
    .when((col("age") > 60) & (col("age") <= 80), "61-80")\
    .otherwise("80+"))

# 描述性统计和数据信息
data.printSchema()


root
 |-- gender: integer (nullable = false)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: integer (nullable = false)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- age_zscore: double (nullable = true)
 |-- hypertension_zscore: double (nullable = true)
 |-- heart_disease_zscore: double (nullable = true)
 |-- avg_glucose_level_zscore: double (nullable = true)
 |-- bmi_zscore: double (nullable = true)
 |-- stroke_zscore: double (nullable = true)
 |-- bmi_: string (nullable = true)
 |-- avg_glucose_level_: string (nullable = true)
 |-- age_: string (nullable = false)



In [46]:
data.show(2, vertical=True)

-RECORD 0----------------------------------------
 gender                   | 1                    
 age                      | 67.0                 
 hypertension             | 0                    
 heart_disease            | 1                    
 ever_married             | 1                    
 work_type                | Private              
 Residence_type           | Urban                
 avg_glucose_level        | 228.69               
 bmi                      | 36.6                 
 smoking_status           | formerly smoked      
 stroke                   | 1                    
 age_zscore               | 1.0404798735788086   
 hypertension_zscore      | -0.32615302576787664 
 heart_disease_zscore     | 4.136337954276694    
 avg_glucose_level_zscore | 2.7231375098735904   
 bmi_zscore               | 1.1931183772210536   
 stroke_zscore            | 4.3681627630148325   
 bmi_                     | obese                
 avg_glucose_level_       | diabetes             
