# Data Engineer - Technical Assessment

In this section of the interview at Beyond Finance, you will be assessed on your ability to perform several Data Engineering tasks. To perform well on this task, you will demonstate competence in the following areas:

* preprocessing data to prepare for a database load
* understanding entity relationships in a database
* merging data from different tables
* filtering data to relevant subsets
* calculating aggregations and descriptive statistics

It will be pretty difficult to complete all questions in the allotted time. Your goal is not to speed through the answers, but to come up with answers that demonstrate your knowledge. It's more about your thought process and logic than getting the right answer or your code.


## Getting Started

This exercise will be broken into 2 parts
1. Data Processing
2. Data Wrangling

### Data Processing
In this section you will take files from the ./raw_data/ subfolders, combine them into a single newline-delimited `json.gz` file per subfolder, and place that CSV file in a ./processed_data/ directory. You may have to do some light investigation into the data files to understand their file formats and delimiters

**Example**

Files
- ./raw_data/tracks/tracks_0.csv
- ./raw_data/tracks/tracks_1.json
- ./raw_data/tracks/tracks_2.csv
- etc... 

should be combined into a single file ./processed_data/tracks.json.gz

**What we look for**

- Can you handle all subfolders in a single pass over the raw data files?
    
    Yes, I designed the script so that each sub-folder is touched exactly once and every raw file inside it is streamed only once.

    1. run_all() loops over the dataset folders single time and inside process() the files are sorted and read each one sequentially. The CSV reader uses chunksize and JSON reader goes line by line.

    2. When reading, i directly write to gzip(or append to it if resuming after checkpoint), so there is no second pass

- What if the file sizes are in GigaBytes? Can your code (if run on a standard laptop) load the files without going out of memory? (hint `chunksize`)

    Yes. I read CSVs in chunks and JSON files line-by-line, writing each record straight to the gzip. That means only one chunk or one line sits in memory at once, so multi-GB source files run fine on a standard laptop without exhausting RAM.

- Can you identify edge cases? What scenarios could break your code?

    For simplicity as of now data is written as is, but if not stronger checks are applied it would break due to following reasons
    
    1. Duplicate data:
     As of now I observed the entire files data are duplicated. Assuming we have GBs/TBs of files I only flag the first exact duplicate via a whole-row SHA-1 and keep loading; removing them in Python would cost more RAM/CPU than it saves. If the files were small I’d dedupe in-memory, but for GB-scale loads I assume true dedup happens later in the warehouse (e.g. silver layer processing).

    2. Schema drift:
        Whenever a column name first appears I log it and pass the data through unchanged, so nothing is lost (and the table will grow horizontally when new columns appear). This can be handled in downstream layers using mergeschema/ rename.

    3. Invalid data-types:
     A price that arrives as a string or a date in a different format is written as is. The assumption is that stronger type-checks happen in curated layers.

    4. Incorrect JSON – Any line that fails json.loads is skipped and logged, keeping the data load alive as of now but that needs to be handled correctly.

    5. Mid-run failures:
     Data lands in *.json.gz.part and a checkpoint records the last raw file completed. If the job stops due to any reason the partial file and checkpoint remain, so we can rerun or simply delete and reload without assuming partial output for finished data.


- Please directly respond to the above questions in your submission.

### Data Wrangling
For this section, we'll pretend you loaded the raw data plus additional tables into a small SQLite database containing roughly a dozen tables. **We've provided this database for you so don't worry about loading it yourself**. If you are not familiar with the SQLite database, it uses a fairly complete and standard SQL syntax, though does not many advanced analytics functions. Consider it just a remote datastore for storing and retrieving data from. 

![](db-diagram.png)

## Data Processing

In [2]:
import pandas as pd 

#!pip install memory_profiler
%load_ext memory_profiler

In [None]:
from pathlib import Path
import json, gzip, logging, os, pandas as pd
from hashlib import sha1
from datetime import datetime

RAW_DIR = Path("raw_data")
OUT_DIR = Path("processed_data")
LOG_DIR = Path("logs")

OUT_DIR.mkdir(exist_ok=True)
LOG_DIR.mkdir(exist_ok=True)

