Use uv venv. Need to uv pip install polars-lts-cpu on older machines.

In [13]:
import polars as pl
import duckdb
import requests
import zipfile
import os
import hvplot.polars

%opts magic unavailable (pyparsing cannot be imported)
%compositor magic unavailable (pyparsing cannot be imported)


Can use awk -F';' 'NR==1 || ($5 >= -3 && $5 <= -2)' data/2023-12_sds011.csv > data/output.csv

but it doesn't seem any faster than using polars


In [2]:
years = range(2020, 2024, 1)
month_nums = range(1, 13, 1)
month_strs = [f'0{str(mon)}' if mon < 10 else str(mon) for mon in month_nums]
yr_mon_list = []
for year in years:
    for m in month_strs:
        yr_mon_list.append(f'{year}-{m}')
    

In [3]:
month_strs

['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']

In [4]:
# type(yr_mon_list[0])
yr_mon_list[0:2]

['2020-01', '2020-02']

In [5]:

def unz_del(yr_mon: str, sensor_type: str = 'sds011'):
    '''
    Download and unzip a csv file for the year \ month and sensor type specified.
    Delete the original zip file after extraction.

    '''
       
    # URL of the ZIP file to download
    zip_url = f'https://archive.sensor.community/csv_per_month/{yr_mon}/{yr_mon}_{sensor_type}.zip'

    # Path where the ZIP file will be saved temporarily
    # data folder
    datfolder = 'data'
    zip_path = f'{datfolder}/temp.zip'

    try:
        # Download the ZIP file
        print(f"Downloading ZIP file from {zip_url}...")
        response = requests.get(zip_url)
        response.raise_for_status()  # Raises an HTTPError if the response was an HTTP error

        with open(zip_path, 'wb') as f:
            f.write(response.content)
        print("Download complete.")
        extracted_files = []
        # Unzip the archive
        print("Extracting ZIP file...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall('data')
        print("Extraction complete.")
        extracted_files = zip_ref.namelist()[0]

        if os.path.exists(zip_path):
            os.remove(zip_path)
            print("Original ZIP file deleted.")
            return extracted_files

    except requests.exceptions.RequestException as e:
        print(f"An error occurred during the download: {e}")
    except zipfile.BadZipFile:
        print("The downloaded file is not a valid ZIP file.")
    except Exception as e:
        print(f"An error occurred: {e}")

    

In [42]:
unz_del('2024-02', 'bme280')

Downloading ZIP file from https://archive.sensor.community/csv_per_month/2024-02/2024-02_bme280.zip...
Download complete.
Extracting ZIP file...
Extraction complete.
Original ZIP file deleted.


'2024-02_bme280.csv'

In [6]:
def make_bme_sensor_data_fact_df(csv_file: str) -> pl.DataFrame:
    '''
    Read a bme280 csv file, filter for lep area, calculate hourly mean values
    for pressure, temperature, humidity
    clean it and write it to a new csv file
    '''
    path = f'data/{csv_file}'
    try:
        raw_df = pl.scan_csv(path,
            has_header=True,
            separator=';',
            null_values = [' ', 'unavailable', 'unknown', 'b'],
            schema = {'sensor_id': pl.Int64,
                      'sensor_type': pl.Utf8,
                      'location': pl.Int64,
                      'lat': pl.Float64,
                      'lon': pl.Float64,
                      'timestamp': pl.Datetime,
                      'pressure': pl.Float64,
                      'altitude': pl.Float64,
                      'pressure_sealevel': pl.Float64,
                      'temperature': pl.Float64,
                      'humidity': pl.Float64},
            ignore_errors=True)
        
        sensor_data_fact_df = (
            raw_df
             .filter([pl.col('lon').is_between(-3, -2.18), pl.col('lat').is_between(51.2, 51.6)])
             .select(['sensor_id', 'lat', 'lon', 'timestamp', 'pressure', 'temperature', 'humidity'])
             .sort(by = [pl.col('sensor_id'), pl.col('timestamp')])
             .group_by_dynamic('timestamp', every = "1h", group_by = 'sensor_id')
             .agg([pl.col('pressure').mean(),
                pl.col('temperature').mean(),
                pl.col('humidity').mean(),
                pl.col('lat').first().alias("lat"),
                pl.col('lon').first().alias("lon")
                    ])
             ).collect()

        thedate = (sensor_data_fact_df
        .head(1)
        .select(pl.col('timestamp'))
        .to_series(0)
        )[0]
        clean_file_out_path = f'data/cleaned/{str(thedate.year)}-{str(thedate.month)}_bme280_clean.csv'
        sensor_data_fact_df.write_csv(clean_file_out_path)
        os.remove(path)
        
        return sensor_data_fact_df
    
    except Exception as e:
        print(f"An error occurred: {e}")

        

In [10]:
bme_df = make_bme_sensor_data_fact_df('2024-02_bme280.csv')

In [21]:
plot_df = (bme_df
 .filter(pl.col('temperature').is_between(-20, 45))
 )

In [23]:
plot_df.hvplot.line(x='timestamp', y=['temperature'], by = 'sensor_id', width = 800, height = 400)

In [7]:
def make_sensor_data_fact_df(csv_file: str) -> pl.DataFrame:
    path = f'data/{csv_file}'
    try:
        raw_df = pl.scan_csv(path,
            null_values = [' ', 'unavailable', 'unknown', 'b'],
            # columns = ['sensor_id', 'location', 'lat', 'lon', 'timestamp', 'P1', 'P2'],
            separator=';',
            ignore_errors = True,
            schema = {
                            'sensor_id': pl.Int64,
                            'sensor_type': pl.Utf8,
                            'location': pl.Utf8,
                            'lat':pl.Float64,
                            'lon':pl.Float64,
                            'timestamp':pl.Datetime,
                            'P1':pl.Float64,
                            'durP1': pl.Float64,
                            'ratioP1': pl.Float64,
                            'P2':pl.Float64,
                            'durP2': pl.Float64,
                            'ratioP2': pl.Float64,
                        },
            try_parse_dates = True#,
            # n_rows=1000
            )
        
        sensor_data_fact_df = (
            raw_df
            .filter([pl.col('lon').is_between(-3, -2.18),
                     pl.col('lat').is_between(51.2, 51.6)])
                     .select(['sensor_id', 'lat', 'lon', 'timestamp', 'P1', 'P2'])
                    .sort(by = [pl.col('sensor_id'), pl.col('timestamp')])
                    .group_by_dynamic('timestamp', every = "1h", by = 'sensor_id')
                    .agg([
                        pl.col('P1').mean().alias("pm10"),
                        pl.col('P2').mean().alias("pm2.5"),
                        pl.col('lat').first().alias("lat"),
                        pl.col('lon').first().alias("lon")
                    ])                    
            ).collect()

        thedate = (sensor_data_fact_df
        .head(1)
        .select(pl.col('timestamp'))
        .to_series(0)
        )[0]
        clean_file_out_path = f'data/cleaned/{str(thedate.year)}-{str(thedate.month)}_SDS011_clean.csv'
        sensor_data_fact_df.write_csv(clean_file_out_path)
        os.remove(path)
        
        return sensor_data_fact_df
    
    except Exception as e:
        print(f"An error occurred: {e}")

        

In [7]:
def compose(g, f):
    def h(x):
        return g(f(x))
    return h

In [8]:
comp = compose(make_sensor_data_fact_df, unz_del)

In [9]:
raw_df = comp('2023-01')

Downloading ZIP file from https://archive.sensor.community/csv_per_month/2023-01/2023-01_sds011.zip...
An error occurred during the download: ('Connection broken: IncompleteRead(70942720 bytes read, 4126126148 more expected)', IncompleteRead(70942720 bytes read, 4126126148 more expected))
An error occurred: No such file or directory (os error 2): data/None


In [11]:
misslist = ['2020-02', '2021-02', '2021-03']

In [12]:
df_miss = [comp(yr_mon) for yr_mon in misslist]

Downloading ZIP file from https://archive.sensor.community/csv_per_month/2020-02/2020-02_sds011.zip...
Download complete.
Extracting ZIP file...
Extraction complete.
Original ZIP file deleted.
Downloading ZIP file from https://archive.sensor.community/csv_per_month/2021-02/2021-02_sds011.zip...
Download complete.
Extracting ZIP file...
Extraction complete.
Original ZIP file deleted.
Downloading ZIP file from https://archive.sensor.community/csv_per_month/2021-03/2021-03_sds011.zip...
Download complete.
Extracting ZIP file...
Extraction complete.
Original ZIP file deleted.


In [14]:
files = os.listdir('data/cleaned')
lazy_df_list = []
for file in files:
    lazy_df = pl.read_csv(f'data/cleaned/{file}',   try_parse_dates=True)
    lazy_df_list.append(lazy_df)

complete_df = (
    pl.concat(lazy_df_list)
)

complete_df.shape

(1525789, 6)

In [15]:
complete_df.glimpse()

Rows: 1525789
Columns: 6
$ sensor_id          <i64> 7675, 7675, 7675, 7675, 7675, 7675, 7675, 7675, 7675, 7675
$ timestamp <datetime[μs]> 2022-07-01 00:00:00, 2022-07-01 01:00:00, 2022-07-01 02:00:00, 2022-07-01 03:00:00, 2022-07-01 04:00:00, 2022-07-01 05:00:00, 2022-07-01 06:00:00, 2022-07-01 07:00:00, 2022-07-01 08:00:00, 2022-07-01 09:00:00
$ pm10               <f64> 7.556000000000001, 7.929999999999999, 8.131250000000001, 7.630799999999999, 12.955416666666665, 8.764999999999999, 6.102916666666668, 7.845999999999999, 7.503750000000001, 5.574583333333333
$ pm2.5              <f64> 4.199999999999999, 3.9791666666666665, 4.03875, 4.3224, 6.626666666666665, 3.822083333333333, 2.6337499999999996, 2.67, 2.285833333333333, 1.7249999999999999
$ lat                <f64> 51.464, 51.464, 51.464, 51.464, 51.464, 51.464, 51.464, 51.464, 51.464, 51.464
$ lon                <f64> -2.566, -2.566, -2.566, -2.566, -2.566, -2.566, -2.566, -2.566, -2.566, -2.566



In [16]:
complete_df.write_csv('data/sensor_df_complete.csv')


In [23]:
sensor_df = pl.read_csv('data/sensor_df_complete.csv', try_parse_dates=True)

In [35]:
dim_sensor_df = (
    sensor_df
    .group_by('sensor_id')
    .agg([pl.col('lat').first().alias('latitude'),
          pl.col('lon').first().alias('longitude')])
    .filter([pl.col('longitude').is_between(-3, -2.18),
             pl.col('latitude').is_between(51.2, 51.6)])
)
lep_sensors = (dim_sensor_df
               .select('sensor_id')
               .to_series())
lep_sensors_list = list(lep_sensors)
lep_sensors_list

[59165,
 56923,
 54309,
 67665,
 47083,
 17459,
 22979,
 56405,
 3040,
 47271,
 23646,
 49233,
 57227,
 5193,
 34954,
 47265,
 65090,
 49227,
 7685,
 49480,
 53949,
 65084,
 66966,
 65379,
 66987,
 20842,
 17314,
 51770,
 39950,
 53890,
 72367,
 70369,
 31188,
 66963,
 12711,
 66972,
 56379,
 68589,
 26922,
 23644,
 65049,
 39254,
 45804,
 57043,
 78947,
 47093,
 65073,
 8582,
 67568,
 33862,
 54813,
 24147,
 11068,
 14787,
 47263,
 17502,
 50038,
 26044,
 51720,
 54757,
 56469,
 48987,
 35068,
 71552,
 34288,
 49231,
 65088,
 48859,
 21989,
 56341,
 32984,
 66970,
 65047,
 10491,
 62915,
 65407,
 47085,
 56255,
 67655,
 54019,
 17318,
 36581,
 51768,
 39570,
 38525,
 66979,
 70153,
 65086,
 40571,
 50765,
 70326,
 69775,
 70424,
 69513,
 54466,
 7675,
 51072,
 65080,
 65077,
 17506,
 60492,
 38362,
 59364,
 47836,
 29880,
 10179,
 35382,
 51748,
 39836,
 65405,
 59605,
 50036,
 48881,
 8741,
 59617,
 66974]

In [37]:
fact_sensor_hr_df = (
    sensor_df
    .drop(['lat', 'lon'])
    .sort(['sensor_id', 'timestamp'])
    .filter(pl.col('sensor_id').is_in(lep_sensors_list))
)

In [39]:
fact_sensor_hr_df.glimpse()

Rows: 1052092
Columns: 4
$ sensor_id          <i64> 3040, 3040, 3040, 3040, 3040, 3040, 3040, 3040, 3040, 3040
$ timestamp <datetime[μs]> 2020-01-01 00:00:00, 2020-01-01 01:00:00, 2020-01-01 02:00:00, 2020-01-01 03:00:00, 2020-01-01 04:00:00, 2020-01-01 05:00:00, 2020-01-01 06:00:00, 2020-01-01 07:00:00, 2020-01-01 08:00:00, 2020-01-01 09:00:00
$ pm10               <f64> 5.729583333333333, 5.000416666666666, 2.8571999999999993, 2.11625, 1.8737500000000002, 1.5132000000000003, 1.3462500000000004, 2.592, 0.8758333333333334, 1.3576000000000001
$ pm2.5              <f64> 4.085, 3.2174999999999994, 1.9967999999999995, 1.1295833333333334, 1.3020833333333333, 1.1440000000000001, 1.1170833333333332, 1.962, 0.75, 0.9531999999999998



In [43]:
fact_sensor_day_df = (fact_sensor_hr_df
                      .group_by_dynamic('timestamp', every='1d', by = 'sensor_id', closed='left')
                      .agg([pl.col('pm10').mean(),
                            pl.col('pm2.5').mean()])
                        .filter(pl.col('pm10').is_not_nan(),
                                pl.col('pm2.5').is_not_nan())
                      )

In [45]:
fact_sensor_day_df.glimpse()

Rows: 44983
Columns: 4
$ sensor_id          <i64> 3040, 3040, 3040, 3040, 3040, 3040, 3040, 3040, 3040, 3040
$ timestamp <datetime[μs]> 2020-01-01 00:00:00, 2020-01-02 00:00:00, 2020-01-03 00:00:00, 2020-01-04 00:00:00, 2020-01-05 00:00:00, 2020-01-06 00:00:00, 2020-01-07 00:00:00, 2020-01-08 00:00:00, 2020-01-19 00:00:00, 2020-01-20 00:00:00
$ pm10               <f64> 1.777557216183575, 1.2170368620037806, 1.0376101751207731, 0.8214650664251207, 0.8532078947368423, 1.5582531400966184, 0.5171283089042601, 1.3677803402646505, 1.1649741715399609, 0.7742836940836941
$ pm2.5              <f64> 1.2739747886473427, 0.7568890044108381, 0.565838979468599, 0.5262321859903384, 0.5102237639553429, 0.7873081521739129, 0.3906180816315327, 0.7129304032766228, 1.0033465886939572, 0.7441821067821067



In [46]:
dim_sensor_df.write_csv('data/dim_sensor_df.csv')
fact_sensor_hr_df.write_csv('data/fact_sensor_hr_df.csv')
fact_sensor_day_df.write_csv('data/fact_sensor_day_df.csv')


In [46]:
dim_sensor_df.glimpse()

Rows: 149
Columns: 3
$ sensor_id <i64> 59165, 54309, 56923, 3040, 45386, 68597, 17459, 56405, 67665, 22979
$ latitude  <f64> 51.5, 51.5, 51.5, 51.40000000000466, 51.89999999999797, 51.0, 51.5, 51.40000000001312, 51.5, 51.40000000000085
$ longitude <f64> -2.599999999999943, -2.5, -2.8000000000009573, -2.5999999999999828, -2.8000000000001135, -2.7999999999999807, -2.5999999999999526, -2.5999999999993295, -2.5999999999993535, -2.599999999999956



In [16]:
sensor_data_fact_df = make_sensor_data_fact_df(raw_df)

In [17]:
sensor_data_fact_df

sensor_id,timestamp,pm10,pm2.5
i64,datetime[μs],f64,f64
7675,2023-01-01 00:00:00,8.8,3.0
7675,2023-01-01 01:00:00,14.5,3.7
7675,2023-01-01 02:00:00,16.6,4.2
7675,2023-01-01 03:00:00,18.7,4.1
7675,2023-01-01 04:00:00,15.1,3.5
7675,2023-01-01 05:00:00,13.3,3.1
7675,2023-01-01 06:00:00,17.8,3.7
7675,2023-01-01 07:00:00,20.1,4.3
7675,2023-01-01 08:00:00,20.6,4.5
7675,2023-01-01 09:00:00,26.5,5.5


In [5]:
def make_sensor_data_dim_df(raw_df: pl.DataFrame) -> pl.DataFrame:
    sensor_data_dim_df = (raw_df
                    .filter([pl.col('lon').is_between(-3, -2),
                        pl.col('lat').is_between(50, 52)])
                        .select(['sensor_id', 'lat', 'lon'])
                        .group_by('sensor_id')
                        .agg([pl.col('lat').mean(),
                            pl.col('lon').mean()])                
                    ).collect()
    

    return sensor_data_dim_df

In [8]:
sensor_data_dim.glimpse()

Rows: 36
Columns: 3
$ sensor_id <i64> 33862, 67665, 66970, 34457, 7685, 70369, 78947, 17741, 48881, 69775
$ lat       <f64> 51.436000000001265, 51.47199999999817, 51.46600000000033, 51.8505059400016, 51.474000000001205, 51.35599999999979, 51.472510399996764, 51.82200000000011, 51.4060000000018, 51.44999999999808
$ lon       <f64> -2.742000000000117, -2.573999999999921, -2.587999999999987, -2.2450679499998865, -2.5760000000000205, -2.905999999999955, -2.5991219799998637, -2.26800000000002, -2.402000000000038, -2.51599999999994



In [6]:
con = duckdb.connect('data/new.db')

In [14]:
con.execute("""
DROP TABLE IF EXISTS raw_tbl;

""")

<duckdb.duckdb.DuckDBPyConnection at 0x7fce7c50ae30>

In [15]:
con.sql("""
COPY raw_tbl FROM 'data/2023-12_sds011.csv'

""")

CatalogException: Catalog Error: Table with name raw_tbl does not exist!
Did you mean "temp.information_schema.tables"?

In [3]:
t = duckdb.read_csv('data/2023-12_sds011.csv', 
                header=True,
                sep=';',
                dtype={
                      'sensor_id': int,
                      'sensor_type': str,
                      'location': str,
                      'lat': float,
                      'lon': float,
                      'timestamp': str,
                      'P1': str,
                      'durP1': str,
                      'ratioP1': str,
                      'P2': str,
                      'durP2': str,
                      'ratioP2': str,
                  }, parallel=True).pl()

: 

In [21]:
type(t)

duckdb.duckdb.DuckDBPyRelation

In [22]:
tp = pl.DataFrame(t)

TypeError: DataFrame constructor called with unsupported type 'DuckDBPyRelation' for the `data` parameter

In [10]:
con.execute(
"""
CREATE TABLE raw_tbl(sensor_id INTEGER,
location INTEGER,
timestamp VARCHAR,
lat FLOAT,
lon FLOAT,
P1 FLOAT,
durP1 FLOAT,
ratioP1 FLOAT,
P2 FLOAT,
durP2 FLOAT,
ratioP2 FLOAT
);
COPY raw_tbl FROM 'data/2023-12_sds011.csv' (AUTO_DETECT false);

"""

)

ConversionException: Conversion Error: CSV Error on Line: 1
Error when converting column "sensor_id".
Could not convert string "sensor_id;sensor_type;location;lat;lon;timestamp;P1;durP1;ratioP1;P2;durP2;ratioP2" to 'INTEGER'

  file=data/2023-12_sds011.csv
  delimiter = , (Auto-Detected)
  quote = " (Auto-Detected)
  escape = " (Auto-Detected)
  new_line = \n (Auto-Detected)
  header = false (Auto-Detected)
  skip_rows = 0 (Auto-Detected)
  date_format =  (Auto-Detected)
  timestamp_format =  (Auto-Detected)
  null_padding=0
  sample_size=20480
  ignore_errors=0
  all_varchar=0


In [5]:
con.close()

In [10]:
con.execute(
"""
CREATE TABLE all_tbl(sensor_id INTEGER, location INTEGER, timestamp VARCHAR, lat FLOAT, lon FLOAT, P1 FLOAT, P2 FLOAT);
COPY raw_tbl FROM 'data/sds011_pldf.csv' (AUTO_DETECT true);

"""

)

)

