In [18]:
import os
import gzip
import csv
import datetime as dt
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

In [5]:
RAW_DATA_PATH = 'raw_data/d12_text_station_5min_2019_10_08.txt.gz'

COLS_TO_IMPORT = list(range(0, 12))
df = pd.read_csv(RAW_DATA_PATH, compression='gzip', delimiter=',', header=None, usecols=COLS_TO_IMPORT)
cols_name = ['timestamp', 'station', 'district', 'freeway_num', 'direction', \
     'lane_type', 'station_length', 'samples', 'pct_observed', 'total_flow', \
     'avg_occupancy', 'avg_speed']
df.columns = cols_name
df.head()

Unnamed: 0,timestamp,station,district,freeway_num,direction,lane_type,station_length,samples,pct_observed,total_flow,avg_occupancy,avg_speed
0,10/08/2019 00:00:00,1201044,12,133,S,OR,,0,0,,,
1,10/08/2019 00:00:00,1201052,12,133,S,FR,,10,100,0.0,0.0,
2,10/08/2019 00:00:00,1201054,12,133,S,ML,1.285,30,100,8.0,0.0038,68.8
3,10/08/2019 00:00:00,1201058,12,133,N,OR,,0,0,,,
4,10/08/2019 00:00:00,1201064,12,133,N,FR,,0,0,,,


In [7]:
df['timestamp'] = pd.to_datetime(df['timestamp'])

In [10]:
df['timestamp'] = df['timestamp'].dt.tz_localize('America/Los_Angeles')

In [11]:
df.dtypes

timestamp         datetime64[ns, America/Los_Angeles]
station                                         int64
district                                        int64
freeway_num                                     int64
direction                                      object
lane_type                                      object
station_length                                float64
samples                                         int64
pct_observed                                    int64
total_flow                                    float64
avg_occupancy                                 float64
avg_speed                                     float64
dtype: object

In [12]:
df.head()

Unnamed: 0,timestamp,station,district,freeway_num,direction,lane_type,station_length,samples,pct_observed,total_flow,avg_occupancy,avg_speed
0,2019-10-08 00:00:00-07:00,1201044,12,133,S,OR,,0,0,,,
1,2019-10-08 00:00:00-07:00,1201052,12,133,S,FR,,10,100,0.0,0.0,
2,2019-10-08 00:00:00-07:00,1201054,12,133,S,ML,1.285,30,100,8.0,0.0038,68.8
3,2019-10-08 00:00:00-07:00,1201058,12,133,N,OR,,0,0,,,
4,2019-10-08 00:00:00-07:00,1201064,12,133,N,FR,,0,0,,,


In [14]:
df['lane_type'].unique()

array(['OR', 'FR', 'ML', 'HV', 'FF', 'CD', 'CH'], dtype=object)

In [16]:
# keep lane_type = ML or HV
lane_type_keep = ['ML', 'HV']
df = df[df['lane_type'].isin(lane_type_keep)]
df.head()

Unnamed: 0,timestamp,station,district,freeway_num,direction,lane_type,station_length,samples,pct_observed,total_flow,avg_occupancy,avg_speed
2,2019-10-08 00:00:00-07:00,1201054,12,133,S,ML,1.285,30,100,8.0,0.0038,68.8
5,2019-10-08 00:00:00-07:00,1201066,12,133,N,ML,0.67,20,100,11.0,0.0044,65.7
7,2019-10-08 00:00:00-07:00,1201076,12,133,N,ML,0.185,0,0,44.0,0.0292,60.9
9,2019-10-08 00:00:00-07:00,1201085,12,405,S,HV,0.18,10,100,3.0,0.0022,65.0
10,2019-10-08 00:00:00-07:00,1201087,12,405,S,ML,0.18,50,100,67.0,0.0066,72.1


In [20]:
# show the unique and the counts of unique values of timestamp
# the total 5-minute interval should be 288
print(df['timestamp'].nunique())
df['timestamp'].unique()

288


