In [1]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession, Row, functions as F, types as T
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, DoubleType
import math
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [None]:

spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("Covid_Hive_HDFS") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.warehouse.dir", "hdfs://hadoop-namenode:9000/user/hive/warehouse") \
    .config("spark.local.dir", "/tmp/spark") \
    .config("spark.sql.hive.metastore.version", "4.0.1") \
    .config("spark.sql.hive.metastore.jars", "/opt/hive/lib/*") \
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [39]:
csv_path = "hdfs://hadoop-namenode:9000/user/hadoop/metadata/metadata_cleaned.csv"


schema = StructType([
    StructField("patientid", StringType(), True),
    StructField("offset", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("age", StringType(), True),
    StructField("finding", StringType(), True),
    StructField("view", StringType(), True),
    StructField("modality", StringType(), True),
    StructField("date", StringType(), True),
    StructField("location", StringType(), True),
    StructField("folder", StringType(), True),
    StructField("filename", StringType(), True),
    StructField("url", StringType(), True),
    StructField("license", StringType(), True),
    StructField("clinical_notes", StringType(), True),
    StructField("age_group", StringType(), True),
    StructField("has_covid", StringType(), True)
])

df = spark.read \
    .option("header", True) \
    .option("multiLine", True) \
    .option("quote", '"') \
    .option("escape", '"') \
    .schema(schema) \
    .csv(csv_path)

In [40]:
df.show()

+---------+------+---+----+---------+----+--------+----------------+--------------------+------+--------------------+--------------------+--------+--------------------+-----------+---------+
|patientid|offset|sex| age|  finding|view|modality|            date|            location|folder|            filename|                 url| license|      clinical_notes|  age_group|has_covid|
+---------+------+---+----+---------+----+--------+----------------+--------------------+------+--------------------+--------------------+--------+--------------------+-----------+---------+
|        2|   0.0|  M|65.0| COVID-19|  PA|   X-ray|January 22, 2020|Cho Ray Hospital,...|images|auntminnie-a-2020...|https://www.nejm....|    None|On January 22, 20...|     Senior|     True|
|        2|   3.0|  M|65.0| COVID-19|  PA|   X-ray|January 25, 2020|Cho Ray Hospital,...|images|auntminnie-b-2020...|https://www.nejm....|    None|On January 22, 20...|     Senior|     True|
|        2|   5.0|  M|65.0| COVID-19|  PA|   

In [None]:
# Запись с партиционированием и бакетированием
df.write \
    .mode("overwrite") \
    .partitionBy("finding", "age_group") \
    .bucketBy(8, "sex", "view") \
    .sortBy("age") \
    .format("parquet") \
    .saveAsTable("covid_metadata_partitioned")

                                                                                

In [None]:
# SQL запрос для анализа данных
query1 = spark.sql("""
    SELECT patientid, finding, age,
           AVG(age) OVER (PARTITION BY finding) as avg_age_by_finding
    FROM covid_metadata_partitioned
    WHERE age IS NOT NULL
""")

query1.show()

query1.write.mode("overwrite").parquet(
    "hdfs://hadoop-namenode:9000/user/hadoop/covid_dataset/processed/query1_top_patients.parquet"
)

+---------+--------+----+------------------+
|patientid| finding| age|avg_age_by_finding|
+---------+--------+----+------------------+
|       70|COVID-19|65.0| 55.07534246575342|
|      142|COVID-19|65.0| 55.07534246575342|
|      142|COVID-19|65.0| 55.07534246575342|
|      142|COVID-19|65.0| 55.07534246575342|
|      142|COVID-19|65.0| 55.07534246575342|
|      143|COVID-19|65.0| 55.07534246575342|
|      199|COVID-19|65.0| 55.07534246575342|
|      351|COVID-19|65.0| 55.07534246575342|
|      446|COVID-19|65.0| 55.07534246575342|
|      446|COVID-19|65.0| 55.07534246575342|
|      446|COVID-19|65.0| 55.07534246575342|
|      446|COVID-19|65.0| 55.07534246575342|
|      305|COVID-19|66.0| 55.07534246575342|
|      305|COVID-19|66.0| 55.07534246575342|
|      307|COVID-19|66.0| 55.07534246575342|
|      151|COVID-19|67.0| 55.07534246575342|
|      155|COVID-19|67.0| 55.07534246575342|
|      320|COVID-19|67.0| 55.07534246575342|
|      320|COVID-19|67.0| 55.07534246575342|
|      165

In [None]:
# Определение UDF для группировки по возрасту
df_with_udf = df

def age_group(age):
    try:
        age = float(age)
    except (TypeError, ValueError):
        return 'Unknown'
    if math.isnan(age):
        return 'Unknown'
    elif age < 1:
        return 'Infant'
    elif age < 5:
        return 'Toddler'
    elif age < 12:
        return 'Child'
    elif age < 18:
        return 'Teen'
    elif age < 35:
        return 'Young Adult'
    elif age < 50:
        return 'Adult'
    elif age < 65:
        return 'Middle-aged'
    elif age < 80:
        return 'Senior'
    else:
        return 'Elderly'

age_group_udf = udf(age_group, StringType())

df_with_udf = df_with_udf.withColumn("age_group_2", age_group_udf(df.age))

df_with_udf.show(5)

df_with_udf.write.mode("overwrite").parquet(
     "hdfs://hadoop-namenode:9000/user/hadoop/covid_dataset/processed/with_udf.parquet"
)

+---------+------+---+----+--------+----+--------+----------------+--------------------+------+--------------------+--------------------+-------+--------------------+-----------+---------+-----------+
|patientid|offset|sex| age| finding|view|modality|            date|            location|folder|            filename|                 url|license|      clinical_notes|  age_group|has_covid|age_group_2|
+---------+------+---+----+--------+----+--------+----------------+--------------------+------+--------------------+--------------------+-------+--------------------+-----------+---------+-----------+
|        2|   0.0|  M|65.0|COVID-19|  PA|   X-ray|January 22, 2020|Cho Ray Hospital,...|images|auntminnie-a-2020...|https://www.nejm....|   None|On January 22, 20...|     Senior|     True|     Senior|
|        2|   3.0|  M|65.0|COVID-19|  PA|   X-ray|January 25, 2020|Cho Ray Hospital,...|images|auntminnie-b-2020...|https://www.nejm....|   None|On January 22, 20...|     Senior|     True|     Sen

                                                                                

In [68]:
# JOIN metadata + view_lookup
views = [
        Row(view="PA", view_desc="Posteroanterior view"),
        Row(view="AP", view_desc="Anteroposterior view"),
        Row(view="LATERAL", view_desc="Lateral view")
]
views_df = spark.createDataFrame(views)
views_df.createOrReplaceTempView("view_lookup")

query2 = spark.sql(
    """
        SELECT m.patientid, m.finding, m.view, v.view_desc
        FROM covid_metadata_partitioned m
        LEFT JOIN view_lookup v
        ON m.view = v.view
    """
)

query2.write.mode("overwrite").parquet(
     "hdfs://hadoop-namenode:9000/user/hadoop/covid_dataset/processed/query2_view_lookup_join.parquet"
)

query2.show(5)

+---------+--------+----+--------------------+
|patientid| finding|view|           view_desc|
+---------+--------+----+--------------------+
|       70|COVID-19|  AP|Anteroposterior view|
|      142|COVID-19|  AP|Anteroposterior view|
|      142|COVID-19|  AP|Anteroposterior view|
|      142|COVID-19|  AP|Anteroposterior view|
|      142|COVID-19|  AP|Anteroposterior view|
+---------+--------+----+--------------------+
only showing top 5 rows


In [70]:
# Подзапрос для расчёта процента COVID-19
query3 = spark.sql("""
    SELECT
        'COVID-19' as diagnosis,
        covid_count,
        total_count,
        ROUND(covid_count / total_count * 100, 2) as covid_percent
    FROM (
        SELECT
            SUM(CASE WHEN finding = 'COVID-19' THEN 1 ELSE 0 END) as covid_count,
            COUNT(*) as total_count
        FROM covid_metadata_partitioned
    )
""")

query3.show()

query3.write.mode("overwrite").parquet(
     "hdfs://hadoop-namenode:9000/user/hadoop/covid_dataset/processed/query3_covid_stats.parquet"
)

+---------+-----------+-----------+-------------+
|diagnosis|covid_count|total_count|covid_percent|
+---------+-----------+-----------+-------------+
| COVID-19|        584|        950|        61.47|
+---------+-----------+-----------+-------------+



In [84]:
# ------------------------------------------------------------
# ML — кластеризация KMeans
# ------------------------------------------------------------
indexer = StringIndexer(inputCol="sex", outputCol="sex_index", handleInvalid="keep")
df_ml = indexer.fit(df).transform(df)

assembler = VectorAssembler(
    inputCols=["age", "sex_index"],
    outputCol="features"
)
df_ml = df_ml.withColumn("age", col("age").cast(DoubleType()))
df_ml = assembler.transform(df_ml).na.drop(subset=["features"])

# Кластеризация
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df_ml)
predictions = model.transform(df_ml)

predictions.select("patientid", "age", "sex", "prediction").show(5)

predictions.write.mode("overwrite").parquet(
    "hdfs://hadoop-namenode:9000/user/hadoop/covid_dataset/processed/kmeans_clusters.parquet"
)

25/07/17 20:06:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


+---------+----+---+----------+
|patientid| age|sex|prediction|
+---------+----+---+----------+
|        2|65.0|  M|         0|
|        2|65.0|  M|         0|
|        2|65.0|  M|         0|
|        2|65.0|  M|         0|
|        4|52.0|  F|         1|
+---------+----+---+----------+
only showing top 5 rows


In [None]:
spark.stop()