: 

Using the read_csv constructor within the sql function is very slow, possibly due to type conversion during read

In [4]:
df = con.sql(
"""

SELECT sensor_id, location, timestamp, lat, lon, P1, P2 
FROM 'data/sds011_pldf.csv'
WHERE (lat > 51 AND lat < 52) AND (lon < -2 AND lon > -3)


""")

Convert to polars DF

In [5]:
pldf = df.pl()

In [6]:
pldf.glimpse()

Rows: 574057
Columns: 7
$ sensor_id          <i64> 24147, 10491, 48881, 7685, 78664, 77022, 66667, 78947, 56255, 34457
$ location           <i64> 12309, 5293, 57063, 3885, 67981, 65952, 57205, 72966, 57229, 20702
$ timestamp <datetime[μs]> 2023-12-01 00:00:05, 2023-12-01 00:00:07, 2023-12-01 00:00:10, 2023-12-01 00:00:14, 2023-12-01 00:00:14, 2023-12-01 00:00:17, 2023-12-01 00:00:27, 2023-12-01 00:00:33, 2023-12-01 00:00:41, 2023-12-01 00:00:56
$ lat                <f64> 51.45, 51.464, 51.406, 51.474, 51.022, 51.44, 51.438, 51.4725104, 51.432, 51.85050594
$ lon                <f64> -2.624, -2.566, -2.402, -2.576, -2.87, -2.054, -2.006, -2.59912198, -2.598, -2.24506795
$ P1                 <f64> 25.03, 18.03, 14.25, 0.8, 3.78, 24.17, 17.5, 2.4, 35.58, 12.95
$ P2                 <f64> 9.87, 11.57, 7.82, 0.8, 3.03, 9.63, 10.73, 1.6, 20.0, 10.57



