# Spark Data Engineering Solution

This is a complete solution to the data engineering aspects of the hack best performed by Python/Spark. 

**Note** XML forecast data provided by the BOM is far easier parsed and persisted by a dataflow2 than in code so not included here. 

Students can start from scratch and land data from the source, or they can start with raw, slightly cleaned (bronze), or merged (silver) data as contained in ``resources.zip``

The final objective is to have an enriched 'Silver' zone table of shipwrecks, with their correct marine zones added.

Be aware, that Parquet does not yet support the ``geometry`` type. Instead of mucking about with WKT, we're using geojson for our Bronze zone.

**Exploring the Data**

A few ideas on how students might want to explore the data along the way can be found in the notebook ``Data Exploration``. Students will probably need to at least perform a cursory glance at the data before writing to Delta, and this notebook has some pretty charts you can use to demo the exploratory process.


## Common Code
This section contains some code common to both Challenge 2 and Challenge 3.

### Install Dependencies

geopandas is needed to work with geojson files.

In [None]:
%pip install geopandas

### Setup Imports & Folder/File Paths

In [None]:
import geopandas as gp
import pyspark.sql.functions as f

# Standard zones (gold can be considered the semantic (power bi dataset) model)
rawFilesFolder = "/lakehouse/default/Files/Raw/"
bronzeFilesFolder = "/lakehouse/default/Files/Bronze/"
silverFilesFolder = "/lakehouse/default/Files/Silver/"


# These files are created by createStudentResources.py 
shipwrecksBronzeFile = f"{bronzeFilesFolder}shipwrecks.geojson"
marineZonesBronzeFile = f"{bronzeFilesFolder}marinezones.geojson"
shipwrecksSilverFile = f"{silverFilesFolder}shipwrecks.json"


### Challenge 2 - Land Ho! - Some Handy Functions to download and unzip files

In [None]:
import os
import urllib
import zipfile
from re import sub

def downloadFile(url, saveFolder):
    filename = os.path.basename(url)
    filepath = os.path.join(saveFolder, filename)
    os.makedirs(saveFolder, exist_ok=True)
    print(f"Downloading file from URL: {url} to {filepath}")
    urllib.request.urlretrieve(url, filepath)
    return(filepath)

def unzipFile(zipfilePath, extractPath):
    print(f"Extracting {zipfilePath} to {extractPath}")
    with zipfile.ZipFile(zipfilePath, 'r') as zip_ref:
        zip_ref.extractall(extractPath)
    os.remove(zipfilePath)

def toPascalCase(s):
  s = sub(r"(_|-)+", " ", s).title().replace(" ", "").replace("*","")
  return ''.join(s)


## Challenge 2 Solutions

Challenge 2 solutions are arranged per dataset - Shipwrecks, Marine Zones and Forecasts

### Shipwrecks Landing From Source - Preferred Github version

A copy of ``Shipwrecks_WAM_002_WA_GDA94_Public.geojson`` is contained in the hack repo to allow easier access if students wish to attempt to download data themselves.

In [None]:
shipwrecksFileUrl = "https://raw.githubusercontent.com/liesel-h/WhatTheHack/xxx-FabricLakehouse/067-FabricLakehouse/Student/Resources/Shipwrecks_WAM_002_WA_GDA94_Public.geojson"
shipwrecksRawFile = downloadFile(shipwrecksFileUrl, shipwrecksRawFolder)

### Shipwrecks Landing From Source - SLIP Version (Only for the advanced student)

This is an example of importing ``Shipwrecks_WAM_002_WA_GDA94_Public_GeoJSON.zip`` from SLIP, provided for completeness. As this requires a SLIP login, a copy of this data is also provided in github, and students should probably use this mirror instead.