<DatetimeArray>
['2019-10-08 00:00:00-07:00', '2019-10-08 00:05:00-07:00',
 '2019-10-08 00:10:00-07:00', '2019-10-08 00:15:00-07:00',
 '2019-10-08 00:20:00-07:00', '2019-10-08 00:25:00-07:00',
 '2019-10-08 00:30:00-07:00', '2019-10-08 00:35:00-07:00',
 '2019-10-08 00:40:00-07:00', '2019-10-08 00:45:00-07:00',
 ...
 '2019-10-08 23:10:00-07:00', '2019-10-08 23:15:00-07:00',
 '2019-10-08 23:20:00-07:00', '2019-10-08 23:25:00-07:00',
 '2019-10-08 23:30:00-07:00', '2019-10-08 23:35:00-07:00',
 '2019-10-08 23:40:00-07:00', '2019-10-08 23:45:00-07:00',
 '2019-10-08 23:50:00-07:00', '2019-10-08 23:55:00-07:00']
Length: 288, dtype: datetime64[ns, America/Los_Angeles]

In [42]:
STATION_DATA_PATH = 'raw_data/d12_text_meta_2019_11_05.txt'
scols_to_keep = ['ID', 'Fwy', 'Dir', 'District', 'County', 'Abs_PM', 'Latitude', 'Longitude']
sdf =  pd.read_csv(STATION_DATA_PATH, delimiter='\t', usecols=scols_to_keep)

In [43]:
sdf.shape

(2420, 8)

In [44]:
sdf.nunique()

ID           2420
Fwy            13
Dir             4
District        1
County          1
Abs_PM        858
Latitude     1093
Longitude    1097
dtype: int64

In [45]:
sdf.head()

Unnamed: 0,ID,Fwy,Dir,District,County,Abs_PM,Latitude,Longitude
0,1201044,133,S,12,59,8.991,33.66184,-117.7553
1,1201052,133,S,12,59,8.991,33.66184,-117.7553
2,1201054,133,S,12,59,8.991,33.66184,-117.7553
3,1201058,133,N,12,59,8.857,33.659542,-117.756294
4,1201064,133,N,12,59,8.723,33.657392,-117.757636


In [47]:
sdf.dtypes

ID             int64
Fwy            int64
Dir           object
District       int64
County         int64
Abs_PM       float64
Latitude     float64
Longitude    float64
dtype: object

In [48]:
sdf_col_names = ['station', 'freeway_num', 'direction', 'district', 'county', 'abs_pm', 'latitude', 'longitude']
sdf.columns = sdf_col_names

In [49]:
sdf.head()

Unnamed: 0,station,freeway_num,direction,district,county,abs_pm,latitude,longitude
0,1201044,133,S,12,59,8.991,33.66184,-117.7553
1,1201052,133,S,12,59,8.991,33.66184,-117.7553
2,1201054,133,S,12,59,8.991,33.66184,-117.7553
3,1201058,133,N,12,59,8.857,33.659542,-117.756294
4,1201064,133,N,12,59,8.723,33.657392,-117.757636


In [50]:
df.shape

(443808, 12)

In [51]:
# merge df with sdf
merged_df = pd.merge(df, sdf[['station', 'abs_pm', 'latitude', 'longitude']], on='station', how='left')
print(merged_df.shape)

(443808, 15)


(443808, 15)

In [52]:
merged_df.head()

Unnamed: 0,timestamp,station,district,freeway_num,direction,lane_type,station_length,samples,pct_observed,total_flow,avg_occupancy,avg_speed,abs_pm,latitude,longitude
0,2019-10-08 00:00:00-07:00,1201054,12,133,S,ML,1.285,30,100,8.0,0.0038,68.8,8.991,33.66184,-117.7553
1,2019-10-08 00:00:00-07:00,1201066,12,133,N,ML,0.67,20,100,11.0,0.0044,65.7,8.991,33.661697,-117.754967
2,2019-10-08 00:00:00-07:00,1201076,12,133,N,ML,0.185,0,0,44.0,0.0292,60.9,9.091,33.663305,-117.753976
3,2019-10-08 00:00:00-07:00,1201085,12,405,S,HV,0.18,10,100,3.0,0.0022,65.0,0.54,33.647015,-117.744091
4,2019-10-08 00:00:00-07:00,1201087,12,405,S,ML,0.18,50,100,67.0,0.0066,72.1,0.54,33.647015,-117.744091


In [53]:
df_405N_ML = merged_df[(merged_df['freeway_num']==405) & (merged_df['direction']=='N') & (merged_df['lane_type']=='ML')]
print(df_405N_ML.shape)

(16704, 15)


In [57]:
df_405N_ML.head()

