# Data Science problems in supply chains with Trase.earth

Nicolás Martín and Oskar Åsbrink

### Division of Work:
- Graphs in Neo4j:
  - Majority Nicolás
- Delta Live Tables & Pipeline:
  - roughly equally!
- Amazon Web Services, Databricks Unity Catalog, Delta-sharing
  - roughly equally!
- Delta Lake documentation, Delta Live Table module for ScaDaMaLe
  - Majority Oskar

# Neo4j
## Sample json data
The following is a sample from a single record of Brazil's animal transportation registry (called GTA - Guia do Transporte Animal). 2 million of such records were imported into neo4j, which include records from 2013 - 2020 for the Brazilian state of Pará.

In [0]:
{
    "ID": "PA|K|102009",
    "INFO_STATUS": "EM TRANSITO",
    "ORIGIN_TAX_NUMBER": "56668880134",
    "ORIGIN_NAME": "FAZ SAO MATEUS II",
    "ORIGIN_FARMER": "CAIO GERONIMO DA SILVA",
    "ORIGIN_CITY": "SAO FELIX DO XINGU",
    "ORIGIN_STATE": "PA",
    "DESTINATION_TAX_NUMBER": "05566585230",
    "DESTINATION_NAME": "FAZENDA COLORADO DO RIO PARDO",
    "DESTINATION_FARMER": "MATHEUS HENRIQUE BORGES SILVA",
    "DESTINATION_CITY": "SAO FELIX DO XINGU",
    "DESTINATION_STATE": "PA",
    "TRANSPORT": "A PE",
    "TIMELINE_OBS": "Nº DO CAR: PA-1507300-B7F0E10F2F0141E7B8AE65BA3E097743 GTA EMITIDA EM SUBSTITUICAO A GTA Nº 066640-K. EMISSOR ANTERIOR: JANDER MARINHO MACHADO (92222447291). ESCRITORIO: SAO FELIX DO XINGU",
    "SPECIES_PURPOSE": "ENGORDA",
    "GTA_SOURCE": "S1 2020",
    "SPECIES_GROUP": "BOVIDEOS",
    "SPECIES_NAME": "BOVINO",
    "DESTINATION_CODE": "15073006602",
    "DESTINATION_GEOCODE": "1507300",
    "ORIGIN_CODE": "15073004972",
    "ORIGIN_GEOCODE": "1507300",
    "MISSING_INFO": "PRESENT",
    "SPECIES": "BOV",
    "TRANSPORT_DATE": "2020-01-07",
    "TRANSPORT_YEAR": 2020,
    "NUM_ANIMALS_ASSUMED": false,
    "SLAUGHTER_GTA": false,
    "ANIMALS": [
      {
        "AMOUNT_SENT": 5,
        "DESCRIPTION": "BOVINO,MACHO,13 A 24 MESES"
      },
      {
        "AMOUNT_SENT": 80,
        "DESCRIPTION": "BOVINO,FEMEA,13 A 24 MESES"
      },
      {
        "AMOUNT_SENT": 15,
        "DESCRIPTION": "BOVINO,MACHO,ACIMA DE 36 MESES"
      },
      {
        "AMOUNT_SENT": 45,
        "DESCRIPTION": "BOVINO,FEMEA,ACIMA DE 36 MESES"
      }
    ]
  }