log_path = LOG_DIR / f"run_{datetime.now():%Y%m%d_%H%M%S}.log"
root = logging.getLogger()
root.setLevel(logging.INFO)
root.handlers.clear()
fh = logging.FileHandler(log_path, mode="w", encoding="utf-8")
fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
root.addHandler(fh)
root.info("job started")

#utility functions
def clean_cols(rec):
    out = {}
    for k, v in rec.items():
        out[k.split(".")[-1]] = v
    return out

def row_hash(rec):
    s = json.dumps(rec, sort_keys=True)
    return sha1(s.encode()).digest()

def csv_rows(path, cols):
    for chunk in pd.read_csv(path, chunksize=50_000):
        for r in chunk.to_dict(orient="records"):
            r = clean_cols(r)
            for c in r:
                cols.add(c)
            yield r

def json_rows(path, cols, stats):
    with path.open() as fh:
        for line in fh:
            line = line.strip()
            if not line:
                continue
            try:
                r = clean_cols(json.loads(line))
                for c in r:
                    cols.add(c)
                yield r
            except json.JSONDecodeError:
                stats["bad_json"] += 1
                logging.warning("bad json in %s", path)

def load_ckpt(cp):
    if cp.exists():
        with cp.open() as f:
            name = f.readline().strip()
            rows = int(f.readline().strip() or 0)
            return name, rows
    return None, 0

def save_ckpt(cp, filename, rows):
    with cp.open("w") as f:
        f.write(f"{filename}\n{rows}\n")

#Process data
def process(ds):
    folder = RAW_DIR / ds
    if not folder.is_dir():
        print("missing:", ds)
        return

    ckpt = OUT_DIR / f"{ds}.ckpt"
    last_done, rows_done = load_ckpt(ckpt)

    part = OUT_DIR / f"{ds}.json.gz.part"
    final = OUT_DIR / f"{ds}.json.gz"
    mode  = "at" if rows_done else "wt"

    stats = {"rows": rows_done, "bad_json": 0, "dup_seen": False}
    cols_seen, hashes = set(), set()

    try:
        gz = gzip.open(part, mode, encoding="utf-8")

        for src in sorted(folder.iterdir()):
            if last_done and src.name <= last_done:
                continue

            ext = src.suffix.lower()
            if ext == ".csv":
                reader = csv_rows(src, cols_seen)
            elif ext in (".json"):
                reader = json_rows(src, cols_seen, stats)
            else:
                logging.info("skipped %s", src)
                continue

            for rec in reader:
                h = row_hash(rec)
                if not stats["dup_seen"] and h in hashes:
                    stats["dup_seen"] = True
                    logging.info("duplicate rows detected in %s", ds)
                hashes.add(h)

                new_cols = []
                for col in rec:
                    if col not in cols_seen:
                        new_cols.append(col)
                        cols_seen.add(col)
                if new_cols:
                    logging.info("new columns in %s: %s", ds, ", ".join(new_cols))

                gz.write(json.dumps(rec) + "\n")
                stats["rows"] += 1

            save_ckpt(ckpt, src.name, stats["rows"])

        gz.close()
        os.replace(part, final)
        ckpt.unlink(missing_ok=True)
        print(f"{ds}: rows={stats['rows']}  bad_json={stats['bad_json']}  "
              f"dup_seen={'yes' if stats['dup_seen'] else 'no'}  columns={len(cols_seen)}")

    except Exception as e:
        gz.close()
        logging.exception("pipeline stopped in %s: %s", ds, e)
        print("error processing", ds, "-", e)
        return

def run_all():
    for d in ("orders", "playlist_track", "track_facts", "tracks"):
        process(d)
    logging.shutdown()
    print("log file:", log_path)


In [28]:
%%memit
run_all()

orders: rows=1120000  bad_json=0  dup_seen=yes  columns=6
playlist_track: rows=4357500  bad_json=0  dup_seen=yes  columns=2
track_facts: rows=1751500  bad_json=0  dup_seen=yes  columns=3
tracks: rows=1751500  bad_json=0  dup_seen=yes  columns=9
log file: logs/run_20250426_160855.log
peak memory: 190.25 MiB, increment: 96.33 MiB


In [29]:
!pip install ipython-sql

Collecting ipython-sql
  Downloading ipython_sql-0.5.0-py3-none-any.whl.metadata (17 kB)
