### Importing data and quickly examine the structure of data

In [1]:
import duckdb

In [2]:
read_bus_fin = ''' create table if not exists bus_fin as select * from read_csv('input/business-financial-data-march-2024-csv.csv') '''
read_bus_emp = ''' create table if not exists bus_emp as select * from read_csv('input/machine-readable-business-employment-data-mar-2024-quarter.csv') '''

duckdb.sql(read_bus_fin)
duckdb.sql(read_bus_emp)

After importing, I firstly did a `describe` statement on both datasets to have a look at their column structure and data types.

In [3]:
duckdb.sql('describe bus_fin')

┌──────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│   column_name    │ column_type │  null   │   key   │ default │  extra  │
│     varchar      │   varchar   │ varchar │ varchar │ varchar │ varchar │
├──────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ Series_reference │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Period           │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ Data_value       │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ Suppressed       │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ STATUS           │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ UNITS            │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Magnitude        │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ Subject          │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Group            │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Series_title_1   │ VARC

In [4]:
duckdb.sql('describe bus_emp')

┌──────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│   column_name    │ column_type │  null   │   key   │ default │  extra  │
│     varchar      │   varchar   │ varchar │ varchar │ varchar │ varchar │
├──────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ Series_reference │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Period           │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ Data_value       │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ Suppressed       │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ STATUS           │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ UNITS            │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Magnitude        │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ Subject          │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Group            │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ Series_title_1   │ VARC

The immediate issues I spotted with the raw imports are that:
- column name cases are all over the place - some in sentence case, some others in all caps
- some column names are using common reserved keywords such as `status` and `group`

Therefore, I built a quick cleaning tool to get rid of those wrinkles and give me a better starting point for query writing.

In [5]:
import re
tables = ['bus_fin', 'bus_emp']
kw_replace_list = ['period', 'status', 'units', 'group', 'subject']

def clean_input(tables: list, kw_replace_list: list):
    for table in tables:
        rel = duckdb.sql(f'select * from {table}')
        for col in rel.columns:
            if re.search(r'[A-Z]', col):
                new_col = col.lower()
                duckdb.sql(f'alter table {table} rename "{col}" to "{new_col}"')
            if col.lower() in kw_replace_list:
                new_col = f'{new_col}_col'
                duckdb.sql(f'alter table {table} rename "{col}" to "{new_col}"')
            print(f'{table}.{col} is renamed to {table}.{new_col}')

clean_input(tables=tables, kw_replace_list=kw_replace_list)

bus_fin.Series_reference is renamed to bus_fin.series_reference
bus_fin.Period is renamed to bus_fin.period_col
bus_fin.Data_value is renamed to bus_fin.data_value
bus_fin.Suppressed is renamed to bus_fin.suppressed
bus_fin.STATUS is renamed to bus_fin.status_col
bus_fin.UNITS is renamed to bus_fin.units_col
bus_fin.Magnitude is renamed to bus_fin.magnitude
bus_fin.Subject is renamed to bus_fin.subject_col
bus_fin.Group is renamed to bus_fin.group_col
bus_fin.Series_title_1 is renamed to bus_fin.series_title_1
bus_fin.Series_title_2 is renamed to bus_fin.series_title_2
bus_fin.Series_title_3 is renamed to bus_fin.series_title_3
bus_fin.Series_title_4 is renamed to bus_fin.series_title_4
bus_fin.Series_title_5 is renamed to bus_fin.series_title_5
bus_emp.Series_reference is renamed to bus_emp.series_reference
bus_emp.Period is renamed to bus_emp.period_col
bus_emp.Data_value is renamed to bus_emp.data_value
bus_emp.Suppressed is renamed to bus_emp.suppressed
bus_emp.STATUS is renamed to

In [6]:
duckdb.sql('describe bus_fin')

┌──────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│   column_name    │ column_type │  null   │   key   │ default │  extra  │
│     varchar      │   varchar   │ varchar │ varchar │ varchar │ varchar │
├──────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ series_reference │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ period_col       │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ data_value       │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ suppressed       │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ status_col       │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ units_col        │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ magnitude        │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ subject_col      │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ group_col        │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ series_title_1   │ VARC

In [7]:
duckdb.sql('describe bus_emp')

┌──────────────────┬─────────────┬─────────┬─────────┬─────────┬─────────┐
│   column_name    │ column_type │  null   │   key   │ default │  extra  │
│     varchar      │   varchar   │ varchar │ varchar │ varchar │ varchar │
├──────────────────┼─────────────┼─────────┼─────────┼─────────┼─────────┤
│ series_reference │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ period_col       │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ data_value       │ DOUBLE      │ YES     │ NULL    │ NULL    │ NULL    │
│ suppressed       │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ status_col       │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ units_col        │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ magnitude        │ BIGINT      │ YES     │ NULL    │ NULL    │ NULL    │
│ subject_col      │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ group_col        │ VARCHAR     │ YES     │ NULL    │ NULL    │ NULL    │
│ series_title_1   │ VARC

