# GMU Daen 690 Spark Pipeline

## Setup
Load the `.env` file if available and the application's config file. **Do NOT** commit your `.env` file to version control, as it may contain sensitive info and/or secrets. Use `.env.template` as a base for your personal `.env` file. Environment files should be, as the name suggests, specific to the environment that the application runs in.

`tomllib` is used for the application's general configuration that typically doesn't change between environments and is intended to take the place of otherwise hard-coded values. More sophisticated libraries, like `pydantic` or `dynaconf`, exist for merging toml configurations, .env file variables, and the user's environment variables together.

Rerun this cell anytime these files change.

In [2]:
import os
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()

import tomllib

with Path("application.toml").open("rb") as f:
    config = tomllib.load(f)

config

{'data': {'source': 'data/ISS_HAL_SOPs.csv',
  'language': 'en',
  'test_train_split': 0.9},
 'pipeline': {'word_embeddings': 'bert',
  'lemma_model': 'lemma_antbnc',
  'sentence_embeddings_pooling_strategy': 'AVERAGE',
  'classifier_epochs': 5,
  'bert': {'model': 'bert_base_cased'}}}

Import the dependencies as usual. This is done after the configuration above incase we want imports to be config-dependent.

In [30]:
import httpx
import io
import os
import pandas as pd
from sklearn.metrics import classification_report, accuracy_score
import pyspark as ps
from pyspark import SparkContext, SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

Create spark session. Set the app name and execution mode. In this case, use all available cores on the local machine.

In [4]:
spark = (
    SparkSession
        .builder
        .appName("ISS Procedures")
        .master("local[*]")
        .config("spark.driver.memory", os.environ.get("SPARK_DRIVER_MEMORY") or "8G")
        .config("spark.executor.memory", os.environ.get("SPARK_EXECUTOR_MEMORY") or "8G")
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.3.2")
        .getOrCreate()
    )

"Apache Spark version:", spark.version

:: loading settings :: url = jar:file:/home/kevin/.cache/pypoetry/virtualenvs/daenspace690-spark-RdILXEdP-py3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/kevin/.ivy2/cache
The jars for the packages stored in: /home/kevin/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-780135a9-3840-4775-901b-40a8d7557639;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.3.2 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.16.0 in central
	found com.google.guava#guava;31.1-jre in central
	found com.google.guava#failurea

23/03/31 12:53:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


('Apache Spark version:', '3.3.2')

In [5]:
# Way overthought this, but ultimately, not really compatible with a notebook project.
# match config['data']['source'].split(":")[0]:
#     case "http"|"https":
#         with httpx.Client() as client:
#             r = client.get(config['data']['source'])
#             csv = io.BytesIO(r.content)
#     case "file":
#         with Path(config['data']['source']).open("rb") as f:
#             csv = io.BytesIO(f.read())
#     case _:
#         csv = config['data']['source']

## Data Wrangling

In [6]:
source_df = spark.read.format("csv") \
    .options(header='True', inferSchema='True') \
    .load(config['data']['source'])

source_df = source_df.toDF(*[c.lower() for c in source_df.columns])

source_df.toPandas().head(20)

Unnamed: 0,procedure type,procedure name,procedure end goal,procedure file number,step number,actor,trigger (what),trigger (how),trigger (where),decision (what),...,decision (where),action (what),action (how),action (where),waiting (what),waiting (how),waiting (where),verification (what),verification (how),verification (where)
0,Manual Manipulation of Items,Reconfigure HAL for EVA,Configure the habitable airlock for EVA by rem...,HAL_1_0.pdf,1.0,,,,,,...,,Stow monitors against the wall,,,,,,,,
1,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,2.0,,,,,,...,,Stow the keyboards against the wall,,,,,,,,
2,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,3.0,,,,,,...,,Remove the seat cushion,,,,,,,,
3,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,4.0,,,,,,...,,Fold the chair backs forward,,,,,,,,
4,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,5.0,,,,,,...,,Detach crew hygiene kit,,from the aft transfer port hatches,,,,,,
5,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,6.0,,,,,,...,,Stow the crew hygiene kits,,in Lockers SA-1 and PA-1,,,,,,
6,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,7.0,,,,,,...,,Remove hatch cargo nets,,from lockers SA-1 and PA-1,,,,,,
7,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,8.0,,,,,,...,,Secure hatch cargo nets,to 3 of the 4 D-rings,at the starboard and port hatch openings,,,,,,
8,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,9.0,,,,,,...,,Remove IVA Common Tool Kit,,from PM-5,,,,,,
9,Manual Manipulation of Items,Reconfigure HAL for EVA,,HAL_1_0.pdf,10.0,,,,,,...,,Temp Stow IVA Common Tool Kit,,behind the Port Hatch Opening,,,,,,


