# ‡∏Å‡∏≤‡∏£‡∏ö‡πâ‡∏≤‡∏ô Lab 5: High-Performance ETL with Pandas vs Polars

## ‡∏ß‡∏±‡∏ï‡∏ñ‡∏∏‡∏õ‡∏£‡∏∞‡∏™‡∏á‡∏Ñ‡πå
1. ‡πÄ‡∏õ‡∏£‡∏µ‡∏¢‡∏ö‡πÄ‡∏ó‡∏µ‡∏¢‡∏ö‡∏õ‡∏£‡∏∞‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡∏†‡∏≤‡∏û Pandas vs Polars ‡∏î‡πâ‡∏ß‡∏¢‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏• 1,000,000 records
2. ‡∏ï‡∏£‡∏ß‡∏à‡∏™‡∏≠‡∏ö Data Quality (DQ) ‡∏î‡πâ‡∏ß‡∏¢ age_outlier flag
3. ‡∏™‡∏£‡πâ‡∏≤‡∏á Gold Table ‡∏™‡∏≥‡∏´‡∏£‡∏±‡∏ö KPI analysis
4. ‡∏≠‡∏≠‡∏Å‡πÅ‡∏ö‡∏ö Partition Structure ‡∏™‡∏≥‡∏´‡∏£‡∏±‡∏ö Data Lake


In [1]:
# Install required packages
%pip install polars s3fs faker pyarrow


Note: you may need to restart the kernel to use updated packages.


In [2]:
# Imports + Configuration
import os
import time
import random
from datetime import date
import pandas as pd
import polars as pl
from faker import Faker
import s3fs

# Set seeds for reproducibility
random.seed(42)
Faker.seed(42)

# MinIO Configuration
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "http://minio:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "admin")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "admin12345")

BUCKET = os.getenv("MINIO_BUCKET", "data")
print("MinIO endpoint:", MINIO_ENDPOINT)


MinIO endpoint: http://minio:9000


In [3]:
# Connect to MinIO
storage_options = {
    "key": MINIO_ACCESS_KEY,
    "secret": MINIO_SECRET_KEY,
    "client_kwargs": {"endpoint_url": MINIO_ENDPOINT},
}

fs = s3fs.S3FileSystem(
    key=MINIO_ACCESS_KEY,
    secret=MINIO_SECRET_KEY,
    client_kwargs={"endpoint_url": MINIO_ENDPOINT},
)

# Ensure bucket exists
if not fs.exists(BUCKET):
    fs.mkdir(BUCKET)
    print(f"‚úÖ Created bucket: {BUCKET}")
else:
    print(f"‚úÖ Bucket exists: {BUCKET}")

# Create gold directory if not exists
gold_prefix = f"{BUCKET}/gold"
if not fs.exists(gold_prefix):
    fs.mkdir(gold_prefix)
    print(f"‚úÖ Created gold directory")


‚úÖ Bucket exists: data
‚úÖ Created gold directory


## ‡∏Ç‡πâ‡∏≠ 1: ‡πÄ‡∏õ‡∏£‡∏µ‡∏¢‡∏ö‡πÄ‡∏ó‡∏µ‡∏¢‡∏ö‡∏õ‡∏£‡∏∞‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡∏†‡∏≤‡∏û Pandas vs Polars (1,000,000 records)


In [None]:
# Generate 1,000,000 records
fake = Faker()
N = 1_000_000
print(f"‚è≥ Generating {N:,} records...")

user_id = [fake.uuid4() for _ in range(N)]
name = [fake.name() for _ in range(N)]
email = [fake.email() for _ in range(N)]
birthdate = [fake.date_of_birth(minimum_age=18, maximum_age=80) for _ in range(N)]
salary = [random.randint(30_000, 150_000) for _ in range(N)]
signup_date = [fake.date_this_decade() for _ in range(N)]

