# ETL Process Tomography Data

Tomography data are available in two folders:
- data/L2 -> STD, SNR, sincos
- data/L3 -> temperature and current

In [1]:
import os
import re
import glob
import pymysql.cursors
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sql_queries import *

In [2]:
conn = pymysql.connect(host='localhost', user='root', password='mypassword', database='tomo')
cur = conn.cursor()

If you want to see all the df, you can set max_rows setting as below

In [3]:
# pd.set_option('display.max_rows', 100)
# pd.reset_option('display.max_rows')

# Stations

Because station information is not provided from the data in the folder, we need to assign station information manually.

Station is grouped in a region which identified from the ID, for example GI-01 and GI-02 are both Gili Iyang stations. New stations need to be assigned this ID which agreed by the team.

As mentioned before, data is separated into two folder: L2 and L3. So each station is also mapped to named folder.

In [4]:
stations_df = pd.read_csv('stations.csv')
stations_df

Unnamed: 0,station_id,name,lat,lon,l2_folder,l3_folder
0,GI-01,Gili Iyang 1,,,GI_01,GI01
1,GI-02,Gili Iyang 2,,,GI_02,GI02
2,GI-03,Gili Iyang 3,,,GI_03,GI03
3,GI-04,Gili Iyang 4,,,GI_04,GI04


Insert into Stations table

In [5]:
# Replace np.nan with None for MySQL insert to work
stations_df_2 = stations_df.replace({np.nan: None})

for index, row in stations_df_2.iterrows():
    cur.execute(stations_table_insert, list(row))
    conn.commit()

Run `test.ipynb` to see the inserted stations data

# Process L2 Files

First create regex for finding all csv files with specified format.

- sincos_file_format = '1210619_04_30_00.csv'
- snr_file_format = '1210618_20_40_00_SNR.csv'
- stack_file_format = '1210619_04_10_00to1210619_00_00_00.csv'
- std_file_format = '1210619_04_20_00to1210619_00_00_00_std.csv'



In [6]:
regex_date      = '([0-9]{6}_[0-9]{2}_[0-9]{2}_[0-9]{2})'
regex_sincos    = re.compile('(?P<station>[0-9])' + regex_date + '.csv')
regex_snr       = re.compile('(?P<station>[0-9])' + regex_date + '_SNR.csv')
regex_stack     = re.compile('(?P<station>[0-9])' + regex_date + 'to' +
                             '(?P<station2>[0-9])' + regex_date + '.csv')
regex_std       = re.compile('(?P<station>[0-9])' + regex_date + 'to' +
                             '(?P<station2>[0-9])' + regex_date + '_std.csv')

In [7]:
def get_files(filepath, regex):
    all_files = []
    count_match = 0
    count_not_match = 0

    for root, dirs, files in os.walk(filepath):
        glob_files = glob.glob(os.path.join(root, '*.csv'))
        for f in glob_files:
            head, tail = os.path.split(f)
            if(regex.match(tail)):
                all_files.append(os.path.abspath(f))
                count_match +=1
            else:
                count_not_match+=1
                
    print('Found {} matching files from total {} files'.format(count_match, count_match+count_not_match))
    return all_files

### Get daily Max SNR/STD files from all stations

Get all std files from all stations

In [None]:
std_files = get_files('data/historical_all_stations', regex_std)
# std_files = get_files('data/historical_data/L2', regex_std)

std_files[:5]

Check the std files headers

In [None]:
std_df = pd.read_csv(std_files[0], sep=' ')
std_df.columns

In [None]:
std_df_2 = pd.read_csv(std_files[134], sep=' ')
std_df_2.columns

Apparently after there are two different type of std csv as we will check further below. 
The one that are we are using is those with `Max_SNR` and `day_in_decimal` header.

Now we are going to build csv files dataframe to easily query which file to ingest

In [None]:
def get_std_station_folder(filepath):
    return os.path.basename(os.path.dirname(filepath))
def get_std_datetime(filename):
    return datetime.strptime(filename[1:16], '%y%m%d_%H_%M_%S')

