In [1]:
!pip install pyspark==3.5.1

Collecting pyspark==3.5.1
  Using cached pyspark-3.5.1-py2.py3-none-any.whl
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.0
    Can't uninstall 'pyspark'. No files were found to uninstall.
Successfully installed pyspark-3.5.1


In [52]:
import os

import pandas as pd
import numpy as np
import yaml
import sys

import warnings
warnings.filterwarnings("ignore")

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import udf

import pytz

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'

In [8]:
import socket
# проверка доступности namenode
socket.gethostbyname("namenode")

'172.19.0.4'

In [9]:
pip show pyspark

Name: pyspark
Version: 3.5.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/spark/python
Requires: py4j
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [10]:
!python --version
!java -version

Python 3.11.6
openjdk version "17.0.8.1" 2023-08-24
OpenJDK Runtime Environment (build 17.0.8.1+1-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.8.1+1-Ubuntu-0ubuntu122.04, mixed mode, sharing)


In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HDFS Project") \
    .getOrCreate()

spark

In [14]:
# считываем данные из файла-"паркета"
df = spark.read.parquet("hdfs://namenode:9000/data/covid_dataset/metadata/metadata.parquet")
df.count()

446

In [31]:
# создаем таблицу с партиционирование и бакетирование и сохраняем в нее данные
(
    df    
    .write
    .mode("overwrite")
    .format("parquet")
    .partitionBy("finding")
    .bucketBy(5, "sex","age")    
    .saveAsTable("tbl_covid")
)

In [33]:
# Либо можно было напрямую запросом создать таблицу
create_table_query = """
CREATE TABLE IF NOT EXISTS tbl_covid (
    patientid INT,
    offset INT,
    sex INT,
    age INT,
    RT_PCR_positive INT,
    survival INT,
    intubated INT,    
    intubation_present INT,
    went_icu INT,
    in_icu INT,
    needed_supplemental_O2 INT,
    temperature INT,
    pO2_saturation INT,
    leukocyte_count INT,
    neutrophil_count INT,
    lymphocyte_count INT,
    folder STRING,
    filename STRING,
    sick INT,
    age_group STRING
)
PARTITIONED BY (finding STRING)
CLUSTERED BY (age,sex) INTO 5 BUCKETS
STORED AS PARQUET
"""

# Выполняем запрос
#spark.sql(create_table_query)

In [32]:
#Проверяем, что данные есть в таблице
df_tbl = spark.table("tbl_covid")

df_tbl.filter("finding = 'covid'").count()

314

In [41]:
# обязательно включить запрос с оконными функциями;
# использовать сложные соединения таблиц;
# применить аналитические подзапросы.

# Набор данных (количество строк),да и самих столбцов не велик, поэтому запросы придуманы очень синтетические
# 1. Количество заболевших с группировкой по заболеванию
spark.sql("""
    SELECT 
        finding, count(*)
FROM tbl_covid
    GROUP BY finding
    ORDER BY count(*) desc
;
""").show()

+------------+--------+
|     finding|count(1)|
+------------+--------+
|       covid|     314|
|   pneumonia|     112|
|tuberculosis|      11|
|          no|       9|
+------------+--------+



In [44]:
# ранжирование пациентов по параметру lymphocyte_count в обратном порядке
spark.sql("""
    SELECT 
        patientid, age, sex, lymphocyte_count,
        RANK() OVER(ORDER BY lymphocyte_count desc) as lymphocyte_ranking
FROM tbl_covid
WHERE lymphocyte_count is not null
;
""").show()

+---------+----+---+----------------+------------------+
|patientid| age|sex|lymphocyte_count|lymphocyte_ranking|
+---------+----+---+----------------+------------------+
|      200|88.0|1.0|           131.0|                 1|
|      314|21.0|1.0|            10.1|                 2|
|      242|24.0|1.0|             2.9|                 3|
|      248|24.0|1.0|             2.8|                 4|
|      230|24.0|1.0|             2.7|                 5|
|      223|24.0|1.0|             1.8|                 6|
|      149|40.0|1.0|            1.73|                 7|
|      227|24.0|0.0|             1.6|                 8|
|      245|24.0|1.0|             1.4|                 9|
|      148|41.0|1.0|             1.3|                10|
|      228|24.0|0.0|             1.2|                11|
|       94|31.0|0.0|             1.2|                11|
|      240|24.0|1.0|             1.1|                13|
|      256|24.0|1.0|             1.1|                13|
|      246|24.0|1.0|           

