In [0]:
bronze_path = "abfss://rawdata@bigdatacuoiky.dfs.core.windows.net/earthquakes_combined_2020_2025.geojson"
silver_output_path = "abfss://silver@bigdatacuoiky.dfs.core.windows.net/earthquake_events_silver/"
raw_df = spark.read.option("multiline", "true").json(bronze_path)

# Load JSON trực tiếp từ Bronze path
df = spark.read.option("multiline", "true").json(bronze_path)

# Tiền xử lý
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import explode

df = raw_df.select(explode("features").alias("feature"))

# Trích xuất các cột cần thiết từ feature.*
df = df.select(
    col("feature.id").alias("id"),
    col("feature.geometry.coordinates").getItem(0).alias("longitude"),
    col("feature.geometry.coordinates").getItem(1).alias("latitude"),
    col("feature.geometry.coordinates").getItem(2).alias("elevation"),
    col("feature.properties.title").alias("title"),
    col("feature.properties.place").alias("place_description"),
    col("feature.properties.sig").alias("sig"),
    col("feature.properties.mag").alias("mag"),
    col("feature.properties.magType").alias("magType"),
    col("feature.properties.time").alias("time"),
    col("feature.properties.updated").alias("updated")
)

# Gán mặc định nếu null và chuyển đổi kiểu timestamp
df = df.withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))\
       .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))\
       .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))\
       .withColumn('time', (col('time') / 1000).cast(TimestampType()))\
       .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))

# Ghi ra Silver layer
df.write.mode("overwrite").parquet(silver_output_path)


In [0]:
silver_output_path = "abfss://silver@bigdatacuoiky.dfs.core.windows.net/earthquake_events_silver/"
gold_output_path = "abfss://gold@bigdatacuoiky.dfs.core.windows.net/earthquake_events_gold/"

from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import StringType
import reverse_geocoder as rg

df = spark.read.parquet(silver_output_path)

df = df.withColumn("sig_class", when(col("sig") < 100, "Low")
                                 .when((col("sig") >= 100) & (col("sig") < 500), "Moderate")
                                 .otherwise("High"))

df.write.mode("overwrite").parquet(gold_output_path)


In [0]:
df = spark.read.parquet("abfss://gold@bigdatacuoiky.dfs.core.windows.net/earthquake_events_gold/")
df.show(5)
df.printSchema()
df.describe().show()


+------------+------------+-----------+-----------+--------------------+--------------------+---+----+-------+--------------------+--------------------+---------+
|          id|   longitude|   latitude|  elevation|               title|   place_description|sig| mag|magType|                time|             updated|sig_class|
+------------+------------+-----------+-----------+--------------------+--------------------+---+----+-------+--------------------+--------------------+---------+
|ak0219pcfvx9|   -155.2341|     65.771|        4.0|M 0.9 - 54 km E o...|54 km E of Huslia...| 12| 0.9|     ml|2021-07-30 23:59:...|2021-08-13 03:19:...|      Low|
|  ok2021ovmz|-97.50583333|36.97083333|       3.28|M 1.9 - 9 km WSW ...|9 km WSW of Hunne...| 53|1.85|     ml|2021-07-30 23:54:...|2021-08-02 14:25:...|      Low|
|  nc73602856|-119.5573333|    38.4215|       5.14|M 1.6 - 26km ENE ...|26km ENE of Darda...| 39|1.59|     md|2021-07-30 23:54:...|2021-08-07 06:38:...|      Low|
|  tx2021ovmy|-98.6308