# MoJ AP tools demo

This notebook demonstrates the use of some of the Python tools developed by data engineers to make creating analytical pipelines simpler for data scientists and analysts.

It focuses on taking a large dataset which is too big for memory, converting it to another format while applying metadata to ensure consistent data types, and creating a database with tables from files or dataframes.

First import the necessary libraries. [pydbtools](https://github.com/moj-analytical-services/pydbtools), [arrow_pd_parser](https://github.com/moj-analytical-services/mojap-arrow-pd-parser) and [mojap_metadata](https://github.com/moj-analytical-services/mojap-metadata) are libraries created and maintained by the Data Modelling and Engineering Team.

In [None]:
import pydbtools as pydb
from arrow_pd_parser import reader, writer
from mojap_metadata import Metadata
import pandas as pd
import awswrangler as wr
import itertools

pd.options.display.max_columns = None

Create a new database, cleaning up any tables and data beforehand in case it already exists.

In [None]:
db = "dmet_example"

pydb.delete_database_and_data(db)
pydb.create_database(db)

We have a dataset that consists of a number of very large csv files. How can we load this without running out of memory and crashing our session?

In [None]:
big_path = "s3://alpha-everyone/s3_data_packer_test/land/big/"

# Don't run this!
# df = wr.s3.read_csv(big_path)

`arrow_pd_parser` has the ability to read files in chunks, returning an iterator of dataframes. Specify a number of lines to load with chunksize to preview the table.

In [None]:
df = next(reader.read(big_path, file_format="csv", chunksize=10, index_col=0))
df

Checking the data types we can see that `date_time` is a string but we would like it to be a timestamp.

In [None]:
df.dtypes

Create metadata to fix this using `mojap_metadata.Metadata`. Note that `arrow` rather than `pandas` types are used, and these will be enforced across formats.

In [None]:
metadata = Metadata.from_dict(
    {
        "name": "big_table",
        "columns": [
            { "name": n, "type": t }
            for n, t in [
                ("name", "string"),
                ("email", "string"),
                ("address", "string"),
                ("city", "string"),
                ("state", "string"),
                ("date_time", "timestamp(ms)"),
                ("price", "int64")
            ]
        ]
    }
)
metadata.columns

Now try previewing the data again with metadata enforced.

In [None]:
df = next(reader.read(
    big_path, 
    file_format="csv", 
    chunksize=10,
    index_col=0, 
    metadata=metadata
))
df

Note that `date_time` is now an object of `datetime.datetime` type as the `pandas` date/time types have too narrow a range.

In [None]:
df.dtypes

For the sake of this demo take a small slice, the first 5 chunks, of an iterator reading the whole data set.

In [None]:
r = itertools.islice(
    reader.read(
        big_path, 
        file_format="csv", 
        chunksize="100MB", 
        index_col=0, 
        metadata=metadata
    ),
    5
)

We can then convert between formats, in this case to parquet, while preserving the metadata.

In [None]:
new_path = "s3://alpha-everyone/dmet_st/big_table.parquet"
wr.s3.delete_objects(new_path)
writer.write(r, new_path, metadata=metadata)

Big datasets in S3 can be used to create a queryable table.

In [None]:
pydb.file_to_table(
    new_path,
    database=db,
    table="big_table",
    location="s3://alpha-everyone/dmet_st/dmet_example/big_table",
    chunksize="100MB",
    metadata=metadata
)
pydb.read_sql_query(f"select * from {db}.big_table limit 5")

Create a new table in the database from an SQL statement.

In [None]:
pydb.delete_table_and_data(database=db, table="state_revenues")

pydb.create_table(
    f"""
    select state, sum(price) as revenue
    from {db}.big_table
    group by state
    """,
    database=db,
    table="state_revenues",
    location="s3://alpha-everyone/dmet_st/dmet_example/state_revenues"
)

sr = pydb.read_sql_query(f"select * from {db}.state_revenues")
sr

What if we want to do some manipulation with pandas?

In [None]:
starts_with_n = sr[sr["state"].str.startswith("N")]
starts_with_n

We can then create another table in the database from the manipulated dataframe. This allows us to create a hybrid pipeline of SQL and pandas operations.

In [None]:
pydb.dataframe_to_table(
    starts_with_n,
    database=db,
    table="starts_with_n",
    location="s3://alpha-everyone/dmet_st/dmet_example/starts_with_n"
)

pydb.read_sql_query(f"select * from {db}.starts_with_n")