Data Preparation Notebook
======

Hi Merideth!  This is a step-by-step guide to the data-preparation process I underwent.  I try to do two somewhat contrary things here: I want to show you how I actually completed the steps (which is often less than the ideal way), while also illustrating what would be best to run in order to _replicate_ the work.

---

Imports: I suggest using ana/miniconda for the environment.  The command `conda create -n NAME jupyter nb_conda dask pandas numpy scikit-learn fastparquet` will be enough to replicate everything here.  It is then activated with `source activate NAME`.

In [3]:
# Dask imports 
import dask
from dask import dataframe as dd
from dask.dataframe import to_parquet, read_parquet
from dask.diagnostics import ProgressBar
# Other data-manipulation imports
import pandas as pd
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.neighbors import DistanceMetric
from sklearn.metrics import pairwise_distances
# utilities
from multiprocessing import cpu_count
from datetime import datetime
from re import compile

### Cleaning/Paring [AS IT WAS DONE]

This next stretch of code is the process for turning the original download from `safecast.org` into the dataset to be used for clustering.  First, in the directory from which this notebook is launched, I assume you've run the following command(s):
```
wget https://api.safecast.org/system/measurements.tar.gz; tar -xvzf measurements.tar.gz
```
(This could take 10-15 minutes).  This should leave you with a 10+ GB `measurements-out.csv` file in the current directory.
At this stage for me, I was still operating on my laptop.  This would eventually become unwieldy and I would have to move to a cloud service.  For these steps, one would typically want a "compute-intensive" tier of machine.  The most economical version of this type of machine I saw was the 16-core compute-intensive resource from DigitalOcean at ~<span>\$</span>0.91/hr, but DigitalOcean did not have any of that type available at the specific hour I started, so I went to AWS (Amazon Web Services) instead.  While this next cell ran on my laptop, subsequent cells (until further notice) ran on a c4.8xlarge instance with 60 Gib of ram, 18 cpus of 2.9 Ghz (one thread each) at a rate of ~$1.49/hr.  I'm not sure it's worth imitating my resource use.  Even if all of these steps had executed in quick succession without hours of errors/aborted calls/memory failures, it likely was not maximally economical.  I used the Amazon Linux AMI, which I installed `git` and `miniconda` onto.  After which I created the environment with the command I recommended above.

---