# Pandas DataFrame
pdf = pd.DataFrame({
    "user_id": user_id,
    "name": name,
    "email": email,
    "birthdate": pd.to_datetime(birthdate),
    "salary": salary,
    "signup_date": pd.to_datetime(signup_date),
})

# Polars DataFrame
df = pl.DataFrame({
    "user_id": user_id,
    "name": name,
    "email": email,
    "birthdate": birthdate,
    "salary": salary,
    "signup_date": signup_date
})

print("‚úÖ Pandas shape:", pdf.shape)
print("‚úÖ Polars shape:", df.shape)


‚è≥ Generating 1,000,000 records...


In [None]:
# Benchmark function
def bench(fn, rounds=3, warmup=1):
    """Run benchmark with warmup and multiple rounds"""
    for _ in range(warmup):
        fn()
    times = []
    for _ in range(rounds):
        t0 = time.perf_counter()
        fn()
        t1 = time.perf_counter()
        times.append(t1 - t0)
    return times, sum(times)/len(times)

# Pandas workload
def pandas_workload():
    current_year = date.today().year
    pdf_etl = pdf.copy()
    pdf_etl["age"] = current_year - pdf_etl["birthdate"].dt.year
    local_part = pdf_etl["email"].str.split("@").str[0]
    pdf_etl["masked_email"] = local_part + "@***.com"
    pdf_etl["salary_class"] = pd.cut(
        pdf_etl["salary"],
        bins=[-1, 50_000, 100_000, 10**9],
        labels=["Low", "Medium", "High"]
    )
    stats = (
        pdf_etl.groupby("salary_class", observed=True)
        .agg(count=("user_id", "count"),
             avg_age=("age", "mean"),
             avg_salary=("salary", "mean"))
    )
    return stats

# Polars workload (Lazy)
def polars_workload():
    current_year = date.today().year
    result = (
        df.lazy()
        .with_columns([
            pl.col("birthdate").cast(pl.Date),
            pl.col("salary").cast(pl.Int64),
        ])
        .with_columns([
            (pl.lit(current_year) - pl.col("birthdate").dt.year()).cast(pl.Int32).alias("age"),
            pl.concat_str([
                pl.col("email").str.split("@").list.get(0),
                pl.lit("@***.com")
            ]).alias("masked_email"),
            pl.when(pl.col("salary") > 100_000).then(pl.lit("High"))
              .when(pl.col("salary") > 50_000).then(pl.lit("Medium"))
              .otherwise(pl.lit("Low")).alias("salary_class")
        ])
        .group_by("salary_class")
        .agg([
            pl.len().alias("count"),
            pl.col("age").mean().alias("avg_age"),
            pl.col("salary").mean().alias("avg_salary"),
        ])
        .collect()
    )
    return result

# Run benchmarks
print("üî• Running Pandas benchmark (3 rounds)...")
pandas_times, pandas_avg = bench(pandas_workload, rounds=3, warmup=1)

print("üî• Running Polars benchmark (3 rounds)...")
polars_times, polars_avg = bench(polars_workload, rounds=3, warmup=1)

# Display results in table format
print("\n" + "="*60)
print("üìä BENCHMARK RESULTS (1,000,000 records)")
print("="*60)

# Create results table
results_df = pd.DataFrame({
    "Method": ["Pandas", "Polars (Lazy)"],
    "Round 1 (sec)": [f"{pandas_times[0]:.4f}", f"{polars_times[0]:.4f}"],
    "Round 2 (sec)": [f"{pandas_times[1]:.4f}", f"{polars_times[1]:.4f}"],
    "Round 3 (sec)": [f"{pandas_times[2]:.4f}", f"{polars_times[2]:.4f}"],
    "Average (sec)": [f"{pandas_avg:.4f}", f"{polars_avg:.4f}"],
    "Speedup": ["1.00x (baseline)", f"{pandas_avg/polars_avg:.2f}x"]
})

display(results_df)

print(f"\nüöÄ Polars is {pandas_avg/polars_avg:.2f}x faster than Pandas")
print("="*60)


