In [None]:
import pandas as pd

# Create a typed DataFrame
df = pd.DataFrame(
    {
        "id": pd.Series([1, 2, 3], dtype="int64"),
        "name": pd.Series(["Alice", "Bob", "Carol"], dtype="string"),
        "age": pd.Series([30, 25, 27], dtype="int32"),
        "salary": pd.Series([50000.0, 60000.0, 75000.0], dtype="float64"),
    }
)

# Convert to Parquet
df.to_parquet("output.parquet", engine="pyarrow")

In [None]:
df2 = pd.read_parquet("output.parquet")
print(df2.dtypes)

id                 int64
name      string[python]
age                int32
salary           float64
dtype: object


In [None]:
import pandas as pd
import dask.dataframe as dd

# Step 1: Create and save initial Parquet
df = pd.DataFrame(
    {
        "session_id": pd.Series([1, 2, 1, 2, 2], dtype="int64"),
        "name": pd.Series(["Alice", "Bob", "Carol", "Max", "John"], dtype="string"),
        "age": pd.Series([30, 25, 27, 55, 60], dtype="int32"),
        "salary": pd.Series([50000.0, 60000.0, 75000.0, 89000, 90000], dtype="float64"),
    }
)

ddf = dd.from_pandas(df)
ddf = ddf.shuffle("session_id", npartitions=df["session_id"].nunique())
# ddf = ddf.set_index("session_id", npartitions=df["session_id"].nunique())
print(ddf.npartitions)

ddf.to_parquet("input_dir/", engine="pyarrow")

# Step 2: Load with Dask
ddf = dd.read_parquet("input_dir/", engine="pyarrow")

for i in range(ddf.npartitions):
    print(ddf.get_partition(i).compute())

# print(len(ddf.columns), len(ddf))
# print(df1.shape, df2.shape)


# Step 3: Fake windowing function
def create_windows(df):
    rows = []
    for i in range(2):
        copy = df.copy()
        # create unique window id per window and session
        copy["window_id"] = copy["session_id"] * 10 + i
        # copy["partition_key"] = copy["id"] * 10 + i  # Unique per window
        rows.append(copy)
    return pd.concat(rows)


# Step 4: Apply to partitions
windowed = ddf.map_partitions(create_windows)
windowed = windowed.shuffle(
    "window_id", npartitions=windowed["window_id"].nunique().compute()
)

print(windowed.npartitions)

windowed.to_parquet("output_windows/", engine="pyarrow", write_index=False)

for i in range(windowed.npartitions):
    print(windowed.get_partition(i).compute())


# load only partition 3 from disk
windowed = dd.read_parquet("output_windows/part.2.parquet", engine="pyarrow")
print(windowed.compute())

2
   session_id  name  age   salary
1           2   Bob   25  60000.0
3           2   Max   55  89000.0
4           2  John   60  90000.0
   session_id   name  age   salary
0           1  Alice   30  50000.0
2           1  Carol   27  75000.0
4
   session_id  name  age   salary  window_id
1           2   Bob   25  60000.0         20
3           2   Max   55  89000.0         20
4           2  John   60  90000.0         20
   session_id   name  age   salary  window_id
0           1  Alice   30  50000.0         10
2           1  Carol   27  75000.0         10
   session_id  name  age   salary  window_id
1           2   Bob   25  60000.0         21
3           2   Max   55  89000.0         21
4           2  John   60  90000.0         21
   session_id   name  age   salary  window_id
0           1  Alice   30  50000.0         11
2           1  Carol   27  75000.0         11
   session_id  name  age   salary  window_id
0           2   Bob   25  60000.0         21
1           2   Max   55  890

In [None]:
import pandas as pd
import dask.dataframe as dd
import dask

# Step 1: Create and save initial Parquet
df = pd.DataFrame(
    {
        "session_id": pd.Series([1, 2, 1, 2, 2], dtype="int64"),
        "name": pd.Series(["Alice", "Bob", "Carol", "Max", "John"], dtype="string"),
        "age": pd.Series([30, 25, 27, 55, 60], dtype="int32"),
        "salary": pd.Series([50000.0, 60000.0, 75000.0, 89000, 90000], dtype="float64"),
    }
)

ddf = dd.from_pandas(df)


# Define a function to save a single group as Parquet
@dask.delayed
def save_group(session_id):
    filename = f"session_{session_id}.parquet"
    group_df = ddf[ddf["session_id"] == sid].compute()
    group_df.to_parquet(filename)
    return filename


# Compute the unique session_ids
session_ids = ddf["session_id"].unique().compute()

# Collect delayed tasks for each session_id group
tasks = []
for sid in session_ids:
    # convert group to pandas df
    tasks.append(save_group(sid))

# Trigger the actual saving in parallel
saved_files = dask.compute(*tasks)

print("Saved files:", saved_files)

Saved files: ('session_1.parquet', 'session_2.parquet')
