In [0]:
import requests
from io import BytesIO
import datetime

time = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
map_key = dbutils.secrets.get(scope="poc5", key="map_key")

endpoint = "https://firms.modaps.eosdis.nasa.gov/api/country/csv/" + map_key + "/VIIRS_SNPP_NRT/POL/1"
# for initial batch
#endpoint="https://firms.modaps.eosdis.nasa.gov/api/country/csv/-- key --"/VIIRS_SNPP_NRT/POL/10/2024-02-20"
#loads data from today-1 at the end
landing = "dbfs:/user/POC5/ingest/fires-poland-" + time + ".csv"

response = requests.get(endpoint)
response.raise_for_status()

dbutils.fs.put(landing, response.text)
#binary_content = BytesIO(response.content)
daily_load = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv(landing)
      )
daily_load.createOrReplaceTempView("daily_load")
display(daily_load)
#daily_load.printSchema()
spark.sql("select * from daily_load ").count()





Wrote 133 bytes.


country_id,latitude,longitude,bright_ti4,scan,track,acq_date,acq_time,satellite,instrument,confidence,version,bright_ti5,frp,daynight


0

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, IntegerType



#to upsert df, its best to use change data feed, with internal databricks table
schema = StructType([
    StructField("country_id", StringType(), nullable=True),
    StructField("latitude", DoubleType(), nullable=True),
    StructField("longitude", DoubleType(), nullable=True),
    StructField("bright_ti4", DoubleType(), nullable=True),
    StructField("scan", DoubleType(), nullable=True),
    StructField("track", DoubleType(), nullable=True),
    StructField("acq_date", DateType(), nullable=True),
    StructField("acq_time", IntegerType(), nullable=True),
    StructField("satellite", StringType(), nullable=True),
    StructField("instrument", StringType(), nullable=True),
    StructField("confidence", StringType(), nullable=True),
    StructField("version", StringType(), nullable=True),
    StructField("bright_ti5", DoubleType(), nullable=True),
    StructField("frp", DoubleType(), nullable=True),
    StructField("daynight", StringType(), nullable=True)
])
# (daily_load.write
#     .format("delta")
#     .mode("append")
#     .saveAsTable("fires",schema=schema, partitionBy=["acq_date","acq_time"])
#     )
     #format=None, mode=None, partitionBy=None
# %sql
# --delete  from fires

In [0]:
# hive_metastore.default.fires
# x= dbutils.fs.ls("/user/hive/warehouse/fires/acq_date=2024-03-08/acq_time=130")
# display(x)


In [0]:
%sql
select * from daily_load
--drop table fires
--delete  from fires

country_id,latitude,longitude,bright_ti4,scan,track,acq_date,acq_time,satellite,instrument,confidence,version,bright_ti5,frp,daynight


In [0]:
%sql
MERGE INTO fires AS target
USING daily_load AS source
ON target.latitude = source.latitude AND target.longitude = source.longitude
AND target.acq_date = source.acq_date
AND target.acq_time = source.acq_time
AND target.bright_ti4 = source.bright_ti4
WHEN NOT MATCHED THEN
  INSERT (country_id, latitude, longitude, bright_ti4, scan, track, acq_date, acq_time, satellite, instrument, confidence, version, bright_ti5, frp, daynight)
  VALUES (source.country_id, source.latitude, source.longitude, source.bright_ti4, source.scan, source.track, source.acq_date, source.acq_time, source.satellite, source.instrument, source.confidence, source.version, source.bright_ti5, source.frp, source.daynight)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
13,0,0,13


In [0]:
%sql
--select * from fires

In [0]:
fires = spark.table("fires")

In [0]:
from pyspark.sql.functions import current_date

today_fires = fires.filter(fires.acq_date == current_date())
display(today_fires)