## Creation of the graph
Based on json files with the previous structure, we created the network graph with the following Cypher query:
```
CALL apoc.periodic.iterate("CALL apoc.load.json('file:///file_path/filename.json') YIELD value AS v",
"MERGE (o_city:City {name:COALESCE(v.ORIGIN_CITY, '')})
MERGE (d_city:City {name:COALESCE(v.DESTINATION_CITY, '')})
MERGE (o_state:State {name:COALESCE(v.ORIGIN_STATE, '')})
MERGE (d_state:State {name:COALESCE(v.DESTINATION_STATE, '')})
MERGE (o_city)-[:IS_LOCATED_IN]->(o_state)
MERGE (d_city)-[:IS_LOCATED_IN]->(d_state)
MERGE (transp:Transport {transport:COALESCE(v.TRANSPORT, '')})
MERGE (purp:Purpose {purpose:COALESCE(v.SPECIES_PURPOSE, '')})
MERGE (gta_src:GTA_Source {gta_source:COALESCE(v.GTA_SOURCE, '')})
MERGE (sp:Species {species:COALESCE(v.SPECIES, '')})
MERGE (o_code:Code {code:COALESCE(v.ORIGIN_CODE, '')})
MERGE (o_geocode:Geocode {geocode:COALESCE(v.ORIGIN_GEOCODE, '')})
MERGE (d_code:Code {code:COALESCE(v.DESTINATION_CODE, '')})
MERGE (d_geocode:Geocode {geocode:COALESCE(v.DESTINATION_GEOCODE, '')})
MERGE (o_code)-[:HAS_GEOCODE]->(o_geocode)
MERGE (d_code)-[:HAS_GEOCODE]->(d_geocode)
MERGE (o_city)-[:HAS_GEOCODE]->(o_geocode)
MERGE (d_city)-[:HAS_GEOCODE]->(d_geocode)
MERGE (gta:GTA {gta_id:COALESCE(v.ID, '')}) 
  ON CREATE SET gta.info_status = v.INFO_STATUS,
                gta.timeline_obs = v.TIMELINE_OBS,
                gta.missing_info = v.MISSING_INFO,
                gta.transport_date = v.TRANSPORT_DATE,
                gta.transport_year = v.TRANSPORT_YEAR,
                gta.num_animals_assumed = v.NUM_ANIMALS_ASSUMED,
                gta.slaughter_gta = v.SLAUGHTER_GTA,
                gta.total_animals = 0
FOREACH (an_sent IN v.ANIMALS | 
    MERGE (an_type:Animal_Type {animal_type:COALESCE(an_sent.DESCRIPTION, '')})
    CREATE (gta)-[:ANIMAL_TYPE_SENT {amount:COALESCE(an_sent.AMOUNT_SENT, '')}]->(an_type)
    SET gta.total_animals = gta.total_animals + an_sent.AMOUNT_SENT
)
CREATE (gta)-[:IS_TRANSPORTED_BY]->(transp)
CREATE (gta)-[:HAS_PURPOSE]->(purp)
CREATE (gta)-[:FROM_GTA_SOURCE]->(gta_src)
CREATE (gta)-[:SPECIES_TRANSPORTED]->(sp)
MERGE (o_farm:Farm {tax_number:COALESCE(v.ORIGIN_TAX_NUMBER, ''), name:COALESCE(v.ORIGIN_NAME, '')}) 
MERGE (o_farm)-[:HAS_CODE]->(o_code)
CREATE (o_farm)-[:IS_ORIGIN_OF_GTA]->(gta)
MERGE (o_farm)-[:IS_LOCATED_IN]->(o_city)
FOREACH (i IN CASE WHEN NOT gta.slaughter_gta THEN [1] ELSE [] END | 
    MERGE (d_farm:Farm {tax_number:COALESCE(v.DESTINATION_TAX_NUMBER,''), name:COALESCE(v.DESTINATION_NAME,'')}) 
    MERGE (d_farm)-[:HAS_CODE]->(d_code)
    MERGE (o_farm)-[sends:SENDS_TO]->(d_farm)
        ON CREATE SET sends.total_sent = gta.total_animals
        ON MATCH SET sends.total_sent = sends.total_sent + gta.total_animals
    MERGE (o_farm)-[y_stats:YEAR_SENDS_STATS {year:gta.transport_year}]->(d_farm)
        ON CREATE SET y_stats.total_sent = gta.total_animals
        ON MATCH SET y_stats.total_sent = y_stats.total_sent + gta.total_animals        
    MERGE (gta)-[:GTA_HAS_DESTINATION]->(d_farm)
    MERGE (d_farm)-[:IS_LOCATED_IN]->(d_city)
)
FOREACH( i IN CASE WHEN gta.slaughter_gta THEN [1] ELSE [] END | 
    MERGE (slaughter:Slaughterhouse {tax_number:COALESCE(v.DESTINATION_TAX_NUMBER,''), name:COALESCE(v.DESTINATION_NAME,'')}) 
    MERGE (slaughter)-[:HAS_CODE]->(d_code)
    MERGE (o_farm)-[sends:SENDS_TO]->(slaughter)
        ON CREATE SET sends.total_sent = gta.total_animals
        ON MATCH SET sends.total_sent = sends.total_sent + gta.total_animals
    MERGE (o_farm)-[y_stats:YEAR_SENDS_STATS {year:gta.transport_year}]->(slaughter)
        ON CREATE SET y_stats.total_sent = gta.total_animals
        ON MATCH SET y_stats.total_sent = y_stats.total_sent + gta.total_animals          
    MERGE (gta)-[:GTA_HAS_DESTINATION]->(slaughter)
    MERGE (slaughter)-[:IS_LOCATED_IN]->(d_city)
)
", {})

```

