## One Billion Row Challenge

💪 The challenge

Your mission, should you choose to accept it, is to write a program that retrieves temperature measurement values from a text file and calculates the min, mean, and max temperature per weather station. There's just one caveat: the file has 1,000,000,000 rows! That's more than 10 GB of data! 😱



### 1-  Pure Python


In [12]:
import time
from tqdm import tqdm

def read_and_calculate_stats(file_name):
    city_stats = {}
    with open(file_name, 'r') as file:
        for line in tqdm(file, desc="Processing data"):
            city, temp = line.strip().split(';')
            temp = float(temp)

            if city in city_stats:
                stats = city_stats[city]
                stats['count'] += 1
                stats['total'] += temp
                if temp < stats['min']:
                    stats['min'] = temp
                if temp > stats['max']:
                    stats['max'] = temp
            else:
                city_stats[city] = {
                    'min': temp,
                    'max': temp,
                    'total': temp,
                    'count': 1
                }

    # Calculate mean from total and count
    for city, stats in city_stats.items():
        stats['mean'] = stats['total'] / stats['count']
        del stats['total'], stats['count']  # Optional: Remove these if no longer needed

    return city_stats

# Main execution
start_time = time.time()
city_stats = read_and_calculate_stats('data/measurements.txt')
end_time = time.time()

print('Helsinki',city_stats['Helsinki'])
print('Guatemala City,',city_stats['Guatemala City'],'\n')

print(f"Time elapsed: {end_time - start_time:.2f} seconds")


Processing data: 1000000000it [08:31, 1953846.14it/s]


ValueError: not enough values to unpack (expected 2, got 1)

## 2- Pandas

In [8]:
import pandas as pd
import time
from tqdm import tqdm

def process_data(file_path, chunk_size=1000000):
    start_time = time.time()  # Start timing

    # Initialize an empty DataFrame to accumulate results
    accumulated_results = pd.DataFrame()

    # Initialize reader object
    reader = pd.read_csv(file_path, sep=';', header=None, names=['city', 'temp'], chunksize=chunk_size)

    # Process each chunk
    for chunk in tqdm(reader, desc="Processing chunks"):
        # Group by 'city' and calculate min, max, and mean for the chunk
        results = chunk.groupby('city')['temp'].agg(['min', 'max', 'mean']).rename(columns={
            'min': 'temperature_min',
            'max': 'temperature_max',
            'mean': 'temperature_mean'
        })
        # Append chunk results to the accumulated results
        accumulated_results = pd.concat([accumulated_results, results])

    # Final aggregation to ensure city stats are correct across all chunks
    final_results = accumulated_results.groupby(accumulated_results.index).agg({
        'temperature_min': 'min',
        'temperature_max': 'max',
        'temperature_mean': 'mean'
    })

    end_time = time.time()  # End timing
    elapsed_time = end_time - start_time  # Calculate elapsed time

    print(f"Elapsed Time: {elapsed_time:.2f} seconds")  # Print the elapsed time
    return final_results

# Specify your file path
file_path = 'data/measurements.txt'
city_stats = process_data(file_path)
print(city_stats)


Processing chunks: 1001it [02:35,  6.44it/s]

Elapsed Time: 155.37 seconds
                          temperature_min  temperature_max  temperature_mean
city                                                                        
Abha                                -34.0             68.6         17.998179
Abidjan                             -26.2             76.6         25.999310
Abéché                              -21.0             81.0         29.409223
Accra                               -23.4             77.6         26.406375
Addis Ababa                         -34.8             66.2         16.010328
...                                   ...              ...               ...
Zanzibar City                       -25.3             78.3         26.003238
Zürich                              -42.4             59.6          9.296035
station_name,measurement              NaN              NaN               NaN
Ürümqi                              -49.9             54.5          7.398977
İzmir                               -30.4      




## 3- Polars

In [8]:
import polars as pl
import time
from tqdm import tqdm

def process_data_polars(file_path, chunk_size=1000000):
    start_time = time.time()

    # Create a lazy scan of the CSV file
    df = pl.scan_csv(file_path, separator=";", has_header=False, new_columns=["city", "temp"])

    # Define aggregations and group by 'city'
    results = (
        df.lazy()
        .group_by("city")
        .agg(
            [
                pl.col("temp").min().alias("temperature_min"),
                pl.col("temp").max().alias("temperature_max"),
                pl.col("temp").mean().alias("temperature_mean"),
            ]
        )
        .collect()
    )

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Elapsed Time: {elapsed_time:.2f} seconds")
    return results

# Specify your file path
file_path = "data/1million.txt"
city_stats = process_data_polars(file_path)
print(city_stats)


