# Data Engineer Take Home Exercise

## Question
### Data Prep

Write a script to transform input CSV to desired output CSV and Parquet. 

You will find a CSV file in the files folder under `data.csv`. There are three steps to this part of the test. Each step concerns manipulating the values for a single field according to the step's requirements. The steps are as follows:

**String cleaning** - The bio field contains text with arbitrary padding, spacing and line breaks. Normalize these values to a space-delimited string.

**Code swap** - There is a supplementary CSV in the files folder under `state_abbreviations`. This "data dictionary" contains state abbreviations alongside state names. For the state field of the input CSV, replace each state abbreviation with its associated state name from the data dictionary.

**Date offset** - The start_date field contains data in a variety of formats. These may include e.g., "June 23, 1912" or "5/11/1930" (month, day, year). But not all values are valid dates. Invalid dates may include e.g., "June 2018", "3/06" (incomplete dates) or even arbitrary natural language. Add a start_date_description field adjacent to the start_date column to enable filtering invalid date values for analysts. Normalize all valid date values in start_date to ISO 8601 (i.e., YYYY-MM-DD).

Your script should take `data.csv` as input and produce a cleansed `enriched.csv` and `enriched.snappy.parquet` files according to the step requirements above.

## Submission Guidelines
We ask that your solutions be implemented in Python (3.8 or newer) or PySpark (3.3 or newer). If you would like to present skills for both approach, feel free to prepare two separate jupyter notebooks. Assume that code will be used monthly to process the data and store it in AWS S3 based data lake. With that assumption please prepare for discussion how this code can be scheduled and how outputs should be stored in S3 bucket.

### Assessment Criteria
Our goal is not to fool you. On the contrary, we would like to see you in your best light! We value clean, DRY and documented code; and in the interest of full disclosure, our assessment criteria is outlined below (in order of significance):

1. Your ability to effectively solve the problems posed.
1. Your ability to solve these problems in a clear and logical manner, with tasteful design.
1. Your ability to appropriately document and comment your code.



## 1. Reading a file line by line using csv.reader
The simplest solution. Generally effective for smaller to medium-sized files when we don't need to load the entire file into memory at once.

#### String cleaning

In [1]:
import csv

try:
    with (
        open("./data/data.csv", "r", encoding="utf-8", newline="") as infile,
        open("./output/data_1.csv", "w", encoding="utf-8", newline="") as outfile
        ):


        reader = csv.reader(infile, delimiter=",")
        writer = csv.writer(outfile, delimiter=",")

        header = next(reader)
        writer.writerow(header)

        for row in reader:
            if len(row) >= 9 and row[8]:  # Ensure there are at least 9 columns and the 9th is not empty
                row[8] = " ".join(row[8].split())  # Normalize spaces in the 9th column
            writer.writerow(row)

except FileNotFoundError as e:
    print(f"Error: {e}")

except Exception as e:
    print(f"An unexpected error occurred: {e}")


#### Code swap

In [2]:
 import csv

# from pathlib import Path

# Load state abbreviations into a dictionary
states = {}
with open("./data/state_abbreviations.csv", encoding="utf-8", newline="") as f:
    reader = csv.DictReader(f)
    for row in reader:
        code = row["state_abbr"].strip()
        name = row["state_name"].strip()
        states[code] = name


# Process main data file
with open("./output/data_1.csv", "r", encoding="utf-8", newline="") as infile, \
     open("./output/data_2.csv", "w", encoding="utf-8", newline="") as outfile:

    reader = csv.DictReader(infile)
    writer = csv.DictWriter(outfile, fieldnames=reader.fieldnames)

    writer.writeheader()

    missing_states = set()

    for row in reader:
        state_code = row.get("state")

        if state_code:
            state_code = state_code.strip()
            if state_code in states:
                row["state"] = states[state_code]
            else:
                missing_states.add(state_code)

        writer.writerow(row)

    if missing_states:
        print("Missing state codes:", ", ".join(sorted(missing_states)))



#### Date offset

In [3]:
import csv
from dateutil import parser

def date_validate(date_str):
    #"""Validate and change format to ISO 8601."""
    try:
        # parse date
        parsed_date = parser.parse(date_str, yearfirst=True)

        # return date in format YYYY-MM-DD
        return parsed_date.strftime('%Y-%m-%d'), "Valid date"
    except (ValueError, TypeError):
        # If date is invalid - desc - invalid
        return "", "Invalid date"

