In [17]:
import os
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col

In [18]:
# AWS key
with open(r'C:\\Users\\yousr\\Documents\\healthstream\\utils\\config.json') as config_file:
    config = json.load(config_file)
    aws_access_key_id = config['aws_access_key_id']
    aws_secret_access_key = config['aws_secret_access_key']

In [19]:
# init spark session
spark = SparkSession.builder \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.0") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .appName("HealthData") \
    .config("spark.network.timeout", "5000s") \
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
    .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

In [20]:
# def data schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("disease", StringType(), True),
    StructField("measurement_type", StringType(), True),
    StructField("measurement_value", FloatType(), True),
    StructField("timestamp", StringType(), True)
])

In [21]:
# reading parquet files from S3 bucket
data = spark.read.parquet("s3a://datareco/parquets/")
data = data.withColumn("value", from_json(col("value"), schema))
data = data.select("value.*")
data.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- disease: string (nullable = true)
 |-- measurement_type: string (nullable = true)
 |-- measurement_value: float (nullable = true)
 |-- timestamp: string (nullable = true)



In [22]:
data.show(10)

+-------------------+---+------------+----------------+-----------------+-------------------+
|               name|age|     disease|measurement_type|measurement_value|          timestamp|
+-------------------+---+------------+----------------+-----------------+-------------------+
|Dr. Rebekah Brennan| 80|Hypertension|  Blood Pressure|        123.79914|2024-03-07 12:05:57|
|   Elizabeth Fuller| 66|Hypertension|     Temperature|       111.713684|2024-03-07 12:00:24|
|       David Robles| 47|  Depression|  Blood Pressure|        166.61244|2024-03-07 12:15:27|
|      Allison Smith| 20|  Depression|  Blood Pressure|         52.54117|2024-03-07 12:14:25|
|     Jennifer Owens| 58|  Depression|         Glucose|         96.77128|2024-03-07 12:11:59|
|   Christopher Cole| 42|Hypertension|         Glucose|       106.533005|2024-03-07 12:16:25|
|        James Smith| 21|  Depression|     Cholesterol|        173.39534|2024-03-07 12:08:52|
|    Megan Alexander| 74|    Diabetes|  Blood Pressure|     

In [23]:
# filtering data for patients with diabetes
diabetes_data = data.filter(data.disease == "Diabetes")
print("number of patients with diabetes:", diabetes_data.count())

number of patients with diabetes: 35


In [24]:
# calculation avg measurement values for patients with diabetes
avg_measurement = diabetes_data.select("measurement_value").groupBy().avg().collect()[0][0]
print("average measurement values for patients with diabetes:", avg_measurement)

average measurement values for patients with diabetes: 120.6552855355399


In [25]:
# stop spark session
spark.stop()