Elapsed Time: 0.04 seconds
shape: (413, 4)
┌────────────┬─────────────────┬─────────────────┬──────────────────┐
│ city       ┆ temperature_min ┆ temperature_max ┆ temperature_mean │
│ ---        ┆ ---             ┆ ---             ┆ ---              │
│ str        ┆ f64             ┆ f64             ┆ f64              │
╞════════════╪═════════════════╪═════════════════╪══════════════════╡
│ Medan      ┆ -8.7            ┆ 63.0            ┆ 26.352709        │
│ Belgrade   ┆ -21.5           ┆ 42.2            ┆ 12.470008        │
│ Mombasa    ┆ -7.3            ┆ 59.7            ┆ 26.455896        │
│ Almaty     ┆ -24.9           ┆ 40.7            ┆ 10.087885        │
│ Bujumbura  ┆ -10.5           ┆ 61.0            ┆ 23.834524        │
│ …          ┆ …               ┆ …               ┆ …                │
│ Moscow     ┆ -34.7           ┆ 39.9            ┆ 5.914118         │
│ Alexandria ┆ -11.2           ┆ 55.2            ┆ 20.053718        │
│ Hanga Roa  ┆ -15.3           ┆ 54.4          


The kernel crashed when we didn't enable streaming. However, for 1 million rows, Polars took only 0.04 seconds. 

Now calculating 1 billion rows using Polars with streaming enabled.


**Streaming Aggregation**: The `collect(streaming=True)` option instructs Polars to perform the aggregation in a streaming fashion. Instead of building the entire result in memory, Polars processes the data in smaller chunks and yields results as they become available. This approach is more memory-efficient and allows for processing larger datasets.

In [3]:
import polars as pl
import time


def process_data_polars_streaming(file_path):
    start_time = time.time()

    # Create a lazy scan of the CSV file
    df = pl.scan_csv(file_path, separator=";", has_header=False, new_columns=["city", "temp"])

    # Define aggregations and group by 'city' with streaming enabled
    results = (
        df.lazy()
        .group_by("city")
        .agg(
            [
                pl.col("temp").min().alias("temperature_min"),
                pl.col("temp").max().alias("temperature_max"),
                pl.col("temp").mean().alias("temperature_mean"),
            ]
        )
        .collect(streaming=True)  # Enable streaming
    )

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Elapsed Time: {elapsed_time:.2f} seconds")
    return results

# Specify your file path
file_path = "data/measurements.txt"
city_stats = process_data_polars_streaming(file_path)
print(city_stats)


Elapsed Time: 27.80 seconds
shape: (414, 4)
┌───────────────┬─────────────────┬─────────────────┬──────────────────┐
│ city          ┆ temperature_min ┆ temperature_max ┆ temperature_mean │
│ ---           ┆ ---             ┆ ---             ┆ ---              │
│ str           ┆ f64             ┆ f64             ┆ f64              │
╞═══════════════╪═════════════════╪═════════════════╪══════════════════╡
│ Perth         ┆ -32.9           ┆ 64.6            ┆ 18.696792        │
│ Skopje        ┆ -35.2           ┆ 61.3            ┆ 12.395606        │
│ Odesa         ┆ -38.5           ┆ 60.4            ┆ 10.701866        │
│ Zanzibar City ┆ -25.3           ┆ 78.3            ┆ 26.003186        │
│ Chicago       ┆ -41.2           ┆ 58.8            ┆ 9.791128         │
│ …             ┆ …               ┆ …               ┆ …                │
│ Minneapolis   ┆ -39.7           ┆ 56.9            ┆ 7.80093          │
│ Ghanzi        ┆ -27.3           ┆ 71.6            ┆ 21.396405        │
│ Chitt

With streaming = True, Polars took 27 s to compute 1 billion rows.


## 4- DuckDB Implementation

In [16]:
import duckdb
import pyarrow.parquet as pq
import pandas as pd
import time

con = duckdb.connect()

# Start time tracking
start_time = time.time()

parquet_file = pq.ParquetFile('data/measurements.parquet')

results = []

# Process in chunks
for i in range(parquet_file.num_row_groups):
    table = parquet_file.read_row_group(i)
    df = table.to_pandas()
    city_stats = df.groupby('city')['temp'].agg(['min', 'max', 'mean']).reset_index()
    results.append(city_stats)

final_results = pd.concat(results)
final_results.columns = ['city', 'min_temp', 'max_temp', 'avg_temp']

# End time tracking and calculate elapsed time
end_time = time.time()
elapsed_time = end_time - start_time

# Print the results and elapsed time
print(final_results)
print(f"\nTime elapsed: {elapsed_time:.2f} seconds")


                         city  min_temp  max_temp   avg_temp
0                        Abha     -13.5      55.0  17.685526
1                     Abidjan      -1.7      57.5  25.419055
2                      Abéché      -6.6      61.4  29.634817
3                       Accra      -4.3      54.7  26.078281
4                 Addis Ababa     -24.8      46.1  15.702610
..                        ...       ...       ...        ...
409             Zanzibar City      -4.4      54.4  25.501149
410                    Zürich     -20.5      32.3   9.370703
411  station_name,measurement       NaN       NaN        NaN
412                    Ürümqi     -20.5      37.3   7.140071
413                     İzmir      -8.9      48.1  18.216854

[1577661 rows x 4 columns]

Time elapsed: 74.60 seconds


## Dask Implementation

Dask is a flexible library for parallel computing in Python. It is a great tool for parallelizing data processing tasks, such as reading large files, filtering, and aggregating data. Dask provides a high-level API that is similar to Pandas, making it easy to use for data scientists and analysts familiar with Pandas.