In [1]:
import csv
from typing import Iterator, List, Dict, Tuple, Any
import itertools
import pandas as pd

In [2]:
def read_csv_in_batches(file_path: str, batch_size: int = 10000):
    with open(file_path, mode='r', encoding='utf-8', newline='') as f:
        reader = csv.DictReader(f)
        batch = []
        for row in reader:
            batch.append(row)
            if len(batch) >= batch_size:
                yield batch
                batch = []
        if batch:
            yield batch

In [3]:
def map_function(batch):
    intermediate = []
    for row in batch:
        try:
            if row['ARR_DELAY'] and float(row['ARR_DELAY']) >= 0: # если нет задержки или самолет прилетел раньше не рассматриваем случай
                key = row['OP_CARRIER']
                value = float(row['ARR_DELAY'])
                intermediate.append((key, value))
        except (KeyError, ValueError, AttributeError):
            continue
    return intermediate

def shuffle_sort(intermediate):
    if not intermediate:
        return []
    intermediate.sort(key=lambda x: x[0])
    grouped = []
    current_key = intermediate[0][0]
    current_values = [intermediate[0][1]]
    
    for key, value in intermediate[1:]:
        if key == current_key:
            current_values.append(value)
        else:
            grouped.append((current_key, current_values))
            current_key = key
            current_values = [value]
    
    grouped.append((current_key, current_values))
    return grouped

def reduce_function(key: str, values: List[float]) -> Tuple[str, float]:
    return key, sum(values) / len(values)

In [4]:
def map_reduce(file_path: str, batch_size: int = 50000):
    all_intermediate = []

    for batch_num, batch in enumerate(read_csv_in_batches(file_path, batch_size)):
        print(f"  Обработка батча {batch_num+1}")
        mapped_batch = map_function(batch)
        all_intermediate.extend(mapped_batch)

    grouped = shuffle_sort(all_intermediate)

    final_results = []
    for key, values in grouped:
        avg_delay = reduce_function(key, values)
        final_results.append(avg_delay)

    return final_results

In [5]:
%%time
results = map_reduce('data_airlines.csv', batch_size=500000)

  Обработка батча 1
  Обработка батча 2
  Обработка батча 3
  Обработка батча 4
  Обработка батча 5
  Обработка батча 6
  Обработка батча 7
  Обработка батча 8
  Обработка батча 9
  Обработка батча 10
  Обработка батча 11
  Обработка батча 12
  Обработка батча 13
  Обработка батча 14
  Обработка батча 15
  Обработка батча 16
  Обработка батча 17
CPU times: total: 47.8 s
Wall time: 1min 21s


In [6]:
a = pd.DataFrame(results, columns=['company', 'del_avg'])
a.sort_values(by=['del_avg'])

Unnamed: 0,company,del_avg
9,HA,12.683495
2,AS,21.591269
7,F9,22.281948
16,WN,23.227791
11,NW,24.376537
15,US,24.414119
13,OO,25.773116
0,9E,26.401539
4,CO,26.912731
5,DL,27.637558