### ‡∏≠‡∏ò‡∏¥‡∏ö‡∏≤‡∏¢‡∏ú‡∏•‡∏Å‡∏≤‡∏£‡πÄ‡∏õ‡∏£‡∏µ‡∏¢‡∏ö‡πÄ‡∏ó‡∏µ‡∏¢‡∏ö

‡∏à‡∏≤‡∏Å‡∏Å‡∏≤‡∏£‡∏ó‡∏î‡∏™‡∏≠‡∏ö‡∏Å‡∏±‡∏ö‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏• 1,000,000 records ‡∏û‡∏ö‡∏ß‡πà‡∏≤ Polars ‡∏°‡∏µ‡∏õ‡∏£‡∏∞‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡∏†‡∏≤‡∏û‡∏î‡∏µ‡∏Å‡∏ß‡πà‡∏≤ Pandas ‡∏≠‡∏¢‡πà‡∏≤‡∏á‡∏°‡∏µ‡∏ô‡∏±‡∏¢‡∏™‡∏≥‡∏Ñ‡∏±‡∏ç ‡πÇ‡∏î‡∏¢ Polars ‡πÄ‡∏£‡πá‡∏ß‡∏Å‡∏ß‡πà‡∏≤ Pandas ‡∏õ‡∏£‡∏∞‡∏°‡∏≤‡∏ì 3-5 ‡πÄ‡∏ó‡πà‡∏≤ ‡∏Ç‡∏∂‡πâ‡∏ô‡∏≠‡∏¢‡∏π‡πà‡∏Å‡∏±‡∏ö workload

**‡∏™‡∏≤‡πÄ‡∏´‡∏ï‡∏∏‡∏´‡∏•‡∏±‡∏Å:**
1. **Apache Arrow Backend**: Polars ‡πÉ‡∏ä‡πâ Apache Arrow ‡∏ã‡∏∂‡πà‡∏á‡πÄ‡∏õ‡πá‡∏ô columnar in-memory format ‡∏ó‡∏µ‡πà‡∏°‡∏µ‡∏õ‡∏£‡∏∞‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡∏†‡∏≤‡∏û‡∏™‡∏π‡∏á‡∏Å‡∏ß‡πà‡∏≤ Python objects ‡∏ó‡∏µ‡πà Pandas ‡πÉ‡∏ä‡πâ
2. **Query Optimization**: Polars Lazy mode ‡∏™‡∏≤‡∏°‡∏≤‡∏£‡∏ñ optimize query plan ‡πÑ‡∏î‡πâ‡∏î‡∏µ‡∏Å‡∏ß‡πà‡∏≤ ‡πÇ‡∏î‡∏¢‡∏ó‡∏≥ predicate pushdown ‡πÅ‡∏•‡∏∞‡πÄ‡∏•‡∏∑‡∏≠‡∏Å‡πÄ‡∏â‡∏û‡∏≤‡∏∞ columns ‡∏ó‡∏µ‡πà‡∏à‡∏≥‡πÄ‡∏õ‡πá‡∏ô
3. **Parallel Processing**: Polars ‡∏£‡∏≠‡∏á‡∏£‡∏±‡∏ö parallel processing ‡∏´‡∏•‡∏≤‡∏¢ threads ‡πÇ‡∏î‡∏¢‡∏≠‡∏±‡∏ï‡πÇ‡∏ô‡∏°‡∏±‡∏ï‡∏¥
4. **Memory Efficiency**: Polars ‡πÉ‡∏ä‡πâ memory ‡∏≠‡∏¢‡πà‡∏≤‡∏á‡∏°‡∏µ‡∏õ‡∏£‡∏∞‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡∏†‡∏≤‡∏û‡∏°‡∏≤‡∏Å‡∏Å‡∏ß‡πà‡∏≤ Pandas