In [None]:
def downloadSLIPFile(slipPath, slipFile, saveFolder, userId, password) :
    """
    Downloads a file from West Australian government Shared Location Information Platform (SLIP)

    Code based on https://toolkit.data.wa.gov.au/hc/en-gb/articles/115000962734 
    """

    saveFile = f"{saveFolder}/{slipFile}"
    os.makedirs(saveFile, exist_ok=True)
    
    dataDownloadRequestUrl = "https://direct-download.slip.wa.gov.au/datadownload/{0}/{1}".format(slipPath, slipFile)


    tokenRequestUrl = "https://sso.slip.wa.gov.au/as/token.oauth2"
    tokenRequestHeaders = { 'Authorization' : 'Basic ZGlyZWN0LWRvd25sb2Fk'}
    tokenRequestForm={"grant_type": "password", "username":userId, "password":password}
    tokenResponse = requests.post(tokenRequestUrl, data=tokenRequestForm, headers=tokenRequestHeaders)
    accessToken=json.loads(tokenResponse.text)["access_token"]

    if tokenResponse.status_code == 200:
        print(f"Downloading file from URL: {dataDownloadRequestUrl} to {saveFolder}")
        dataDownloadRequestHeaders = { 'Authorization' : 'Bearer ' + accessToken}
        dataDownloadResponse = requests.get(dataDownloadRequestUrl, headers=dataDownloadRequestHeaders)
        if dataDownloadResponse.status_code == 200:
            with open(saveFile, 'wb') as f:
                f.write(dataDownloadResponse.content)
            
            with zipfile.ZipFile(saveFile, 'r') as zipref:
                geojsonfile=[filename for filename in zipref.namelist() if filename.endswith('.geojson')][0]
                zipref.extractall(saveFolder)
                return f"{saveFolder}/{geojsonfile}"
        else:
            print("Error download file with error " + str(dataDownloadResponse.status_code) + "-" + dataDownloadResponse.text)
    else:
        print("Error getting token: " + str(tokenResponse.status_code) + "-" + tokenResponse.text)


# SLIP username and password
# Storing creds in a notebook is never a good idea, use Key Vault
# mssparkutils.credentials.getSecret('https://SomeKeyVault.vault.azure.net/','SomeSecret')
#
# https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/microsoft-spark-utilities?pivots=programming-language-python

SLIPUsername = ""
SLIPPassword = ""

#The WAM-002 Shipwrecks data https://direct-download.slip.wa.gov.au/datadownload/People_and_Society/Shipwrecks_WAM_002_WA_GDA94_Public_GeoJSON.zip
SLIPFolder="People_and_Society"
SLIPFile="Shipwrecks_WAM_002_WA_GDA94_Public_GeoJSON.zip"
WAMsaveFolder = f"{rawFilesFolder}WAM"

shipwrecksRawFile = downloadSLIPFile(SLIPFolder, SLIPFile, WAMsaveFolder, SLIPUsername, SLIPPassword)

### Shipwrecks Transforming Raw to Bronze

Regardless of origin, the steps to transform the raw geojson to Bronze zone are the same. Here we update the CRS (see Data Exploration for why), camelCase column names, drop/rename some columns then write to our Bronze zone.

As mentioned in the coach notes, parquet does not support ``geometry`` type, so we're saving a geojson as we'll need the geometry later.

In [None]:
shipwrecksRawFile = '/lakehouse/default/Files/Raw/WAM/Shipwrecks_WAM_002_WA_GDA94_Public.geojson'

df_shipwrecks = gp.read_file(shipwrecksRawFile)

df_shipwrecks.to_crs('epsg:3857')

df_shipwrecks.rename(columns=lambda x: toPascalCase(x), inplace=True)
df_shipwrecks.drop(columns={'DateDepth','TimeDepth','MaxDepth','MinDepth','BearingTo','LengthOf','ObjectId','UniqueNum'}, inplace=True)


df_shipwrecks.rename(columns={'TypeOfSi': 'Type', 'DateInspe': 'DateInspected', 'Long':'Lon','CountryBu':'CountryBuilt', 'Constructi': 'Construction','PortRegis':'PortRegistered', 'FileNumbe': 'FileNumber','OfficialN':'OfficialNumber','Aac': 'AAC'}, inplace=True)
df_shipwrecks.set_geometry("Geometry", inplace=True)
df_shipwrecks.to_file(shipwrecksBronzeFile, driver='GeoJSON') 

### Marine Zones Landing From Source

The BOM provides marine zone data as a zipped shapefile via FTP. Again, students can either download this from the source, or preferably start with the raw files in resources.zip.