country_id,latitude,longitude,bright_ti4,scan,track,acq_date,acq_time,satellite,instrument,confidence,version,bright_ti5,frp,daynight
POL,50.03705,18.48552,299.54,0.45,0.47,2024-03-08,132,N,VIIRS,n,2.0NRT,272.51,0.71,N
POL,50.33119,19.28671,318.43,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,270.93,2.76,N
POL,50.33535,19.28925,311.88,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,271.63,1.75,N
POL,50.33694,19.28292,296.84,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,273.17,1.69,N
POL,50.34246,19.33705,299.36,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,274.49,1.27,N
POL,50.34273,18.86546,298.46,0.45,0.47,2024-03-08,132,N,VIIRS,n,2.0NRT,269.97,1.43,N
POL,50.53555,17.97452,296.87,0.41,0.45,2024-03-08,132,N,VIIRS,n,2.0NRT,272.33,0.93,N
POL,50.93123,21.67787,300.79,0.56,0.52,2024-03-08,130,N,VIIRS,n,2.0NRT,270.06,1.32,N
POL,51.68808,15.97798,334.1,0.5,0.41,2024-03-08,130,N,VIIRS,n,2.0NRT,274.27,10.74,N
POL,54.4249,16.85785,316.68,0.47,0.4,2024-03-08,130,N,VIIRS,n,2.0NRT,269.74,2.05,N


Databricks visualization. Run in Databricks to view.

In [0]:
(fires.write
 .format("jdbc")
 .option("url", dbutils.secrets.get(scope="poc5", key="jdbc_url"))
 .option("dbtable", "poc5_table")
 .option("user", "woj_admi")
 .option("password", dbutils.secrets.get(scope="poc5", key="jdbc_pass"))
 .mode("append")
 .save()
)

In [0]:
%sql
--delete   from fires
--drop table fires

In [0]:
#adls setup
save_to_path = "fires-poland-" + time
container = "poc5"
storage_account = "poc5wojtek"
storage_account_key=dbutils.secrets.get(scope="poc5", key="storage_account_key")
spark.conf.set(
    "fs.azure.account.key." + storage_account + ".dfs.core.windows.net",
    storage_account_key
)


In [0]:
#dump to adls
(daily_load.write
  .format("csv")
  .option("header", True)
  .save("abfss://" + container + "@" + storage_account + ".dfs.core.windows.net/" + save_to_path + ".csv"))
display(daily_load)

country_id,latitude,longitude,bright_ti4,scan,track,acq_date,acq_time,satellite,instrument,confidence,version,bright_ti5,frp,daynight
POL,50.93123,21.67787,300.79,0.56,0.52,2024-03-08,130,N,VIIRS,n,2.0NRT,270.06,1.32,N
POL,51.68808,15.97798,334.1,0.5,0.41,2024-03-08,130,N,VIIRS,n,2.0NRT,274.27,10.74,N
POL,54.4249,16.85785,316.68,0.47,0.4,2024-03-08,130,N,VIIRS,n,2.0NRT,269.74,2.05,N
POL,50.03705,18.48552,299.54,0.45,0.47,2024-03-08,132,N,VIIRS,n,2.0NRT,272.51,0.71,N
POL,50.33119,19.28671,318.43,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,270.93,2.76,N
POL,50.33535,19.28925,311.88,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,271.63,1.75,N
POL,50.33694,19.28292,296.84,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,273.17,1.69,N
POL,50.34246,19.33705,299.36,0.47,0.48,2024-03-08,132,N,VIIRS,n,2.0NRT,274.49,1.27,N
POL,50.34273,18.86546,298.46,0.45,0.47,2024-03-08,132,N,VIIRS,n,2.0NRT,269.97,1.43,N
POL,50.53555,17.97452,296.87,0.41,0.45,2024-03-08,132,N,VIIRS,n,2.0NRT,272.33,0.93,N


In [0]:
#clear the storage container-- 
# dbutils.fs.rm("abfss://" + container + "@" + storage_account + ".dfs.core.windows.net/",True)