Measurands are imported as strings, probably due to anomalous data in these fields. Convert to hourly mean data using group by dynamic after sorting first.

In [14]:
(
    pldf
    .with_columns(pl.col(pl.Utf8).cast(pl.Float64))
    .sort('sensor_id', 'timestamp')
    .group_by_dynamic(pl.col('timestamp').alias('date'), every= '1h', by= ['sensor_id', 'lat', 'lon'])
    .agg(pl.col('P1').mean().alias('PM10_mean'),
         pl.col('P2').mean().alias('PM2.5_mean'))
)

sensor_id,lat,lon,date,PM10_mean,PM2.5_mean
i64,f64,f64,datetime[μs],f64,f64
7685,51.474,-2.576,2023-12-01 00:00:00,3.8996,1.6172
7685,51.474,-2.576,2023-12-01 01:00:00,0.605833,0.554583
7685,51.474,-2.576,2023-12-01 02:00:00,0.53,0.488333
7685,51.474,-2.576,2023-12-01 03:00:00,0.3364,0.3364
7685,51.474,-2.576,2023-12-01 04:00:00,0.348333,0.348333
7685,51.474,-2.576,2023-12-01 05:00:00,0.516667,0.359583
7685,51.474,-2.576,2023-12-01 06:00:00,0.404,0.396
7685,51.474,-2.576,2023-12-01 07:00:00,1.856667,0.9075
7685,51.474,-2.576,2023-12-01 08:00:00,1.062083,0.846667
7685,51.474,-2.576,2023-12-01 09:00:00,1.4428,0.728


