# Data Engineer - Technical Assessment

In this section of the interview at Beyond Finance, you will be assessed on your ability to perform several Data Engineering tasks. To perform well on this task, you will demonstate competence in the following areas:

* preprocessing data to prepare for a database load
* understanding entity relationships in a database
* merging data from different tables
* filtering data to relevant subsets
* calculating aggregations and descriptive statistics

It will be pretty difficult to complete all questions in the allotted time. Your goal is not to speed through the answers, but to come up with answers that demonstrate your knowledge. It's more about your thought process and logic than getting the right answer or your code.


## Getting Started

This exercise will be broken into 2 parts
1. Data Processing
2. Data Wrangling

### Data Processing
In this section you will take files from the ./raw_data/ subfolders, combine them into a single newline-delimited `json.gz` file per subfolder, and place that CSV file in a ./processed_data/ directory. You may have to do some light investigation into the data files to understand their file formats and delimiters

**Example**

Files
- ./raw_data/tracks/tracks_0.csv
- ./raw_data/tracks/tracks_1.json
- ./raw_data/tracks/tracks_2.csv
- etc... 

should be combined into a single file ./processed_data/tracks.json.gz

**What we look for**

- Can you handle all subfolders in a single pass over the raw data files? 
  - [Murali Parimi] My process handles all subfolders in one single pass one by one
- What if the file sizes are in GigaBytes? Can your code (if run on a standard laptop) load the files without going out of memory? (hint `chunksize`)
  - [Murali Parimi] I am using generators and it loads one row at a time to memory. I have provided a pandas version as well in this assignment in a separate cell
     - The pure python generator version may fail if the json files come in as an array of json objects
- Can you identify edge cases? What scenarios could break your code?
  - [Murali Parimi]
    - deduplication logic not in place, may bloat volume
    - quality checks to ensure dollar amounts makes sense, names are really names, dates are dates etc. -- this will not break code though
    - if a different format files or compressed files are landed, we may see failures or at least being logged
    - if we have a very very large lines that couldnt fit in the memory even though we are reading one line at a time.

- Please directly respond to the above questions in your submission.

### Data Wrangling
For this section, we'll pretend you loaded the raw data plus additional tables into a small SQLite database containing roughly a dozen tables. **We've provided this database for you so don't worry about loading it yourself**. If you are not familiar with the SQLite database, it uses a fairly complete and standard SQL syntax, though does not many advanced analytics functions. Consider it just a remote datastore for storing and retrieving data from. 

![](db-diagram.png)

## Data Processing

In [None]:
import pandas as pd 

!pip install memory_profiler
!pip install ijson
%pip install ipython-sql
%load_ext memory_profiler

### Python File Processor

In [None]:

import os
import json
import gzip
import csv
import pandas as pd
from pathlib import Path
from datetime import datetime

LANDING_ZONE = Path("./raw_data")
PROCESSED_ZONE = Path("./processed_data")
PROCESSED_ZONE.mkdir(exist_ok=True)
LOG_ZONE = Path("./logs")

LOG_FILE = LOG_ZONE / f"unsupported_files_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

def read_pd_csv(file_path):
    for chunk in pd.read_csv(file_path, chunksize=10000):
        yield from chunk.to_dict(orient="records")