input_path = "./output/data_2.csv"
output_path = "./output/data_3.csv"

with open(input_path, "r", encoding="utf-8", newline="") as infile, \
     open(output_path, "w", encoding="utf-8", newline="") as outfile:

    reader = csv.DictReader(infile)

    # add new column to existing fieldnames
    fieldnames = reader.fieldnames + ["start_date_description"]
    writer = csv.DictWriter(outfile, fieldnames=fieldnames)

    writer.writeheader()

    for row in reader:
        date_str = row.get("start_date")  # change if column name differs

        formatted_date, description = date_validate(date_str)
        row["start_date"] = formatted_date
        row["start_date_description"] = description

        writer.writerow(row)


 #### Create enriched.csv and enriched.snappy.parquet

In [4]:
!pip3 install -U pandas pyarrow

Defaulting to user installation because normal site-packages is not writeable


In [5]:
import csv
from dateutil import parser
import pandas as pd


# -------------------- Configuration --------------------

STATE_ABBREVIATIONS_PATH = "./data/state_abbreviations.csv"
INPUT_PATH = "./data/data.csv"
OUTPUT_CSV_PATH = "./output/enriched.csv"
OUTPUT_PARQUET_PATH = "./output/enriched.snappy.parquet"

STATE_COLUMN = "state"
START_DATE_COLUMN = "start_date"
START_DATE_DESC_COLUMN = "start_date_description"

CLEAN_TEXT_COLUMN = "bio"

# -------------------- Helpers --------------------

def date_validate(date_str):
    """Validate a date string and return ISO 8601 format + description."""
    if not date_str:
        return "", "Missing date"

    try:
        parsed_date = parser.parse(date_str, yearfirst=True)
        return parsed_date.strftime("%Y-%m-%d"), "Valid date"
    except (ValueError, TypeError):
        return "", "Invalid date"


