Dataset was pulled from https://www.kaggle.com/datasets/hmavrodiev/sofia-air-quality-dataset/code

In [27]:
import os
import time
import pandas as pd
import glob

In [28]:
test_path = 'data/2017-07_bme280sof.csv'

### Read in the data with Dask

In [29]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv(test_path)
end = time.time()
print("Read JSON with dask: ",(end-start),"sec")

Read JSON with dask:  0.005956888198852539 sec


### Read in the data with Pandas

In [30]:
import pandas as pd
start = time.time()
df = pd.read_csv(test_path)
end = time.time()
print("Read CSV with pandas: ",(end-start),"sec")

Read CSV with pandas:  0.4173860549926758 sec


### Here Dask is better than Pandas, Modin and Ray, with the least reading time of 0.012 sec

In [31]:
from dask import dataframe as dd
df = dd.read_csv(test_path,delimiter=',')

In [33]:
df.columns

Index(['Unnamed: 0', 'sensor_id', 'location', 'lat', 'lon', 'timestamp',
       'pressure', 'temperature', 'humidity'],
      dtype='object')

In [34]:
df = df.drop(df.columns[[0]], axis=1)  # df.columns is zero-based pd.Index

***DROP THE FIRST COLUMN***

In [35]:
#No. of Rows
len(df.index)

701548

In [36]:
#No, of Columns
len(df.columns)

8

In [37]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

  df.columns=df.columns.str.replace('[#,@,&]','')


In [38]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [39]:
data=df.columns
data

Index(['sensor_id', 'location', 'lat', 'lon', 'timestamp', 'pressure',
       'temperature', 'humidity'],
      dtype='object')

### Validation

In [40]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [5]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df = df.drop(df.columns[[0]], axis=1)  # df.columns is zero-based pd.Index
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    received_col = list(map(lambda x: x.lower(), list(df.columns)))
    
    if len(received_col) == len(expected_col) and set(expected_col)  == set(received_col):
        logging.info("column name and column length validation passed")
        return 1
    else:
        logging.info("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        logging.info("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        logging.info("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [7]:
%%writefile schema.yaml
file_type: csv
dataset_name: file
file_name: 2017-07_bme280sof
table_name: sensorData
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - unnamed__0
    - sensor_id
    - location
    - lat
    - lon
    - timestamp
    - pressure
    - temperature
    - humidity

Writing schema.yaml


In [6]:
#%%writefile process.py

import datetime
import csv
import gzip
from dask import dataframe as dd
import utility as util
import glob
import pandas as pd

logging = False

if logging:
    import sys
    old_stdout = sys.stdout

    log_file = open("message.log","w")
    sys.stdout = log_file


def append_df(df, destination):
    # Write csv in gz format in pipe separated text file (|)
    df.to_csv(destination,
            sep='|',
            header=True,
            index=False,
            quoting=csv.QUOTE_ALL,
            compression='gzip',
            quotechar='"',
            doublequote=True,
            lineterminator='\n')

def get_files_from_warehouse(path):
    return glob.glob(os.path.join(path, "*.csv"))

def process_data(csv_files, config_data, dest):
    
    for filename in csv_files:
        
        df = dd.read_csv(filename,delimiter=config_data['inbound_delimiter'])

        #validating the header of the file
        if util.col_header_val(df,config_data):

            # append data frame to CSV file
            append_df(df, dest)

            print(f'SUCCESS: {filename}')

        else:
            print(f'FAILED: {filename}')   

config_data = util.read_config_file("schema.yaml")
csv_files = get_files_from_warehouse('data/')
dest = 'process.csv.gz'

process_data(csv_files, config_data, dest)


if logging:
    sys.stdout = old_stdout
    log_file.close()



FAILED: data/2017-12_sds011sof.csv
SUCCESS: data/2018-09_bme280sof.csv
FAILED: data/2018-02_sds011sof.csv
FAILED: data/2018-06_sds011sof.csv
FAILED: data/2018-11_sds011sof.csv
SUCCESS: data/2019-04_bme280sof.csv
FAILED: data/2018-01_sds011sof.csv
SUCCESS: data/2019-03_bme280sof.csv
FAILED: data/2018-12_sds011sof.csv
FAILED: data/2018-05_sds011sof.csv
SUCCESS: data/2019-07_bme280sof.csv
FAILED: data/2017-11_sds011sof.csv
SUCCESS: data/2017-09_bme280sof.csv
FAILED: data/2017-08_sds011sof.csv
SUCCESS: data/2017-07_bme280sof.csv
SUCCESS: data/2017-10_bme280sof.csv
SUCCESS: data/2018-04_bme280sof.csv
FAILED: data/2019-06_sds011sof.csv
FAILED: data/2019-02_sds011sof.csv
SUCCESS: data/2018-10_bme280sof.csv
SUCCESS: data/2018-07_bme280sof.csv
FAILED: data/2019-05_sds011sof.csv
SUCCESS: data/2018-03_bme280sof.csv
FAILED: data/2018-08_sds011sof.csv
FAILED: data/2019-01_sds011sof.csv
SUCCESS: data/2018-02_bme280sof.csv
FAILED: data/2018-09_sds011sof.csv
FAILED: data/2019-04_sds011sof.csv
SUCCESS:

All of the *sds* files have additional columns, `p1` and `p2` which invalidates them

In [15]:
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
import os

target = 'process.csv.gz/'

filenames = os.listdir(target)

for filename in filenames:

    dfs = delayed(pd.read_csv(target+filename, compression = 'gzip', sep='|'))

df = dd.from_delayed(dfs) # df is a dask dataframe

df.columns

Index(['unnamed__0', 'sensor_id', 'location', 'lat', 'lon', 'timestamp',
       'pressure', 'temperature', 'humidity'],
      dtype='object')

Columns check out

In [17]:
print(f' Total rows = {len(df)}')

 Total rows = 905440


In [13]:
#compare file sizes

total = 0 
for file in csv_files:
    if 'bme' in file:
        total += os.path.getsize(file)

print(f'Original size: {total}')


compressed = os.path.getsize('process.csv.gz')
print(f'Original size: {compressed}')

Original size: 7811040806
Original size: 256