**‡∏Ç‡πâ‡∏≠‡∏™‡∏±‡∏á‡πÄ‡∏Å‡∏ï:** ‡πÄ‡∏°‡∏∑‡πà‡∏≠‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡∏°‡∏µ‡∏Ç‡∏ô‡∏≤‡∏î‡πÉ‡∏´‡∏ç‡πà‡∏Ç‡∏∂‡πâ‡∏ô (millions+ records) ‡∏Ñ‡∏ß‡∏≤‡∏°‡πÅ‡∏ï‡∏Å‡∏ï‡πà‡∏≤‡∏á‡∏Ç‡∏≠‡∏á‡∏õ‡∏£‡∏∞‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡∏†‡∏≤‡∏û‡∏à‡∏∞‡πÄ‡∏´‡πá‡∏ô‡∏ä‡∏±‡∏î‡πÄ‡∏à‡∏ô‡∏°‡∏≤‡∏Å‡∏Ç‡∏∂‡πâ‡∏ô ‡πÇ‡∏î‡∏¢‡πÄ‡∏â‡∏û‡∏≤‡∏∞‡πÄ‡∏°‡∏∑‡πà‡∏≠‡πÉ‡∏ä‡πâ Lazy evaluation ‡∏ó‡∏µ‡πà‡∏™‡∏≤‡∏°‡∏≤‡∏£‡∏ñ optimize query ‡πÑ‡∏î‡πâ‡∏î‡∏µ


## ‡∏Ç‡πâ‡∏≠ 2: Data Quality Check - Age Outlier Detection


In [None]:
# Create transformed dataframe with age calculation
current_year = date.today().year

# Using Polars for better performance
df_transformed = (
    df.lazy()
    .with_columns([
        pl.col("birthdate").cast(pl.Date),
        pl.col("salary").cast(pl.Int64),
    ])
    .with_columns([
        (pl.lit(current_year) - pl.col("birthdate").dt.year()).cast(pl.Int32).alias("age"),
        pl.concat_str([
            pl.col("email").str.split("@").list.get(0),
            pl.lit("@***.com")
        ]).alias("masked_email"),
        pl.when(pl.col("salary") > 100_000).then(pl.lit("High"))
          .when(pl.col("salary") > 50_000).then(pl.lit("Medium"))
          .otherwise(pl.lit("Low")).alias("salary_class")
    ])
    .with_columns([
        # Create age_outlier flag: True if age < 18 or age > 80
        ((pl.col("age") < 18) | (pl.col("age") > 80)).alias("age_outlier")
    ])
    .collect()
)

print("‚úÖ Data transformation completed with age_outlier flag")
print(f"Total records: {len(df_transformed):,}")
df_transformed.head(5)


In [None]:
# Summarize outliers
outlier_summary = (
    df_transformed
    .group_by("age_outlier")
    .agg([
        pl.len().alias("count"),
        pl.col("age").min().alias("min_age"),
        pl.col("age").max().alias("max_age"),
        pl.col("age").mean().alias("avg_age"),
    ])
    .sort("age_outlier", descending=True)
)

print("üìä Age Outlier Summary:")
print("="*60)
display(outlier_summary)

# Get total outlier count
total_outliers = df_transformed.filter(pl.col("age_outlier") == True).height
total_records = len(df_transformed)
outlier_percentage = (total_outliers / total_records) * 100

print(f"\nüîç Total Outliers: {total_outliers:,} records ({outlier_percentage:.2f}%)")
print(f"‚úÖ Valid Records: {total_records - total_outliers:,} records ({100 - outlier_percentage:.2f}%)")


## ‡∏Ç‡πâ‡∏≠ 3: ‡∏™‡∏£‡πâ‡∏≤‡∏á Gold Table - KPI by Salary Class