In [8]:
duckdb.sql('select * from bus_fin order by random() limit 10').show()

┌──────────────────┬────────────┬────────────┬────────────┬────────────┬───────────┬───────────┬────────────────────────────────┬─────────────────────────────────────────────────┬─────────────────────────────────────┬─────────────────────────────────────────────────┬────────────────┬────────────────┬────────────────┐
│ series_reference │ period_col │ data_value │ suppressed │ status_col │ units_col │ magnitude │          subject_col           │                    group_col                    │           series_title_1            │                 series_title_2                  │ series_title_3 │ series_title_4 │ series_title_5 │
│     varchar      │   double   │   double   │  varchar   │  varchar   │  varchar  │   int64   │            varchar             │                     varchar                     │               varchar               │                     varchar                     │    varchar     │    varchar     │    varchar     │
├──────────────────┼────────────┼──────────

In [9]:
duckdb.sql('select * from bus_emp order by random() limit 10').show()

┌──────────────────┬────────────┬─────────────┬────────────┬────────────┬───────────┬───────────┬────────────────────────────────┬──────────────────────────────────────────────┬────────────────────────────────────────┬──────────────────────────────────┬────────────────┬────────────────┬────────────────┐
│ series_reference │ period_col │ data_value  │ suppressed │ status_col │ units_col │ magnitude │          subject_col           │                  group_col                   │             series_title_1             │          series_title_2          │ series_title_3 │ series_title_4 │ series_title_5 │
│     varchar      │   double   │   double    │  varchar   │  varchar   │  varchar  │   int64   │            varchar             │                   varchar                    │                varchar                 │             varchar              │    varchar     │    varchar     │    varchar     │
├──────────────────┼────────────┼─────────────┼────────────┼────────────┼───────────┼

Now I can see column names are smartly named with no clashing with reserved keywords.

Since the data was imported via CSV files, there exists no column / table comments. Hence I'm missing some context with the data here. 

But, given the data is obtained from Stats NZ and hence publicly available, I went on to their website to see if they have a place detailing what the data is about.

