In [0]:
# tiers = ["bronze","silver","gold"]
# adl_paths = {tier : f'abfss://{tier}@myfirststorageaccount271.dfs.core.windows.net/' for tier in tiers}

# bronze_adls = adl_paths['bronze']
# silver_adls = adl_paths['silver']
# gold_adls = adl_paths['gold']

# dbutils.fs.ls(bronze_adls)
# dbutils.fs.ls(silver_adls)
# dbutils.fs.ls(gold_adls)


# import requests
# import json
# from datetime import datetime
# from datetime import date, timedelta

# start_date = date.today() - timedelta(1)
# end_date = date.today()

#### Fetch data passed from Bronze Task.

In [0]:
bronze_output = dbutils.jobs.taskValues.get(taskKey='Bronze',key='bronze_output')
start_date = bronze_output['start_date']
end_date = bronze_output['end_date']

bronze_adls = bronze_output['bronze_adls']
silver_adls = bronze_output['silver_adls']

print(f'Start Date {start_date}, End Date {end_date}')

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta


In [0]:
df = spark.read.option("multiline","true").json(f"{bronze_adls}/{start_date}_earthquake_data.json")

#### Rename columns for better readibility

In [0]:
df = (

    df.select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.mag').alias('mag'),
        col('properties.sig').alias('sig'),
        col('properties.magType').alias('magnitude_type'),
        col('properties.place').alias('place_description'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [0]:
df = (
    df
    .withColumn('longitude',when(isnull(col('longitude')),0).otherwise(col('longitude')))
    .withColumn('latitude',when(isnull(col('latitude')),0).otherwise(col('latitude')))
    .withColumn('time',when(isnull(col('time')),0).otherwise(col('time')))
)

In [0]:
df = (
    df
    .withColumn('time',(col('time') / 1000).cast(TimestampType()))
    .withColumn('updated',(col('updated') / 1000).cast(TimestampType()))
)

#### Write Parquet File to Silver Container, send path to data to the Gold Task.

In [0]:
import json
silver_data_path = f"{silver_adls}earthquake_events_silver/"
df.write.mode('append').parquet(silver_data_path)

In [0]:
dbutils.jobs.taskValues.set(key = "silver_output",value = silver_data_path)