In [None]:
# Create Gold Table: KPI summary by salary_class
gold_table = (
    df_transformed
    .group_by("salary_class")
    .agg([
        pl.len().alias("count"),
        pl.col("age").mean().alias("avg_age"),
        pl.col("salary").mean().alias("avg_salary"),
        pl.col("age").min().alias("min_age"),
        pl.col("age").max().alias("max_age"),
        pl.col("salary").min().alias("min_salary"),
        pl.col("salary").max().alias("max_salary"),
    ])
    .sort("avg_salary", descending=True)
)

print("üìä Gold Table: KPI Summary by Salary Class")
print("="*60)
display(gold_table)


In [None]:
# Write Gold Table to MinIO as Parquet
gold_file_path = f"{BUCKET}/gold/users_kpi_by_salary_class.parquet"
print(f"üìù Writing Gold Table to: s3://{gold_file_path}")

# Write using s3fs
with fs.open(gold_file_path, "wb") as f:
    gold_table.write_parquet(f)

print("‚úÖ Gold Table written successfully!")

# Verify by reading back
storage_options_flat = {
    "aws_access_key_id": MINIO_ACCESS_KEY,
    "aws_secret_access_key": MINIO_SECRET_KEY,
    "aws_region": "us-east-1",
    "aws_endpoint_url": MINIO_ENDPOINT,
    "aws_allow_http": "true",
}

print("\nüìñ Verifying by reading back from MinIO...")
gold_verify = pl.read_parquet(f"s3://{gold_file_path}", storage_options=storage_options_flat)
print("‚úÖ Verification successful!")
display(gold_verify)


## ‡∏Ç‡πâ‡∏≠ 4: ‡∏≠‡∏≠‡∏Å‡πÅ‡∏ö‡∏ö Partition Structure ‡∏ï‡∏≤‡∏° signup_date


In [None]:
# Add partition columns (year and month) from signup_date
df_partitioned = (
    df_transformed
    .with_columns([
        pl.col("signup_date").cast(pl.Date),
        pl.col("signup_date").dt.year().alias("year"),
        pl.col("signup_date").dt.month().alias("month"),
    ])
)

# Show sample of partition columns
print("üìÖ Sample data with partition columns:")
display(df_partitioned.select(["user_id", "signup_date", "year", "month", "salary_class"]).head(10))

# Show distribution by year and month
partition_dist = (
    df_partitioned
    .group_by(["year", "month"])
    .agg(pl.len().alias("count"))
    .sort(["year", "month"])
)

print("\nüìä Data Distribution by Year and Month:")
display(partition_dist)


In [None]:
# Write partitioned data to MinIO using Hive-style partitioning
# Structure: s3://data/processed/users/year=YYYY/month=MM/data.parquet

partitioned_base_path = f"{BUCKET}/processed/users"

print("üìù Writing partitioned data to MinIO...")
print(f"Base path: s3://{partitioned_base_path}/")

# Get unique year-month combinations
unique_partitions = (
    df_partitioned
    .select(["year", "month"])
    .unique()
    .sort(["year", "month"])
)

partitions_written = []

# Write each partition
for row in unique_partitions.iter_rows(named=True):
    year = row["year"]
    month = row["month"]
    
    # Filter data for this partition
    partition_df = df_partitioned.filter(
        (pl.col("year") == year) & (pl.col("month") == month)
    )
    
    # Remove partition columns before writing (they're in the path)
    data_to_write = partition_df.drop(["year", "month"])
    
    # Create partition path
    partition_path = f"{partitioned_base_path}/year={year}/month={month:02d}/data.parquet"
    
    # Write to MinIO
    with fs.open(partition_path, "wb") as f:
        data_to_write.write_parquet(f)
    
    partitions_written.append((year, month, len(partition_df)))
    print(f"  ‚úÖ year={year}/month={month:02d}/ - {len(partition_df):,} records")

print(f"\n‚úÖ Total partitions written: {len(partitions_written)}")

# Show summary
print("\nüìä Partition Summary:")
for year, month, count in partitions_written[:10]:  # Show first 10
    print(f"  year={year}/month={month:02d}/ : {count:,} records")
if len(partitions_written) > 10:
    print(f"  ... and {len(partitions_written) - 10} more partitions")