def read_pd_json(file_path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            first_char = f.read(1)
            f.seek(0)
            if first_char == "[":
                data = json.load(f)
                for row in data:
                    yield row
            else:
                for line in f:
                    if line.strip():
                        try:
                            yield json.loads(line)
                        except json.JSONDecodeError:
                            continue
    except Exception as e:
        print(f"Error reading {file_path}: {e}")

def pd_process_folder(subfolder_path):
    output_path = PROCESSED_ZONE / f"{subfolder_path.name}.json.gz"
    with gzip.open(output_path, "wt", encoding="utf-8") as out_file:
        for file in subfolder_path.iterdir():
            if not file.is_file():
                continue

            ext = file.suffix.lower()
            if ext == ".csv":
                for record in read_pd_csv(file):
                    json.dump(record, out_file)
                    out_file.write("\n")
            elif ext == ".json":
                for record in read_pd_json(file):
                    json.dump(record, out_file)
                    out_file.write("\n")
            else:
                print(f"Unsupported file format: {file}")

def read_csv(file_path):
    with open(file_path, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

def read_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        first_char = f.read(1)
        f.seek(0)
        if first_char == '[':
            # Entire list-style JSON
            data = json.load(f)
            for item in data:
                yield item
        else:
            for line in f:
                line = line.strip()
                if line:
                    try:
                        yield json.loads(line)
                    except json.JSONDecodeError:
                        continue

def read_with_ijson(file_path):
    """
    reads records from a JSON file contained in array using ijson.
    """
    import ijson
    with open(file_path, 'r', encoding='utf-8') as f:
        for record in ijson.items(f, 'item'):
            yield record

def read_records(file_path):
    ext = file_path.suffix.lower()
    if ext == '.csv':
        yield from read_csv(file_path)
    elif ext == '.json':
        yield from read_json(file_path)
    else:
        with open(LOG_FILE, "a") as log_file:
            log_file.write(f"[SKIPPED] unsupported file format: {file_path}")


def process_folder(subfolder):
    output_path = PROCESSED_ZONE / f"{subfolder.name}.json.gz"
    with gzip.open(output_path, 'wt', encoding='utf-8') as out_file:
        for file in subfolder.iterdir():
            if file.is_file() and file.suffix.lower() in {'.csv', '.json'}:
                for record in read_records(file):
                    json.dump(record, out_file)
                    out_file.write('\n')

def main(option):
    for folder in LANDING_ZONE.iterdir():
        if folder.is_dir():
            if option == 'pandas':
                print(f"processing with pandas: {folder.name}")
                process_folder(folder)
            else:
                print(f"processing in python generators : {folder.name}")
                pd_process_folder(folder)                





In [None]:
%%memit
main("pandas")

## Data Wrangling

In [None]:
pip install prettytable==2.5.0

In [None]:
import prettytable
prettytable.DEFAULT_STYLE = prettytable.PLAIN_COLUMNS

In [None]:
%load_ext sql 
%sql sqlite:///db/sqlite/chinook.db

In [None]:
import sqlite3

con = sqlite3.connect("db/sqlite/chinook.db")

### 1. How many different customers are there? 
**Answer** 
- 59

In [None]:
%%sql
select count(distinct customerid) from customers

### 2. How long is the longest track in minutes?

**Answer**
- 88 minutes

In [None]:
%%sql
select round(max(Milliseconds) / (1000 * 60)) from tracks

### 3. Which genre has the shortest average track length?

**Answer**
- Sci Fi & Fantasy has 49 minutes of average playtime

In [None]:
%%sql
select g.name, round(avg(milliseconds)/ (1000*60)) avg_length from tracks t join genres g on t.genreid = g.genreid
group by g.name 
order by avg_length desc
limit 1

### 4. Which artist shows up in the most playlists?

**Answer**
- "None/NA" appears to be the most with their tracks appearing in 12 playlists.
- Additional information:  "None" or "NA" shows as most played (2262 tracks) followed by "Steve Harris" with 193 tracks

In [None]:
%%sql
select composer, count(distinct playlistid) as num_playlists from playlist_track p join tracks t on p.trackid = t.trackid
group by composer
order by num_playlists desc
limit 1




### 5. What album had the most purchases?

**Answer**

Categories
-  most purchases by dollar amount : "Greatest Hits" with $232.81
-  most purchases by number of invoices:  "Greatest Hits" with 16 different invoices


In [None]:
%%sql


with title_invoice as (
    select a.title, i.invoiceid from invoice_items i join tracks t on i.trackid = t.trackid join albums a on t.albumid = a.albumid 
    group by a.title, i.invoiceid
)
select title, sum(total) as total_sales from title_invoice ti join invoices i on ti.invoiceid = i.invoiceid
group by title
order by total_sales desc
limit 1






### 6. Which customer has the highest number of sales in terms of dollars?

**Answer**
- Helena Holý

In [None]:
%%sql
select c.Firstname || " " || c.Lastname as name from invoices I join customers c on i.customerid = c.customerid group by name order by sum(total) desc limit 1

### 7. Count of customers who have dollar sales more than $40?

**Answer**

- 14 customers

In [None]:
%%sql
select count(*) from (select customerid, sum(total) total_dollars from invoices group by customerid having total_dollars > 40) a