Stats NZ does have two pages for the [business financial](https://www.stats.govt.nz/information-releases/business-financial-data-march-2024-quarter/) and [business employment](https://www.stats.govt.nz/information-releases/business-employment-data-march-2024-quarter/) data.

By inspecting the sample rows from both tables and the web info, it seems that the following columns are important in subsequent analytics:
- `period_col`: valid time period for the data (quaterly)
- `data_value`: the value defined by subsequent conditions
- `group_col`: type of relationship as characterised by the combination of filters series_title_1, series_title_2, etc.
- `series_title_1`, `series_title_2` ... : filters that collectively define `data_value`

### Question 1
Of the industries where salaries and wages data did NOT exist in 2016 and only appeared later, 
which industry had the highest average value for actual filled jobs across time and what was that value?

In [10]:
query_1 = '''
    select 
        series_title_2 as industy
        , round(avg(data_value), 1) as avg_actual_filled_jobs -- limit decimal places
    from bus_emp be
    inner join (
        select series_title_2 as industry 
        from bus_fin
        -- using lower() and trim() on series_title columns to prudently remove unwanted whitespaces and enable caseless comparison
        where lower(trim(series_title_1)) = 'salaries and wages'
        group by 1 
        having min(period_col) >= 2017 -- salaries and wages data appearing no earlier than 2017
    ) x
        on lower(trim(be.series_title_2)) = lower(trim(x.industry))
    where lower(trim(series_title_1)) = 'filled jobs'
        and lower(trim(series_title_3)) = 'actual'
    group by 1 
    order by 2 desc limit 1 -- picking the top 1 row with highest avg_actual_filled_jobs
'''

duckdb.sql(query=query_1)

┌──────────────┬────────────────────────┐
│   industy    │ avg_actual_filled_jobs │
│   varchar    │         double         │
├──────────────┼────────────────────────┤
│ Retail Trade │               194053.7 │
└──────────────┴────────────────────────┘

Answer: **"Retail Trade"** had the highest average value for actual filled jobs across time. The value is **194053.7** (rounded to one decimal place)

### Question 2
Provide the answer and write a DuckDB SQL query to show 
which year/month combination and industry 
had the second highest seasonally adjusted operating income sales 
across all the business industries in New Zealand that are categorised as NZSIOC level 2.

In [11]:
query_2 = '''
with summing_sales as (
    select 
        period_col as yearmonth
        , series_title_2 as industry
        , coalesce(sum(data_value), 0) as sum_sales -- null means data unavailable, hence coercing to zero
    from bus_fin
    where lower(trim(group_col)) = 'industry by financial variable (nzsioc level 2)'
        and lower(trim(series_title_1)) = 'sales (operating income)'
        and lower(trim(series_title_4)) = 'seasonally adjusted'
    group by 1, 2
)
, final as (
    select yearmonth, industry, sum_sales 
    from (
        select *, dense_rank() over(order by sum_sales desc) as rnk -- dense rank to ensure rnk = 2 is always available in case of tied sales sum
        from summing_sales
    ) x 
    where rnk = 2
)
select * from final
'''

duckdb.sql(query=query_2)

┌───────────┬─────────────────┬───────────┐
│ yearmonth │    industry     │ sum_sales │
│  double   │     varchar     │  double   │
├───────────┼─────────────────┼───────────┤
│   2023.03 │ Wholesale Trade │ 38810.022 │
└───────────┴─────────────────┴───────────┘

Answer: **"Wholesale Trade"** in **March 2023** had the second highest seasonally adjusted operating income sales across all the business industries in New Zealand that are categorised as NZSIOC level 2

### Question 3
Create a DuckDB SQL query to calculate the quarterly cumulative number of 
filled jobs over time for the territorial authority 
with the highest average value of filled jobs across time. 

You may not use window functions in your query.

In [12]:
query_3 = f'''
    with average_filled_jobs_ta as (
        select 
            series_title_2 as territorial_authority
            , coalesce(avg(data_value), 0) as average_filled_jobs
        from bus_emp
        where lower(trim(group_col)) = 'territorial authority by employment variable'
            and lower(trim(series_title_1)) = 'filled jobs'
        group by 1
    )
    , summing_filled_jobs as (
        -- sum fiiled jobs for the TA with max avg value of filled jobs ('Auckland', for this dataset)
        select 
            be.period_col
            , sum(be.data_value) as total_filled_jobs
        from bus_emp be
        where be.series_title_2 = ( select territorial_authority from average_filled_jobs_ta where average_filled_jobs = ( select max(average_filled_jobs) from average_filled_jobs_ta ) )
            and lower(trim(be.group_col)) = 'territorial authority by employment variable'
            and lower(trim(be.series_title_1)) = 'filled jobs'
        group by 1
    )
    , final as (
        select 
            s1.period_col
            , sum(s2.total_filled_jobs) as rolling_sum_total_filled_jobs
        from summing_filled_jobs s1
        inner join summing_filled_jobs s2 
            -- self join with range-matches to match single left hand with range right hand to calculate rolling sum
            on s1.period_col >= s2.period_col
        group by 1 
    )
    select * from final 
    order by 1
'''

duckdb.sql(query=query_3).show(max_rows=100)

┌────────────┬───────────────────────────────┐
│ period_col │ rolling_sum_total_filled_jobs │
│   double   │            double             │
├────────────┼───────────────────────────────┤
│    2011.06 │                      558627.0 │
│    2011.09 │                     1124066.0 │
│    2011.12 │                     1697795.0 │
│    2012.03 │                     2258168.0 │
│    2012.06 │                     2831106.0 │
│    2012.09 │                     3401783.0 │
│    2012.12 │                     3981507.0 │
│    2013.03 │                     4556314.0 │
│    2013.06 │                     5145225.0 │
│    2013.09 │                     5731581.0 │
│    2013.12 │                     6330161.0 │
│    2014.03 │                     6922160.0 │
│    2014.06 │                     7526621.0 │
│    2014.09 │                     8132406.0 │
│    2014.12 │                     8752492.0 │
│    2015.03 │                     9367389.0 │
│    2015.06 │                     9997399.0 │
│    2015.09 

#### Extra thought on Question 3
If window functions are allowed, the `final` CTE can be simplified as 
```sql
select 
    period_col
    , sum(total_filled_jobs) over(order by period_col rows between unbounded preceding and current row) as rolling_sum_total_filled_jobs
from summing_filled_jobs
```

### Question 4
Assume these datasets are used in part of a pipeline where the file that arrives 
may contain unwanted duplicates, incorrect datatypes, missing dates or other data quality aberrations. 

What things could be done programmatically to make sure the input data is of adequate quality and improve the pipeline?

### Answer - Q4
In the event of data quality issues, it is important to firstly define with stakeholders what are the likely issues that occur in the data, and how the issues ought to be addressed when they do happen. 

For the sake of discussion, let's just use these three issues as an example:
- unwanted duplicates
- incorrect data types
- missing data

If duplicates are 'unwanted', then we must have already established with the stakeholders that duplicates are not to be expected in the dataset. 

Also, incorrect data types will prevent the database engine to append incoming data to an exisiting table target. 

Hence for these two types of issues, the best way to deal with them is "quaratine, notify, and inspect", which means the pipeline should:
1. identify the presence of duplicated rows / columns with incorrect data types
1. if problematic rows are found, quaratine the rows in question in a separate table for later inspection
1. log any issues encountered, preferrably also alert the support engineer about the issue with automated email / messages
1. once all problematic rows are isolated, the remainder of dataset can be appended to target
1. once notified, the engineers can inspect the quarantined rows from the quarantine tables and decide how to deal with them manually, such as dropping these rows, correct data types etc..

Missing data can be processed differently. If stakeholders concur, it is possible to use imputation / inference to fill those missing values where possible, such as with the `.fillna()` method in pandas df. Alternatively, we could still apply the standard 5-step procedure above to deal with missing data issues.

There could be more profound issues with data such as inconsistent schema (incoming data having less / more columns than target). Checks for such issues shall be placed at the very beginning of all checks to induce early halt of pipeline for engineer's manual intervention when these issues occur, instead of wasting resources on further checks.

To better illustrate the ideology above, I have put together a `quality_check_dataset()` function below to programatically implement these ideas

In [13]:
from datetime import datetime
import logging
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
l = logging.getLogger('inpection_logger')



def quality_check_dataset(raw: str, load_target: str, expect_complete: list[str]):
    '''
    This functions validates the raw data quality in three aspects: duplicated records, missing data, incorrect data types.
    
    Parameters taken as follows:
    1. `raw`: string name of the incoming raw dataset
    2. `load_target`: string name of the target table that `raw` will be inserted to
    3. `expect_complete`: list of string column names in `raw` where no null values are to be expected
    '''

    inpsection_tstamp = datetime.now().strftime('%Y%m%d_%H%M%S')

    raw_cols = duckdb.sql(f'select * from {raw}').columns
    target_dtypes = duckdb.sql(f'select * from {load_target}').dtypes
    
    try_casts = ', '.join([f'try_cast({raw_cols[i]} as {target_dtypes[i]}) as __try_cast_{raw_cols[i]}' for i in range(len(raw_cols))])
    case_when_bad_casts = ' or '.join([f'( {raw_col} is not null and __try_cast_{raw_col} is null )' for raw_col in raw_cols])

    completeness_assertion = ' or '.join([f'{i} is null' for i in expect_complete])
    

    # duplicates
    quarantine_dups_destination = f'{raw}_dups_{inpsection_tstamp}'
    quartine_dups = f'''
        create table {quarantine_dups_destination} as 
            select max(__dup_rownum) as __dup_cnt, * exclude (__row_hash, __dup_rownum)
            from (
                select *, md5(raw_tbl::TEXT) as __row_hash, row_number() over(partition by md5(raw_tbl::TEXT) order by random()) as __dup_rownum
                from {raw} raw_tbl
            ) x
            group by all
            having max(__dup_rownum) > 1
    '''
    print(quartine_dups) # print out the quarantine query for demo purposes, not needed in real world
    duckdb.sql(quartine_dups)
    dup_cnt = duckdb.sql(f'select sum(__dup_cnt) as dup_cnt from {quarantine_dups_destination}').fetchall()[0][0]

    if dup_cnt:
        l.warning(f'Duplicated rows found, total={dup_cnt}, quarantined in {quarantine_dups_destination}')
    else:
        l.info('No duplicated data rows found')

    reconstruct_raw = f'''
        create or replace table {raw} as 
            select * from {raw}
            except 
            select * exclude(__dup_cnt) from {quarantine_dups_destination}
    '''
    duckdb.sql(reconstruct_raw)
    l.info(f'{raw} reconstructed, less {dup_cnt} rows due to removal of duplicated rows')
    l.info(f'{raw} has {duckdb.table(raw).shape[0]} rows remaining\n')

    # missing_data
    quartine_missing_data_destination = f'{raw}_missing_data_{inpsection_tstamp}'
    quartine_missing_data = f'''
        create table {quartine_missing_data_destination} as 
            select * from {raw} where {completeness_assertion}
    '''
    print(quartine_missing_data)
    duckdb.sql(quartine_missing_data)
    missing_data_cnt = duckdb.sql(f'select * from {quartine_missing_data_destination}').shape[0]
    
    if missing_data_cnt:
        l.warning(f'Missing data rows found, total={missing_data_cnt}, quarantined in {quartine_missing_data_destination}')
    else:
        l.info('No missing data rows found')
    

    reconstruct_raw = f'''
        create or replace table {raw} as 
            select * from {raw}
            except all
            select * from {quartine_missing_data_destination}
   '''
    duckdb.sql(reconstruct_raw)
    l.info(f'{raw} reconstructed, less {missing_data_cnt} rows due to removal of rows with missing data')
    l.info(f'{raw} has {duckdb.table(raw).shape[0]} rows remaining\n')


    # datatypes
    quartine_bad_dtypes_destination = f'{raw}_bad_dtypes_{inpsection_tstamp}'
    quartine_bad_dtypes = f'''
        create table {quartine_bad_dtypes_destination} as
            select columns('^[^_][^_].+') from (
                select *, case when {case_when_bad_casts} then 1 else 0 end as __has_bad_dtype
                from ( 
                    select *, {try_casts} from {raw}
                ) x
            ) x
            where __has_bad_dtype = 1
    '''
    print(quartine_bad_dtypes)
    duckdb.sql(quartine_bad_dtypes)
    bad_dtypes_cnt = duckdb.sql(f'select * from {quartine_bad_dtypes_destination}').shape[0]

    if bad_dtypes_cnt:
        l.warning(f'Rows where bad data types exist found, total={bad_dtypes_cnt}, quarantined in {quartine_bad_dtypes_destination}')
    else:
        l.info('No rows where bad data types exist found')

    reconstruct_raw = f'''
        create or replace table {raw} as 
            select * from {raw}
            except all
            select * from {quartine_bad_dtypes_destination}
    '''
    duckdb.sql(reconstruct_raw)
    l.info(f'{raw} reconstructed, less {bad_dtypes_cnt} rows due to removal of rows with bad data types')
    l.info(f'{raw} has {duckdb.table(raw).shape[0]} rows remaining\n')

    l.info(f'Now performing insert into {load_target}')
    duckdb.sql(f'insert into {load_target} select * from {raw}')
    l.info(f'{load_target} updated with {duckdb.table(raw).shape[0]} rows')
    l.info(f'{load_target} now has {duckdb.table(load_target).shape[0]} rows')



I have also prepared a simple `test_raw` and `test_target` tables for the purpose of demo.

The `test_raw` table has 3 columns and 5 rows. Both `c1_int` and `c3_tstamp` are expected to have no null values. Correct data types are in their column names.

The `quality_check_dataset` correctly identified that there are 2 rows that are duplicates (row 1, 2), 1 row with missing data (row 3), and 1 row with incorrect data types (row 4).

The remaining 1 row is then correctly inserted to `test_target`.

Sufficient logging is provided for engineer's later inspection.

In [14]:
duckdb.sql(
    '''
    create or replace table test_raw as 
        select '123' as c1_int,   'abc' as c2_str,   '2025-01-01 00:00:00' as c3_tstamp   union all
        select '123' as c1_int,   'abc' as c2_str,   '2025-01-01 00:00:00' as c3_tstamp   union all
        select null  as c1_int,   'abc' as c2_str,   null                  as c3_tstamp   union all
        select 'xxx' as c1_int,   'abc' as c2_str,   '2025-01-01 00:00:00' as c3_tstamp   union all
        select '456' as c1_int,   'abc' as c2_str,   '2025-01-01 00:00:00' as c3_tstamp 
    '''
)

duckdb.sql(
    '''
    create or replace table test_target as 
        select cast('888' as int) as c1_int,   cast('xyz' as string) as c2_str,   cast(now() as timestamp) as c3_tstamp
    '''
)

quality_check_dataset(raw='test_raw', load_target='test_target', expect_complete=['c1_int', 'c3_tstamp'])

2025-06-15 12:04:43 INFO     test_raw reconstructed, less 2 rows due to removal of duplicated rows
2025-06-15 12:04:43 INFO     test_raw has 3 rows remaining

2025-06-15 12:04:43 INFO     test_raw reconstructed, less 1 rows due to removal of rows with missing data
2025-06-15 12:04:43 INFO     test_raw has 2 rows remaining

2025-06-15 12:04:43 INFO     test_raw reconstructed, less 1 rows due to removal of rows with bad data types
2025-06-15 12:04:43 INFO     test_raw has 1 rows remaining

2025-06-15 12:04:43 INFO     Now performing insert into test_target
2025-06-15 12:04:43 INFO     test_target updated with 1 rows
2025-06-15 12:04:43 INFO     test_target now has 2 rows



        create table test_raw_dups_20250615_120443 as 
            select max(__dup_rownum) as __dup_cnt, * exclude (__row_hash, __dup_rownum)
            from (
                select *, md5(raw_tbl::TEXT) as __row_hash, row_number() over(partition by md5(raw_tbl::TEXT) order by random()) as __dup_rownum
                from test_raw raw_tbl
            ) x
            group by all
            having max(__dup_rownum) > 1
    

        create table test_raw_missing_data_20250615_120443 as 
            select * from test_raw where c1_int is null or c3_tstamp is null
    

        create table test_raw_bad_dtypes_20250615_120443 as
            select columns('^[^_][^_].+') from (
                select *, case when ( c1_int is not null and __try_cast_c1_int is null ) or ( c2_str is not null and __try_cast_c2_str is null ) or ( c3_tstamp is not null and __try_cast_c3_tstamp is null ) then 1 else 0 end as __has_bad_dtype
                from ( 
                    select *, try_cast(c1_i

In [15]:
duckdb.table('test_raw')

┌─────────┬─────────┬─────────────────────┐
│ c1_int  │ c2_str  │      c3_tstamp      │
│ varchar │ varchar │       varchar       │
├─────────┼─────────┼─────────────────────┤
│ 456     │ abc     │ 2025-01-01 00:00:00 │
└─────────┴─────────┴─────────────────────┘

In [16]:
duckdb.table('test_target')

┌────────┬─────────┬─────────────────────────┐
│ c1_int │ c2_str  │        c3_tstamp        │
│ int32  │ varchar │        timestamp        │
├────────┼─────────┼─────────────────────────┤
│    888 │ xyz     │ 2025-06-15 12:04:43.546 │
│    456 │ abc     │ 2025-01-01 00:00:00     │
└────────┴─────────┴─────────────────────────┘

### Question 5
Create summary statistics and perform a statistical analysis or create a model using the provided datasets. We are interested in your justification for your choices and reasoning. 

You may join the data to other datasets from https://www.stats.govt.nz/large-datasets/csv-files-fordownload/ if you wish

### Answer - Q5

First of all, I inspected both datasets and calculated summary stats using `summarize` in `duckdb.sql()`.

After inspection of both dataset and the summary , I'm interested in looking at, industry-wise, how do the change rates following metrics correlated with each other:
- profits
- expenditure
- salaries
- filled jobs

The reason why I chose change rates is that this could normalise data since these four metric all bear different units. Also, since profits / expenditure / salaries are from the business financial data set and recorded as quaterly values while filled jobs is from the business employment data set and recorded as monthly values, normalising values to yearly change rates could help to match data granularity.

I firstly built a SQL query to extract the yearly change rates for these metrics. Then I used `plotly` to visulise the results and identify correlations.

In [17]:
duckdb.sql('summarize bus_fin')

┌──────────────────┬─────────────┬─────────────────────────────────────────────────┬─────────────────────────────────────────────────┬───────────────┬────────────────────┬───────────────────┬────────────────────┬────────────────────┬────────────────────┬───────┬─────────────────┐
│   column_name    │ column_type │                       min                       │                       max                       │ approx_unique │        avg         │        std        │        q25         │        q50         │        q75         │ count │ null_percentage │
│     varchar      │   varchar   │                     varchar                     │                     varchar                     │     int64     │      varchar       │      varchar      │      varchar       │      varchar       │      varchar       │ int64 │  decimal(9,2)   │
├──────────────────┼─────────────┼─────────────────────────────────────────────────┼─────────────────────────────────────────────────┼───────────────┼───────

In [18]:
duckdb.sql('summarize bus_emp')

┌──────────────────┬─────────────┬────────────────────────────────┬──────────────────────────────────────────────┬───────────────┬───────────────────┬────────────────────┬────────────────────┬────────────────────┬───────────────────┬───────┬─────────────────┐
│   column_name    │ column_type │              min               │                     max                      │ approx_unique │        avg        │        std         │        q25         │        q50         │        q75        │ count │ null_percentage │
│     varchar      │   varchar   │            varchar             │                   varchar                    │     int64     │      varchar      │      varchar       │      varchar       │      varchar       │      varchar      │ int64 │  decimal(9,2)   │
├──────────────────┼─────────────┼────────────────────────────────┼──────────────────────────────────────────────┼───────────────┼───────────────────┼────────────────────┼────────────────────┼────────────────────┼───────

In [19]:
df_bus_metric_corr = duckdb.sql(
    f'''
    --beginsql
        with profits_expenditure_salaries_changes as (
            select *
                , round(( sum_profits_in_year - lag(sum_profits_in_year) over(partition by industry order by year_time) ) / ( lag(sum_profits_in_year) over(partition by industry order by year_time) ), 3) as profits_yearly_change
                , round(( sum_expenditure_in_year - lag(sum_expenditure_in_year) over(partition by industry order by year_time) ) / ( lag(sum_expenditure_in_year) over(partition by industry order by year_time) ), 3) as expenditure_yearly_change
                , round(( sum_salaries_in_year - lag(sum_salaries_in_year) over(partition by industry order by year_time) ) / ( lag(sum_salaries_in_year) over(partition by industry order by year_time) ), 3) as salaries_yearly_change
            from (
                select 
                    date_trunc('year', strptime(cast(period_col as string), '%Y.%m')) as year_time
                    , series_title_2 as industry
                    , sum(case when lower(trim(series_title_1)) = 'operating profit' then data_value else 0 end) as sum_profits_in_year
                    , sum(case when lower(trim(series_title_1)) = 'purchases and operating expenditure' then data_value else 0 end) as sum_expenditure_in_year
                    , sum(case when lower(trim(series_title_1)) = 'salaries and wages' then data_value else 0 end) as sum_salaries_in_year
                from bus_fin 
                where lower(trim(series_title_1)) in ( 'operating profit', 'purchases and operating expenditure', 'salaries and wages' )
                    and lower(trim(group_col)) = 'industry by financial variable (nzsioc level 1)'
                group by 1, 2
                having count(case when lower(trim(series_title_1)) = 'operating profit' then data_value else null end) = 4 -- ensuring all 4 quarter data present for a given year for profit
                    and count(case when lower(trim(series_title_1)) = 'purchases and operating expenditure' then data_value else null end) = 4 -- ensuring all 4 quarter data present for a given year for expenditure
                    and count(case when lower(trim(series_title_1)) = 'salaries and wages' then data_value else null end) = 4 -- ensuring all 4 quarter data present for a given year for salaries
            ) x
        )
        , filled_jobs_changes as (
            select *
                , round(( sum_filled_jobs_in_year - lag(sum_filled_jobs_in_year) over(partition by industry order by year_time) ) / ( lag(sum_filled_jobs_in_year) over(partition by industry order by year_time) ), 3) as filled_jobs_yearly_change
            from (
                select 
                    date_trunc('year', strptime(cast(period_col as string), '%Y.%m')) as year_time
                    , series_title_2 as industry
                    , sum(data_value) as sum_filled_jobs_in_year
                from bus_emp 
                where lower(trim(group_col)) = 'industry by employment variable'
                    and lower(trim(series_title_1)) = 'filled jobs'
                group by 1, 2
                having count(data_value) = 12 -- ensuring all 12 months data present for a given year for filled jobs
            ) x
        )
        , combined as (
            select 
                pesc.year_time
                , pesc.industry
                , pesc.profits_yearly_change 
                , pesc.expenditure_yearly_change
                , pesc.salaries_yearly_change
                , fjc.filled_jobs_yearly_change
            from profits_expenditure_salaries_changes pesc
            inner join filled_jobs_changes fjc on lower(trim(pesc.industry)) = lower(trim(fjc.industry)) and pesc.year_time = fjc.year_time
        )
        select * from combined
        where ( profits_yearly_change + expenditure_yearly_change + salaries_yearly_change + filled_jobs_yearly_change ) is not null
    --endsql
    '''
).to_df()

df_bus_metric_corr.head(20)

Unnamed: 0,year_time,industry,profits_yearly_change,expenditure_yearly_change,salaries_yearly_change,filled_jobs_yearly_change
0,2023-01-01,Health Care and Social Assistance,0.116,0.055,0.107,0.038
1,2018-01-01,Wholesale Trade,0.009,0.074,0.056,0.031
2,2019-01-01,Wholesale Trade,0.067,0.026,0.047,0.007
3,2020-01-01,Wholesale Trade,0.272,-0.025,0.006,-0.007
4,2021-01-01,Wholesale Trade,-0.025,0.161,0.09,0.014
5,2022-01-01,Wholesale Trade,0.045,0.206,0.098,0.03
6,2023-01-01,Wholesale Trade,0.026,0.017,0.055,0.017
7,2020-01-01,Mining,-0.284,-0.154,-0.092,0.005
8,2021-01-01,Mining,0.284,0.09,0.083,0.016
9,2022-01-01,Mining,0.267,0.185,0.135,0.029


In [20]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import pandas as pd

# reusable function for plot making
def explore_correlations(df: pd.DataFrame, grouping_var:str, time_series: str, time_interval: str, left_var: str, left_var_desc: str, right_var: str, right_var_desc: str):

    industries = sorted(df[grouping_var].unique().tolist())
    fig = make_subplots(rows=len(industries), cols=3, subplot_titles=(f'{left_var_desc} {time_interval}-on-{time_interval}', f'{right_var_desc} {time_interval}-on-{time_interval}', f'{left_var_desc}-{right_var_desc} correlation'))

    layout_dict = {}

    for idx, var in enumerate(industries):
        grouping_df = df[df[grouping_var] == var].sort_values(by=time_series)
        fig.add_trace(
            go.Bar(x=grouping_df[time_series], y=grouping_df[left_var], name=f'{left_var_desc} {time_interval}-on-{time_interval}', marker=dict(color='#002F6B'), zorder=1),
            row = idx+1, 
            col = 1
        )
        fig.add_trace(
            go.Bar(x=grouping_df[time_series], y=grouping_df[right_var], name=f'{right_var_desc} {time_interval}-on-{time_interval}', marker=dict(color='#002F6B'), zorder=1),
            row = idx+1,
            col = 2
        )
        fig.add_trace(
            go.Scatter(x=grouping_df[left_var], y=grouping_df[right_var], name=f'{left_var_desc}-{right_var_desc} correlation', mode='markers', marker=dict(color='#002F6B'), zorder=1),
            row = idx+1, 
            col = 3
        )

        slope, intercept = np.polyfit(grouping_df[left_var], grouping_df[right_var], 1)
        y_fit = slope * grouping_df[left_var] + intercept

        fig.add_trace(
            go.Scatter(x=grouping_df[left_var], y=y_fit, mode='lines', name=f'{var} Fit', line=dict(dash='solid', color='#FAA61A'), showlegend=False),
            row = idx+1, 
            col = 3
        )

        layout_dict.update(
            {
                f'xaxis{3*(idx+1)-2}': dict(title='year'),
                f'xaxis{3*(idx+1)-1}': dict(title='year'),
                f'xaxis{3*(idx+1)}': dict(title=f'{left_var_desc} {time_interval}-on-{time_interval}'),
                f'yaxis{3*(idx+1)-2}': dict(title=var),
                f'yaxis{3*(idx+1)}': dict(title=f'{right_var_desc} {time_interval}-on-{time_interval}'),
            }
        )

    layout_dict.update(
        {
            'height': 5000,
            'showlegend': False,
        }
    )

    fig.update_layout(**layout_dict)
    fig.update_yaxes(title_font=dict(size=12))

    fig.show()

#### business profits - business expenditure correlation

In [21]:
explore_correlations(df_bus_metric_corr, 'industry', 'year_time', 'year', 'profits_yearly_change', 'business profits change', 'expenditure_yearly_change', 'business expenditure change')

![profits-expenditure](./assets/profits_expenditure.png)

#### business profits - business salaries correlation

In [22]:
explore_correlations(df_bus_metric_corr, 'industry', 'year_time', 'year', 'profits_yearly_change', 'business profits change', 'salaries_yearly_change', 'business salaries change')

![profits-salaries](./assets/profits_salaries.png)

#### business profits - filled jobs correlation

In [23]:
explore_correlations(df_bus_metric_corr, 'industry', 'year_time', 'year', 'profits_yearly_change', 'business profits change', 'filled_jobs_yearly_change', 'business filled jobs change')

![profits-jobs](./assets/profits_jobs.png)

#### filled jobs - salaries correlation

In [24]:
explore_correlations(df_bus_metric_corr, 'industry', 'year_time', 'year', 'salaries_yearly_change', 'business salaries change', 'filled_jobs_yearly_change', 'business filled jobs change')

![profits-jobs](./assets/jobs_salaries.png)

### Conclusion for Q5:

Here are some key gists from the plots generated:
- **Agriculture, forestry and fishing**, **Mining**, and **Transport, postal and warehousing** industry show strong positive correlation between profit and and expenditure, which could indicate strong inclination to re-invest profits into upgrading equipment and infrastructure.
- **Wholesale trade** sees negative correlation between profit and and expenditure, which is especially pronounced during COVID time, indicating the soar of cost of goods and transport during that time
- It is interesting to see that increase rate of business profits do not necessarily correlated with the change rate of filled jobs. Especially in **Health care** and **Manufacturing** sectors. Reason for health care could be that COVID drives high demand of professional and support personnels albeit shrinking profit. For manufactuing it could be that with increased productivity and level of automation, more profits can be generated with the same amount of employees.
- **Agriculture, forestry and fishing** seems to be the only industry where filled jobs and salaries correlated negatively. **Wholesale trade** and **Transport, postal and warehousing** shows strong positive correlation between filled jobs and salaries, indicating these industries tend to be human labour heavy.