In [7]:
df = source_df \
    .select(
        'procedure name',
        expr("""
            stack(
                16,
                'actor', actor,
                'trigger (what)', `trigger (what)`,
                'trigger (how)', `trigger (how)`,
                'trigger (where)', `trigger (where)`,
                'decision (what)', `decision (what)`,
                'decision (how)', `decision (how)`,
                'decision (where)', `decision (where)`,
                'action (what)', `action (what)`,
                'action (how)', `action (how)`,
                'action (where)', `action (where)`,
                'waiting (what)', `waiting (what)`,
                'waiting (how)', `waiting (how)`,
                'waiting (where)', `waiting (where)`,
                'verification (what)', `verification (what)`,
                'verification (how)', `verification (how)`,
                'verification (where)', `verification (where)`
            ) as (label, text)
        """)
    ) \
    .filter("text is not null")

df.toPandas().head(10)

23/03/31 12:53:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,procedure name,label,text
0,Reconfigure HAL for EVA,action (what),Stow monitors against the wall
1,Reconfigure HAL for EVA,action (what),Stow the keyboards against the wall
2,Reconfigure HAL for EVA,action (what),Remove the seat cushion
3,Reconfigure HAL for EVA,action (what),Fold the chair backs forward
4,Reconfigure HAL for EVA,action (what),Detach crew hygiene kit
5,Reconfigure HAL for EVA,action (where),from the aft transfer port hatches
6,Reconfigure HAL for EVA,action (what),Stow the crew hygiene kits
7,Reconfigure HAL for EVA,action (where),in Lockers SA-1 and PA-1
8,Reconfigure HAL for EVA,action (what),Remove hatch cargo nets
9,Reconfigure HAL for EVA,action (where),from lockers SA-1 and PA-1


## Spark ML

In [8]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd

"Spark NLP version", sparknlp.version()

('Spark NLP version', '4.3.2')

### Pipeline Steps

Inital tokenization of dataframe.

In [20]:
document = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

In [10]:
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

In [11]:
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

In [12]:
stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(["normalized"]) \
    .setOutputCol("cleanTokens")

In [13]:
lemma = LemmatizerModel \
    .pretrained(config["pipeline"]["lemma_model"]) \
    .setInputCols(["normalized"])\
    .setOutputCol("lemma")

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ]lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
Download done! Loading the resource.
[OK!]


In [14]:
def set_word_embeddings(word_embeddings_name: str, input_cols: list[str], output_col: str, lang: str = config["data"]["language"]) -> any:
    match word_embeddings_name:
        case "bert":
            transformer = BertEmbeddings.pretrained(config["pipeline"][word_embeddings_name]["model"], lang)
        case "elmo":
            transformer = ElmoEmbeddings.pretrained(config["pipeline"][word_embeddings_name]["model"], lang)
        case _:
            raise NameError(f"Not an implemented word embedding: {word_embeddings_name}")

    return transformer \
        .setInputCols(input_cols) \
        .setOutputCol(output_col)

word_embeddings = set_word_embeddings(config["pipeline"]["word_embeddings"], ["document", "lemma"], "embeddings")

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[ | ]bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
Download done! Loading the resource.
[ / ]

2023-03-31 12:53:25.938699: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[OK!]


In [15]:
sentence_embeddings = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")

