In [2]:
import pandas as pd

test_sheet = pd.read_csv('./test_IKD_air_quality.csv',
    names=['id', 'description', 'measure', 'value', 'timestamp'])
full_sheet = pd.read_csv('./IKD_air_quality.csv',
    names=['id', 'description', 'measure', 'value', 'timestamp'])
full_sheet_1_2 = pd.read_csv('./IKD_air_quality_1_2.csv',
    names=['id', 'description', 'measure', 'value', 'timestamp'])

def empty_sheet():
    return pd.DataFrame({
        'SDS_P1': [],
        'SDS_P2': [],
        'BME280_temperature': [],
        'BME280_humidity': [],
        'BME280_pressure': [],
        'CH2O': [],
        'NH3': [],
        'CO': [],
        'NO2': [],
        'NH3-raw': [],
        'CO-raw': [],
        'NO2-raw': [],
        'signal': [],
        'quality': []
    })

empty_sheet()

Unnamed: 0,SDS_P1,SDS_P2,BME280_temperature,BME280_humidity,BME280_pressure,CH2O,NH3,CO,NO2,NH3-raw,CO-raw,NO2-raw,signal,quality


In [3]:
len(full_sheet.timestamp.unique())

364872

In [4]:
lines_by_moment_groups = full_sheet.groupby(full_sheet.timestamp)
lines_by_moment = lines_by_moment_groups.size()
lines_by_moment

timestamp
2019-12-09 16:07:11    11
2019-12-12 15:55:20    14
2019-12-12 15:56:25    14
2019-12-12 15:57:29    14
2019-12-12 15:58:33    14
                       ..
2020-09-30 22:30:25    14
2020-09-30 22:31:31    14
2020-09-30 22:32:34    14
2020-09-30 22:33:41    14
2020-09-30 22:34:44    14
Length: 364872, dtype: int64

In [5]:
unique_amounts_of_lines_by_moment = lines_by_moment.unique()
unique_amounts_of_lines_by_moment

array([11, 14, 12,  9, 13])

In [6]:
moments_by_amount = lines_by_moment.groupby(lines_by_moment).count()
moments_by_amount

9         42
11        39
12    225272
13        14
14    139505
dtype: int64

In [7]:
moments_by_amount[14] / moments_by_amount.sum()

0.38233956017452697

In [8]:
moments_by_amount[12] / moments_by_amount.sum()

0.6174000745466903

In [3]:
def synchronous_values_to_single_row(values: pd.DataFrame) -> pd.DataFrame:
    '''
    Function converts a bunch of synchronously retrieved data to a single row,
    which is suitable for concatenation.
    It's expected that 'values' is a result of operation like:
        frame[frame.timestamp == 'timestamp_here']
    '''
    values = values[['id','value']].set_index('id')
    values.index.name = None
    values = values.transpose()
    return values

# example
several_rows = test_sheet[test_sheet.timestamp == '2019-12-09 16:07:11']
print(several_rows)
synchronous_values_to_single_row(several_rows)


                    id  description measure      value            timestamp
0               SDS_P1         PM10   ug/m3     49.760  2019-12-09 16:07:11
1               SDS_P2        PM2.5   ug/m3     30.250  2019-12-09 16:07:11
2   BME280_temperature  Temperature      °C     20.480  2019-12-09 16:07:11
3      BME280_humidity     Humidity       %     53.250  2019-12-09 16:07:11
4      BME280_pressure     Pressure      Pa  97850.840  2019-12-09 16:07:11
5                 CH2O         CH₂O     ppb     40.000  2019-12-09 16:07:11
6                  NH3          NH₃     ppm      0.287  2019-12-09 16:07:11
7                   CO           CO     ppm      0.380  2019-12-09 16:07:11
8                  NO2          NO₂     ppm      0.662  2019-12-09 16:07:11
9               signal   WiFiSignal      dB    -38.000  2019-12-09 16:07:11
10             quality  WiFiQvality       %    100.000  2019-12-09 16:07:11


Unnamed: 0,SDS_P1,SDS_P2,BME280_temperature,BME280_humidity,BME280_pressure,CH2O,NH3,CO,NO2,signal,quality
value,49.76,30.25,20.48,53.25,97850.84,40.0,0.287,0.38,0.662,-38.0,100.0


In [4]:
from multiprocessing import Pool, cpu_count
from functools import partial
from tqdm import tqdm

from itertools import zip_longest

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return list(zip_longest(*args, fillvalue=fillvalue))


def proccess_moment_and_write(moment: str, sheet: pd.DataFrame):
    # print("Process %d working on %s..." % (os.getpid(), moment))
    sync_values = sheet.loc[sheet.timestamp == moment]
    next_row = synchronous_values_to_single_row(sync_values).rename(index={'value': moment})
    # return pd.concat([empty_sheet(), next_row]) #.to_csv('res.txt', mode='a')
    return next_row


# def process_chunk(chunk, sheet, out_file_path):
#     print("Process {} working on [{}-{}]...".format(os.getpid(), chunk[0], chunk[-1]))
#     res = pd.concat([proccess_moment_and_write(m, sheet) for m in tqdm(chunk)])
#     res = pd.concat([empty_sheet(), res])
#     res.to_csv(out_file_path, mode='a', header=False)
#     print("Chunk done.")