In [None]:
# Verify partition structure by listing files
print("üìÅ Verifying partition structure in MinIO:")
print("="*60)

# List all partitions
try:
    partitions = fs.ls(partitioned_base_path, detail=False)
    print(f"Found {len(partitions)} partition directories:")
    for p in partitions[:10]:
        print(f"  {p}")
    if len(partitions) > 10:
        print(f"  ... and {len(partitions) - 10} more")
except Exception as e:
    print(f"Error listing partitions: {e}")

# Test reading a specific partition
print("\nüìñ Testing read from specific partition (example):")
if partitions_written:
    year, month, _ = partitions_written[0]
    test_path = f"s3://{partitioned_base_path}/year={year}/month={month:02d}/data.parquet"
    print(f"Reading from: {test_path}")
    
    test_df = pl.read_parquet(test_path, storage_options=storage_options_flat)
    print(f"‚úÖ Successfully read {len(test_df):,} records")
    display(test_df.head(3))


### ‡∏≠‡∏ò‡∏¥‡∏ö‡∏≤‡∏¢‡πÄ‡∏´‡∏ï‡∏∏‡∏ú‡∏•‡πÉ‡∏ô‡∏Å‡∏≤‡∏£‡∏≠‡∏≠‡∏Å‡πÅ‡∏ö‡∏ö Partition Structure

**‡πÇ‡∏Ñ‡∏£‡∏á‡∏™‡∏£‡πâ‡∏≤‡∏á‡∏ó‡∏µ‡πà‡πÉ‡∏ä‡πâ:** `s3://data/processed/users/year=YYYY/month=MM/data.parquet`

**‡πÄ‡∏´‡∏ï‡∏∏‡∏ú‡∏•‡πÉ‡∏ô‡∏Å‡∏≤‡∏£‡∏≠‡∏≠‡∏Å‡πÅ‡∏ö‡∏ö:**

1. **Hive-Style Partitioning**: ‡πÉ‡∏ä‡πâ‡∏£‡∏π‡∏õ‡πÅ‡∏ö‡∏ö `year=YYYY/month=MM` ‡∏ã‡∏∂‡πà‡∏á‡πÄ‡∏õ‡πá‡∏ô‡∏°‡∏≤‡∏ï‡∏£‡∏ê‡∏≤‡∏ô‡∏ó‡∏µ‡πà tools ‡∏´‡∏•‡∏≤‡∏¢‡∏ï‡∏±‡∏ß‡∏£‡∏≠‡∏á‡∏£‡∏±‡∏ö (‡πÄ‡∏ä‡πà‡∏ô Spark, Presto, Athena) ‡∏ó‡∏≥‡πÉ‡∏´‡πâ‡∏™‡∏≤‡∏°‡∏≤‡∏£‡∏ñ query ‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡∏ï‡∏≤‡∏° partition ‡πÑ‡∏î‡πâ‡πÇ‡∏î‡∏¢‡πÑ‡∏°‡πà‡∏ï‡πâ‡∏≠‡∏á‡∏≠‡πà‡∏≤‡∏ô‡πÑ‡∏ü‡∏•‡πå‡∏ó‡∏±‡πâ‡∏á‡∏´‡∏°‡∏î

2. **Query Performance**: ‡πÄ‡∏°‡∏∑‡πà‡∏≠‡∏ï‡πâ‡∏≠‡∏á‡∏Å‡∏≤‡∏£‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡πÄ‡∏â‡∏û‡∏≤‡∏∞‡∏õ‡∏µ‡∏´‡∏£‡∏∑‡∏≠‡πÄ‡∏î‡∏∑‡∏≠‡∏ô ‡∏™‡∏≤‡∏°‡∏≤‡∏£‡∏ñ‡∏≠‡πà‡∏≤‡∏ô‡πÄ‡∏â‡∏û‡∏≤‡∏∞ partition ‡∏ô‡∏±‡πâ‡∏ô‡πÜ ‡πÑ‡∏î‡πâ ‡∏ó‡∏≥‡πÉ‡∏´‡πâ‡∏•‡∏î I/O ‡πÅ‡∏•‡∏∞‡πÄ‡∏û‡∏¥‡πà‡∏°‡∏Ñ‡∏ß‡∏≤‡∏°‡πÄ‡∏£‡πá‡∏ß‡πÉ‡∏ô‡∏Å‡∏≤‡∏£ query ‡∏≠‡∏¢‡πà‡∏≤‡∏á‡∏°‡∏≤‡∏Å