In this block I read the csv in, providing dtype specifications for the `Surface` and `Radiation` columns.  They have mixed types so they need to be allowed to be read in as `object`.  I rename the `Time` column, drop the irrelevant (and mostly empty in some cases) list of columns in line 6.  I then write the repartitioned (with 24 partitions because of my 8-core machine) dataframe to parquet files (these are vastly superior, I can't recommend them enough).  The output is long (double click to reveal it) due to an error but shows that this process took ~10 minutes on a laptop.

In [9]:
# Making file more manageable by dropping undesired columns, renaming, repartitioning then saving in better file format.
# Only needed to be executed once; preserved for posterity.

mmnts = dd.read_csv('./measurements-out.csv', dtype={'Surface':'object', 'Radiation':'object'})
mmnts = mmnts.rename(columns={'Captured Time': 'Time'})
mmnts = mmnts.drop(['Device ID', 'MD5Sum', 'Height', 'Surface', 'Radiation', 'Uploaded Time', 'Loader ID'], axis=1)
ncores = cpu_count()
with ProgressBar():
    to_parquet( mmnts.repartition(npartitions=(3*ncores)), 
                '/mnt/c/users/eliw5/src/academic/Phys18Conf_1/', 
                engine='fastparquet', 
                compute=True,
    )

[                                        ] | 0% Completed | 11.1s

  args2 = [_execute_task(a, cache) for a in args]


[##                                      ] | 5% Completed | 31.1s

  args2 = [_execute_task(a, cache) for a in args]


[##                                      ] | 6% Completed | 44.8s

  args2 = [_execute_task(a, cache) for a in args]


[###                                     ] | 7% Completed | 48.6s

  args2 = [_execute_task(a, cache) for a in args]


[####                                    ] | 10% Completed |  1min  5.0s

  args2 = [_execute_task(a, cache) for a in args]


[####                                    ] | 12% Completed |  1min 12.3s

  args2 = [_execute_task(a, cache) for a in args]


[####                                    ] | 12% Completed |  1min 13.9s

  args2 = [_execute_task(a, cache) for a in args]


[######                                  ] | 16% Completed |  1min 38.9s

  args2 = [_execute_task(a, cache) for a in args]


[#######                                 ] | 18% Completed |  1min 51.9s

  args2 = [_execute_task(a, cache) for a in args]


[########                                ] | 22% Completed |  2min 13.5s

  args2 = [_execute_task(a, cache) for a in args]


[##########                              ] | 25% Completed |  2min 27.8s

  args2 = [_execute_task(a, cache) for a in args]


[##########                              ] | 26% Completed |  2min 35.1s

  args2 = [_execute_task(a, cache) for a in args]


[###########                             ] | 29% Completed |  2min 54.7s

  args2 = [_execute_task(a, cache) for a in args]


[###########                             ] | 29% Completed |  2min 57.1s

  args2 = [_execute_task(a, cache) for a in args]


[#############                           ] | 32% Completed |  3min 16.5s

  args2 = [_execute_task(a, cache) for a in args]


[#############                           ] | 33% Completed |  3min 19.7s

  args2 = [_execute_task(a, cache) for a in args]


[###############                         ] | 37% Completed |  3min 42.5s

  args2 = [_execute_task(a, cache) for a in args]


[################                        ] | 41% Completed |  4min 11.3s

  args2 = [_execute_task(a, cache) for a in args]


[##################                      ] | 45% Completed |  4min 34.3s

  args2 = [_execute_task(a, cache) for a in args]


[##################                      ] | 47% Completed |  4min 43.4s

  args2 = [_execute_task(a, cache) for a in args]


[###################                     ] | 49% Completed |  5min  0.8s

  args2 = [_execute_task(a, cache) for a in args]


[####################                    ] | 51% Completed |  5min  6.6s

  args2 = [_execute_task(a, cache) for a in args]


[######################                  ] | 55% Completed |  5min 35.4s

  args2 = [_execute_task(a, cache) for a in args]


[########################                ] | 62% Completed |  6min 18.5s

  args2 = [_execute_task(a, cache) for a in args]


[#########################               ] | 63% Completed |  6min 33.4s

  args2 = [_execute_task(a, cache) for a in args]


[############################            ] | 70% Completed |  7min  8.0s

  args2 = [_execute_task(a, cache) for a in args]


[############################            ] | 70% Completed |  7min  9.2s

  args2 = [_execute_task(a, cache) for a in args]


[############################            ] | 72% Completed |  7min 19.8s

  args2 = [_execute_task(a, cache) for a in args]


[#############################           ] | 74% Completed |  7min 34.3s

  args2 = [_execute_task(a, cache) for a in args]


[###############################         ] | 78% Completed |  7min 57.4s

  args2 = [_execute_task(a, cache) for a in args]


[###############################         ] | 79% Completed |  8min  3.8s

  args2 = [_execute_task(a, cache) for a in args]


[#################################       ] | 83% Completed |  8min 23.6s

  args2 = [_execute_task(a, cache) for a in args]


[##################################      ] | 85% Completed |  8min 37.8s

  args2 = [_execute_task(a, cache) for a in args]


[###################################     ] | 87% Completed |  8min 48.5s

  args2 = [_execute_task(a, cache) for a in args]


[###################################     ] | 88% Completed |  8min 58.1s

  args2 = [_execute_task(a, cache) for a in args]


[########################################] | 100% Completed | 10min  2.3s


Inelegantly, it took until I was over to the server to decide that I truly did not want to keep track of `Location Name`.  It seemed like a good idea, but it was sparse enough not to be worth the hassle in the end.  In this cell I drop that column and rewrite with more partitions (108 on my 18 core AWS instance).

In [5]:
mmnts = read_parquet('./data_files/first_trim/', engine='fastparquet')
mmnts = mmnts.drop(['Location Name'], axis='columns')
ncores = cpu_count()
mmnts = mmnts.repartition(npartitions=6*ncores)
with ProgressBar(), dask.config.set(scheduler='processes'):
    to_parquet( mmnts,
                './data_files/second_trim/',
                engine='fastparquet',
                compute=True
    )

[########################################] | 100% Completed |  1hr  3min  1.6s


Now lacking the sparse `Location Name` column, it was easy to `dropna` across all columns, since any sample lacking a `Time`, `Latitude`, `Longitude`, `Value`, or `Unit` was useless.  This step was surprisingly quick.

In [8]:
mmnts = read_parquet('./data_files/second_trim/', engine='fastparquet')
mmnts = mmnts.dropna()
with ProgressBar(), dask.config.set(scheduler='processes', optimize_graph=True):
    to_parquet( mmnts,
                './data_files/third_trim/',
                engine='fastparquet',
                compute=True
    )

[########################################] | 100% Completed | 49.6s


In this cell, I made the `Unit` column have the `category` dtype.  Dask dataframes have a built in `.categorize` method.  Initially, I resisted using this just as an animal spirit, and instead separated the `Unit` column, called `.astype('category').to_frame()`, and attempted to re-`merge` them.  This was a disastrous idea.  If it all possible, it is crucial to avoid `merge` on anything of size using Dask (and this dataset had 110 million entries when *I* downloaded it).  The best answer I saw in my troubleshooting (I didn't investigate the source) was that Dask inefficiently tries to build the entire merged dataframe in memory.  This would result in fully duplicating the whole 110 million entries, which could be significantly bigger in-memory than they appear as parquet files (where they total 5.6 GB).  Just this `merge` was capable of consuming more than 60 Gib (64 GB) of RAM.  Additionally, if Dask does not remember that, despite being separated, the dataframes have all the same partitions, it could cause a linear-time search for each sample across every partition of the merging dataframe.  I'm sure there are more detailed reasons for the failure.  The several, hour-long eventual failed steps have _not_ been kept here for posterity.

In [3]:
mmnts = read_parquet('./data_files/third_trim/', engine='fastparquet')
with ProgressBar(), dask.config.set(scheduler='processes', optimize_graph=True):
    to_parquet( mmnts.categorize(columns=['Unit'], index=False),
                '/home/ec2-user/Phys18Conf_1/data_files/fourth_trim/',
                engine='fastparquet',
                compute=True,
    )

[########################################] | 100% Completed | 20.3s
[########################################] | 100% Completed | 18.5s


This cell contains the most substantive transformation so far: changing the Python object time-stamp strings into numbers of seconds since 2011 (a month or two before the first datum).  This will later be necessary so that DBSCAN can do its temporal clustering with a readable value.  Once again motivated by avoiding `dask.DataFrame.merge` at all costs, we find a way to, in parallel and without changing/splitting the dataframe, apply the desired transformation only to the `Time` column entries.  We do this by mapping to each partition (`.map_partitions`) a `lambda`, that calls `pandas.DataFrame.apply`, _column-wise_ with the function `filt`.  `filt` checks if the column has `dtype == 'object` (as only the `Time` column will).  If so, it calls `pandas.Series.apply` on the _column_ (always the `Time` column in practice) with `tconv`, which reads the string into a Python `datetime` object; takes its difference from 2011 as a `time_delta`; then converts that to seconds as an numpy `int64`.  There are three levels of `apply` going on here!

In [9]:
start_dt = datetime.strptime('2011-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
microseconds = compile('\.\d*')

def filt(col):
    if col.dtype != 'object':
        return col
    else:
        return col.apply(tconv)
    
def tconv(s):
    dt = datetime.strptime(microseconds.sub('', str(s)), '%Y-%m-%d %H:%M:%S')
    delta = dt - start_dt
    result = np.array(delta.total_seconds())
    return result.astype(np.int64)

mmnts = read_parquet('./data_files/fourth_trim/', engine='fastparquet')
_meta = {'Time': 'int64', 'Latitude': 'float64', 'Longitude': 'float64', 'Value': 'float64', 'Unit': 'category'}
mmnts = mmnts.map_partitions(lambda df: df.apply(filt, axis=0), meta=_meta)
with ProgressBar(), dask.config.set(scheduler='processes', optimize_graph=True):
    to_parquet( mmnts,
               '/home/ec2-user/Phys18Conf_1/data_files/fifth_trim',
               engine='fastparquet',
               compute=True
    )

[########################################] | 100% Completed |  1min 56.6s


For comparison, the dataframe has just about 110 million rows before we drop the inappropriate units.  After, it would have about 102 million.

In [13]:
len(mmnts)

109799687

Here we see the distribution of values across the categories of `Unit`.  The vast majority are CPMs.  We would maintain almost all the data by only taking the rows of category `cpm`.  A number of units are totally uninterpretable/unusable.  `status` is not any kind of unit as far as I can tell. (Perhaps it is a default that people forget to change when submitting their observations?).  Perhaps there is some way to measure radiation as a temperature, but I couldn't find any evidence of `celcius` as a unit of radiation.  `1`, `0`, `DeviceType1`, `DeviceType2` are not units at all.  From brief investigation, it appears that `PM1`, `PM2.5`, `PM10`, `pm2.5` are a type of air quality measure.  This is probably because SafeCast is trying to break into other kinds of environmental monitoring, including air quality.  This also appears to be the source of `NOXppm` and `HUMD%`.  `RSSI` is a measure of radio signal strength.  `TEMPC`, perhaps another temperature measurement.  The only reputable alternative measurement present is the Sievert.  I looked into converting the sievert measurements to CPMs, but it appears that _how_ to do this conversion depends on what atomic substance the Geiger counter was calibrated with: CPMs translate differently to Sieverts calibrated with Cobalt isotope than with Caesium isotope.  This information was not present even in the full table, so this ruled out the conversion.  Unfortunately, the largest chunks left on the table were not the Sieverts, but the `status` and the `celcius`, which were uninterpretable.  

In [20]:
mmnts = read_parquet('/home/ec2-user/Phys18Conf_1/data_files/fifth_trim/', engine='fastparquet')
with ProgressBar():
    out = mmnts['Unit'].value_counts().compute(scheduler='processes', optimize_graph=True)

[########################################] | 100% Completed |  0.7s


In [21]:
out

cpm             102927743
status            3765116
celcius           2775932
usv                279888
microsievert        30096
 cpm                10043
1                    8892
DeviceType2          1363
CPM                   289
Cpm                    55
DeviceType1            45
usv/hr                 43
uSv/h                  43
PM1                    28
HUMD%                  20
PM2.5                  17
0                      15
uSv                    14
PM10                   13
NOXppm                 12
TEMPC                  11
pm2.5                   4
uSv/hr                  3
RSSI                    2
Name: Unit, dtype: int64

Nonetheless, we could retain the ~11000 measurements separated only by the alternate spellings of CPM.  Now that all units were CPM, we could discard the `Unit` column entirely.  `out2[:-2]` merely gets rid of two rows with bad time stamps (instead of 2011 they read 201).  This wasn't done earlier because `dask.DataFrame.drop` does not support dropping rows.  

In [23]:
with ProgressBar():
    out2 = mmnts[mmnts['Unit'].isin(['cpm', ' cpm', 'CPM', 'Cpm'])].compute(scheduler='processes', optimize_graph=True)

[########################################] | 100% Completed | 36.4s


In [42]:
out2.drop('Unit', axis=1, inplace=True)
out3 = dd.from_pandas(out2[:-2], npartitions=108)
with ProgressBar(), dask.config.set(scheduler='processes', optimize_graph=True):
    to_parquet( out3,
                '/home/ec2-user/Phys18Conf_1/data_files/sixth_trim/',
                engine='fastparquet',
                compute=True,
    )

[########################################] | 100% Completed | 16.4s


### Cleaning/Paring [AS IT SHOULD BE DONE]

---

This cell, as long as the initial csv was read validly from a file system, and written back as parquet files validly to a file system, would complete all the above transformations in a single step. 

Using the `usecols` keyword argument, only the five columns of interest are read in from the original csv.  Renaming, repartitioning, dropping NaNs, categorizing, and numerifying of the `Time` column are all done without calling `.compute`.  The categories which are typographical variations on CPM are identified and those rows agglomerated before `Unit` is dropped.  This is computed in-memory to a pandas dataframe, allowing us to drop the last two skewed time-stamp rows before converting back to Dask and writing the final parquet files to disk.  Though it might appear wasteful to compute the pandas dataframe only to convert it back to dask, this likely costs little as long as it is immediately written; since one cannot `.concat` a dask dataframe with a pandas dataframe, there is no way to fancily only compute the partition which needs to be dropped, and allow the other partitions to be computed as they are written.  Concatenating them back together would require the top 107 partitions to be computed anyway, and then we are in the same situation as computing the whole thing.    

In [None]:
mmnts = dd.read_csv('measurements-out.csv', usecols=['Captured Time', 'Latitude', 'Longitude', 'Value', 'Unit'])
mmnts = mmnts.rename(columns={'Captured Time': 'Time'})
mmnts = mmnts.repartition(npartitions=(6*cpu_count()))
mmnts = mmnts.dropna()
with ProgressBar(), dask.config.set(scheduler='processes'):
    mmnts = mmnts.categorize(columns=['Unit'], index=False)

start_dt = datetime.strptime('2011-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
microseconds = compile('\.\d*')

def filt(col):
    if col.dtype != 'object':
        return col
    else:
        return col.apply(tconv)
    
def tconv(s):
    dt = datetime.strptime(microseconds.sub('', str(s)), '%Y-%m-%d %H:%M:%S')
    delta = dt - start_dt
    result = np.array(delta.total_seconds())
    return result.astype(np.int64)

_meta = {'Time': 'int64', 'Latitude': 'float64', 'Longitude': 'float64', 'Value': 'float64', 'Unit': 'category'}
mmnts = mmnts.map_partitions(lambda df: df.apply(filt, axis=0), meta=_meta)

cpms = []
for category in mmnts['Unit'].cat.categories:
    if category.lower().strip() == 'cpm':
        cpms.append(category)

with ProgressBar():
    mmnts = mmnts[mmnts['Unit'].isin(cpms)].compute(scheduler='processes', optimize_graph=True)

mmnts = dd.from_pandas(mmnts[:-2], npartitions=(6*cpu_count()))
with ProgressBar(), dask.config.set(scheduler='processes', optimize_graph=True):
    to_parquet( mmnts,
                '/path/to/your/storage/location/',
                engine='fastparquet',
                compute=True,
    )

In [43]:
out2

Unnamed: 0,Time,Latitude,Longitude,Value
0,286909200,37.507552,139.941170,72.0
1,286887600,37.507250,139.940000,55.0
2,286887600,37.505445,0.016667,68.0
3,286686000,34.066487,-118.895217,50.0
4,286678800,37.673233,140.066667,48.0
5,286678800,37.674782,140.079895,52.0
6,285015600,37.517902,139.925478,59.0
7,285015600,37.754973,140.687660,384.0
8,285015600,37.526595,139.931667,59.0
9,285015600,37.737157,140.726832,441.0
