In [3]:
!pip3 install duckdb

Collecting duckdb
  Downloading duckdb-1.0.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl.metadata (762 bytes)
Downloading duckdb-1.0.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl (16.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.7/16.7 MB[0m [31m37.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: duckdb
Successfully installed duckdb-1.0.0


In [11]:
import duckdb

con = duckdb.connect(':memory:')

sql_script = """
CREATE TABLE gd_events AS
SELECT * FROM read_csv_auto('/home/jovyan/extracted_files/test_grandata_data_engineer/events.csv');

CREATE TABLE free_sms_destinations AS
SELECT id 
FROM read_csv_auto('/home/jovyan/extracted_files/test_grandata_data_engineer/free_sms_destinations.csv');

CREATE TABLE filtered_events AS
SELECT 
    id_source,
    id_destination,
    CAST(calls AS INTEGER) AS calls,
    CAST(seconds AS INTEGER) AS seconds,
    CAST(sms AS INTEGER) AS sms,
    CAST(region AS INTEGER) AS region
FROM 
    gd_events
WHERE 
    id_source IS NOT NULL AND id_destination IS NOT NULL;

WITH billing AS (
    SELECT 
        e.*,
        f.id AS free_sms_id,
        CASE
            WHEN f.id IS NULL THEN sms * 0.0
            WHEN region BETWEEN 1 AND 5 THEN sms * 1.5
            ELSE sms * 2.0
        END AS billing_amount
    FROM 
        filtered_events e
    LEFT JOIN 
        free_sms_destinations f 
    ON 
        e.id_destination = f.id
)
SELECT 
    SUM(billing_amount) AS total_billing_amount
FROM 
    billing;
"""

con.execute(sql_script)

result = con.execute("""
WITH billing AS (
    SELECT 
        e.*,
        f.id AS free_sms_id,
        CASE
            WHEN f.id IS NULL THEN sms * 0.0
            WHEN region BETWEEN 1 AND 5 THEN sms * 1.5
            ELSE sms * 2.0
        END AS billing_amount
    FROM 
        filtered_events e
    LEFT JOIN 
        free_sms_destinations f 
    ON 
        e.id_destination = f.id
)
SELECT 
    SUM(billing_amount) AS total_billing_amount
FROM 
    billing;
""").fetchall()

print(result)

[(Decimal('18998.0'),)]


In [14]:
import duckdb
from pyspark.sql.functions import md5
import pandas as pd

con = duckdb.connect(':memory:')

sql_script = """
CREATE TABLE gd_events AS
SELECT * FROM read_csv_auto('/home/jovyan/extracted_files/test_grandata_data_engineer/events.csv');

CREATE TABLE free_sms_destinations AS
SELECT id 
FROM read_csv_auto('/home/jovyan/extracted_files/test_grandata_data_engineer/free_sms_destinations.csv');

CREATE TABLE filtered_events AS
SELECT 
    id_source,
    id_destination,
    CAST(calls AS INTEGER) AS calls,
    CAST(seconds AS INTEGER) AS seconds,
    CAST(sms AS INTEGER) AS sms,
    CAST(region AS INTEGER) AS region
FROM 
    gd_events
WHERE 
    id_source IS NOT NULL AND id_destination IS NOT NULL;

CREATE TABLE billing_events AS
SELECT 
    e.*,
    f.id AS free_sms_id,
    CASE
        WHEN f.id IS NOT NULL THEN sms * 0.0
        WHEN region BETWEEN 1 AND 5 THEN sms * 1.5
        ELSE sms * 2.0
    END AS billing_amount
FROM 
    filtered_events e
LEFT JOIN 
    free_sms_destinations f 
ON 
    e.id_destination = f.id;

CREATE TABLE top_100_users_billing AS
SELECT 
    id_source,
    SUM(billing_amount) AS total_billing_amount
FROM 
    billing_events
GROUP BY 
    id_source
ORDER BY 
    total_billing_amount DESC
LIMIT 100;
"""

con.execute(sql_script)

top_100_users_billing = con.execute("SELECT *, md5(id_source) AS id_hashed FROM top_100_users_billing").fetchdf()

output_path = '/home/jovyan/extracted_files/top_100_users_billing.parquet'
top_100_users_billing.to_parquet(output_path, compression='gzip', index=False)

print("File saved as:", output_path)

File saved as: /home/jovyan/extracted_files/top_100_users_billing.parquet