### Relevant graph queries
1. Structure of the graph. Generated using `call apoc.meta.graph()`
![Graph structure](https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/2022_12_15_graph_structure.png?raw=true)

2. Example of a simple relationship between farms, slaughterhouses, GTA and City. Generated using `MATCH p=(n:Farm)--(g:GTA)--(s:Slaughterhouse)--(c:City) WHERE s.name<>'' RETURN n,g,s,c LIMIT 5`
![Relationships example](https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/2022_12_15_Graph_example_of_relationship.png?raw=true)

3. Top aggregated exchanges from farms to other farms or slaughterhouses. Generated using 
```
MATCH (n:Farm)-[s:SENDS_TO]->(m) RETURN n.name AS Origin_farm, m.name AS Destination_farm, 
s.total_sent AS Sent_animals, labels(m) AS Destination_type ORDER BY s.total_sent DESC LIMIT 100
```
![Relationships example](https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/2022_12_15_Top_aggregated_exchanges.png?raw=true)

3. Example of farms which are indirect suppliers (at 2 degrees of separation) to a Slaughterhouse. Generated using
```MATCH p=(s:Slaughterhouse)<-[st:SENDS_TO*2..2]-(f:Farm) RETURN p LIMIT 20``` 
![Indirect suppliers example](https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/2022_12_15_Indirect_suppliers_example.png?raw=true)

# Databricks Unity Catalog and Delta Sharing

## Setting up AWS IAM roles and policies for Unity Catalog and S3 buckets

Custom trust policy role created, with a trust relationship to Databricks Unity Catalog service
```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "<databricks_account_id>"
                }
            }
        }
    ]
}
```

Policy for letting Unity Catalog use the S3 bucket for the metastore linked with Unity Catalog
```
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject",
                "s3:ListBucket",
                "s3:GetBucketLocation",
                "s3:GetLifecycleConfiguration",
                "s3:PutLifecycleConfiguration"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket_name_for_metastore>/*",
                "arn:aws:s3:::<bucket_name_for_metastore>",
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "sts:AssumeRole"
            ],
            "Resource": [
                "arn:aws:iam::<aws id>:role/<name of role created above>"
            ],
            "Effect": "Allow"
        }
    ]
}
```

## Create a Delta Table from a csv on an S3
The S3 object is a 10GB csv with relevant look-up information, which will be loaded into a Delta table within Unity Catalog. It will later be made externally available to access or query as a Delta Share [for external users to load, query, or connect to a Business Intelligence tool. ].

In [0]:
# Setting schema automerge to have databricks resolve schema differences when merging
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")

## Reading the csv into a spark dataframe
# File location and type
file_location = "s3a://path/file.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ";"

# Create the spark dataframe
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Create a spark table called 'df_table' based on the dataframe
df.createOrReplaceTempView('df_table')

# Create an empty Delta table based on the schema of 'df_table'
spark.sql("CREATE TABLE IF NOT EXISTS <catalog>.<db>.<table_name> \
    USING DELTA \
    PARTITIONED BY (<col_name>) \
    SELECT * from df_table \
    WHERE 1=2")