In [11]:
pldf.write_csv('data/sds011_pldf.csv')
type(pldf)

polars.dataframe.frame.DataFrame

In [13]:
sds_bristol = pl.read_csv('data/sds011_pldf.csv')

In [19]:
type(sds_bristol)

polars.dataframe.frame.DataFrame

In [21]:
(sds_bristol
 .with_columns(pl.col(pl.Utf8).cast(pl.Datetime))
 
 )

sensor_id,location,timestamp,lat,lon,P1,P2
i64,i64,datetime[μs],f64,f64,f64,f64
24147,12309,2023-12-01 00:00:05,51.45,-2.624,25.03,9.87
10491,5293,2023-12-01 00:00:07,51.464,-2.566,18.03,11.57
48881,57063,2023-12-01 00:00:10,51.406,-2.402,14.25,7.82
7685,3885,2023-12-01 00:00:14,51.474,-2.576,0.8,0.8
78664,67981,2023-12-01 00:00:14,51.022,-2.87,3.78,3.03
77022,65952,2023-12-01 00:00:17,51.44,-2.054,24.17,9.63
66667,57205,2023-12-01 00:00:27,51.438,-2.006,17.5,10.73
78947,72966,2023-12-01 00:00:33,51.47251,-2.599122,2.4,1.6
56255,57229,2023-12-01 00:00:41,51.432,-2.598,35.58,20.0
34457,20702,2023-12-01 00:00:56,51.850506,-2.245068,12.95,10.57