Collecting prettytable (from ipython-sql)
  Downloading prettytable-3.16.0-py3-none-any.whl.metadata (33 kB)
Collecting sqlalchemy>=2.0 (from ipython-sql)
  Downloading sqlalchemy-2.0.40-cp310-cp310-macosx_11_0_arm64.whl.metadata (9.6 kB)
Collecting sqlparse (from ipython-sql)
  Downloading sqlparse-0.5.3-py3-none-any.whl.metadata (3.9 kB)
Collecting ipython-genutils (from ipython-sql)
  Downloading ipython_genutils-0.2.0-py2.py3-none-any.whl.metadata (755 bytes)
Downloading ipython_sql-0.5.0-py3-none-any.whl (20 kB)
Downloading sqlalchemy-2.0.40-cp310-cp310-macosx_11_0_arm64.whl (2.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m24.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading ipython_genutils-0.2.0-py2.py3-none-any.whl (26 kB)
Downloading prettytable-3.16.0-py3-none-any.whl (33 kB)
Downloading sqlparse-0.5.3-py3-none-any.whl (44 kB)
[

## Data Wrangling

In [30]:
%load_ext sql 
%sql sqlite:///db/sqlite/chinook.db

In [32]:
import sqlite3

con = sqlite3.connect("db/sqlite/chinook.db")

In [33]:
cursor = con.cursor()

In [35]:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()

print("List of tables:", [table for table in tables])

List of tables: [('albums',), ('sqlite_sequence',), ('artists',), ('customers',), ('employees',), ('genres',), ('invoices',), ('invoice_items',), ('media_types',), ('playlists',), ('playlist_track',), ('tracks',), ('sqlite_stat1',)]


In [39]:
df = pd.read_sql("SELECT * FROM customers LIMIT 5;", con)
print(df)

   CustomerId  FirstName     LastName  \
0           1       Luís    Gonçalves   
1           2     Leonie       Köhler   
2           3   François     Tremblay   
3           4      Bjørn       Hansen   
4           5  František  Wichterlová   

                                            Company  \
0  Embraer - Empresa Brasileira de Aeronáutica S.A.   
1                                              None   
2                                              None   
3                                              None   
4                                  JetBrains s.r.o.   

                           Address                 City State         Country  \
0  Av. Brigadeiro Faria Lima, 2170  São José dos Campos    SP          Brazil   
1          Theodor-Heuss-Straße 34            Stuttgart  None         Germany   
2                1498 rue Bélanger             Montréal    QC          Canada   
3                 Ullevålsveien 14                 Oslo  None          Norway   
4                

### 1. How many different customers are there?

In [42]:
cursor.execute("SELECT COUNT(DISTINCT CustomerId) FROM customers")
count_row = cursor.fetchone()[0]
print("Distinct customers:", count_row)

Distinct customers: 59


### 2. How long is the longest track in minutes?

In [52]:
query = """
    SELECT Name, Milliseconds, ROUND(Milliseconds /(1000*60.0), 2) AS length_minutes
    FROM tracks
    ORDER BY 2 DESC
    LIMIT 5;
"""
df = pd.read_sql(query, con)
print(df)

                          Name  Milliseconds  length_minutes
0       Occupation / Precipice       5286953           88.12
1      Through a Looking Glass       5088838           84.81
2  Greetings from Earth, Pt. 1       2960293           49.34
3      The Man With Nine Lives       2956998           49.28
4  Battlestar Galactica, Pt. 2       2956081           49.27


In [55]:
cursor.execute(
    """
    SELECT Name, ROUND(Milliseconds /(1000*60.0), 2) AS length_minutes
    FROM tracks
    ORDER BY 2 DESC
    LIMIT 1;
    """
)

max_minutes = cursor.fetchone()
print("Longest track length (minutes):", max_minutes)

Longest track length (minutes): ('Occupation / Precipice', 88.12)


### 3. Which genre has the shortest average track length?

In [58]:
query = """
    select g.Name, ROUND(AVG(t.Milliseconds) / (1000*60.0), 2) AS avg_length_minutes
    from tracks t, genres g
    where t.GenreId = g.GenreId
    group by 1
    order by 2
    limit 5;
"""
df = pd.read_sql(query, con)
print(df)

             Name  avg_length_minutes
0   Rock And Roll                2.24
1           Opera                2.91
2     Hip Hop/Rap                2.97
3  Easy Listening                3.15
4      Bossa Nova                3.66


In [59]:
cursor.execute(
    """
    select g.Name, ROUND(AVG(t.Milliseconds) / (1000*60.0), 2) AS avg_length_minutes
    from tracks t, genres g
    where t.GenreId = g.GenreId
    group by 1
    order by 2
    limit 1;
    """
)

shortest_avg = cursor.fetchone()
print("track with shortest avg length", shortest_avg)

track with shortest avg length ('Rock And Roll', 2.24)


### 4. Which artist shows up in the most playlists?

In [70]:
query = """
select distinct PlaylistId from playlist_track
    limit 5;
"""
df = pd.read_sql(query, con)
print(df)

   PlaylistId
0           1
1           3
2           5
3           8
4           9


In [71]:
query = """
    select a.Name, count(distinct pt.PlaylistId)
    from artists a, albums al, tracks t, playlist_track pt
    where a.ArtistId = al.ArtistId
    and al.AlbumId = t.AlbumId
    and t.TrackId = pt.TrackId
    group by 1
    order by 2 desc
    limit 5;
"""
df = pd.read_sql(query, con)
print(df)

                                                Name  \
0                                     Eugene Ormandy   
1                                 The King's Singers   
2                   English Concert & Trevor Pinnock   
3      Berliner Philharmoniker & Herbert Von Karajan   
4  Academy of St. Martin in the Fields & Sir Nevi...   

   count(distinct pt.PlaylistId)  
0                              7  
1                              6  
2                              6  
3                              6  
4                              6  


In [73]:
cursor.execute(
    """
    select a.Name, count(distinct pt.PlaylistId)
    from artists a, albums al, tracks t, playlist_track pt
    where a.ArtistId = al.ArtistId
    and al.AlbumId = t.AlbumId
    and t.TrackId = pt.TrackId
    group by 1
    order by 2 desc
    limit 1;
    """
)

artist = cursor.fetchone()
print("artist in most playlist", artist)

artist in most playlist ('Eugene Ormandy', 7)


### 5. What album had the most purchases?

In [76]:
query = """
    select al.Title, i.InvoiceLineid, i.InvoiceId, i.UnitPrice, i.Quantity
    from albums al, tracks t, invoice_items i
    where al.AlbumId = t.AlbumId
    and t.TrackId = i.TrackId
    order by 1,2 
    limit 5;
"""
df = pd.read_sql(query, con)
print(df)

                    Title  InvoiceLineId  InvoiceId  UnitPrice  Quantity
0  ...And Justice For All            317         60       0.99         1
1  ...And Justice For All            890        165       0.99         1
2  ...And Justice For All            891        165       0.99         1
3  ...And Justice For All           1464        270       0.99         1
4  ...And Justice For All           1465        270       0.99         1


In [77]:
cursor.execute(
    """
    select al.Title, count(distinct i.InvoiceLineid)
    from albums al, tracks t, invoice_items i
    where al.AlbumId = t.AlbumId
    and t.TrackId = i.TrackId
    group by 1
    order by 2 desc
    limit 1;
    """
)

album = cursor.fetchone()
print("album with most purchase", album)

album with most purchase ('Minha Historia', 27)


### 6. Which customer has the highest number of sales in terms of dollars?

In [80]:
cursor.execute(
    """
    select concat(c.FirstName, ' ', c.LastName) as customer_name
    from customers c, invoices i
    where c.CustomerId = i.CustomerId
    group by 1
    order by sum(i.Total) desc
    limit 1;
    """
)

cust = cursor.fetchone()[0]
print("customer with highest sales: ", cust)

customer with highest sales:  Helena Holý


### 7. Count of customers who have dollar sales more than $40?

In [81]:
cursor.execute(
    """
    with base as (select customerId, sum(Total) from invoices
    group by 1
    having sum(Total) > 40)
    select count(customerId) from base
    """
)

greaterthan40 = cursor.fetchone()[0]
print("customer with sales > 40: ", greaterthan40)

customer with sales > 40:  14