In [None]:
# WA Marine Forecast Zones IDM000003.zip
bomFtpServer = "ftp://anonymous@ftp.bom.gov.au/"

coastalWatersRawFolder = f"{rawFilesFolder}BOM"
coastalWatersFile = f"{bomFtpServer}anon/gen/fwo/IDW11160.xml"

marineZonesRawFolder = f"{rawFilesFolder}BOM/IDM00003"
marineZonesRawFile = f"{marineZonesRawFolder}/IDM00003.shp"
marineZonesBronzeFile = f"{bronzeFilesFolder}marinezones.geojson"
marineZonesZipFile = f"{bomFtpServer}anon/home/adfd/spatial/IDM00003.zip"

Run to download marine zones (or use the resources.zip version instead)

In [None]:
print("Downloading marine zones data....")
marineZonesDownloaded   = downloadFile(marineZonesZipFile, marineZonesRawFolder)
unzipFile(marineZonesDownloaded, marineZonesRawFolder)

### Marine Zones Transforming Raw to Bronze

Similar to shipwrecks, here we updae the CRS, camelCase column names, remove/rename cols, and filter to WA coastal waters then writing to Bronze zone as geojson.

In [None]:
df_marineZones = gp.read_file(marineZonesRawFile)

df_marineZones.to_crs('epsg:3857')

df_marineZones = df_marineZones[df_marineZones.STATE_CODE == "WA"]
df_marineZones = df_marineZones.where(df_marineZones.notna(), None)
df_marineZones.rename(columns=lambda x: toPascalCase(x), inplace=True)
df_marineZones.rename(columns={'DistName': 'DistrictName'}, inplace=True) 
df_marineZones.drop(columns={'DistNo','StateCode','Type', 'Pt1Name','Pt2Name'}, inplace=True)
df_marineZones.set_geometry("Geometry", inplace=True)
df_marineZones.to_file(marineZonesBronzeFile, driver='GeoJSON')

### Local Water Forecast Landing From Source

The BOM provides forecast data as XML via FTP. Again, students can either download this from the source, or preferably start with the raw files in resources.zip. This file is not processed by spark, use a dataflow gen 2, it's easier.

In [None]:
# Download the coastal waters forecast data for later use by a dataflow
# or use the version from resources.zip
# this file isn't used by this notebook
coastalWatersRawFile = downloadFile(coastalWatersFile, coastalWatersRawFolder)

## Challenge 3 - Enriching Shipwrecks with Marine Zone data
Using geopandas to spatial join and clean some more before writing to a delta table. For an in-depth explaination see ``Data Exploration``

**Note** ensure the geometry column is dropped when writing to Delta!

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
spark.conf.set("sprk.sql.parquet.vorder.enabled", "true") # Enable VOrder write
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true") # Enable automatic delta optimized write

df_marineZones = gp.read_file(marineZonesBronzeFile)
df_shipwrecks = gp.read_file(shipwrecksBronzeFile)

df_joined = df_shipwrecks.sjoin_nearest(df_marineZones, how="left", distance_col="distance").query("distance < 5000")

#Drop the geometry, it's not supported by parquet
df_joined.drop(columns={'index_right', 'geometry', 'distance'}, inplace=True)
df_joined = df_joined.where(df_joined.notna(), None)

#Save a copy in Silver (not strictly necessary)
df_joined.to_json(shipwrecksSilverFile, orient='records', lines=True)

#Save to delta
saveTable = "Shipwrecks"
spark.createDataFrame(df_joined).write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(f"Tables/{saveTable}")

Alternatively, if starting from pre-enriched silver/shipwrecks.json in resources.zip 
(or right click, load data...)

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
spark.conf.set("sprk.sql.parquet.vorder.enabled", "true") # Enable VOrder write
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true") # Enable automatic delta optimized write

saveTable = "Shipwrecks"
df = spark.read.json(shipwrecksSilverFile)
df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(f"Tables/{saveTable}")


## Done!
You should now have geojson raw and bronze files, and a silver zone delta table of enriched shipwrecks. 

Next, **go write a dataflow to import forecast data** (straight to silver perhaps?)

----
