Query check

In [1]:
from core_pro import DataPipeLine


query = """
    select grass_date
    , count(distinct user_id) total_a1
    from mp_user.dws_user_login_1d__vn_s0_live
    where grass_date >= date'2022-01-01'
    group by 1
"""
print(DataPipeLine(query).query)


    select grass_date
    , count(distinct user_id) total_a1
    from mp_user.dws_user_login_1d__vn_s0_live
    where grass_date >= date'2022-01-01'
    group by 1



In [2]:
df = DataPipeLine(query).run_presto_to_df()

🤖 JDBC Status: RUNNING 77.69%, Memory 209GB: : 61it [05:05,  5.00s/it]        

[94m🐔 JDBC:[39m Data Shape: (622, 2)





# 2. Threadpool query data

How to set multi-thread download with different dates

In [3]:
import polars as pl
from datetime import date, timedelta
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor



def download(date_input):
    """wrapper your download code into a function"""
    # setup file name
    path = Path.home() / 'VSCodeProjects/how-to-use/'
    file_name = path / f'{date_input}.parquet'

    # query
    query = f"""
    select grass_date
    , count(distinct user_id) total_a1
    from mp_user.dws_user_login_1d__vn_s0_live
    where grass_date = date'{date_input}'
    group by 1
    """

    # check if file exist (don't have to download file again)
    if not file_name.exists():
        df = DataPipeLine(query).run_presto_to_df()
        df.write_parquet(file_name, use_pyarrow=True)

In [4]:
yesterday = date.today() - timedelta(days=1)
dates = pl.date_range(start=date(2023, 9, 8), end=yesterday, interval='2d', eager=True)
run = list(dates)
run

[datetime.date(2023, 9, 8),
 datetime.date(2023, 9, 10),
 datetime.date(2023, 9, 12),
 datetime.date(2023, 9, 14)]

In [5]:
max_workers = 4   # depend on your CPU
with ThreadPoolExecutor(max_workers) as executor:
    executor.map(download, run)

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 0it [00:00, ?it/s]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 2it [00:05,  2.50s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 3it [00:10,  3.54s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 4it [00:15,  4.09s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 5it [00:20,  4.40s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 6it [00:25,  4.60s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 7it [00:30,  4.73s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 8it [00:35,  4.82s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 9it [00:40,  4.87s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: : 10it [00:45,  4.91s/it]
[A
[A

[A[A

🤖 JDBC Status: WAITING_FOR_RESOURCES 0%, Memory 0GB: 