In [0]:
from pyspark.sql.functions import from_json, col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ArrayType

# ---------------------------------------------------------
# 1. إعدادات الاتصال بـ Azure Event Hubs
# ---------------------------------------------------------
# نستخدم Connection String التي زودتني بها
eh_namespace = "hospital-analytics-namespace5"
eh_topic = "hospital-analytics-eh"
eh_conn_string = "Endpoint=sb://hospital-analytics-namespace5.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=li1fiELBAIzuyi73e9U9cQoeicbwrHbch+AEhLDHFfs="

# إعدادات Kafka الخاصة بـ Spark للاتصال بـ Event Hubs
# لاحظ: نستخدم المنفذ 9093 وبروتوكول SASL_SSL
# هام: تم استخدام kafkashaded.org.apache.kafka... لحل مشكلة LoginException في Databricks
kafka_options = {
    "kafka.bootstrap.servers": f"{eh_namespace}.servicebus.windows.net:9093",
    "subscribe": eh_topic,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{eh_conn_string}";',
    "startingOffsets": "latest",  # قراءة البيانات الجديدة فقط
    "failOnDataLoss": "false"
}

# ---------------------------------------------------------
# 2. تعريف هيكل البيانات (Schema) لاستقبال الـ JSON
# ---------------------------------------------------------
# مطابق لما يرسله كود الـ Producer الخاص بك
record_schema = StructType([
    StructField("record_id", StringType(), True),
    StructField("admission_time", StringType(), True),
    StructField("discharge_time", StringType(), True),
    StructField("hospital_id", StringType(), True),
    StructField("department", StringType(), True),
    StructField("ICU_admission", BooleanType(), True),
    StructField("diagnosis", StringType(), True),
    StructField("treatment", StringType(), True),
    StructField("arrival_mode", StringType(), True),
    StructField("doctor_id", StringType(), True),
    StructField("medicine_taken", StringType(), True),
    StructField("severity_level", StringType(), True)
])

patient_schema = StructType([
    StructField("patient_id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("full_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("governorate", StringType(), True),
    StructField("address", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("contact_number", StringType(), True),
    StructField("records", ArrayType(record_schema), True)
])

# ---------------------------------------------------------
# 3. قراءة الدفق (Read Stream)
# ---------------------------------------------------------
print("🚀 Starting Stream from Event Hubs...")

df_stream = spark.readStream.format("kafka").options(**kafka_options).load()

# تحويل البيانات من Binary إلى String ثم إلى JSON Structure
df_parsed = df_stream.selectExpr("CAST(value AS STRING) as json_string") \
    .select(from_json(col("json_string"), patient_schema).alias("data")) \
    .select("data.*") \
    .withColumn("ingestion_time", current_timestamp())

# ---------------------------------------------------------
# 4. الكتابة (Write Stream) إلى Bronze
# ---------------------------------------------------------
storage_account_name = "hospitalstorge"
base_path = f"abfss://bronze@{storage_account_name}.dfs.core.windows.net/Realtime"
checkpoint_path = f"abfss://bronze@{storage_account_name}.dfs.core.windows.net/Realtime_Checkpoints"

# الكتابة بصيغة Delta (الأفضل لـ Databricks)
query = df_parsed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .option("path", base_path) \
    .start()

print(f"✅ Streaming started! Writing to: {base_path}")
print(f"📂 Checkpoints at: {checkpoint_path}")

# انتظار الدفق (ضروري لكي لا يتوقف السكريبت)
query.awaitTermination()