Unnamed: 0,timestamp,station,district,freeway_num,direction,lane_type,station_length,samples,pct_observed,total_flow,avg_occupancy,avg_speed,abs_pm,latitude,longitude
5,2019-10-08 00:00:00-07:00,1201100,12,405,N,ML,0.255,50,100,37.0,0.0045,72.6,0.7,33.64809,-117.746598
9,2019-10-08 00:00:00-07:00,1201125,12,405,N,ML,0.32,60,67,44.0,0.0037,69.6,0.88,33.648958,-117.74955
15,2019-10-08 00:00:00-07:00,1201159,12,405,N,ML,0.21,50,100,43.0,0.0038,73.3,2.81,33.659163,-117.780107
18,2019-10-08 00:00:00-07:00,1201185,12,405,N,ML,0.36,39,100,79.0,0.0156,68.0,3.63,33.663379,-117.793258
21,2019-10-08 00:00:00-07:00,1201211,12,405,N,ML,0.575,39,100,77.0,0.0145,69.1,3.8,33.663798,-117.796176


In [62]:
df_405N_ML.sort_values(['abs_pm', 'timestamp']).reset_index(drop=True)

Unnamed: 0,timestamp,station,district,freeway_num,direction,lane_type,station_length,samples,pct_observed,total_flow,avg_occupancy,avg_speed,abs_pm,latitude,longitude
0,2019-10-08 00:00:00-07:00,1211066,12,405,N,ML,0.535,13,100,14.0,0.0024,67.7,0.370,33.646486,-117.741156
1,2019-10-08 00:05:00-07:00,1211066,12,405,N,ML,0.535,14,100,0.0,0.0000,68.1,0.370,33.646486,-117.741156
2,2019-10-08 00:10:00-07:00,1211066,12,405,N,ML,0.535,12,100,0.0,0.0000,68.1,0.370,33.646486,-117.741156
3,2019-10-08 00:15:00-07:00,1211066,12,405,N,ML,0.535,9,100,0.0,0.0000,68.1,0.370,33.646486,-117.741156
4,2019-10-08 00:20:00-07:00,1211066,12,405,N,ML,0.535,13,100,0.0,0.0000,68.1,0.370,33.646486,-117.741156
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
16699,2019-10-08 23:35:00-07:00,1221562,12,405,N,ML,0.445,40,100,87.0,0.0168,69.4,23.699,33.782868,-118.090872
16700,2019-10-08 23:40:00-07:00,1221562,12,405,N,ML,0.445,40,100,98.0,0.0194,69.2,23.699,33.782868,-118.090872
16701,2019-10-08 23:45:00-07:00,1221562,12,405,N,ML,0.445,40,100,88.0,0.0173,69.3,23.699,33.782868,-118.090872
16702,2019-10-08 23:50:00-07:00,1221562,12,405,N,ML,0.445,40,100,94.0,0.0187,69.4,23.699,33.782868,-118.090872


In [65]:
# replace the flow with Null is the pct is lower than 80
pct_observed_mask = df_405N_ML['pct_observed'] < 80
df_405N_ML.loc[pct_observed_mask, 'total_flow'] = np.nan
df_405N_ML.head()

Unnamed: 0,timestamp,station,district,freeway_num,direction,lane_type,station_length,samples,pct_observed,total_flow,avg_occupancy,avg_speed,abs_pm,latitude,longitude
5,2019-10-08 00:00:00-07:00,1201100,12,405,N,ML,0.255,50,100,37.0,0.0045,72.6,0.7,33.64809,-117.746598
9,2019-10-08 00:00:00-07:00,1201125,12,405,N,ML,0.32,60,67,,0.0037,69.6,0.88,33.648958,-117.74955
15,2019-10-08 00:00:00-07:00,1201159,12,405,N,ML,0.21,50,100,43.0,0.0038,73.3,2.81,33.659163,-117.780107
18,2019-10-08 00:00:00-07:00,1201185,12,405,N,ML,0.36,39,100,79.0,0.0156,68.0,3.63,33.663379,-117.793258
21,2019-10-08 00:00:00-07:00,1201211,12,405,N,ML,0.575,39,100,77.0,0.0145,69.1,3.8,33.663798,-117.796176


In [76]:
df_405N_ML_pivot = df_405N_ML.pivot(index='station', columns='timestamp', values='total_flow').reset_index()
df_405N_ML_pivot.head()

