# Data Engineer - Technical Assessment

In this section of the interview at Beyond Finance, you will be assessed on your ability to perform several Data Engineering tasks. To perform well on this task, you will demonstate competence in the following areas:

* preprocessing data to prepare for a database load
* understanding entity relationships in a database
* merging data from different tables
* filtering data to relevant subsets
* calculating aggregations and descriptive statistics

It will be pretty difficult to complete all questions in the allotted time. Your goal is not to speed through the answers, but to come up with answers that demonstrate your knowledge. It's more about your thought process and logic than getting the right answer or your code.


## Getting Started

This exercise will be broken into 2 parts
1. Data Processing
2. Data Wrangling

### Data Processing
In this section you will take files from the ./raw_data/ subfolders, combine them into a single newline-delimited `json.gz` file per subfolder, and place that CSV file in a ./processed_data/ directory. You may have to do some light investigation into the data files to understand their file formats and delimiters

**Example**

Files
- ./raw_data/tracks/tracks_0.csv
- ./raw_data/tracks/tracks_1.json
- ./raw_data/tracks/tracks_2.csv
- etc... 

should be combined into a single file ./processed_data/tracks.json.gz

**What we look for**

- Can you handle all subfolders in a single pass over the raw data files?
- How can you limit memory consumption? (hint `chunksize`)

### Data Wrangling
For this section, we'll pretend you loaded the raw data plus additional tables into a small SQLite database containing roughly a dozen tables. **We've provided this database for you so don't worry about loading it yourself**. If you are not familiar with the SQLite database, it uses a fairly complete and standard SQL syntax, though does not many advanced analytics functions. Consider it just a remote datastore for storing and retrieving data from. 

![](db-diagram.png)

## Data Processing (40 minutes)

In [1]:
!pip install memory_profiler
import pandas as pd 
import os
import shutil
from io import StringIO
import gzip


%load_ext memory_profiler