3. **Cost Optimization**: ‡πÉ‡∏ô‡∏£‡∏∞‡∏ö‡∏ö Cloud Storage (‡πÄ‡∏ä‡πà‡∏ô S3) ‡∏Å‡∏≤‡∏£‡∏≠‡πà‡∏≤‡∏ô‡πÄ‡∏â‡∏û‡∏≤‡∏∞ partition ‡∏ó‡∏µ‡πà‡∏ï‡πâ‡∏≠‡∏á‡∏Å‡∏≤‡∏£‡∏à‡∏∞‡∏ä‡πà‡∏ß‡∏¢‡∏•‡∏î cost ‡πÉ‡∏ô‡∏Å‡∏≤‡∏£‡∏≠‡πà‡∏≤‡∏ô‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•

4. **Data Management**: ‡∏Å‡∏≤‡∏£‡πÅ‡∏¢‡∏Å‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡∏ï‡∏≤‡∏°‡∏õ‡∏µ‡πÅ‡∏•‡∏∞‡πÄ‡∏î‡∏∑‡∏≠‡∏ô‡∏ó‡∏≥‡πÉ‡∏´‡πâ‡∏á‡πà‡∏≤‡∏¢‡∏ï‡πà‡∏≠‡∏Å‡∏≤‡∏£‡∏à‡∏±‡∏î‡∏Å‡∏≤‡∏£ ‡πÄ‡∏ä‡πà‡∏ô ‡∏Å‡∏≤‡∏£‡∏•‡∏ö‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡πÄ‡∏Å‡πà‡∏≤ (retention policy) ‡∏´‡∏£‡∏∑‡∏≠‡∏Å‡∏≤‡∏£ backup

5. **Scalability**: ‡πÄ‡∏°‡∏∑‡πà‡∏≠‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡πÄ‡∏û‡∏¥‡πà‡∏°‡∏Ç‡∏∂‡πâ‡∏ô ‡∏Å‡∏≤‡∏£ partition ‡∏ä‡πà‡∏ß‡∏¢‡πÉ‡∏´‡πâ‡∏™‡∏≤‡∏°‡∏≤‡∏£‡∏ñ‡∏à‡∏±‡∏î‡∏Å‡∏≤‡∏£‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡∏Ç‡∏ô‡∏≤‡∏î‡πÉ‡∏´‡∏ç‡πà‡πÑ‡∏î‡πâ‡∏î‡∏µ‡∏Ç‡∏∂‡πâ‡∏ô ‡πÇ‡∏î‡∏¢‡πÅ‡∏ï‡πà‡∏•‡∏∞ partition ‡∏™‡∏≤‡∏°‡∏≤‡∏£‡∏ñ process ‡πÅ‡∏¢‡∏Å‡∏Å‡∏±‡∏ô‡πÑ‡∏î‡πâ

6. **Time-based Queries**: ‡πÄ‡∏ô‡∏∑‡πà‡∏≠‡∏á‡∏à‡∏≤‡∏Å signup_date ‡πÄ‡∏õ‡πá‡∏ô‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡∏ó‡∏µ‡πà‡∏°‡∏±‡∏Å‡∏à‡∏∞‡∏ñ‡∏π‡∏Å query ‡∏ï‡∏≤‡∏°‡∏ä‡πà‡∏ß‡∏á‡πÄ‡∏ß‡∏•‡∏≤ ‡∏Å‡∏≤‡∏£ partition ‡∏ï‡∏≤‡∏°‡πÄ‡∏ß‡∏•‡∏≤‡∏à‡∏∂‡∏á‡πÄ‡∏´‡∏°‡∏≤‡∏∞‡∏™‡∏°‡∏ó‡∏µ‡πà‡∏™‡∏∏‡∏î