In [None]:
final_df = (
    pldf
    .with_columns
)

In [None]:
    # columns = {
    # 'sensor_id': 'BIGINT',
    # 'location': 'BIGINT',
    # 'timestamp': 'TIMESTAMP',
    # 'lat': 'REAL',
    # 'lon': 'REAL',
    # 'P1':'REAL',
    # 'P2':'REAL'
    # }

In [5]:
pldf.glimpse()

Rows: 574057
Columns: 7
$ sensor_id          <i64> 24147, 10491, 48881, 7685, 78664, 77022, 66667, 78947, 56255, 34457
$ location           <i64> 12309, 5293, 57063, 3885, 67981, 65952, 57205, 72966, 57229, 20702
$ timestamp <datetime[μs]> 2023-12-01 00:00:05, 2023-12-01 00:00:07, 2023-12-01 00:00:10, 2023-12-01 00:00:14, 2023-12-01 00:00:14, 2023-12-01 00:00:17, 2023-12-01 00:00:27, 2023-12-01 00:00:33, 2023-12-01 00:00:41, 2023-12-01 00:00:56
$ lat                <f64> 51.45, 51.464, 51.406, 51.474, 51.022, 51.44, 51.438, 51.4725104, 51.432, 51.85050594
$ lon                <f64> -2.624, -2.566, -2.402, -2.576, -2.87, -2.054, -2.006, -2.59912198, -2.598, -2.24506795
$ P1                 <str> '25.03', '18.03', '14.25', '0.80', '3.78', '24.17', '17.50', '2.40', '35.58', '12.95'
$ P2                 <str> '9.87', '11.57', '7.82', '0.80', '3.03', '9.63', '10.73', '1.60', '20.00', '10.57'



