In [None]:
import collections
import json
import os
import pandas as pd
from shutil import copyfile

Goal: take the acurite export files from v2 and create easily reviewed summaries and easily loaded monthly files.  

Output is split into directories for each property listed in `properties`. 
This decouples the files from the properties they record, allowing analysis of . 
In the property directory, the output is in a subdirectory for the `major` version of the output. 

# Collect files for cleaning (generic process)

Cleaning considers all files in the `inputDir` directory compared against the list of `cleanedFiles`, which records all the files that have been completely processed and last version used in processing. 

Depending on the settings of the current versions and the booleans `minorRerun` and `allRerun`, the system will not reprocess files that have been successfully cleaned. 

The versioning method is informed by [semantic versioning](https://en.wikipedia.org/wiki/Software_versioning#Degree_of_compatibility)  When the major version of this cleaning notebook changes, the next run should load all the files and reprocess, placing the output in a new subdirectory for that property. Future development may use the minor number for changes that are compatible with existing consumers of the cleaned files. (More data in the report, additional export files.)  The patch version may be used for keeping track with development. 

## Merging details

The process loads each file into a dataframe, renames the columns to `inputCols` and drops all the columns in `dropCols`. Duplicate data -- all columns match -- is dropped.

### Assumptions

- That the file suffix `fileType` is an exact match (case sensitive)
- That the files are comma separated value files that can be directly read into a data frame.
- That the files have headers.


In [None]:
# ---------------------------------------------------------------------#
# Collect files for cleaning: initialization
# ---------------------------------------------------------------------#
major = 0
minor = 0 # Downstream notebooks can use previous minor versions.  
patch = 0 # Only internal processing changes that should be tracked

# Should files be reprocessed if the minor version is different?
minorRerun = True 

# Should all files be reprocessed no matter what versions?
allRerun = True

#inputDir = '../Acurite.v2'
inputDir = '../DeleteMe/'
fileType = '.csv'
cleanedFilesRecord = './acuritev2Cleaning.json'

inputCols = ["Name","X00","Timestamp","Temperature_F","Humidity_pct",
             "Dew_Point_F","Heat_Index_F","X01","X02","Pressure_inHg",
             "X03","X04","X05","X06","X07","X08","X09","X10","X11","X12",
             "X13","X14","X15"]
dropCols = ["X00","X01","X02","X03","X04","X05","X06","X07","X08","X09",
            "X10","X11","X12","X13","X14","X15"]


now = pd.Timestamp.utcnow().isoformat()
Status = collections.namedtuple('Status', ['major', 'minor',
                               'patch','time']) 
currentStatus = Status(major,minor,patch,now)
def statusStmt(myStatus):
    """Return a string from the namedtuple status"""
    r = str(myStatus[0])+"."+str(myStatus[1])+"."+str(myStatus[2])+" "+now
    return r

report = "Status: "+statusStmt(currentStatus)+".\n\n"


# Cleaning the files


- Persist important details
  - first and last observations of sensors


In [None]:
# ---------------------------------------------------------------------#
# Cleaning the files: initialization
# ---------------------------------------------------------------------#
sensorCol = "Name"
sensorsRecord = '../sensorHistory.json'
History = collections.namedtuple('History', ['earliest','latest','status']) 

# Persisting the cleaned data 

- Create output directories, if needed, labeled with the major version.
- Convert the timestamp to a standard.
- Separate on the `timestampCol` column into `period` blocks.
- A block is considered complete if there is a record in the first and last `subPeriod` in the period.
- Output with an ISO standard timestamp `isoTimestampCol`
- Generate a run report.

In [None]:
# ---------------------------------------------------------------------#
# Persisting the cleaned data: initialization
# ---------------------------------------------------------------------#
stop = False
outputPath = "../"
# These are the general output directories.
properties = ['Temperature','Humidity','Pressure']
# Columns required in all output (other than timestamp)
keyCols = ["Name"]
# Dictionary of the columns appropirate for each propery
propertyCols = {
    'Temperature': ["Temperature_F","Heat_Index_F"],
    'Humidity': ["Humidity_pct","Dew_Point_F"],
    'Pressure': ["Pressure_inHg"]
}

timestampCol = 'Timestamp'

period = "month"
subPeriod = "day"

# This will be the new column
isoTimestampCol = 'ISO_Timestamp'

In [None]:
# ---------------------------------------------------------------------#
# Collect files for cleaning: execution
# ---------------------------------------------------------------------#

# newFiles lists all the available input files, initially.
input = os.listdir(inputDir)
newFiles = []
for f in input:
    if f.endswith(fileType):
        newFiles.append(f)

report = report+str(len(newFiles))+" "+fileType+" files in the "+inputDir+" directory.\n\n"

# cleanedFiles: a dictionary where the key is the input file name and 
# the output is a namedtuple Status of major, minor, patch, and 
# execution timestamp.
try:    
    with open(cleanedFilesRecord) as cf:
        cleanedFiles = json.load(cf)
        report = report+str(len(cleanedFiles))+" files already processed:\n"      
        for f in cleanedFiles:
            cleanedFiles[f] = Status(*cleanedFiles[f])
            report = report+f+": \n\t"+statusStmt(cleanedFiles[f])+"\n"
                     
except FileNotFoundError:
    cleanedFiles = {}
    report = report+"No cleaned files.\n"
except:
    print("Unexpected error")
    raise
    
# If a file has been cleaned by an acceptable version of the processing 
# script, it will be removed from the new files list.
reportRR = ""
if allRerun:
    reportRR = '\n\nNonetheless, all files will be rerun.\n\n'
else:
    if minorRerun:
        reportRR = str(major)+"."+str(minor)
    else:
        reportRR = str(major)
        
    reportRR = '\nAll files processed with a version earlier than '+reportRR+' will be rerun.\n'
    
report = report+reportRR

processFilesReport = ""
processFiles = []

for f in newFiles:
    if f in cleanedFiles:
        if  allRerun:
            processFilesReport=processFilesReport+"\t"+f+"\n"
            processFiles.append(f)
        elif cleanedFiles[f].major < major:
            processFilesReport=processFilesReport+"\t"+f+"\n"
            processFiles.append(f)
        elif minorRerun and cleanedFiles[f].minor < minor:
            processFilesReport=processFilesReport+"\t"+f+"\n"
            processFiles.append(f)
    else:
        processFilesReport=processFilesReport+"\t"+f+"\n"
        processFiles.append(f)

if len(processFiles) > 0:
    report = report+"The following "+str(len(processFiles))+" files are included in the cleaning:\n"
    report = report+processFilesReport
else:
    report = report+"No files to be cleaned.\n"

# Import each new file into a dataframe, relabel the columns, pop out 
# the undesired columns and add to a list.
processedDF = []

for f in processFiles:
    df = pd.read_csv(inputDir+f)
    df.columns = inputCols
    for c in dropCols:
        df.pop(c)
    cleanedFiles[f] = currentStatus
    processedDF.append(df)
    
# Concatenate the list of dataframes into one frame, removing dupes.
try:
    inputDF = pd.concat(processedDF,ignore_index=True)
    report = report+"\nThere are "+str(len(inputDF))+" rows in the sum of all the files. "
    inputDF.drop_duplicates(inplace=True)
    report = report+"After removing duplicates, there are "+str(len(inputDF))+" rows.\n"
except ValueError:
    print("No input files to process") 
    with open('run'+pd.Timestamp.utcnow().strftime('%Y%M%dT%H%M')+'.txt', 'w') as f:
        f.write(report)
    stop = True


# inputDF is the concatenation of all dataframes. 
# print(report)


In [None]:
# ---------------------------------------------------------------------#
# Cleaning the files: processing
# ---------------------------------------------------------------------#
if not stop:
# Record when sensors were seen 
    try:    
        with open(sensorsRecord) as sf:
            sensors = json.load(sf)      
            for s in sensors:
                sensors[s] = History(*sensors[s])

    except FileNotFoundError:
        sensors = {}
    except JSONDecodeError: # TODO why doesn't this work?
        sensors = {}
        copyfile(sensorsRecord, 
                 sensorsRecord+pd.Timestamp.utcnow().strftime('%Y%M%dT%H%M')+'.txt')
    except:
        print("Unexpected error")
        raise

    sensorGroupDF = inputDF.groupby(sensorCol)
    firstSeries = sensorGroupDF[timestampCol].min()
    lastSeries = sensorGroupDF[timestampCol].max()
    sensorList = lastSeries.index

    for s in sensorList:
        if s in sensors:  
            print("Was:",sensors[s])
            if firstSeries[s] =< pd.to_datetime(sensors[s].earliest):
                sensors[s].earliest = firstSeries[s].isoformat()
                sensors[s].status = currentStatus

            if lastSeries[s] >= pd.to_datetime(sensors[s].latest):
                sensors[s].latest = lastSeries[s].isoformat()
                sensors[s].status = currentStatus
            print("Now:",sensors[s])

        else:
            sensors[s] = History(firstSeries[s].isoformat(),lastSeries[s].isoformat(),currentStatus)

    with open(sensorsRecord, 'w') as outfile:
        json.dump(sensors, outfile)

    report = report+"\n\nDocumented the sensor history in "+sensorsRecord+".\n"

In [None]:
# ---------------------------------------------------------------------#
# Persisting the cleaned data: execution
# ---------------------------------------------------------------------#
if not stop:
    # By convention, the output is stored one directory up from this  
    # cleaning notebook in a directory named for the property measured.
    outputDir = {}
    versionDir = 'cleaned.v'+str(major)
    for p in properties:
        # https://docs.python.org/3/library/os.html?highlight=os%20makedirs#os.makedirs 
        # Absolute path because documentation indicates risk with using "../"
        pp = os.path.abspath(outputPath+p) 
        outputDir[p] = pp+"/"+versionDir
        os.makedirs(outputDir[p], exist_ok=True)


    record = record + "\nSplitting the input into files by "+period+".\n"
    if period == 'year':
        periodFmt = '%Y'
    elif period == 'month':
        periodFmt = '%Y-%m'
    
    inputDF['Period'] = inputDF[timestampCol].map(lambda x: x.strftime(periodFmt))
    periodList = inputDF['Period'].unique()
    PeriodDF = inputDF.groupby('Period')
    countSeries = PeriodDF[timestampCol].count()
    r = "\nTime period\tCount of records\n"
    for s,c in countSeries.iteritems():
        r=r+s+"\t\t"+str(c)+"\n"
        
    record = record + r
    
# https://pandas.pydata.org/pandas-docs/stable/user_guide/groupby.html#selecting-a-group

# TODO How to verify period is complete OR load in previous months to merge?



In [None]:
# ---------------------------------------------------------------------#
# Collect files for cleaning: persist success
# ---------------------------------------------------------------------#
# LAST CELL

with open('run'+pd.Timestamp.utcnow().strftime('%Y%M%dT%H%M')+'.txt', 'w') as f:
    f.write(report)
    
# Record the completed files
#with open(cleanedFilesRecord, 'w') as outfile:
    #json.dump(cleanedFiles, outfile)

In [None]:
PeriodDF = inputDF.groupby('Period')
countSeries = PeriodDF[timestampCol].count()
r = "\nTime period\tCount of records\n"
for s,c in countSeries.iteritems():
    r=r+s+"\t\t"+str(c)+"\n"
    
print(r)