**Silver Notebook - Processing the data from bronze1**

In [None]:
import json

# Access parameters directly
bronze1_output = ""

In [None]:
# Parse the JSON string
output_data = json.loads(bronze1_output)

# Access individual variables
start_date = output_data.get("start_date")
silver_adls = output_data.get("silver_adls")
bronze1_adls = output_data.get("bronze1_adls")

print(f"Start Date: {start_date}")
print(f"Silver ADLS: {silver_adls}")
print(f"bronze1 ADLS: {bronze1_adls}")

In [None]:
'''
from datetime import date, timedelta

# Remove this before running Data Factory Pipeline
start_date = date.today() - timedelta(1)

bronze1_adls = "abfss://bronze1@storage999.dfs.core.windows.net/"
silver_adls = "abfss://silver@storage999.dfs.core.windows.net/"
'''


In [None]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [None]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze1_adls}{start_date}_earthquake_data.json")

In [None]:
df

In [None]:
df.head()

In [None]:
# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [None]:
df

In [None]:
df.head()

In [None]:
display(df)

In [None]:
# Validate data: Check for missing or null values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [None]:
# Convert 'time' and 'updated' to timestamp from Unix time
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)

In [None]:
df.head()

In [None]:
# Save the transformed DataFrame to the Silver container
silver_data = f"{silver_adls}earthquake_events_silver/"

In [21]:
# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_data)

StatementMeta(hourpool, 12, 21, Finished, Available, Finished)

In [22]:
mssparkutils.notebook.exit(silver_data)

StatementMeta(hourpool, 12, 22, Finished, Available, Finished)

ExitValue: abfss://silver@storage999.dfs.core.windows.net/earthquake_events_silver/