In [3]:
lazy_qry = (pl.scan_csv(source='data/2023-12_sds011.csv',
                  n_rows= 100000000,
                  null_values = [' ', 'unavailable', 'unknown', 'b'],
                  separator=';')
                  .select(pl.col(['sensor_id', 'location', 'lat', 'lon', 'timestamp', 'P1', 'P2']))
                #   .with_columns(pl.col(['lat', 'lon']).cast(pl.Float64),
                #                 pl.col('timestamp').str.to_datetime(format = '%Y-%m-%dT%H:%M:%S'))
                #    .filter([pl.col('lat').is_between(51, 52), pl.col('lon').is_between(-3, -1.9)])
                #    .rename({
                #        'P1':'PM10',
                #        'P2':'PM2.5',
                #        'timestamp':'datetime'
                #    })
                
                  )

In [2]:
reader = (pl.read_csv_batched(
    'data/2023-12_sds011.csv',
    null_values = [' ', 'unavailable', 'unknown', 'b'],
    columns = ['sensor_id', 'location', 'lat', 'lon', 'timestamp', 'P1', 'P2'],
    separator=';',
    ignore_errors = True,
    # dtypes = {
    #                   'sensor_id': pl.Int64,
    #                   'location': pl.Utf8,
    #                   'lat':pl.Float64,
    #                   'lon':pl.Float64,
    #                   'timestamp':pl.Datetime,
    #                   'P1':pl.Float64,
    #                   'P2':pl.Float64
    #               }
    # try_parse_dates = True
    )
 
)