timestamp,station,2019-10-08 00:00:00-07:00,2019-10-08 00:05:00-07:00,2019-10-08 00:10:00-07:00,2019-10-08 00:15:00-07:00,2019-10-08 00:20:00-07:00,2019-10-08 00:25:00-07:00,2019-10-08 00:30:00-07:00,2019-10-08 00:35:00-07:00,2019-10-08 00:40:00-07:00,...,2019-10-08 23:10:00-07:00,2019-10-08 23:15:00-07:00,2019-10-08 23:20:00-07:00,2019-10-08 23:25:00-07:00,2019-10-08 23:30:00-07:00,2019-10-08 23:35:00-07:00,2019-10-08 23:40:00-07:00,2019-10-08 23:45:00-07:00,2019-10-08 23:50:00-07:00,2019-10-08 23:55:00-07:00
0,1201100,37.0,43.0,45.0,56.0,42.0,37.0,41.0,32.0,48.0,...,97.0,75.0,66.0,55.0,56.0,67.0,73.0,72.0,51.0,37.0
1,1201125,,,,,,,,,,...,,,,,,,,,,
2,1201159,43.0,41.0,44.0,51.0,46.0,46.0,32.0,37.0,42.0,...,97.0,84.0,74.0,64.0,57.0,65.0,71.0,66.0,65.0,41.0
3,1201185,79.0,59.0,67.0,66.0,71.0,66.0,46.0,57.0,51.0,...,141.0,116.0,106.0,87.0,92.0,89.0,100.0,100.0,87.0,58.0
4,1201211,77.0,62.0,65.0,69.0,76.0,64.0,46.0,57.0,54.0,...,140.0,119.0,106.0,86.0,93.0,90.0,100.0,99.0,58.0,39.0


In [108]:
df_405N_ML_merged = pd.merge(df_405N_ML_pivot, sdf[['station', 'abs_pm', 'latitude', 'longitude']], on='station', how='left')

In [109]:
df_405N_ML_merged.iloc[:, 1:].interpolate()

Unnamed: 0,2019-10-08 00:00:00-07:00,2019-10-08 00:05:00-07:00,2019-10-08 00:10:00-07:00,2019-10-08 00:15:00-07:00,2019-10-08 00:20:00-07:00,2019-10-08 00:25:00-07:00,2019-10-08 00:30:00-07:00,2019-10-08 00:35:00-07:00,2019-10-08 00:40:00-07:00,2019-10-08 00:45:00-07:00,...,2019-10-08 23:25:00-07:00,2019-10-08 23:30:00-07:00,2019-10-08 23:35:00-07:00,2019-10-08 23:40:00-07:00,2019-10-08 23:45:00-07:00,2019-10-08 23:50:00-07:00,2019-10-08 23:55:00-07:00,abs_pm,latitude,longitude
0,37.0,43.0,45.0,56.0,42.0,37.0,41.0,32.0,48.0,38.0,...,55.0,56.0,67.0,73.0,72.0,51.0,37.0,0.7,33.64809,-117.746598
1,40.0,42.0,44.5,53.5,44.0,41.5,36.5,34.5,45.0,35.5,...,59.5,56.5,66.0,72.0,69.0,58.0,39.0,0.88,33.648958,-117.74955
2,43.0,41.0,44.0,51.0,46.0,46.0,32.0,37.0,42.0,33.0,...,64.0,57.0,65.0,71.0,66.0,65.0,41.0,2.81,33.659163,-117.780107
3,79.0,59.0,67.0,66.0,71.0,66.0,46.0,57.0,51.0,48.0,...,87.0,92.0,89.0,100.0,100.0,87.0,58.0,3.63,33.663379,-117.793258
4,77.0,62.0,65.0,69.0,76.0,64.0,46.0,57.0,54.0,53.0,...,86.0,93.0,90.0,100.0,99.0,58.0,39.0,3.8,33.663798,-117.796176
5,60.0,58.0,0.0,40.0,62.0,18.0,42.0,42.0,0.0,91.0,...,97.0,94.0,98.0,110.0,114.0,94.0,68.0,4.78,33.666407,-117.812902
6,155.0,136.0,141.0,153.0,146.0,148.0,135.0,126.0,114.0,129.0,...,187.0,153.0,154.0,150.0,173.0,166.0,130.0,5.32,33.669218,-117.821657
7,83.0,64.0,71.0,70.0,73.0,66.0,64.0,59.0,52.0,63.0,...,109.0,101.0,106.0,104.0,111.0,108.0,64.0,5.51,33.670291,-117.824699
8,63.0,10.0,9.0,36.0,0.0,40.0,0.0,42.0,22.0,9.0,...,123.0,115.0,103.0,122.0,117.0,107.0,84.0,5.98,33.672851,-117.832271
9,79.0,61.0,75.0,65.0,78.0,59.0,63.0,61.0,53.0,49.0,...,117.0,102.0,97.0,119.0,104.0,103.0,77.0,6.62,33.676465,-117.842488