def load_state_lookup(path):
    """Load state abbreviations into a dictionary."""
    states = {}
    with open(path, encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            code = row["state_abbr"].strip()
            name = row["state_name"].strip()
            states[code] = name
    return states


def normalize_whitespace(value):
    """Normalize internal whitespace in a string."""
    return " ".join(value.split())


# -------------------- Main Processing --------------------

def enrich_csv(input_path, output_csv_path, output_parquet_path, state_lookup):
    missing_states = set()
    enriched_rows = []

    with open(input_path, encoding="utf-8", newline="") as infile, \
         open(output_csv_path, "w", encoding="utf-8", newline="") as outfile:

        reader = csv.DictReader(infile)
        fieldnames = reader.fieldnames + [START_DATE_DESC_COLUMN]
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()

        for row in reader:
            # Optional string cleaning
            if CLEAN_TEXT_COLUMN and row.get(CLEAN_TEXT_COLUMN):
                row[CLEAN_TEXT_COLUMN] = normalize_whitespace(row[CLEAN_TEXT_COLUMN])

            # State code → full name
            state_code = row.get(STATE_COLUMN)
            if state_code:
                state_code = state_code.strip()
                if state_code in state_lookup:
                    row[STATE_COLUMN] = state_lookup[state_code]
                else:
                    missing_states.add(state_code)

            # Date validation / normalization
            formatted_date, description = date_validate(row.get(START_DATE_COLUMN))
            row[START_DATE_COLUMN] = formatted_date
            row[START_DATE_DESC_COLUMN] = description

            writer.writerow(row)
            enriched_rows.append(row)  # store for parquet


    # Create Parquet with Snappy
    # It needs to switch to pandas - the standard csv module doesn’t handle Parquet.
    if enriched_rows:
        df = pd.DataFrame(enriched_rows)
        df.to_parquet(output_parquet_path, engine="pyarrow", compression="snappy", index=False)

    if missing_states:
        print("Missing state codes:", ", ".join(sorted(missing_states)))


# -------------------- Entry Point --------------------

def main():
    try:
        state_lookup = load_state_lookup(STATE_ABBREVIATIONS_PATH)
        enrich_csv(INPUT_PATH, OUTPUT_CSV_PATH, OUTPUT_PARQUET_PATH, state_lookup)
        print("Enrichment complete! CSV and Parquet files created.")
    except FileNotFoundError as e:
        print(f"File not found: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")


if __name__ == "__main__":
    main()


Enrichment complete! CSV and Parquet files created.



#### Using PANDAS

Here’s a chunked pandas version:
- the same logic as above
- to optimize this for memory efficiency (especially with large files)
- instead of collecting all rows into a list, we process and write each chunk to Parquet as we go.
- this will reduce memory usage since we’re not holding everything in memory at once

In [6]:
#!pip install -U pandas pyarrow
import csv
from dateutil import parser
import pandas as pd
import pyarrow as pa

# -------------------- Configuration --------------------

STATE_ABBREVIATIONS_PATH = "./data/state_abbreviations.csv"
INPUT_PATH = "./data/data.csv"
OUTPUT_CSV_PATH = "./output/enriched_1.csv"
OUTPUT_PARQUET_PATH = "./output/enriched_1.snappy.parquet"

STATE_COLUMN = "state"
START_DATE_COLUMN = "start_date"
START_DATE_DESC_COLUMN = "start_date_description"

CLEAN_TEXT_COLUMN = "bio"

# -------------------- Helpers --------------------

def date_validate(date_str):
    """Validate a date string and return ISO 8601 format + description."""
    if not date_str:
        return "", "Missing date"

    try:
        parsed_date = parser.parse(date_str, yearfirst=True)
        return parsed_date.strftime("%Y-%m-%d"), "Valid date"
    except (ValueError, TypeError):
        return "", "Invalid date"


def load_state_lookup(path):
    """Load state abbreviations into a dictionary."""
    states = {}
    with open(path, encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            code = row["state_abbr"].strip()
            name = row["state_name"].strip()
            states[code] = name
    return states


def normalize_whitespace(value):
    """Normalize internal whitespace in a string."""
    return " ".join(value.split())


# -------------------- Main Processing --------------------

def enrich_csv(input_path, output_csv_path, output_parquet_path, state_lookup):
    missing_states = set()

    # Open CSV for writing
    with open(input_path, encoding="utf-8", newline="") as infile, \
         open(output_csv_path, "w", encoding="utf-8", newline="") as outfile:

        reader = csv.DictReader(infile)
        fieldnames = reader.fieldnames + [START_DATE_DESC_COLUMN]
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()

        # Temporary DataFrame to accumulate rows for Parquet in chunks
        chunk_rows = []

        # Process each row
        for row in reader:
            # Optional string cleaning
            if CLEAN_TEXT_COLUMN and row.get(CLEAN_TEXT_COLUMN):
                row[CLEAN_TEXT_COLUMN] = normalize_whitespace(row[CLEAN_TEXT_COLUMN])

            # State code → full name
            state_code = row.get(STATE_COLUMN)
            if state_code:
                state_code = state_code.strip()
                if state_code in state_lookup:
                    row[STATE_COLUMN] = state_lookup[state_code]
                else:
                    missing_states.add(state_code)

            # Date validation / normalization
            formatted_date, description = date_validate(row.get(START_DATE_COLUMN))
            row[START_DATE_COLUMN] = formatted_date
            row[START_DATE_DESC_COLUMN] = description

            # Write to CSV
            writer.writerow(row)

            # Collect row for Parquet
            chunk_rows.append(row)

            # Every 1000 rows, write to Parquet
            if len(chunk_rows) >= 1000:
                df_chunk = pd.DataFrame(chunk_rows)
                df_chunk.to_parquet(output_parquet_path, engine="pyarrow", compression="snappy", index=False)
                chunk_rows = []  # Clear the list after writing chunk

        # Write any remaining rows to Parquet
        if chunk_rows:
            df_chunk = pd.DataFrame(chunk_rows)
            df_chunk.to_parquet(output_parquet_path, engine="pyarrow", compression="snappy", index=False)

    if missing_states:
        print("Missing state codes:", ", ".join(sorted(missing_states)))


# -------------------- Entry Point --------------------

def main():
    try:
        state_lookup = load_state_lookup(STATE_ABBREVIATIONS_PATH)
        enrich_csv(INPUT_PATH, OUTPUT_CSV_PATH, OUTPUT_PARQUET_PATH, state_lookup)
        print("Enrichment complete! CSV and Parquet files created.")
    except FileNotFoundError as e:
        print(f"File not found: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")


if __name__ == "__main__":
    main()


Enrichment complete! CSV and Parquet files created.


In [7]:
print("enriched_1.snappy.parquet - schema\n")
!parquet-tools schema ./output/enriched_1.snappy.parquet
print("\n\n")

print("enriched_1.snappy.parquet - row count\n")
!parquet-tools row-count ./output/enriched_1.snappy.parquet
print("\n\n")

print("enriched.snappy.parquet - row count\n")
!parquet-tools row-count ./output/enriched.snappy.parquet
print("\n\n")

print("enriched.csv - row count\n")
!cat ./output/enriched.csv | wc -l
print("\n\n")

print("enriched.csv - head -3\n")
!head -3 ./output/enriched.csv
print("\n\n")

print("data.csv - head -3\n")
!head -3 ./data/data.csv
print("\n\n")


enriched_1.snappy.parquet - schema

{"Tag":"name=schema, inname=Schema","Fields":[{"Tag":"name=name, inname=Name, type=BYTE_ARRAY, convertedtype=UTF8, logicaltype=STRING, repetitiontype=OPTIONAL, encoding=RLE_DICTIONARY"},{"Tag":"name=gender, inname=Gender, type=BYTE_ARRAY, convertedtype=UTF8, logicaltype=STRING, repetitiontype=OPTIONAL, encoding=RLE_DICTIONARY"},{"Tag":"name=birthdate, inname=Birthdate, type=BYTE_ARRAY, convertedtype=UTF8, logicaltype=STRING, repetitiontype=OPTIONAL, encoding=RLE_DICTIONARY"},{"Tag":"name=address, inname=Address, type=BYTE_ARRAY, convertedtype=UTF8, logicaltype=STRING, repetitiontype=OPTIONAL, encoding=RLE_DICTIONARY"},{"Tag":"name=city, inname=City, type=BYTE_ARRAY, convertedtype=UTF8, logicaltype=STRING, repetitiontype=OPTIONAL, encoding=RLE_DICTIONARY"},{"Tag":"name=state, inname=State, type=BYTE_ARRAY, convertedtype=UTF8, logicaltype=STRING, repetitiontype=OPTIONAL, encoding=RLE_DICTIONARY"},{"Tag":"name=zipcode, inname=Zipcode, type=BYTE_ARRAY, c

#### Open points for discussion:
1. AWS S3 Security and Access Control

Implement robust IAM roles and policies to enforce least-privilege access to S3 buckets.

Access should be clearly separated by environment, data layer, and user/service responsibilities.

2. Data Partitioning Strategy

Partition data based on ingestion (lake arrival) timestamp, typically at a daily granularity.

Follow Hive-compatible directory conventions: _<bucket>/<layer>/year/month/day/_

This enables efficient query pruning and scalable data organization.

3. Data Layering

Adopt a multi-layer data lake architecture:
* Raw layer: immutable source data as ingested.
* Curated layer: transformed and standardized data

Curated datasets should retain the same file naming and structural alignment as raw data where feasible to support traceability and simplify debugging and issue resolution.

4. Monitoring and Error Handling

Implement end-to-end monitoring to validate that all expected files are processed successfully.

Define clear error-handling mechanisms for failed processing, including logging, retry strategies, quarantine locations, and alerting.

5. Data Reconciliation

Perform reconciliation checks between source and target datasets, including file counts and row-level comparisons, to ensure data completeness and consistency.

6. Processing Models: Batch vs. Streaming

Select processing approaches based on latency, scalability, and operational requirements:
* Scheduled batch processing via notebooks or jobs (e.g., Athena or SageMaker), triggered using cron-based schedules.
* Time-based container execution using ECS tasks on AWS Fargate (serverless compute) or equivalent platforms such as Google Cloud Run to avoid Kubernetes overhead.
* Event-driven processing using AWS Lambda triggered by S3 object creation events (via EventBridge) to support near-real-time ingestion.

7. Consider using PySpark instead of pandas
8. Evaluate a more suitable file format—most likely Parquet.