In [3]:
batches = reader.next_batches(100)

In [5]:
len(batches)

100

In [13]:
df_list = []
while batches:
    df_current_batches = pl.concat(batches)
    df_list.append(df_current_batches)
    batches = reader.next_batches(100)
    
    

In [14]:
df_list

[]

In [10]:
df.glimpse()

Rows: 50327703
Columns: 7
$ sensor_id <i64> 33756, 35558, 53080, 64777, 75993, 39874, 8887, 30876, 23578, 20020
$ location  <i64> 62507, 21661, 51117, 51212, 64680, 73346, 4480, 17582, 11967, 10171
$ lat       <f64> 51.7285344, 50.794, 52.710001, 42.4855289, 53.8, 52.53073404, 52.37, 48.15869432, 50.852, 42.694
$ lon       <f64> 9.01158874, -1.066, 6.851716, 23.40718567, 9.988, 13.17634106, 9.748, 11.55006838, 4.4, 23.318
$ timestamp <str> '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16', '2023-12-04T19:05:16'
$ P1        <f64> 0.55, 0.6, 10.15, 10.5, 10.63, 10.73, 10.8, 11.3, 11.88, 1195.72
$ P2        <f64> 0.32, 0.6, 4.25, 4.82, 4.43, 4.63, 7.68, 5.47, 6.45, 999.9



