# Clean Joined Data As Parquet

## Load Libraries

In [33]:
import os
import glob
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import numpy as np
import seaborn as sns
import sidetable as stb
import pprint
import yaml

## Read config

In [25]:
with open('../params.yaml') as conf_file:
    config = yaml.safe_load(conf_file)

In [26]:
pprint.pprint(config["data_load"]["evi_dataset_csv"])

'data/raw/EVI_DATA_15TH_DEC_2022.csv'


## Convert from csv to parquet

In [27]:
# Get a list of CSV files to import
csv_files = glob.glob('C:/Users/User/Downloads/EVI/*.csv')

In [28]:
# define the column data types
dtype = {'ID': 'int64', 'REG_NO':'object', 'VEHICLE_CLASS':'int64', 'VEHICLE_COLOR': 'object', \
         'VEHICLE_MODEL': 'object', 'IP_ADDRESS':'object', 'ANTENNA':'int64', 'DISCOVER_TIME':'object'}

In [29]:
# read the CSV file with specified column data types
df = dd.read_csv(csv_files, dtype=dtype)

In [30]:
# Get the number of rows in the Dask DataFrame
with ProgressBar():
    num_rows = len(df)

[########################################] | 100% Completed | 28.30 s


In [31]:
# Print the number of rows
print("Number of rows:", num_rows)

Number of rows: 14771592


In [32]:
# Write the result to a parquet file
with ProgressBar():
    df.to_parquet('C:/Users/User/Downloads/EVI/output.parquet', compression='gzip', engine='pyarrow')

[########################################] | 100% Completed | 41.58 s


## Load parquet in DuckDB

In [4]:
import duckdb  
conn = duckdb.connect(':memory:')

In [5]:
#read all files with a name ending in ".csv" in the folder "dir"
evi_dataset = "C:/Users/User/Downloads/EVI/output.parquet/*.parquet"
conn.sql('CREATE TABLE evi_data AS SELECT * FROM read_parquet(\'' + evi_dataset + '\')')
#SELECT * FROM 'dir/*.csv';

FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

In [6]:
conn.execute('PRAGMA table_info(evi_data)').df()

Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,ID,BIGINT,False,,False
1,1,REG_NO,VARCHAR,False,,False
2,2,VEHICLE_CLASS,BIGINT,False,,False
3,3,VEHICLE_COLOR,VARCHAR,False,,False
4,4,VEHICLE_MODEL,VARCHAR,False,,False
5,5,IP_ADDRESS,VARCHAR,False,,False
6,6,ANTENNA,BIGINT,False,,False
7,7,DISCOVER_TIME,VARCHAR,False,,False
8,8,__null_dask_index__,BIGINT,False,,False


In [7]:
conn.execute('''  
SELECT 
    COUNT(*)
FROM
   evi_data
    
''').df()

Unnamed: 0,count_star()
0,14771592


In [8]:
evi_reader = '../' + config["data_load"]["reader_dataset_csv"]
conn.sql('CREATE TABLE reader AS SELECT * FROM read_csv_auto(\'' + evi_reader + '\')')

In [9]:
conn.execute('PRAGMA table_info(reader)').df()

Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,NAME,VARCHAR,False,,False
1,1,IP,VARCHAR,False,,False
2,2,LONGITUDE,DOUBLE,False,,False
3,3,LATITUDE,DOUBLE,False,,False
4,4,ID,BIGINT,False,,False


In [10]:
conn.execute('''  
SELECT 
    COUNT(*)
FROM
   reader
''').df()

Unnamed: 0,count_star()
0,40


In [11]:
evi_reader_pair = '../' + config["data_load"]["reader_pair_dataset_csv"]
conn.sql('CREATE VIEW reader_pair AS SELECT * FROM read_csv_auto(\'' + evi_reader_pair + '\')')

In [12]:
conn.execute('PRAGMA table_info(reader_pair)').df()

Unnamed: 0,cid,name,type,notnull,dflt_value,pk
0,0,ID,BIGINT,False,,False
1,1,READER1,BIGINT,False,,False
2,2,READER2,BIGINT,False,,False
3,3,PAIR_NAME,VARCHAR,False,,False


In [13]:
conn.execute('''  
SELECT 
    COUNT(*)
FROM
   reader_pair
''').df()

Unnamed: 0,count_star()
0,12


In [14]:
display(conn.execute('SHOW TABLES').df())

Unnamed: 0,name
0,evi_data
1,reader
2,reader_pair


## IP Address Checking

In [15]:
conn.sql('''  
CREATE TABLE evi_ip AS
SELECT DISTINCT
    evi_data.IP_ADDRESS
FROM
   evi_data

''')

In [None]:
conn.execute('''  
SELECT 
    IP_ADDRESS
FROM
   evi_ip
WHERE IP_ADDRESS NOT IN (SELECT IP from reader)

''').df() 

## Reader ID Check

In [None]:
conn.execute('''  
SELECT 
    *
FROM
   reader
ORDER BY ID

''').df() 

In [None]:
conn.execute('''  
SELECT 
    *
FROM
   reader_pair
ORDER BY ID

''').df() 

In [None]:
conn.execute('''  
SELECT 
    READER1
FROM
   reader_pair
UNION 
SELECT READER2 from reader_pair

''').df() 

In [None]:
conn.execute('''  
SELECT 
    ID
FROM
   reader
WHERE ID NOT IN (SELECT READER1 from reader_pair UNION SELECT READER2 from reader_pair)

''').df() 

## String to Timestamp conversion

In [21]:
conn.execute('''  
SELECT 
    SUBSTR(DISCOVER_TIME, 1, 9),
    SUBSTR(DISCOVER_TIME, 10, 16),
    SUBSTR(DISCOVER_TIME, 10, 6),
    SUBSTR(DISCOVER_TIME, 29, 3),
    
    STRPTIME(SUBSTR(DISCOVER_TIME, 1, 9) || SUBSTR(DISCOVER_TIME, 10, 16) || 
    SUBSTR(DISCOVER_TIME, 29, 3), '%d-%b-%y %H.%M.%S.%f %p') as DISCOVER_TIME,
    
    STRPTIME(SUBSTR(DISCOVER_TIME, 1, 9) || SUBSTR(DISCOVER_TIME, 10, 6) || 
    SUBSTR(DISCOVER_TIME, 29, 3), '%d-%b-%y %H.%M %p') as DISCOVER_TIMESTAMP 
FROM
    evi_data
ORDER BY discover_time DESC
LIMIT 5
''').df()

FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,"substr(""DISCOVER_TIME"", 1, 9)","substr(""DISCOVER_TIME"", 10, 16)","substr(""DISCOVER_TIME"", 10, 6)","substr(""DISCOVER_TIME"", 29, 3)",DISCOVER_TIME,DISCOVER_TIMESTAMP
0,31-MAR-22,11.59.59.962000,11.59,PM,2022-03-31 23:59:59.962,2022-03-31 23:59:00
1,31-MAR-22,11.59.59.911000,11.59,PM,2022-03-31 23:59:59.911,2022-03-31 23:59:00
2,31-MAR-22,11.59.58.538000,11.59,PM,2022-03-31 23:59:58.538,2022-03-31 23:59:00
3,31-MAR-22,11.59.58.210000,11.59,PM,2022-03-31 23:59:58.210,2022-03-31 23:59:00
4,31-MAR-22,11.59.57.621000,11.59,PM,2022-03-31 23:59:57.621,2022-03-31 23:59:00


## Inner join among evi_data, reader, and reader_pair table

In [None]:
conn.execute('''  
SELECT 
    evi_data.reg_no,
    evi_data.antenna,
    STRPTIME(SUBSTR(DISCOVER_TIME, 1, 9) || SUBSTR(DISCOVER_TIME, 10, 16) || 
    SUBSTR(DISCOVER_TIME, 29, 3), '%d-%b-%y %H.%M.%S.%f %p') as DISCOVER_TIME,  
    STRPTIME(SUBSTR(DISCOVER_TIME, 1, 9) || SUBSTR(DISCOVER_TIME, 10, 6) || 
    SUBSTR(DISCOVER_TIME, 29, 3), '%d-%b-%y %H.%M %p') as DISCOVER_TIMESTAMP,
    evi_data.IP_ADDRESS,
    reader.id,
    reader.longitude,
    reader.latitude,
    reader.name,
FROM
   evi_data, reader,
   (SELECT
    reader_pair.reader1 reader1,
    reader_pair.reader2 reader2,
    reader_pair.pair_name
    FROM
    reader_pair) pair
WHERE 
    TRIM(evi_data.ip_address) = TRIM(reader.ip)
AND
    (pair.reader1 = reader.id 
    OR pair.reader2 = reader.id)
ORDER BY discover_time DESC
LIMIT 5
    
''').df()

In [None]:
conn.execute('''  
SELECT 
    COUNT(REG_NO)
FROM
   evi_data
    
''').df()

In [None]:
conn.sql('''  
CREATE TABLE reader_table AS
SELECT 
    reader.id,
    reader.ip,
    reader.longitude,
    reader.latitude,
    pair.reader1,
    pair.reader2,
    pair.pair_name,
    reader.name,
FROM
   reader,
   (SELECT
    reader_pair.reader1 reader1,
    reader_pair.reader2 reader2,
    reader_pair.pair_name
    FROM
    reader_pair) pair
WHERE 
    (pair.reader1 = reader.id 
    OR pair.reader2 = reader.id)    
''')

In [None]:
conn.execute('''  
SELECT 
    count(*)
FROM
   reader_table
    
''').df()

In [None]:
conn.execute('PRAGMA table_info(reader_table)').df()

In [None]:
conn.sql('''  
CREATE TABLE data_table AS
SELECT 
    evi_data.reg_no,
    evi_data.antenna,
    STRPTIME(SUBSTR(DISCOVER_TIME, 1, 9) || SUBSTR(DISCOVER_TIME, 10, 16) || 
    SUBSTR(DISCOVER_TIME, 29, 3), '%d-%b-%y %H.%M.%S.%f %p') as DISCOVER_TIME,  
    STRPTIME(SUBSTR(DISCOVER_TIME, 1, 9) || SUBSTR(DISCOVER_TIME, 10, 9) || 
    SUBSTR(DISCOVER_TIME, 29, 3), '%d-%b-%y %H.%M.%S %p') as DISCOVER_TIMESTAMP,
    evi_data.IP_ADDRESS,
    reader_table.id,
    reader_table.longitude,
    reader_table.latitude,
    reader_table.pair_name,
    reader_table.name,
    reader_table.reader1,
    reader_table.reader2
FROM evi_data
INNER JOIN reader_table
ON evi_data.IP_ADDRESS = reader_table.IP
''')

In [None]:
conn.execute('''  
SELECT 
    COUNT(*)
FROM
   data_table
    
''').df()

In [None]:
conn.execute('PRAGMA table_info(data_table)').df()

## Categorical Value Check

In [None]:
conn.execute('''  
SELECT 
    NAME,
    COUNT(NAME) As Values
FROM
   data_table
GROUP BY NAME
ORDER BY NAME
''').df()

In [None]:
conn.sql('UPDATE data_table SET NAME = \'To Notun Baazar New\' WHERE NAME=\'To Natun Bazar New\'')

In [None]:
conn.execute('''  
SELECT 
    NAME,
    COUNT(NAME) As Values
FROM
   data_table
GROUP BY NAME
    
''').df()

In [None]:
conn.execute('''  
SELECT 
    PAIR_NAME,
    COUNT(PAIR_NAME) As Values
FROM
   data_table
GROUP BY PAIR_NAME
ORDER BY PAIR_NAME
''').df()

## New Features (Year, Month, Day, etc.)

In [None]:
conn.sql('''
CREATE TABLE clean_join_data AS
SELECT 
    ID,
    REG_NO,
    ANTENNA,
    DISCOVER_TIME,
    DISCOVER_TIMESTAMP,
    YEAR(DISCOVER_TIMESTAMP) as DISCOVER_YEAR, 
    MONTH(DISCOVER_TIMESTAMP) as DISCOVER_MONTH,
    DAY(DISCOVER_TIMESTAMP) as DISCOVER_DAY, 
    HOUR(DISCOVER_TIMESTAMP) as DISCOVER_HOUR,
    MINUTE(DISCOVER_TIMESTAMP) as DISCOVER_MINUTE,
    IP_ADDRESS,
    LATITUDE,
    LONGITUDE,
    PAIR_NAME,
    NAME
FROM
   data_table
''')

In [None]:
conn.execute('''  
SELECT 
    COUNT(*)
FROM
   clean_join_data
    
''').df()

## Save Clean Joined Data as parquet format

In [23]:
parquet_file = '../' + config["featurize"]["clean_joined_data"]
print(parquet_file)

../data/processed/clean_join_data_parquet


In [None]:
conn.sql('COPY clean_join_data TO \'' + parquet_file + '\'(FORMAT PARQUET)')