###  Case Study

Sample kafka message:
```json
{
  "country": "PH",
  "app_version": "113.0.0",
  "city": "PH_MNL",
  "$os": "Android",
  "language": "en_ph",
  "$wifi": "true",
  "$network_type": "WIFI",
  "$ip": "111.222.333.444",
  "$screen_height": "1600",
  "$referrer_title": "Lalamove",
  "trigger_time": "2025-04-11T06:27:38Z",
  "user_type": "personal_user",
  "$device_id": "123412341234",
  "platform_type": "Android",
  "aaid": "11111111-2222-3333-4444-f0f355df5b6b",
  "$province": "马尼拉都会区",
  "$app_id": "hk.xxxxxx.app.client",
  "$lib_method": "code",
  "$os_version": "14",
  "$is_first_day": "false",
  "$city": "帕西市",
  "$model": "SM-A047F",
  "$screen_width": "720",
  "global_source": "android",
  "data_center": "SIN",
  "$brand": "SAMSUNG",
  "env": "prd",
  "$app_version": "113.0.0",
  "$lib": "Android",
  "arg3": "onLocationChanged",
  "arg2": "8",
  "$is_channel_callback_event": "false",
  "$country": "菲律宾",
  "$app_name": "Lalamove",
  "arg1": "b_poi_select",
  "$lib_version": "6.3.5.5",
  "$timezone_offset": "-480",
  "sdkVer": "guapp-1.0.1.39",
  "$manufacturer": "SAMSUNG",
  "country_id": "50000",
  "$is_login_id": "true",
  "city_id": "51001",
  "$track_signup_original_id": "0755bbcc"
}
```

It is rewrote based on a real kafka message from our production environment, PII related infomation has been anomynized. From the content in this payload, please try your best to answer these following questions:

1. What is the purpose for this kafka message and how can it be triggered and captured?
2. Any important attributes are not included from you understanding? 
3. Any suggestions to enrich the attributes or improve the schema design for this message 
4. Select one popular database product (mysql, pgsql, hive etc), and provide us the methods for json string processing, and explain the pro & con for the different solutions
5. Assume you are a data engineer and the analytic team ask you to preprocess this kafka topic for standardization, try to design a flexible & efficient schema for this request

### SQL
#### Config

In [1]:
import pandas as pd
import duckdb
con = duckdb.connect(':memory:')

tables = [
    'day_summary', ## pk: ['month','market_id','day_type']
    'driver_wallet_transaction', ## pk: ['txn_id']
    'market', ## pk: ['market_id']
    'order_info', ## pk: ['order_id']
    'user_summary', ## pk: ['month', 'market_id']
]

for t in tables:
    con.execute(f'''
        CREATE TABLE IF NOT EXISTS {t} AS
        SELECT * FROM read_csv_auto('data/{t}.csv')    
        ;
    '''
)

#### Tables

In [2]:
con.sql('select * from driver_wallet_transaction').fetchdf()
# txn_id: auto-increment id (PK)
# txn_type: ORDER/CASHOUT/OTHER_REWARD
# driver_id: identfier of the driver
# txn_tms: timestamp of the transaction
# txn_amt_usd: transaction amount in usd
# balance_before: wallet balance before transaction
# balance_after: wallet balance after transaction
# related_order_id: related order id when txn_type == 'ORDER', can duplicate
# market_id: driver located market

Unnamed: 0,txn_id,txn_type,driver_id,txn_tms,txn_amt_usd,balance_before,balance_after,related_order_id,market_id
0,1,ORDER,123,2025-01-01 00:12:00,10,0,10,345.0,1
1,2,ORDER,123,2025-01-01 00:12:00,20,10,30,345.0,1
2,3,CASHOUT,123,2025-01-01 00:24:00,-30,30,0,,1
3,4,ORDER,123,2025-01-01 00:35:00,40,0,40,200.0,1
4,5,ORDER,123,2025-01-01 11:12:00,10,40,50,300.0,1
5,6,ORDER,123,2025-02-01 00:12:00,20,50,70,400.0,1
6,7,ORDER,123,2025-03-01 00:12:00,30,70,100,500.0,1
7,8,ORDER,234,2025-02-01 00:12:00,20,0,20,600.0,1
8,9,ORDER,234,2025-02-01 00:12:00,30,20,50,700.0,1
9,10,ORDER,234,2025-02-02 00:12:00,40,50,90,800.0,1


In [3]:
con.sql('select * from order_info').fetchdf()
## item: 3 possible values - FOOD/DOCUMENT/GROCERY only
## For 1 order exists in order_info
    # there is at least 1 related record in driver wallet transaction table

Unnamed: 0,order_id,gmv_usd,item
0,345,1000,FOOD
1,200,1000,FOOD
2,300,1000,DOCUMENT
3,400,1000,FOOD
4,500,1000,DOCUMENT
5,600,1000,GROCERY
6,700,1000,GROCERY
7,800,1000,GROCERY
8,900,1000,GROCERY
9,1000,1200,GROCERY