# def proccess_sheet(old_sheet: pd.DataFrame, out_file_path) -> None:
#     unique_moments = old_sheet.timestamp.unique()
#     empty_sheet().to_csv(out_file_path, header=True)
#     # for chunk in grouper(unique_moments, 10000):
#     with Pool(processes=cpu_count()) as executor:
#         func = partial(process_chunk, sheet=old_sheet, out_file_path=out_file_path)
#         executor.map(func, grouper(unique_moments, 10000))
#         # res.to_csv(out_file_path, mode='a', header=True)
#         print("Done.")
#         executor.close()
#         executor.join()

def proccess_sheet(old_sheet: pd.DataFrame, out_file_path) -> None:
    unique_moments = old_sheet.timestamp.unique()
    empty_sheet().to_csv(out_file_path, header=True)

    cnt = 0
    chunks = grouper(unique_moments, 10000)
    for chunk in chunks:
        with Pool(processes=cpu_count()) as executor:
            func = partial(proccess_moment_and_write, sheet=old_sheet)
            res = pd.concat(executor.map(func, tqdm(chunk)))
            res = pd.concat([empty_sheet(), res])
            res.to_csv(out_file_path, mode='a', header=False)
            cnt += 1
            print("{}/{} done.".format(cnt, len(chunks)))
            executor.close()
            executor.join()
    
    print("FAT DONE.")
    

# proccess_sheet(test_sheet, 'out-test.csv')
# 7proccess_sheet(full_sheet, 'ikd-out.csv')
#proccess_sheet(full_sheet_1_2, 'ikd-out-1-2.csv')

In [5]:
full_sheet_2_3 = pd.read_csv('./IKD_air_quality_2_3.csv',
    names=['id', 'description', 'measure', 'value', 'timestamp'])
full_sheet_3_3 = pd.read_csv('./IKD_air_quality_3_3.csv',
    names=['id', 'description', 'measure', 'value', 'timestamp'])

proccess_sheet(full_sheet_2_3, 'ikd-out-2-3.csv')
proccess_sheet(full_sheet_3_3, 'ikd-out-3-3.csv')

100%|██████████| 10000/10000 [02:22<00:00, 70.16it/s]
1/13 done.
100%|██████████| 10000/10000 [02:31<00:00, 66.00it/s]
2/13 done.
100%|██████████| 10000/10000 [02:36<00:00, 64.05it/s]
3/13 done.
100%|██████████| 10000/10000 [02:45<00:00, 60.30it/s]
4/13 done.
100%|██████████| 10000/10000 [02:39<00:00, 62.75it/s]
5/13 done.
100%|██████████| 10000/10000 [02:33<00:00, 65.10it/s]
6/13 done.
100%|██████████| 10000/10000 [02:34<00:00, 64.83it/s]
7/13 done.
100%|██████████| 10000/10000 [02:34<00:00, 64.54it/s]
8/13 done.
100%|██████████| 10000/10000 [02:34<00:00, 64.59it/s]
9/13 done.
100%|██████████| 10000/10000 [02:35<00:00, 64.40it/s]
10/13 done.
100%|██████████| 10000/10000 [02:35<00:00, 64.37it/s]
11/13 done.
100%|██████████| 10000/10000 [02:36<00:00, 63.94it/s]
12/13 done.
100%|██████████| 10000/10000 [00:29<00:00, 333.92it/s]
13/13 done.
FAT DONE.
100%|██████████| 10000/10000 [02:30<00:00, 66.41it/s]
1/11 done.
100%|██████████| 10000/10000 [02:30<00:00, 66.61it/s]
2/11 done.
100%|█████

In [1]:
# toooo long operation
# res = proccess_sheet(full_sheet)
# res

import pandas as pd

pd.read_csv('ikd-out.csv')

Unnamed: 0.1,Unnamed: 0,SDS_P1,SDS_P2,BME280_temperature,BME280_humidity,BME280_pressure,CH2O,NH3,CO,NO2,NH3-raw,CO-raw,NO2-raw,signal,quality
0,2019-12-09 16:07:11,49.76,30.25,20.48,53.25,97850.84,40.0,0.287,0.380,0.662,,,,-38.0,100.0
1,2019-12-12 15:55:20,51.20,23.58,4.56,71.87,98181.98,15.0,0.002,0.305,7.781,817.0,576.0,892.0,-89.0,22.0
2,2019-12-12 15:56:25,30.90,18.02,4.57,71.68,98186.29,16.0,0.003,0.328,7.537,808.0,555.0,892.0,-88.0,24.0
3,2019-12-12 15:57:29,26.93,15.71,4.61,71.01,98187.86,15.0,0.003,0.322,7.551,801.0,576.0,891.0,-87.0,26.0
4,2019-12-12 15:58:33,23.50,14.18,4.66,70.85,98180.66,16.0,0.002,0.276,7.764,836.0,605.0,895.0,-88.0,24.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
364867,2020-09-30 22:30:25,2.28,1.02,11.33,100.00,98428.20,0.0,0.035,0.590,0.165,662.0,499.0,996.0,-82.0,36.0
364868,2020-09-30 22:31:31,3.63,0.79,11.34,100.00,98424.58,0.0,0.034,0.580,0.205,666.0,505.0,998.0,-82.0,36.0
364869,2020-09-30 22:32:34,1.32,1.09,11.37,100.00,98428.83,0.0,0.033,0.569,0.201,657.0,491.0,997.0,-82.0,36.0
364870,2020-09-30 22:33:41,11.49,1.73,11.34,100.00,98418.45,0.0,0.035,0.574,0.189,667.0,502.0,1001.0,-82.0,36.0