In [None]:
header_df = pd.DataFrame(columns=['station', 'stfolder', 'date', 'datetime', 'file', 'columns', 'nrows', 'filepath'])
for file in std_files:
    std_df = pd.read_csv(file, sep=' ')
    filename = os.path.split(file)[1]
    columns_list = list(std_df)
    header_df = header_df.append(
        {
            'file': filename, 
            'columns':''.join(map(str, columns_list)),
            'nrows': len(std_df.index),
            'datetime': get_std_datetime(filename),
            'date': get_std_datetime(filename).date(),
            'stfolder': get_std_station_folder(file),
            'filepath': file
        }, ignore_index=True)

header_df['columns'].unique()


In [None]:
std_files_df = header_df.loc[header_df['columns'] == 'Max_SNRday_in_decimal'].copy()
std_files_df

Get all maximum number of rows per day as the main **daily aggregated data**

In [None]:
idx = std_files_df.groupby(["stfolder", "date"])['nrows'].transform(max) == std_files_df['nrows']
std_files_df[idx]

In [None]:
# This shows max() of each column which return the largest nrows but it also return the 
# "latest" file name which may not be the one with largest nrows. Use this to get the latest data per day. 
std_files_df = std_files_df.groupby(["stfolder", "date"], as_index=False).max()
std_files_df

In [None]:
# Script to check nrows of each file on a given stfolder and date
# std_files_df.loc[(std_files_df['stfolder']=='GI_01') & (std_files_df['date']==pd.to_datetime('2021-06-22').date())]

### Process std file

Test with one of the file as an example

In [None]:
std_file = std_files_df.iloc[3].filepath
std_date = std_files_df.iloc[3].datetime

cur.execute(get_station_id_l2_sql, (std_files_df.iloc[3].stfolder))
std_station, = cur.fetchone()

std_data_df = pd.read_csv(std_file, sep=' ')
std_data_df.head()

NOTE: day_in_decimal is time in UTC parsed as 'x.y' where x is the day in given month (from filename) and y time in 24 hour.
eg: For file '1210623_23_40_00to1210623_00_00_00_std.csv' and day_in_decimal 23.979167, it means 
the data date is 23 June 2021 and the time is 0.979167*24=23.500008 in hour or 23:30:00.0288 

But due to some issues, some early files might have differences between 'x' and the date from the filename. In that case get day from the file name. 

In [8]:
def day_decimal_to_timestamp(datetime, day_decimal_ser):
    timestamp_ser = pd.to_datetime(datetime.date()).value + day_decimal_ser%1*86400*1e9
    return pd.to_datetime(timestamp_ser, format='%Y-%m-%d %H:%M:%S')

In [None]:
std_data_df['timestamp'] = day_decimal_to_timestamp(std_date, std_data_df['day_in_decimal'])
std_data_df['station'] = std_station
std_data_df

Insert into Max SNR/STD table

In [None]:
for index, row in std_data_df[['station', 'timestamp', 'Max_SNR']].iterrows():
    cur.execute(max_snr_table_insert, list(row))
    conn.commit()

If all ok then repeat to all the files.

In [None]:
for index, file in std_files_df.iterrows():
    std_file = file.filepath
    std_date = file.datetime
    std_station = get_station_id(cur, file.stfolder)
    std_data_df = pd.read_csv(std_file, sep=' ')
    std_data_df['timestamp'] = day_decimal_to_timestamp(std_date, std_data_df['day_in_decimal'])
    std_data_df['station'] = std_station
    for index, row in std_data_df[['station', 'timestamp', 'Max_SNR']].iterrows():
        cur.execute(max_snr_table_insert, list(row))
        conn.commit()

# Process L3 Files

Next, we want to create regex for L3 csv files with specified format.

- current_file_format = `Curr_G101-GI02.csv`
- temperature_file_format = `Temp_G101-GI03.csv`

In [9]:
regex_current = re.compile('Curr_(?P<src_station>[0-9A-Za-z]{4})-(?P<dest_station>[0-9A-Za-z]{4}).csv')
regex_temp    = re.compile('Temp_(?P<src_station>[0-9A-Za-z]{4})-(?P<dest_station>[0-9A-Za-z]{4}).csv')
regex_test    = re.compile('.*.csv')