In [4]:
con.sql('select * from day_summary').fetchdf()
# month: yyyy-mm-01 (may include future data for forecasting purpose)
# day_type: WORKING_DAY/PUBLIC_HOLIDAY

Unnamed: 0,month,market_id,day_type,day_cnt
0,2025-01-01,1,WORKING_DAY,20
1,2025-01-01,1,PUBLIC_HOLIDAY,10


In [5]:
con.sql('select * from market').fetchdf()

Unnamed: 0,market_id,market_code
0,1,MY
1,2,SG
2,3,PH


In [6]:
con.sql('select * from user_summary').fetchdf()
## summarised online users (APP+WEB)

Unnamed: 0,month,market_id,active_online_user
0,2025-01-01,1,312305
1,2025-02-01,1,2139993
2,2025-01-01,2,29128
3,2025-02-01,2,1239


####  Questions
1. **CSV data provided are just sample data.**
 * Please assume the transactonal data you handling is in **million level** scale with **10+ markets** and **10+ years**
 
2. Solve the questions with DuckDB query. Doc: https://duckdb.org/
 * We are evaluating your code in correctness, readibilities and maintabilities.

3. We are not expecting perfect answers. Try your best to answer.
 * If the question is not possible to answer or not making sense
 * Please answer why and feel free to tweak the requirements or put your assumption

##### Q1. Create a table `driver_periodic_summary`  (PK: time_period_type, time_period, driver_id) with following attributes in SQL.

* Summarise the driver wallet transactions in daily, monthly and quarter level.
* Attributes Needed
    1. time_period: start date of the period
    2. time_period_type: day/month/quarter (quarter start on jan/apr/jul/oct)
    2. driver_id
    3. starting_balance: starting balance at the begining of the period
    3. txn_amt_usd: total transacted amount in the period
    4. ending_balance: ending balance at the end of the period
    5. order_cnt: total related order
    6. cashout_cnt: cashout transaction cnt
    7. other_txn_cnt: other wallet transaction cnt
    8. commission_paid_usd: gmv_usd minus txn_amt_usd
    9. first_txn_id: first wallet transaction id in the period
    10. last_txn_id: last wallet transaction id in the period
    11. first_related_order_id: first related_order_id in the period
    12. last_related_order_id: last related_order_id in the period

* Bonus if solving with ~50-line human-readible code.
* There's a sample output `example_output/q1.csv` to help your understanding to the requirements.

In [7]:
pd.read_csv('example_output/q1.csv')

Unnamed: 0,time_period,time_period_type,driver_id,starting_balance,txn_amt_usd,ending_balance,order_cnt,cashout_cnt,other_txn_cnt,first_txn_id,last_txn_id,commission_paid_usd,first_related_order_id,last_related_order_id
0,2025-01-01,quarter,234,0,320.0,320,11,0.0,0.0,8,18,12080.0,600,1600
1,2025-01-01,quarter,345,0,810.0,810,11,0.0,1.0,19,30,12500.0,1700,2800
2,2025-01-01,quarter,123,0,100.0,100,5,1.0,0.0,1,7,4870.0,345,500
3,2025-02-01,month,234,0,320.0,320,11,0.0,0.0,8,18,12080.0,600,1600
4,2025-02-01,month,345,0,790.0,810,10,0.0,1.0,19,30,11320.0,1700,2800
5,2025-03-01,month,345,40,20.0,60,1,0.0,0.0,20,20,1180.0,1800,1800
6,2025-01-01,month,123,0,50.0,50,3,1.0,0.0,1,5,2920.0,345,300
7,2025-03-01,month,123,70,30.0,100,1,0.0,0.0,7,7,970.0,500,500
8,2025-02-01,month,123,50,20.0,70,1,0.0,0.0,6,6,980.0,400,400
9,2025-02-03,day,234,180,140.0,320,5,0.0,0.0,14,18,5860.0,1200,1600


In [8]:
q1_sql = '''
select ''
'''
con.sql(q1_sql).fetchdf()

Unnamed: 0,''
0,


##### Q2. Data Patch
In 2024 Aug, driver_wallet_transaction.**balance_before** and **balance_after** were missing (all null in the production db).
Create a sql query to re-calculate the above attributes with other attributes in the table.

In [9]:
q2_sql = '''
select ''
'''
con.sql(q2_sql).fetchdf()

Unnamed: 0,''
0,


##### Q3. An operations manager needs a monthly report segmented by market and item category (document, food, grocery) containing: 
1. month
2. market
3. item
6. total working days in month
5. active online user
7. total gmv

* hints: some tables may include future dates.

In [10]:
q3_sql = '''
select ''
'''
con.sql(q3_sql).fetchdf()

Unnamed: 0,''
0,