# Write to the Delta Table created
df.write.format("delta").mode("append").insertInto("<catalog>.<db>.<table_name>")

# Show some records to review it has been loaded properly
display(spark.table("<catalog>.<db>.<table_name>"))

Screenshots of the results of running the previous scripts on a 10GB CSV file on a spark cluster made of 1 driver and 2 workers, each with 4 cores and 8GB memory. 
![Reading the csvs](https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/2022_12_15_Reading_the_csv.png?raw=true)
![Creating an empty table with the csv schema](https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/2022_12_15_Create_delta_table.png?raw=true)
![Display Unity Catalog table](https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/2022_12_15_Display_table.png?raw=true)

## Reading a shared Delta Table from an external (non-Databricks) location

In [0]:
# Install the delta sharing library
%pip install delta-sharing
import delta_sharing

# Open a client using credentials (shared by the provider of the data)
profile_path = f"/path_to_share_credentials/config.share"
client = delta_sharing.SharingClient(profile_path)

# See all tables being shared
client.list_all_tables()

share_name = f"shared_resource"
schema_name = f"brazil"
# Load a table as a pandas dataframe
# .. also possible through spark, hive, or connectors from 3rd parties (Power BI, Tableu, etc..)
candy_df = delta_sharing.load_as_pandas(f"{profile_path}#{share_name}.{schema_name}.{table_name}")

Screenshot of reading an example share called 'candyland' from a Jupyter Notebook in SNIC cloud, through pandas
![Delta Sharing example using](files/tables/trase/2022_12_16_Delta_sharing.png)

# Delta Live Tables

__See below of example changes made to scripts, and see `shrimp_delta_poc_rewrite_submission.ipynb` and `ec_shrimp_area_production_per_parish_rewrite.ipynb` for examples on how a script was transformed to be able to run in a Pipeline.__

### Example of changes to Trase data preprocessing-scripts

The main task for this part of the project is to implement and use Delta Live Tables (DLT) to help with the preprocessing pipeline. To make this easy for Trase, the main approach was to rewrite their load and save functions, and mount an existing S3 bucket to Databricks, to integrate DLT code instead of AWS-cli code.

In [0]:
### Original function, without modifications ###

def get_df(
    key,
    sep=";",
    bucket_name="trase-uppsala",
    type=None,
    skiprows=None,
    encoding="latin1",
):
    data = read_s3_csv(key, bucket_name=bucket_name)
    if skiprows:
        return remove_whitespace(
            pd.read_csv(
                io.BytesIO(data),
                sep=sep,
                encoding=encoding,
                dtype=type,
                skiprows=skiprows,
            )
        )
    return remove_whitespace(
        pd.read_csv(io.BytesIO(data), sep=sep, encoding=encoding, dtype=type)
    )

def remove_whitespace(df):
    for col in df.columns:
        if df[col].dtype in (np.object, np.str):
            df[col] = df[col].str.strip()
    return df

In [0]:
### Modified function, loading existing Delta Table instead of fetching from S3 ###
def get_df(table_name):
    return ps.DataFrame(dlt.read(table_name)).to_pandas()


### used when creating delta tables ###
def get_df_clean(key,sep=";",encoding="latin1",bucket_name="s3a://uutrase/data/trase-uppsala/",type=None,skiprows=None):
    
    return spark.createDataFrame(remove_whitespace(spark.read.option("delimiter", sep)
                             .option("encoding","UTF-8")
                             .option("header", "true")
                             .option("multiline","true")
                             .csv(bucket_name + key).toPandas()))
def remove_whitespace(df):
    for col in df.columns:
        if df[col].dtype in (np.object, np.str):
            df[col] = df[col].str.strip()
            
    return df


In [0]:
# creating a delta live table
@dlt.table(name="city_df")
def get_city_df():
    return get_df_clean("s3://bucket/path_to_data/data.csv", sep=",", encoding="utf8")

### Resulting pipeline in Databricks
<img src="https://github.com/oskarasbrink/ECLAT-DataMining-Project/blob/main/temp_images/pipeline.png?raw=true" width="500"/>