### Get All Temperature files

In [10]:
temp_files = get_files('data/historical_data/L3', regex_temp)
temp_files

Found 4 matching files from total 39 files


['C:\\workspace\\tomo\\tomo-etl\\data\\historical_data\\L3\\st00\\Temp_GI01-GI02.csv',
 'C:\\workspace\\tomo\\tomo-etl\\data\\historical_data\\L3\\st00\\Temp_GI01-GI03.csv',
 'C:\\workspace\\tomo\\tomo-etl\\data\\historical_data\\L3\\st00\\Temp_GI01-GI04.csv',
 'C:\\workspace\\tomo\\tomo-etl\\data\\historical_data\\L3\\st02\\Temp_GI03-GI04.csv']

Take a look on one of the data

In [22]:
temp_df = pd.read_csv(temp_files[0], usecols=['Date_In_Decimal', 'degree_(C)'])
temp_df.columns = ['day_in_decimal', 'temperature']
temp_df

Unnamed: 0,day_in_decimal,temperature
0,24.208333,29.238612
1,24.201389,29.219702
2,24.145833,29.224371
3,24.138889,29.223039


### Process Temperature files

In [12]:
def parse_l3_file(file, regex):
    head, tail = os.path.split(file)
    m = regex.match(tail)
    return m.group('src_station'), m.group('dest_station')

In [13]:
def day_decimal_to_timestamp(datetime, day_decimal_ser):
    timestamp_ser = pd.to_datetime(datetime.date()).value + day_decimal_ser%1*86400*1e9
    return pd.to_datetime(timestamp_ser, format='%Y-%m-%d %H:%M:%S')

In [58]:
files_count = 0
total_count = len(temp_files)
for file in temp_files:
    print(f'Processing temperature files: {files_count+1}/{total_count} files')
    src_station, dest_station = parse_l3_file(file, regex_temp)
    
    # Query station id from parsed filename and check file naming consistency
    cur.execute(get_station_id_l3_sql, (src_station))
    src_station_result = cur.fetchone()
    cur.execute(get_station_id_l3_sql, (dest_station))
    dest_station_result = cur.fetchone()
    
    if ((src_station_result is None) or (dest_station_result is None)):
        raise ValueError('Unknown station, please check your L3 file naming')
    else:
        src_station_id, = src_station_result
        dest_station_id, = dest_station_result
        
        
    # Get or create new station link
    cur.execute(get_station_link_sql, (src_station_id, dest_station_id, 
                                       dest_station_id, src_station_id))
    link_result = cur.fetchone()
        
    if link_result is None:
        link_name = f'{src_station_id}_{dest_station_id}'
        cur.execute(station_link_id_insert, (link_name, src_station_id, dest_station_id))
        conn.commit()
        print(f'Created new station link {link_name}')
        
    cur.execute(get_station_link_sql, (src_station_id, dest_station_id, 
                                       dest_station_id, src_station_id))
    link_id, link_name, src_id, dest_id = cur.fetchone()
    
    # Insert temperature data
    temp_df = pd.read_csv(file, usecols=['Date_In_Decimal', 'degree_(C)'])
    temp_df.columns = ['day_in_decimal', 'temperature']
    temp_df['link_id'] = link_id
    # TODO confirm how to get the day in decimal day
    temp_df['timestamp'] = pd.to_datetime(
        pd.to_datetime('20210531', format="%Y%m%d").value + temp_df['day_in_decimal']*86400*1e9, 
        format='%Y-%m-%d %H:%M:%S'
    )
    for index, row in temp_df[['link_id', 'timestamp', 'temperature']].iterrows():
        cur.execute(temp_table_insert, list(row))
        conn.commit()
    
    files_count += 1

Processing temperature files: 1/4 files
Processing temperature files: 2/4 files
Processing temperature files: 3/4 files
Processing temperature files: 4/4 files


# Close connection to db

In [None]:
cur.close()
conn.close()