# Prerequisites

In [None]:
import numpy as np
import tables as tb
import os
import glob
import csv
import shutil
from tqdm.notebook import tqdm as tqdm
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = [16,9]

if not os.path.exists('/experiments'):
  !ln -s /content/drive/My\ Drive/experiments /experiments

% cd /experiments/code/cense_storage/
% pip install pyproj

from utils import *

dataFileSize = 10000
nbFrequencyBands = 29

inputPath = '/experiments/data/global/censeDomainAdaptation/'
localPath = '/experiments/data/local/censeDomainAdaptation/'

dataPath = '/tmp/censeDa/'
if not os.path.exists(dataPath):
  os.makedirs(dataPath)
  
dataPath = inputPath



sListFile = 'sensor_list.csv'
sensorInfo = sensor_list(sListFile)

datasetFileName = 'censeDa.h5'

/content/drive/My Drive/experiments/code/cense_storage
Found 74 sensors.


# Generate dataset


In [None]:
if not os.path.exists(dataPath+'fetch'):
  ! rsync -r --info=progress2 {inputPath+'fetch.zip'} {dataPath}
  ! unzip -qqo -d  {dataPath}  {dataPath+'fetch.zip'}
  ! rm {dataPath+'fetch.zip'}

  3,846,118,429 100%   32.57MB/s    0:01:52 (xfr#1, to-chk=0/1)


In [None]:
class Sensor(tb.IsDescription):
  number = tb.Int32Col()
  id = tb.StringCol(100)
  lat = tb.Float64Col()
  lon = tb.Float64Col() 

In [None]:
tmpFileName = dataPath+'tmpDa'

data = np.zeros((dataFileSize, nbFrequencyBands))
dataTime = np.zeros((dataFileSize, 1))

dryRun = False
reDo = True

if 'f' in locals():
  f.close()

if reDo :
  fileName = dataPath+datasetFileName
  if dryRun:
    fileName = fileName.replace('.h5', 'DryRun.h5')
  f = tb.open_file(fileName, mode='w')
  f.create_table('/', 'sensor', Sensor, 'Sensor information')
  time = f.create_group('/', 'time', 'time expressed in epoch')
  spectrum = f.create_group('/', 'spectrum', 'spectral data third octave bands fast (125ms)')
  for sCount, s in tqdm(enumerate(sensorInfo), total=len(sensorInfo)):
    sensor = f.root.sensor.row
    sensor['id'] = s["sID"]
    sensor['lat'] = s["latGPS"]
    sensor['lon'] = s["lonGPS"]
    sensor['number'] = sCount
    sensor.append()
    arrayTime = f.create_earray(time, 'sensor'+str(sCount), tb.Float64Atom(), (0,))
    arraySpec = f.create_earray(spectrum, 'sensor'+str(sCount), tb.Float64Atom(), (0, nbFrequencyBands))
    #print('----- '+s["sID"]+' -----')
    if dryRun:
      if np.random.randint(1):
        data = np.zeros((10, 32))
        arrayTime.append(data[:, 0])
        arraySpec.append(data[:, 3:])
    else:
      fileNames = []
      for year in [2019, 2020]:
        for month in range(13):
          for day in range(33):
            for hour in range(25):
              fileName = dataPath+'fetch/'+s["sID"]+'/'+str(year)+'/'+str(month)+'/'+str(day)+'/'+str(hour)+'.zip'
              if os.path.exists(fileName):
                fileNames.append(fileName)
      # print(len(fileNames))

      for fileName in fileNames:         
        shutil.copy(fileName, tmpFileName+'.zip')
        os.system('unzip -d /tmp '+tmpFileName+'.zip')
        csvFileName = os.path.basename(fileName).replace('zip', 'csv')
        with open('/tmp/'+csvFileName, 'r') as csvfileID:
          reader = csv.reader(csvfileID, delimiter=',')
          nbVec = (sum(1 for row in reader))
        with open('/tmp/'+csvFileName, 'r') as csvfileID: 
          reader = csv.reader(csvfileID, delimiter=',')
          data = np.zeros((nbVec, 32))
          rCount = 0
          for row in reader:
            data[rCount, :] = [float(s) for s in row]
            rCount +=1
          arrayTime.append(data[:, 0])
          arraySpec.append(data[:, 3:])

          os.remove('/tmp/'+csvFileName)
        os.remove(tmpFileName+'.zip')
  f.root.sensor.flush()
  f.close()

# sync to drive

In [None]:
! rsync  --info=progress2 {dataPath+datasetFileName} {localPath}

  1,152,088,072 100%  152.15MB/s    0:00:07 (xfr#1, to-chk=0/1)


# sync from drive

In [None]:
if not os.path.exists(dataPath+datasetFileName):
  ! rsync  --info=progress2 {inputPath+datasetFileName} {dataPath}
  #! rsync  --info=progress2 {dataPath+datasetFileName} {dataPath+datasetFileName.replace('.h5', 'Bck.h5')}

 12,657,956,872 100%   29.53MB/s    0:06:48 (xfr#1, to-chk=0/1)


# check dataset integrity

  

In [None]:
print(len(sensorInfo))
f = tb.open_file(dataPath+datasetFileName, mode='r')
print(len(f.root.sensor))
print(sum(1 for x in f.root.time))
print(sum(1 for x in f.root.spectrum))
f.close()

74
16
16
16


# Check disk usage

In [None]:
! du -sh {dataPath}

17G	/tmp/censeDa/


# remove sensors with low content

In [None]:
f = tb.open_file(dataPath+datasetFileName, mode='a')
for d in f.root.spectrum: # loop over spectral data
  if d.shape[0]<2000000:
    for xCount, x in enumerate(f.root.sensor):   
      if x['number'] == int(d._v_name.replace('sensor', '')):
        f.root.sensor.remove_row(xCount) # remove column in table
    f.root.time._f_get_child(d._v_name)._f_remove() # remove time data
    d._f_remove() # remove spectral data
f.root.sensor.flush()
f.close()

# put all sensors at length

In [None]:
f = tb.open_file(dataPath+datasetFileName, mode='a')
minSize = min([x.shape[0] for x in f.root.spectrum])
print(minSize)
minSize = 300000 # uncomment to generate small dataset
for s in f.root.sensor: # loop over sensors
  f.root.spectrum._f_get_child('sensor'+str(s['number'])).rename('tmp')
  f.create_array(f.root.spectrum, 'sensor'+str(s['number']), f.root.spectrum.tmp[:minSize])
  f.root.spectrum._f_get_child('tmp')._f_remove()
  print(f.root.spectrum._f_get_child('sensor'+str(s['number'])).shape)

  f.root.time._f_get_child('sensor'+str(s['number'])).rename('tmp')
  f.create_array(f.root.time, 'sensor'+str(s['number']), f.root.time.tmp[:minSize])
  f.root.time._f_get_child('tmp')._f_remove()
  print(f.root.time._f_get_child('sensor'+str(s['number'])).shape) 
f.close()

3296320
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)
(300000, 29)
(300000,)


# Sanity check

min max, and average mean, std per block
per sensor and frequencyBand

In [None]:
def sanityCheck(datasetPath, dryRun=False):

  f = tb.open_file(datasetPath, mode='r')
  stats = np.zeros((len(f.root.sensor), f.root.spectrum.sensor0.shape[1], 5))

  for dCount, data in tqdm(enumerate(f.root.spectrum), total=len(f.root.sensor)):
    if data.shape[0]>0:
      if dryRun:
        data = data[0:100, :]
      stats[dCount, :, 0] = np.amin(data, axis=0)
      stats[dCount, :, 1] = np.amax(data, axis=0)
      stats[dCount, :, 2] = np.mean(data, axis=0)
      stats[dCount, :, 3] = np.std(data, axis=0)
      stats[dCount, :, 4] = data.shape[0]
  f.close()

  return stats

stats = sanityCheck(dataPath+datasetFileName)

HBox(children=(FloatProgress(value=0.0, max=16.0), HTML(value='')))




# Display statistics

In [None]:
plt.rcParams["figure.figsize"] = [16,9]

f, axes = plt.subplots(2, 2)
statNames = ['Min', 'Max', 'Mean', 'Std']
axes = axes.flatten()
for aCount, ax in enumerate(axes):
  ax.set_title(statNames[aCount])
  i = ax.imshow(stats[:, :, aCount])
  f.colorbar(i, ax=ax)
  ax.set_xlabel('Frequency bands')
  ax.set_ylabel('Sensor id')

f.show()
f.savefig(localPath+'/figures/stats.png')

# check for discontinuity

In [None]:
tolerance = 250

f = tb.open_file(dataPath+datasetFileName, mode='r')

nbDisc = np.zeros((len(f.root.sensor), 1))
nbVec = np.zeros((len(f.root.sensor), 1))

for tCount, t in tqdm(enumerate(f.root.time), total=len(f.root.sensor)):
  t0 = t[0]
  nbVec[tCount] = len(t)
  for t1 in t[1:]:
    if t1-t0 > tolerance:
      nbDisc[tCount] += 1
    t0=t1
f.close()

HBox(children=(FloatProgress(value=0.0, max=16.0), HTML(value='')))




# display discontinuity per sensor

In [None]:
fig, axes = plt.subplots(2, 1)

axes[0].plot(nbDisc/max(nbVec))
axes[0].set_xlabel('Sensor id')
axes[0].set_ylabel('# dicontinuities / # vectors')
axes[1].plot(nbVec)
axes[1].set_xlabel('Sensor id')
axes[1].set_ylabel('# vectors')

plt.show()
fig.savefig(localPath+'/figures/discontinuities.png')

# check for continuity

In [None]:
f = tb.open_file(dataPath+datasetFileName, mode='r')
print(len(f.root.time.sensor13))
plt.plot(f.root.time.sensor13)

# curate sensor naming

In [None]:
f = tb.open_file(dataPath+datasetFileName, mode='a')
for dCount, d in enumerate(f.root.sensor): # loop over sensor table
  print(d['number'])
  f.root.time._f_get_child('sensor'+str(d['number'])).rename('sensorTmp'+str(dCount))
  f.root.spectrum._f_get_child('sensor'+str(d['number'])).rename('sensorTmp'+str(dCount))
  d['number'] = dCount 
  d.update()

for d in f.root.spectrum:
  d.rename(d._v_name.replace('Tmp', ''))
for d in f.root.time:
  d.rename(d._v_name.replace('Tmp', ''))
f.close()

# Debug

# close all tables files
  

In [None]:
tb.file._open_files.close_all()

Closing remaining open files:/tmp/censeDa/censeDa.h5...done


# revert backup

In [None]:
! rsync  --info=progress2 {dataPath+datasetFileName.replace('.h5', 'Bck.h5')} {dataPath+datasetFileName}

In [None]:
f = tb.open_file(inputPath+datasetFileName, mode='r')
print(f.root.sensor[:]['number'])
print(f)
f.close()

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15]
/experiments/data/global/censeDa/censeDa.h5 (File) ''
Last modif.: 'Fri Jun  5 18:31:11 2020'
Object Tree: 
/ (RootGroup) ''
/sensor (Table(16,)) 'Sensor information'
/spectrum (Group) 'spectral data third octave bands fast (125ms)'
/spectrum/sensor0 (Array(3296320, 29)) ''
/spectrum/sensor1 (Array(3296320, 29)) ''
/spectrum/sensor10 (Array(3296320, 29)) ''
/spectrum/sensor11 (Array(3296320, 29)) ''
/spectrum/sensor12 (Array(3296320, 29)) ''
/spectrum/sensor13 (Array(3296320, 29)) ''
/spectrum/sensor14 (Array(3296320, 29)) ''
/spectrum/sensor15 (Array(3296320, 29)) ''
/spectrum/sensor2 (Array(3296320, 29)) ''
/spectrum/sensor3 (Array(3296320, 29)) ''
/spectrum/sensor4 (Array(3296320, 29)) ''
/spectrum/sensor5 (Array(3296320, 29)) ''
/spectrum/sensor6 (Array(3296320, 29)) ''
/spectrum/sensor7 (Array(3296320, 29)) ''
/spectrum/sensor8 (Array(3296320, 29)) ''
/spectrum/sensor9 (Array(3296320, 29)) ''
/time (Group) 'time expressed in epoch'

In [None]:
! ptrepack {dataPath+datasetFileName} {dataPath+'tmp.h5'}
! mv {dataPath+'tmp.h5'} {dataPath+datasetFileName}

In [None]:
stats


(16, 29, 5)

In [None]:
np.save('/experiments/data/global/censeDomainAdaptation/models', stats)