In [4]:
df = lazy_qry.collect() #crashes kernel

: 

In [21]:
df.glimpse()

Rows: 27954
Columns: 7
$ sensor_id          <i64> 24147, 10491, 48881, 7685, 78664, 77022, 66667, 78947, 56255, 34457
$ location           <i64> 12309, 5293, 57063, 3885, 67981, 65952, 57205, 72966, 57229, 20702
$ lat                <f64> 51.45, 51.464, 51.406, 51.474, 51.022, 51.44, 51.438, 51.4725104, 51.432, 51.85050594
$ lon                <f64> -2.624, -2.566, -2.402, -2.576, -2.87, -2.054, -2.006, -2.59912198, -2.598, -2.24506795
$ datetime  <datetime[μs]> 2023-12-01 00:00:05, 2023-12-01 00:00:07, 2023-12-01 00:00:10, 2023-12-01 00:00:14, 2023-12-01 00:00:14, 2023-12-01 00:00:17, 2023-12-01 00:00:27, 2023-12-01 00:00:33, 2023-12-01 00:00:41, 2023-12-01 00:00:56
$ PM10               <f64> 25.03, 18.03, 14.25, 0.8, 3.78, 24.17, 17.5, 2.4, 35.58, 12.95
$ PM2.5              <f64> 9.87, 11.57, 7.82, 0.8, 3.03, 9.63, 10.73, 1.6, 20.0, 10.57



In [None]:
 schema = {
                      'sensor_id': pl.Int64,
                      'location': pl.Utf8,
                      'lat':pl.Float64,
                      'lon':pl.Float64,
                      'timestamp':pl.Datetime,
                      'P1':pl.Float64,
                      'P2':pl.Float64
                  }