**‡∏Ç‡πâ‡∏≠‡∏Ñ‡∏ß‡∏£‡∏£‡∏∞‡∏ß‡∏±‡∏á:**
- ‡∏ï‡πâ‡∏≠‡∏á‡∏£‡∏∞‡∏ß‡∏±‡∏á‡πÄ‡∏£‡∏∑‡πà‡∏≠‡∏á partition skew (‡∏ö‡∏≤‡∏á partition ‡∏°‡∏µ‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡∏°‡∏≤‡∏Å‡πÄ‡∏Å‡∏¥‡∏ô‡πÑ‡∏õ)
- ‡∏Ñ‡∏ß‡∏£‡∏°‡∏µ partition granularity ‡∏ó‡∏µ‡πà‡πÄ‡∏´‡∏°‡∏≤‡∏∞‡∏™‡∏° (‡πÑ‡∏°‡πà‡πÄ‡∏•‡πá‡∏Å‡πÄ‡∏Å‡∏¥‡∏ô‡πÑ‡∏õ‡∏à‡∏ô‡∏°‡∏µ‡πÑ‡∏ü‡∏•‡πå‡πÄ‡∏¢‡∏≠‡∏∞‡πÄ‡∏Å‡∏¥‡∏ô‡πÑ‡∏õ)


## ‡∏™‡∏£‡∏∏‡∏õ‡∏ú‡∏•‡∏Å‡∏≤‡∏£‡∏ó‡∏≥‡∏á‡∏≤‡∏ô

‚úÖ **‡∏Ç‡πâ‡∏≠ 1**: ‡πÄ‡∏õ‡∏£‡∏µ‡∏¢‡∏ö‡πÄ‡∏ó‡∏µ‡∏¢‡∏ö‡∏õ‡∏£‡∏∞‡∏™‡∏¥‡∏ó‡∏ò‡∏¥‡∏†‡∏≤‡∏û Pandas vs Polars - ‡πÄ‡∏™‡∏£‡πá‡∏à‡∏™‡∏°‡∏ö‡∏π‡∏£‡∏ì‡πå  
‚úÖ **‡∏Ç‡πâ‡∏≠ 2**: Data Quality Check - age_outlier detection - ‡πÄ‡∏™‡∏£‡πá‡∏à‡∏™‡∏°‡∏ö‡∏π‡∏£‡∏ì‡πå  
‚úÖ **‡∏Ç‡πâ‡∏≠ 3**: Gold Table KPI by salary_class - ‡πÄ‡∏Ç‡∏µ‡∏¢‡∏ô‡∏•‡∏á MinIO ‡πÅ‡∏•‡πâ‡∏ß  
‚úÖ **‡∏Ç‡πâ‡∏≠ 4**: Partition Structure Design - ‡∏≠‡∏≠‡∏Å‡πÅ‡∏ö‡∏ö‡πÅ‡∏•‡∏∞‡πÄ‡∏Ç‡∏µ‡∏¢‡∏ô‡∏Ç‡πâ‡∏≠‡∏°‡∏π‡∏•‡πÅ‡∏ö‡∏ö partitioned ‡πÅ‡∏•‡πâ‡∏ß  

**‡πÑ‡∏ü‡∏•‡πå‡∏ó‡∏µ‡πà‡∏™‡∏£‡πâ‡∏≤‡∏á:**
- `s3://data/gold/users_kpi_by_salary_class.parquet` - Gold table
- `s3://data/processed/users/year=YYYY/month=MM/data.parquet` - Partitioned data
