# MOJ ETL Pipeline Example

An example on how to build a data ETL pipeline on the Analytical Platform, transform the data using pyspark Glue jobs and expose the data to analytical tools such as Athena, along with some notes on best practices.

## Data Engineering Structure

![alt text](images/data_engineering_structure.png "DE structure overview")

The image above lays out the processing structure. We are going to go through an overview of why we try to adhere to this structure and will try to use the problem set above to explain how we are going to apply this structure to the problem set.

The first thing to note is our tech stack. We use S3 for data storage, our transformation logic to move data from A to B is not restricted to but normally carried out in Python or Spark (SQL). These scripts are versioned on GitHub. 

The scripts need to be orchestrated. This can be achieved using [Airflow](https://user-guidance.services.alpha.mojanalytics.xyz/tools/airflow/) or using [AWS Step Functions](https://aws.amazon.com/step-functions/). We will not discuss orchestration further in this tutorial.

### Landing Area

All the data that we process is always written into this landing area first. We either recieve data from others or extract it ourselves and write it to the landing area. Some key notes to this s3 bucket:

- Data is written to this bucket, we have a naming convection for each dataset e.g. `s3://mojap-land/open-data/parliment-questions/`. Who (or what) writes data to that location should have write access to that s3 path only.
- Data in this folder is transient meaning that once it's processed it is deleted from its folder.

### Raw History

When data lands into `s3://mojap-land` we want a script (`F1(x)`) to check the data and make sure that it's expected - this may be something like has the correct extension, does have the expected type of data, expected volumes etc. If the data passes the checks then the data that falls into land should be moved from `s3://mojap-land` to `s3://mojap-raw-hist` (and also `s3://mojap-raw` this is optional will get onto this in a bit). This function should not transform the data it should only check it's validity. We want to ensure that the data in `moj-raw-hist` is an exact replica of what the data was when it landed in `mojap-land`. **This is vital for reproducibility.** Any time data falls into `moj-raw-hist` you should treat that data as immutible block. If you require to make any changes to the data / corrections do that later down the pipeline never change the original raw data. When data is moved from from `land` to `raw-hist` you should partition your data or name it so that you can traceback the date/time it was moved over. Ideally if you have control over how data is uploaded to `mojap-land` in which most cases you do you can upload the data in the following folder structure:

```
└── s3://mojap-land
│   └── my-data-folder
│       └── file_land_timestamp=1534978800
│           └── data_1534978800.csv
```

We use `file_land_timestamp` as a file partition of when the data was uploaded to the land bucket in S3. We use hive partitioning [for more details](https://resources.zaloni.com/blog/partitioning-in-hive) as it allows you to point athena or spark directly at the data folder `my-data-folder/` and treat that as a dataframe with that's partitioned by a column named `file_land_timestamp`. When your function tests and moves data from data from land to raw you should keep the same folder structure only changing the bucket name (making it easy to cut and paste the data from one bucket to another after it passes the testing function). Below is an example of what you `raw-hist` bucket should look like:

```
└── s3://mojap-raw-hist
│   └── my-data-folder/
│       └── file_land_timestamp=1534968800
│           └── data_1534968800.csv
│       └── file_land_timestamp=1534978800
│           └── data_1534978800.csv
```

This structure shows that there have been two dataset loaded from `land` into `raw-hist` at different times. Following the same naming conventions means that it is easy to track your data between each process. It's also worth noting that if possible it is also good practice to suffix your data file itself with the timestamp value. In case someone accidently moves the data it will still give you a change to try and figure out what time it was uploaded into your pipeline.

### Raw

This area is optional. If your data is large you may want to only process/add deltas to your existing database rather creating everything from scratch from `raw-hist`. It is recommended that if the relative computational overheard in recalculating everything from scratch is minor (compared to running a more complex code to update your existing data with a delta) then you should opt for recalculating everything from scratch as this makes your scripts to transform data simpler, idempotent and less dependentant on the state of the current database [see here for an example of this](https://github.com/moj-analytical-services/learning_data_engineering/blob/master/functional_progamming_etl.ipynb) _(unfortunately this repo is only available to those part of the org)_.

:point_up: _note it's out of date so take it with a pinch of salt_.

However if you do need to update your database by deltas or have a preprocess stage (see below) then you might want to utilise the `raw` folder. This should be transient like `land` i.e. once done with the data delete it (as there should be an exact copy in `raw-hist`). Continuing with our example the folder structure should mimic that of `raw-hist`/`land` just with a different bucket name e.g.

```
└── s3://mojap-raw
│   └── my-data-folder
│       └── file_land_timestamp=1534978800
│           └── data_1534978800.csv
```

### Processed

This area is again optional. If you're data in raw is messed up or complex then you may want to add this stage. For example we often recieve corrupted CSVs from external data suppliers where we've had to write our own csv parser to fix the dataset. This is a costly process and we only want to apply this once to the data as it comes in. In most cases you will be taking data from the `raw` folder transforming the data into a more usable dataset and writing it to a `processed` bucket - this is the role of the `F2(x)` function in the diagram. Normal convension is to delete the data from `raw` once it hsa been written to the `processed` bucket (this is where the idea of these folders being transient comes in once it's been moved to `A -> B` get rid of it from `A`). Again it is good practice to keep the same file partitioning and naming convention as the `raw` or `raw-hist` folder (making it easy to track data across our processes). Therefore the processed would be something like:

```
└── s3://alpha-mojap-processed-my-data
│   └── my-data-folder
│       └── file_land_timestamp=1534978800
│           └── data_1534978800.jsonl.gz

### OR ###

└── s3://alpha-mojap-processed-my-data
│   └── my-data-folder
│       └── file_land_timestamp=1534968800
│           └── data_1534968800.jsonl.gz
│       └── file_land_timestamp=1534978800
│           └── data_1534978800.jsonl.gz
```

Some extra things to note about the processed bucket:

- The reason why there are two examples above are dependant on your ETL pipeline. If your pipeline only updated your deployed database with deltas then you would have the first option where you only store the deltas to update you DB in the processed bucket. Once the deployed DB is updated you would delete the data in processed (in this instance the processed bucket is transient like raw). If you're recreating the database from scratch each time you would probably want to keep all processed datasets to save you having to process all data from `raw-hist` every time you update your DB (in this case you treat the `processed` bucket more like `raw-hist` - although it's worth noting that there might be a point where you want to overwrite everything in `processed` if you update the processing code this means it's not immutable like `raw-hist` where you would never delete the data).

- The processed data bucket is prefixed by `alpha` unlike the previous buckets discussed so far that are not in `alpha`. `alpha` is the prefix for buckets that you can think of as being in the platform users namespace. This means that the processed bucket is created via the [analytical platform control panel (AP-CP)](https://cpanel-master.services.alpha.mojanalytics.xyz/). It also means that users can be given access to this bucket via the AP-CP **(all access to data engineering buckets should be readonly)**.

- The next name after `mojap-processed` this is our naming convention to make it easier to scan for data engineers processed buckets.

- Note that the data type has changed from `.csv` to `.json.gz` this is just to denote that the data is actually different to the data in `raw` or `raw-hist`. For recommended datasets see the Data storage best practice section.

- All our processes (i.e. `F(x)`) are scripts running in a versioned docker build on our kubernetes cluster. The version tag of our docker build relates to the git tag. It's good practice to add a column to your data that is equal to the tag that processed the data. This helps you tag the data yourself if you ever update the processing script you'd be able to know what tag ran that data (this is done in the example of this repo). You will see that the normal convension is to put your tag of your docker image as an env variable in your DAG (and pass it as an env in your container) so your script can pull this tag from the env.

### Curated

As discussed above this data is created from either the `raw`, `raw-hist` or `processed` bucket depending on your etl process. The data will usually be converted to parquet files (see [Data Formats](#data-formats)) to speed up subsequent queries based on this data. This stage will most likely be written in spark. If possible it's best to write your spark scripts fully in spark-SQL. This makes it easier for analysts to understand the data transforms (rather than having to understand the spark dataframe API). The curated bucket will hold the data that you expose as your Athena Database. This data does not have to follow the partitioning of the previous buckets (as you might want to partition it to make it more efficient to query or not partition it at all). An example of a curated folder structure on S3 would be:

```

└── s3://alpha-mojap-curated-my-data
│   └── my-database/
│       └── table1/
│           └── data-part-0.parquet
│           └── data-part-1.parquet
│           └── data-part-2.parquet
│           └── data-part-3.parquet
│       └── table2/
│           └── partition=0/
│                └── data-part-0.parquet
│                └── data-part-1.parquet
│           └── partition=1/
│                └── data-part-0.parquet
│                └── data-part-1.parquet
```

As stated before, it doesn't have to follow the previous structures. The data structure of this bucket is more up to the data engineer and how they best think to structure the data. Something worth noting is that the data for each table is in a folder where the folder name matches the table name (matching the names just makes things easier to read). All the tables are in the same directory (this again is just good practice for easier understanding of your data structure). Something that you are restricted to (if using `etl_manager`) is that all the tables in your database must exist in the same bucket.

Once your curated tables are produced you will want to overlay your glue schema (this is the meta data database schema that allows Athena to query the data the schema points to) over the data to make it accessible to analysts. Something to note is that because Athena is "schema on read" you can define/delete/update the schema without affecting the data underneath. You can think of the schemas that define our data as databases are just overlaid over the data (they just tell Athena how to parse the data and interpret the datasets in S3 using SQL). This means you can delete and recreate your entire database schema at the end or beginning of your ETL job (normally it is done at the end but sometimes you might want to do it at particular points of your etl pipeline if your pipeline requires to read data via the glue schemas).

## Best Practice

### Write your spark jobs to move data from path A to B

When you write spark jobs always try to process the data moving from one location to another e.g.

```python
df = spark.read.parquet("s3://bucket-a/")
df = do_stuff_to(df)
df.write.mode('overwrite').format('parquet').save("s3://bucket-b/") # code runs fine
```

If we were to change this code to write out the data back to `bucket-a` it will break e.g.

```python
df = spark.read.parquet("s3://bucket-a/")
df = do_stuff_to(df)
df.write.mode('overwrite').format('parquet').save("s3://bucket-a/") # Code will fail here
```

The reason for this failing is that Spark does lazy evaluation _(Note the description ahead of why this happens is to best explain why it fails in an easy to read way. So it will deviate a bit from what actually "happens" but the overview and way of thinking about it is correct)_. This means that it only evalutes code when it needs to. Line one tells says read in the data but spark won't actually read in the data at that point - it will just note that it has to do that as it's first task. Line to states to apply some transforms to the data again spark will note in needs to do this after the previous task. Line 3 tells spark to write the data to S3. Writing data to somewhere is referred to an action (actions in spark trigger it to actually do something `df.count()` and `df.show()` are other examples of actions). At this point spark will look back at all the tasks it has been asked to complete and start running them in order. It is worth remembering at this point that Spark runs processes in parallel across multiple workers (for this example let's say we have two workers). Worker 1 will read in half the data in `bucket-a` (line 1) do stuff (line 2) and then write it back out to `bucket-a` (line 3). Worker 2 will do the same thing but reading in the other half of data in `bucket-a`. Now if worker 1 gets to (line 3) before worker 2 gets to (line 1) you have a race condition - worker 1 has overwritten the data before worker 2 could read it - hence you get an error.

:point_up: This is a long explaination to a very minor point but leads to a more import way of thinking about your ETL process. Spark + S3 are not databases where you can read a single row, update it and overwrite that same row like you would in a standard SQL database. This means that it is often more performant to recalculate your database table from scratch and overwrite the existing one every time you want to update your database with a delta - so always do the easiest thing first (then when it starts to not be performant move to a more complex update process). When your data does become too big then you want to start using file partitioning an overwriting partitions that match your delta. [An example of this can be found here](https://github.com/isichei/spark_testing/blob/master/dynamic_paritions.ipynb) _(Annoyingly this was written using dynamic partitioning which is not currently possible with Glue Jobs on AWS's spark cluster. But you'll get the idea - it is possible to do it without hive partitioning, it just requires more bespoke code rather than SQL)_.

### Avoid ISO timestamps as S3 file partitions

Use unix timestamps instead. S3 paths act the same as URLs. So using semi-colons as file partitons e.g. `partition=2018-01-01T00:00:00` evaluates to `partition=2018-01-01T00%3A00%3A00` so best to avoid.

### Data Formats
Standard dataset formats we use ranked in order of preference:

1. **parquet:** Columnar dataset which is normally preferred for analytical work. Works best for doing SQL aggregrations on our data (most common queries from our analysts). Compresses as default, also holds the meta data of your columns. Allows schema merging in with Spark.

2. **jsonl:** [newline-delimited JSON](http://jsonlines.org/) not columnar but useful for data that is in a hireachical format. Can query nested jsons using Spark and Athena which can be useful. Doesn't fully hold meta data but jsons are a strongly defined so will hold characters and numbers types. Ideally these should be compressed with gzip (`.jsonl.gz`). Allows schema merging in with Spark.

3. **csv:** If possible try to avoid no meta data built in and doesn't allow schema merging in Spark. Note column names need to be removed to work with Athena (or add specification to CSV serde to skip the first column [see etl_manager for more info](https://github.com/moj-analytical-services/etl_manager)). Ideally these should be compressed with gzip (`.csv.gz`). Sometimes these might be preferable if your pipeline needs to send data to people who use something like `Excel` for example. But for purposes for our databases try to avoid. Also more general explaination as to [why CSVs are not great](https://chriswarrick.com/blog/2017/04/07/csv-is-not-a-standard/).

### Data Sizes

We don't have a hard rule for this again it's very dependant on what your pipeline is doing. A good rule is to chunk up your data to reasonable sizes (probably no bigger than 1GB). Athena and Spark will concatenate all the data in the same folder as one table. So if you have 3GBs of data it's best to break this up into 3 files in the same folder (this also makes it far easier for tools like spark and athena to parallel your work - each worker parses in each chunck of data). It's also worth adding some integer suffix that notes which chunk it is (e.g. `data_0_1534978800.jsonl.gz`, `data_1_1534978800.jsonl.gz`, `etc`).

## Problem Set

1. Get an extract of data from [this API](https://postcodes.io/) and write it to a landing area
2. Validate the data extract 
3. Add the extract to the existing database (if it exists) and create a new aggregated table in this database
4. Apply the athena schema to the data
5. Review the results

### Step 1: Get that data

Often we get sent data from external data supliers in all manners of mess. In this example we are going to control the data we pull ourselves. We are going to write a simple script that pulls 1000 postcodes from this (amazing) API called [postcodes.io](https://postcodes.io/) and writes it to a folder for us. As stated we control the extraction so we get to apply best practices here and write the data to a location in S3 that's partitioned correctly.

We are just going to call the `random/postcodes` part of the API to grab a single random postcode. The output of this API looks like this:

```json
{
    "status": 200,
    "result": {
        "postcode": "SN9 6DZ",
        "quality": 1,
        "eastings": 413198,
        "northings": 155014,
        "country": "England",
        "nhs_ha": "South West",
        "longitude": -1.812104,
        "latitude": 51.294105,
        "european_electoral_region": "South West",
        "primary_care_trust": "Wiltshire",
        "region": "South West",
        "lsoa": "Wiltshire 038C",
        "msoa": "Wiltshire 038",
        "incode": "6DZ",
        "outcode": "SN9",
        "parliamentary_constituency": "Devizes",
        "admin_district": "Wiltshire",
        "parish": "Upavon",
        "admin_county": null,
        "admin_ward": "Pewsey Vale",
        "ced": null,
        "ccg": "NHS Wiltshire",
        "nuts": "Wiltshire",
        "codes": {
            "admin_district": "E06000054",
            "admin_county": "E99999999",
            "admin_ward": "E05008382",
            "parish": "E04011843",
            "parliamentary_constituency": "E14000665",
            "ccg": "E38000206",
            "ced": "E99999999",
            "nuts": "UKK15"
        }
    }
}
```
So our script will call this api n times and put each response into a tabular format (using the `unpack_data` function). Our data is a list of dictionaries.

In [1]:
land_base_path = 's3://mojap-land/open_data/postcodes_example/'
raw_hist_base_path = 's3://mojap-raw-hist/open_data/postcodes_example/'
api_get = "https://api.postcodes.io/random/postcodes"
job_bucket = "alpha-curated-postcodes-example"
n = 10

In [2]:
import json
import os
from datetime import datetime

from urllib.request import urlopen

# Lil' function to take the api response and put into a tabular format
def unpack_data(data):
    new_dict = {}
    row = data['result']
    for c in row:
        if c != 'codes':
            new_dict[c] = row[c]
    for c in row["codes"]:
        new_dict["codes_" + c] = row["codes"][c]
    return new_dict

# Get the run timestamp of this script - use it as the file partition (note that I remove milliseconds)
run_timestamp = int(datetime.now().timestamp())
# Request the API n times
data = []
for i in range(0,n):
    f = urlopen(api_get)
    api_out = f.readlines()[0]
    row = json.loads(api_out)
    
    new_row = unpack_data(row)
    new_row['index'] = i
    data.append(new_row)
    
print(data[0])

{'postcode': 'NN2 8EG', 'quality': 1, 'eastings': 474654, 'northings': 264667, 'country': 'England', 'nhs_ha': 'East Midlands', 'longitude': -0.907267, 'latitude': 52.275095, 'european_electoral_region': 'East Midlands', 'primary_care_trust': 'Northamptonshire Teaching', 'region': 'East Midlands', 'lsoa': 'Northampton 003C', 'msoa': 'Northampton 003', 'incode': '8EG', 'outcode': 'NN2', 'parliamentary_constituency': 'Northampton North', 'admin_district': 'West Northamptonshire', 'parish': 'Kingsthorpe', 'admin_county': None, 'admin_ward': 'Kingsthorpe North', 'ced': None, 'ccg': 'NHS Northamptonshire', 'nuts': 'West Northamptonshire', 'codes_admin_district': 'E06000062', 'codes_admin_county': 'E99999999', 'codes_admin_ward': 'E05013257', 'codes_parish': 'E04013032', 'codes_parliamentary_constituency': 'E14000861', 'codes_ccg': 'E38000242', 'codes_ccg_id': '78H', 'codes_ced': 'E99999999', 'codes_nuts': 'TLF24', 'codes_lsoa': 'E01027146', 'codes_msoa': 'E02005652', 'codes_lau2': 'E0600006

Finally we want to write the data to S3 in the same file partitioning structure explained above. We can utilise other packages like gluejobutils to do this. As the data is a list of objects we'll export the data as new line delimited json.

In [3]:
import os
import gzip

from gluejobutils.s3 import (
    s3_path_to_bucket_key,
    s3_resource
)

def write_dicts_to_jsonl_gz(data, s3_path):
    file_as_string = json.dumps(data[0])
    for d in data[1:]:
        file_as_string += '\n'
        file_as_string += json.dumps(d)
    b, k = s3_path_to_bucket_key(s3_path)
    compressed_out = gzip.compress(bytes(file_as_string, 'utf-8'))
    s3_resource.Object(b, k).put(Body=compressed_out)

s3_out = os.path.join(land_base_path, 'random_postcodes', f'file_land_timestamp={run_timestamp}', f'random_postcodes_{run_timestamp}.jsonl.gz')
write_dicts_to_jsonl_gz(data, s3_out)

### Step 2: Validate the data

Now the data has be written to the land bucket we want to run a script that checks the data and then writes it to raw or raw-hist if it passes the checks. We are going to do something really simple here. Our script doesn't care if there are more than one file_land_timestamp partition. It will just run through each data chunk (assumes there will only ever be one data set per partition - we can make this assumption as we control the code that writes the data into land). The first test will see if the data we've pulled:

In [4]:
len_data = len(data)
if len_data < n:
    error = True
    print(f"TEST DATA SIZE: FAILED (size {len_data})")
else:
    print(f"TEST DATA SIZE: PASSED (size {len_data})")

TEST DATA SIZE: PASSED (size 10)


Good practice is to test your script against your metadata to make sure that the input data conforms to your metadata. In this example we just simply test to see if there is a column mismatch but in a proper pipeline you would probably test the meta data schema fully (i.e. datatypes, enums, regex and nullables). You can see the metadata we test the data against [here](meta_data/raw/random_postcodes.json). To get a better understanding of the structure of our metadata schemas read the README of [etl_manager](https://github.com/moj-analytical-services/etl_manager).

Finally if the code doesn't error out you want to move the data from `land` to `raw-hist` (and `raw` if you are using that bucket - which I am not in this example) then delete the data from `land` as it's now been moved. Because we keep the same `file_land_timestamp` and strict naming convention our files paths are easy to change (only the bucket changes). Moving data from `land` -> `raw-hist` so it's best to not write the data you have loaded in. Instead just copy and paste it via S3.

In [5]:
import pandas as pd
from gluejobutils import s3
from scripts.utils import (
    read_jsonl_from_s3
)

table_land_path = os.path.join(land_base_path, 'random_postcodes/')

error = False
meta = pd.read_json('meta_data/raw/random_postcodes.json')
colnames = [c['name'] for c in meta['columns']]

# Get all partitions then test each one
all_data_paths = s3.get_filepaths_from_s3_folder(table_land_path)
if len(all_data_paths) == 0:
    raise ValueError(f"Was expecting data in land but nothing was found in the folder: {table_land_path}")

for data_path in all_data_paths:
    print(f'TESTING {data_path}')
    data = read_jsonl_from_s3(data_path, compressed=True)

    # We expect n records
    len_data = len(data)
    if len_data < n:
        error = True
        print(f"TEST DATA SIZE: FAILED (size {len_data})")
    else:
        print(f"TEST DATA SIZE: PASSED (size {len_data})")

    # We might want to check the data against the our meta data (if we expect all the columns to exist)
    # If there is an error wait to test the rest of the data so you can see which other rows fail before raising an error
    error_str = ''
    for i, row in enumerate(data):
        col_mismatch = list(set(row.keys()).symmetric_difference(set(colnames)))
        if len(col_mismatch) > 0:
            error_str += f"row {i}: col mismatch: {', '.join(col_mismatch)}\n"
            error = True

    if error_str != '':
        print(error_str)

    if error:
        raise ValueError("Raising error due to one of the tests not passing. See log.")
    else:
        print("All tests passed!")
        print("Now writing to raw and deleting from land...")
        raw_hist_out = data_path.replace('s3://mojap-land/', 's3://mojap-raw-hist/')
        s3.copy_s3_object(data_path, raw_hist_out)
        s3.delete_s3_object(data_path)
        print("Done.")

TESTING s3://mojap-land/open_data/postcodes_example/random_postcodes/file_land_timestamp=1665425864/random_postcodes_1665425864.jsonl.gz
TEST DATA SIZE: PASSED (size 10)
All tests passed!
Now writing to raw and deleting from land...
Done.


### Step 3: Add the data (using spark)

In this step we are going to use spark to create our curated database (reading data from `raw-hist` and writing to `curated`). This requires `etl_manager`. The [readme](https://github.com/moj-analytical-services/etl_manager/blob/master/README.md) of this package is fairly indepth so I'm not going to reiterate information from that readme here. 

Our spark jobs (aka glue jobs) is a pyspark script that is ran on an AWS spark server. It is used to do bigger data transformations (in parallel) that is better to keep off our cluster (which current gives you the ability to run R/Python with single CPU). When writing spark scripts try to use Spark-SQL as much as possible to make it easier for others to understand. You may find there are times when a single line of code using the spark dataframe API will achieve the same result as 10 lines of SQL (so it's up to the data engineer to make a judgement on when to deviate from Spark-SQL syntax). Because we use pyspark always use the dataframe API or Spark-SQL as these are optimised (avoid using Resilient Distributed Datasets (RDDs) as they will be super slow in pyspark as summarised [here](https://databricks.com/glossary/what-is-rdd)).

Our spark code is going to do two things:

1. Read **all** the data from `raw-hist` (not just new data) and overwrite the table in curated. This is the laziest approach to updating your table (i.e. recreate it from scratch and delete the old version). This is also the cleanest approach as the overwrite means you don't often have to clean up half written files or clear out a temp folder of data you are using to update your table. In addition it can be quicker than trying to do a clever update process in spark - mainly because this often requires writing more than once to S3 which is time consuming.
2. Create a calculated table (a simple group by statement) and write it to a new partition (based on the date ran) in curated.

When you run a glue job your meta_data folder structure is uploaded to S3 for your glue job to reference as the meta data makes it easier to ensure your transforms output your data to the correct output. So we read in the metadata jsons from the glue job and make sure the data conforms to that meta data using the [`align_df_to_meta`](https://github.com/moj-analytical-services/gluejobutils/blob/master/gluejobutils/datatypes.py#L52-L76) function from gluejobutils. This is highly recommended to use this function before you write it out to S3 to ensure that your data aligns to the meta data schemas. Otherwise you might end up getting meta data mismatches between your data in S3 and what is declared in glue schema (meaning Athena will error out when it tries to parse your data via the glue schema)

When the glue job runs on AWS it requires a role to run it. This role needs permissions to access S3, use Athena, Glue, run spark jobs, etc... The role "airflow-postcodes-example-role" has been created with all the relevant permissions. See [iam_builder](https://github.com/moj-analytical-services/iam_builder) for instructions on how to generate an IAM policy based on a yaml or json configuration.

The full pyspark script can be found in the following path [`etl_pipeline_example_job/job.py`](etl_pipeline_example_job/job.py). This is the script that is ran on AWS's spark cluster. For triggering the spark script we need to have a separate python script which is run locally:

In [6]:
from etl_manager.etl import GlueJob
import datetime

job_bucket = "alpha-curated-postcodes-example"
iam_role = "airflow-postcodes-example-role"
github_tag = "v0.0.1"
snapshot_date = datetime.datetime.now().strftime("%Y-%m-%d")

# Get job parameters for specific glue job
job_args = {"--github_tag": github_tag, "--snapshot_date": snapshot_date}
job = GlueJob(f"etl_pipeline_example_job/", bucket = job_bucket, job_role = iam_role, job_arguments = job_args)

print(f'Starting job "{job.job_name}"...')
job.run_job()
job.wait_for_completion(verbose=True)

Starting job "etl_pipeline_example_job"...
2022-10-10 19:18:33: Job State: RUNNING | Execution Time: 3 (s) | Error: n/a
2022-10-10 19:18:43: Job State: RUNNING | Execution Time: 13 (s) | Error: n/a
2022-10-10 19:18:53: Job State: RUNNING | Execution Time: 23 (s) | Error: n/a
2022-10-10 19:19:04: Job State: RUNNING | Execution Time: 33 (s) | Error: n/a
2022-10-10 19:19:14: Job State: RUNNING | Execution Time: 44 (s) | Error: n/a
2022-10-10 19:19:24: Job State: RUNNING | Execution Time: 54 (s) | Error: n/a
2022-10-10 19:19:34: Job State: RUNNING | Execution Time: 64 (s) | Error: n/a
2022-10-10 19:19:44: Job State: SUCCEEDED | Execution Time: 72 (s) | Error: n/a


You can follow the progress of the glue job on the Glue console. It will be called example_job and should take less than a minute to complete. The metadata will be saved to s3://alpha-curated-postcodes-example/_GlueJobs_/example_job/.

### Step 4: Update the metadata

Now that the data has been processed with Spark and outputed to the curated bucket, all you need to do is declare your glue schema over the top of it to allow people to access the data as a database via Athena. You can do this using the agnostic meta data schemas we have. The script is relatively painless to do this final bit (thanks etl_manager).

Remember that this is schema on read so deleting the database doesn't touch the data the glue schema points to, all it does is delete the glue schema itself. It's a minor operation to create/delete and glue database so we opt to delete it and then recreate it everytime we do an update (this makes it easier if you have changed your meta-data schemas).

If your database has tables with file partitions (like our calculated table), then you need to run the `refresh_table_partitions` function. Otherwise if you query the table that has partitions in it Athena will return an empty table. Under the hood `refresh_table_partitions` is just the athena query `MSCK REPAIR TABLE db.table` for each table in the database.

In [7]:
from etl_manager.meta import read_database_folder

db = read_database_folder('meta_data/curated/')
db.delete_glue_database()
db.create_glue_database()
db.refresh_all_table_partitions()

### Step 5: Review the results

The script below grabs the database and table name from the `meta_data` directory and then runs an Athena query using the `pydbtools` library.

The count of rows returned from random_postcodes should correspond to the total number of rows saved in raw_hist. This will include data present from historic runs, not just the most recent run.

In [8]:
import pydbtools as pydb

with open('meta_data/curated/database.json') as f:
    database = json.load(f)
database_name = database['name']
with open('meta_data/curated/random_postcodes.json') as f:
    table = json.load(f)
table_name = table['name']
print(database_name, table_name)

print(pydb.read_sql_query(f"SELECT COUNT(*) AS Postcodes from {database_name}.{table_name}").values)

example_postcodes_db random_postcodes
[[130]]