You should consider upgrading via the 'pip install --upgrade pip' command.[0m




In [2]:
%%memit

# Some setup stuff to ensure idempotence of each run
outdir = "processed_data"
    
if os.path.exists(outdir):
    shutil.rmtree(outdir)

os.makedirs("processed_data")


for root, dirs, files in os.walk("./raw_data"):
    if dirs:
        #initial iteration will be the root dir with the subdirectories in it...skip this.
        continue
    
    outfile_name = os.path.join(outdir, f"{os.path.basename(root)}.json.gz")
    print(f"Writing aggregated results out to {outfile_name}")
    
    # Open a gzip file in append-bytes mode to continually write  bytes to it
    # the compression should occur when the bytes are written to file
    # allowing gzip to maximize the compression ratio since it is compressing
    # the full range of bytes not just each individual chunk
    with gzip.open(outfile_name, "ab") as of:
        for f in files:
            print(f"Processing file {f}")
            _, extenstion = os.path.splitext(f)
            if extenstion == '.json':
                # Open df in as TextIterator using chunksize option to read in 1000 lines at a time
                # tried with 10000, 5000, 1000, 500 .
                # 1000 seems to be the sweet spot usually having a 
                # max usage of 95MiB. Oddly it was better than 500 which was also slower.
                df_iterator = pd.read_json(os.path.join(root, f), lines=True, chunksize=1000)
                for df in df_iterator:
                    # Instantiate a brand new buffer each loop. Using the same buffer and truncating each time 
                    # led to very slow iteration.
                    buffer = StringIO()
                    df.to_json(buffer, orient="records", lines=True)
                    
                    # Seek to the begining of the buffer to prepare to read
                    buffer.seek(0)
                    
                    # write out this chunk gzip file
                    of.write(buffer.read().encode())

            if extenstion == '.csv':
                df_iterator = pd.read_csv(os.path.join(root, f), chunksize=1000)
                for df in df_iterator:
                    buffer = StringIO()
                    df.to_json(buffer, orient="records", lines=True)
                    
                    buffer.seek(0)
                    of.write(buffer.read().encode())
                    


Writing aggregated results out to processed_data/tracks.json.gz
Processing file tracks_1.json
Processing file tracks_0.csv
Processing file tracks_2.csv
Processing file tracks_4.csv
Processing file tracks_3.json
Writing aggregated results out to processed_data/playlist_track.json.gz
Processing file playlist_track_0.csv
Processing file playlist_track_2.csv
Processing file playlist_track_4.csv
Processing file playlist_track_1.json
Processing file playlist_track_3.json
Writing aggregated results out to processed_data/orders.json.gz
Processing file orders_3.json
Processing file orders_2.csv
Processing file orders_0.csv
Processing file orders_4.csv
Processing file orders_1.json
Writing aggregated results out to processed_data/track_facts.json.gz
Processing file track_facts_0.csv
Processing file track_facts_2.csv
Processing file track_facts_3.json
Processing file track_facts_4.csv
Processing file track_facts_1.json
peak memory: 95.29 MiB, increment: 19.59 MiB


In [3]:
# Test to ensure data was properly written to gzipped json files
for root, _, files in os.walk("./processed_data"):
    for f in files:
        pd.read_json(os.path.join(root, f), orient="records", lines=True, compression="gzip")

## Data Wrangling (20 minutes)

In [4]:
!pip install ipython-sql
%load_ext sql 
%sql sqlite:///db/sqlite/chinook.db

You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [5]:
import sqlite3

con = sqlite3.connect("db/sqlite/chinook.db")

### 1. How many different customers are there?

In [6]:
%%sql
-- CustomerId is primary key so no need for distinct
SELECT COUNT(CustomerId) FROM customers;

 * sqlite:///db/sqlite/chinook.db
Done.


COUNT(CustomerId)
59


### 2. How long is the longest track in minutes?

In [7]:
%%sql
SELECT (CAST(MAX(Milliseconds) AS DOUBLE PRECISION) / 1000) / 60 AS minutes FROM tracks;

 * sqlite:///db/sqlite/chinook.db
Done.


minutes
88.11588333333334


### 3. Which genre has the shortest average track length?

In [8]:
%%sql
-- Select the top record of the list of genres sorted by their average track length
SELECT g.name
FROM tracks t
JOIN genres g
ON t.GenreId = g.GenreId
GROUP BY t.GenreId
ORDER BY AVG(Milliseconds) ASC
LIMIT 1;

 * sqlite:///db/sqlite/chinook.db
Done.


Name
Rock And Roll


### 4. Which artist shows up in the most playlists?

In [9]:
%%sql

-- Join playlists to artists and group by artists name then orderby distinct playlistIds and take the top

SELECT ar.Name, COUNT(distinct p.PlayListId) as playlists
FROM playlists p
JOIN playlist_track pt
    ON p.PlaylistId = pt.PlaylistId
JOIN tracks t
    ON t.TrackId = pt.TrackId
JOIN albums ab
    ON ab.AlbumId = t.AlbumId
JOIN artists ar
    ON ar.ArtistId = ab.ArtistId
GROUP BY ar.Name
ORDER BY COUNT(distinct p.PlayListId) DESC
LIMIT 1;
    






 * sqlite:///db/sqlite/chinook.db
Done.


Name,playlists
Eugene Ormandy,7


### 5. What was the most popular album among these customers?

In [10]:
%%sql

-- Join invoices to albums and group by album title order by number of invoice lines for each album. Take the top.

SELECT a.Title, COUNT(distinct ii.InvoiceLineId) tracks_from_album_sold
FROM invoices i
JOIN invoice_items ii
    ON i.InvoiceId = ii.InvoiceId
JOIN tracks t
    ON ii.TrackId = t.TrackId
JOIN albums a
    ON a.AlbumId = t.AlbumId
GROUP BY a.Title
ORDER BY COUNT(distinct ii.InvoiceLineId) DESC
LIMIT 1;

 * sqlite:///db/sqlite/chinook.db
Done.


Title,tracks_from_album_sold
Minha Historia,27