In [16]:
classifier = ClassifierDLApproach() \
    .setInputCols(["sentence_embeddings"]) \
    .setOutputCol("class") \
    .setLabelColumn("label") \
    .setMaxEpochs(config["pipeline"]["classifier_epochs"]) \
    .setEnableOutputLogs(True)

In [21]:
pipeline = Pipeline(
    stages=[
        document,
        tokenizer,
        normalizer,
        lemma,
        word_embeddings,
        sentence_embeddings,
        classifier
    ]
)

### Training

In [18]:
split_ratio = config["data"]["test_train_split"]

train_df, test_df = df.randomSplit([split_ratio, 1 - split_ratio], seed = 17)

In [22]:
pipeline_fitted = pipeline.fit(train_df)

2023-03-31 12:54:17.960496: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:43] Reading SavedModel from: /tmp/688a7db9e908_classifier_dl11914931068105674014
2023-03-31 12:54:18.014650: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:107] Reading meta graph with tags { serve }
2023-03-31 12:54:18.014805: I external/org_tensorflow/tensorflow/cc/saved_model/reader.cc:148] Reading SavedModel debug info (if present) from: /tmp/688a7db9e908_classifier_dl11914931068105674014
2023-03-31 12:54:18.323564: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:228] Restoring SavedModel bundle.
2023-03-31 12:54:18.857713: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:212] Running initialization op on SavedModel bundle at path: /tmp/688a7db9e908_classifier_dl11914931068105674014
2023-03-31 12:54:18.968654: I external/org_tensorflow/tensorflow/cc/saved_model/loader.cc:301] SavedModel load for tags { serve }; Status: success: OK. Took 1008167 microse

Training started - epochs: 5 - learning_rate: 0.005 - batch_size: 64 - training_examples: 1072 - classes: 15
Epoch 1/5 - 0.28s - loss: 43.82694 - acc: 0.18587239 - batches: 17
Epoch 2/5 - 0.08s - loss: 42.880547 - acc: 0.17415364 - batches: 17
Epoch 3/5 - 0.08s - loss: 42.880547 - acc: 0.17415364 - batches: 17
Epoch 4/5 - 0.08s - loss: 42.880547 - acc: 0.17415364 - batches: 17
Epoch 5/5 - 0.07s - loss: 42.880547 - acc: 0.1751302 - batches: 17


### Testing

In [28]:
results_df = pipeline_fitted.transform(test_df).select("text", "label", "class.result").toPandas()
results_df['result'] = results_df['result'].apply(lambda x: x[0])

results_df

                                                                                

Unnamed: 0,text,label,result
0,turns blue,trigger (how),trigger (where)
1,Verify HAL IMV CLOSED,verification (what),trigger (where)
2,PCS Display,verification (where),trigger (where)
3,"""Navigate to """"PCS"""" in the Air Systems Display""",action (what),trigger (where)
4,Seal Airlock SEAL,action (what),trigger (where)
...,...,...,...
108,STOW Nose Bag 3,action (what),trigger (where)
109,Secure hatch cargo nets,action (what),trigger (where)
110,from PN-4,action (where),trigger (where)
111,from SN-2,action (where),trigger (where)


### Results

In [33]:
print(classification_report(results_df.label, results_df.result))

                      precision    recall  f1-score   support

        action (how)       0.00      0.00      0.00         1
       action (what)       0.00      0.00      0.00        35
      action (where)       0.00      0.00      0.00        12
       trigger (how)       0.00      0.00      0.00        20
     trigger (where)       0.20      1.00      0.34        23
  verification (how)       0.00      0.00      0.00         8
 verification (what)       0.00      0.00      0.00         5
verification (where)       0.00      0.00      0.00         4
       waiting (how)       0.00      0.00      0.00         4
      waiting (what)       0.00      0.00      0.00         1

            accuracy                           0.20       113
           macro avg       0.02      0.10      0.03       113
        weighted avg       0.04      0.20      0.07       113



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [34]:
print(accuracy_score(results_df.label, results_df.result))

0.20353982300884957