In [46]:
# Найдем пациентов с одним диагнозом, получившим аналогичные вмешательства (например, интубацию и кислородотерапию),
# но один выжил, а другой нет, и выяснить, как эти факторы повлияли на выживаемость.
spark.sql("""
SELECT
    T1.patientid AS Patient_A,
    T2.patientid AS Patient_B,
    T1.age,
    T1.sex,
    T1.in_icu,
    T1.intubated,
    T1.needed_supplemental_O2,
    CASE
        WHEN T1.survival = 1 THEN 'Survived'
        ELSE 'Deceased'
    END AS Survival_Status_A,
    CASE
        WHEN T2.survival = 1 THEN 'Survived'
        ELSE 'Deceased'
    END AS Survival_Status_B
FROM
    tbl_covid T1
JOIN
    tbl_covid T2
ON
    T1.finding = T2.finding  -- одинаковые диагнозы
    AND T1.patientid <> T2.patientid  -- исключаем совмещение пациента с самим собой
    AND T1.intubated = T2.intubated  -- интубация проведена у обоих пациентов
    AND T1.needed_supplemental_O2 = T2.needed_supplemental_O2  -- нуждалась ли терапия кислородом
WHERE
    T1.survival <> T2.survival  -- пациенты с разной судьбой (один выжил, другой погиб)
ORDER BY
    T1.age;
;
""").show()

+---------+---------+----+---+------+---------+----------------------+-----------------+-----------------+
|Patient_A|Patient_B| age|sex|in_icu|intubated|needed_supplemental_O2|Survival_Status_A|Survival_Status_B|
+---------+---------+----+---+------+---------+----------------------+-----------------+-----------------+
|       13|     326b|35.0|1.0|   0.0|      0.0|                   1.0|         Survived|         Deceased|
|      336|     326b|49.0|1.0|   1.0|      0.0|                   1.0|         Survived|         Deceased|
|       19|     326b|55.0|0.0|   0.0|      0.0|                   1.0|         Survived|         Deceased|
|      349|     326b|57.0|1.0|   0.0|      0.0|                   1.0|         Survived|         Deceased|
|        2|     326b|65.0|1.0|   0.0|      0.0|                   1.0|         Survived|         Deceased|
|     326b|      349|94.0|1.0|  NULL|      0.0|                   1.0|         Deceased|         Survived|
|     326b|      336|94.0|1.0|  NULL|

In [49]:
# 4. Обработка в PySpark.
# Фильтрация снимков с определенным диагнозом и сохранение результатов в Parquet:
# выбрать подмножество данных по определенному критерию (например, только пациенты с диагнозом COVID-19);
# сохранить результат в оптимизированном формате (Parquet/ORC) в HDFS.

df_spark = spark.sql("SELECT * FROM tbl_covid")
# Фильтруем пациентов с положительным результатом на COVID-19
filtered_df = df_spark.filter(df_spark.finding == 'covid')\
                .select("patientid", "sex", "age", "finding", "survival", "temperature", "leukocyte_count", "lymphocyte_count")

# Сохраняем результат в HDFS в формате Parquet
filtered_df.write.parquet("output/spark_filter1.parquet", mode="overwrite")

In [53]:
# Использование UDF (пользовательских функций):
def categorize_age(age):
    if age < 30:
        return "young"
    elif age <= 60:
        return "middle"
    else:
        return "old"

# Регистрация UDF
categorize_age_udf = udf(categorize_age, StringType())

In [55]:
# Функция унификации диагноза
# !!! На предыдущих шагах сделал уже унификацию, поэтому формально функцию реализовываю
def unify_finding(finding):
    if finding.startswith("COVID-19"):
        return "COVID-19"
    return finding

# Регистрация UDF
unify_finding_udf = udf(unify_finding, StringType())

In [63]:
# Применяем UDF
processed_df = df_spark \
                        .withColumn("age_category", categorize_age_udf(df_spark.age)) \
                        .withColumn("finding_unified", unify_finding_udf(df_spark.finding)) 
                 

In [64]:
processed_df.show()

+---------+------+----+----+---------------+--------+---------+------------------+--------+------+----------------------+-----------+--------------+---------------+----------------+----------------+------------+--------------------+----+---------+-----------------+-------+------------+---------------+
|patientid|offset| sex| age|RT_PCR_positive|survival|intubated|intubation_present|went_icu|in_icu|needed_supplemental_O2|temperature|pO2_saturation|leukocyte_count|neutrophil_count|lymphocyte_count|      folder|            filename|sick|age_group|__index_level_0__|finding|age_category|finding_unified|
+---------+------+----+----+---------------+--------+---------+------------------+--------+------+----------------------+-----------+--------------+---------------+----------------+----------------+------------+--------------------+----+---------+-----------------+-------+------------+---------------+
|       19|  27.0| 0.0|55.0|            1.0|     1.0|      0.0|               0.0|     0.0|

In [64]:
spark.stop()