In [110]:
# rename all the columns
pivot_col_names = [f"{hour:02d}{minute:02d}" for hour in range(0, 24) for minute in range(0, 60, 5)]
pivot_col_names.insert(0, 'station')
pivot_col_names.extend(['abs_pm', 'latitude', 'longitude'])

In [112]:
df_405N_ML_merged.shape[1] == len(pivot_col_names)

Unnamed: 0,station,2019-10-08 00:00:00-07:00,2019-10-08 00:05:00-07:00,2019-10-08 00:10:00-07:00,2019-10-08 00:15:00-07:00,2019-10-08 00:20:00-07:00,2019-10-08 00:25:00-07:00,2019-10-08 00:30:00-07:00,2019-10-08 00:35:00-07:00,2019-10-08 00:40:00-07:00,...,2019-10-08 23:25:00-07:00,2019-10-08 23:30:00-07:00,2019-10-08 23:35:00-07:00,2019-10-08 23:40:00-07:00,2019-10-08 23:45:00-07:00,2019-10-08 23:50:00-07:00,2019-10-08 23:55:00-07:00,abs_pm,latitude,longitude
0,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
1,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
2,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
3,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
4,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
5,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
6,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
7,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
8,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
9,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False


In [None]:
directory_path = 'data/'
file_list = []
for file in os.listdir(directory_path):
    # os.fsdecode decode a file system name from the file system's encoding to Unicode
    filename = os.fsdecode(file)
    if filename.endswith('.gz'):
        file_list.append(filename)
        # print(f'Processing file: {filename}')

file_list

## Notes

### Filter the data:

- only lane type = `ML` or `HV` will be collected
- `total_flow` and `avg_speed` will be interpolated if `pct_observed >= 70`

In [3]:
directory_path = 'raw_data/'
pemsdata_columns = \
    ['timestamp', 'station', 'district', 'freeway_num', 'direction', \
     'lane_type', 'station_length', 'samples', 'pct_observed', 'total_flow', \
     'avg_occupancy', 'avg_speed']

raw_df = pd.read_csv(directory_path+'d12_text_station_5min_2019_10_08.txt.gz', header=None, usecols=range(0, 12))
raw_df.columns = pemsdata_columns
raw_df['timestamp'] = pd.to_datetime(raw_df['timestamp'], format="%m/%d/%Y %H:%M:%S")
raw_df = raw_df[(raw_df['lane_type'] == 'HV') | (raw_df['lane_type'] == 'ML')]
# raw_df.to_parquet(directory_path + "d12_5min_" + f"{raw_df['timestamp'][0].strftime('%y%m%d')}" + ".parquet")


In [4]:
# if pct_observed is larger than 70, keep it as it is; otherwise, change to interpolate()
raw_df['total_flow'].where(raw_df['pct_observed']>=70, raw_df['total_flow'].interpolate())

2          8.0
5         11.0
7         44.0
9          3.0
10        67.0
          ... 
695512    11.0
695513     0.0
695514     4.0
695518    93.0
695519     1.0
Name: total_flow, Length: 443808, dtype: float64

In [5]:
# assign np.nan value to total_flow if the pct_observed is less than 70
# group by each route, fill np.ana value with interpolate values
# interpolate is designed to be applied on missing values
raw_df['total_flow'].where(raw_df['pct_observed']>=70, np.nan)
raw_df.groupby('freeway_num')['total_flow'].apply(lambda x: x.interpolate())

2          8.0
5         11.0
7         44.0
9          3.0
10        67.0
          ... 
695512    11.0
695513     0.0
695514     4.0
695518    93.0
695519     1.0
Name: total_flow, Length: 443808, dtype: float64

In [6]:
(raw_df['total_flow'].interpolate().fillna(0) == raw_df['total_flow'].where(raw_df['pct_observed']>=70, raw_df['total_flow'].interpolate())).sum()

443808