In [152]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from functools import reduce



In [32]:
df = pd.read_csv("project_data.csv")
# df.shape - (1584823, 3)
# df column values:
# localminute - Timestamp 
# dataid - MeterID 
# meter_value - meter reading

(1584823, 3)

# How many houses are included in the measurement study?

In [26]:
# Since dataid is unique for each meter, we can count the number of unique dataid numbers
df.dataid.value_counts().size

157

### This gives us 157 for the number of houses included in the study

# Are there any malfunctioning meters? If so, identify them and the time periods where they were malfunctioning.

## There are various ways to define a malfunctioning meter. Let us explore some of them

### Let us first check, if there are *meters that report a lower value at a later timestamp*. Since meter_value is cumulative consumption, meter_value should not be lower at a later timestamp for the same meter

In [80]:
grouped = df.groupby(['dataid'], sort=['localminute'])
def has_decreasing_values(df):
    current_value = 0
    for index, val in df.iteritems():
        if val < current_value:
            return True
        current_value = val
        
meters_with_decreasing = (grouped['meter_value']
                          .agg(has_decreasing_value)
                          .where(lambda x: x)
                          .dropna()
                          .keys())

In [103]:
print(len(meters_with_decreasing))
meters_with_decreasing

43


Int64Index([  35,   77,   94,  483,  484, 1042, 1086, 1185, 1507, 1556, 1718,
            1790, 1801, 2129, 2335, 2449, 3134, 3527, 3544, 3893, 4031, 4193,
            4514, 4998, 5129, 5131, 5193, 5403, 5810, 5814, 5892, 6836, 7017,
            7030, 7117, 7739, 7794, 7989, 8156, 8890, 9134, 9639, 9982],
           dtype='int64', name='dataid')

#### Wow, we have 43 meters that have a decreasing value! Let's zoom in and get the offending sections for each meter. (In other words, find the exact data points which show this descreasing value)

In [117]:
## Need to re-sort after filter since filter takes all rows and breaks original sorting
dec_meters = (grouped.filter(lambda x: int(x.name) in meters_with_decreasing)
              .groupby(['dataid'], sort=['localminute']))

## Iterate over values to find offending rows for each meter
## WARNING: RUNS VERY SLOWLY. TODO: OPTIMIZE
offending_values = {}
for group_id, rows in dec_meters.groups.items():
    offending_values[group_id] = []
    current_value = 0
    group_rows = dec_meters.get_group(group_id)
    group_rows_count = group_rows.shape[0]
    for i in range(group_rows_count):
        if group_rows.iloc[i]['meter_value'] < current_value:
            offending_values[group_id].append([group_rows.iloc[i-1], group_rows.iloc[i]])
        current_value = group_rows.iloc[i]['meter_value']
    

##### Number of 'broken data' instances for each meter

In [149]:
print("Meter ID |", "Number of broken readings")
for k, v in offending_values.items():
    print(str(k).ljust(20), len(v))
        

Meter ID | Number of broken readings
35                   1
77                   1
94                   6
483                  1
484                  9
1042                 1
1086                 1
1185                 135
1507                 2
1556                 12
1718                 4
1790                 1
1801                 1
2129                 3
2335                 5
2449                 93
3134                 18
3527                 1
3544                 18
3893                 2
4031                 16
4193                 1
4514                 141
4998                 1
5129                 76
5131                 1
5193                 4
5403                 156
5810                 10
5814                 1
5892                 1
6836                 51
7017                 1
7030                 90
7117                 123
7739                 1
7794                 1
7989                 2
8156                 151
8890                 44
9134                 11

##### Just knowing the number of broken readings may not be useful. 
##### Let's say that we want to know which meters should be fixed, since faulty meters result in us inaccurately measuring consumption, and possibly losing money.
##### There should be some measure of tolerance in deciding if a meter is broken. In this case, let's check the average decreasing amount, and the ratio of "broken" readings 

In [161]:
print("Meter ID |", "Number of broken readings |", "Average decrease across broken readings |", "Percentage of broken readings for meter")
for k, v in offending_values.items():
    print(str(k).ljust(20), 
          str(len(v)).ljust(20), 
          str(reduce(lambda x, y: x + abs(y[1]['meter_value'] - y[0]['meter_value']) / len(v), [0] + v )).ljust(40), 
          (len(v) / dec_meters.get_group(k).shape[0]) * 100)

Meter ID | Number of broken readings | Average decrease across broken readings | Percentage of broken readings for meter
35                   1                    2.0                                      0.008423180592991915
77                   1                    2.0                                      0.009360666479453337
94                   6                    9.0                                      0.016513003990642632
483                  1                    14.0                                     0.0036195164326046038
484                  9                    2.4444444444444446                       0.020438751873552256
1042                 1                    2.0                                      0.02610966057441253
1086                 1                    2.0                                      0.003330114222917846
1185                 135                  16267.674074074097                       0.7314694408322496
1507                 2                    2.0    

#### With the data laid out like this, it is pretty clear that most meters are not really broken if we allow a 2% error rate
#### However, in terms of error volume, some are pretty suspect. Let's use an average volume error of 100 Cubic foot (that's a lot!) as our threshold, and filter our results

In [164]:
print("Meter ID |", "Number of broken readings |", "Average decrease across broken readings |", "Percentage of broken readings for meter")
for k, v in offending_values.items():
    measure = str(reduce(lambda x, y: x + abs(y[1]['meter_value'] - y[0]['meter_value']) / len(v), [0] + v )).ljust(40)
    if float(measure) > 10:
        print(str(k).ljust(20), 
              str(len(v)).ljust(20), 
              measure, 
              (len(v) / dec_meters.get_group(k).shape[0]) * 100)

Meter ID | Number of broken readings | Average decrease across broken readings | Percentage of broken readings for meter
483                  1                    14.0                                     0.0036195164326046038
1185                 135                  16267.674074074097                       0.7314694408322496
1556                 12                   13574.0                                  0.3252032520325203
2335                 5                    22942.0                                  0.05611672278338946
2449                 93                   15472.860215053784                       1.7067351807671134
3134                 18                   12877.222222222224                       0.4480955937266617
3544                 18                   8488.222222222223                        0.8104457451598379
4514                 141                  23971.8156028369                         0.7392261717521234
4998                 1                    14.0             

In [193]:
"""Use this function to validate/check each bad meter given our heuristic"""
def print_bad_meter_readings(meterID):
    meter_readings = offending_values[meterID]
    print("time apart |".ljust(20), "meter value initial |", "meter value after |", "difference")
    for readings_pair in meter_readings:

        print(str(pd.to_datetime(readings_pair[1]['localminute']) - pd.to_datetime(readings_pair[0]['localminute'])).ljust(25), 
              str(readings_pair[0]['meter_value']).ljust(20), 
              str(readings_pair[1]['meter_value']).ljust(20), 
              readings_pair[1]['meter_value'] - readings_pair[0]['meter_value'])
print_bad_meter_readings(1556)
        

time apart |         meter value initial | meter value after | difference
0 days 00:00:13           203154               203152               -2
0 days 00:06:59           203580               203578               -2
0 days 01:52:10.446150    223266               206892               -16374
0 days 00:15:09.023424    223266               206892               -16374
0 days 00:01:10.125932    223268               206910               -16358
0 days 02:14:09.533295    223282               206986               -16296
0 days 00:21:08.597335    223304               207046               -16258
0 days 01:39:08.641617    223306               207048               -16258
0 days 01:41:09.602248    223318               207066               -16252
0 days 00:46:07.782113    223320               207080               -16240
0 days 01:13:08.190968    223332               207094               -16238
0 days 00:14:06.456241    223340               207104               -16236


### Another requirement of the meters is that they push their readings to be saved if the meter values differ by at least 2 cubic foot
#### Let us verify that every pair of readings for a meter differ by at least 2 cubic foot. To not double count, we will only check for readings where the differing value is 0 >= x > 2 so that we don't get the same error readings where the readings decrease
#### Since the value is smaller, we will not focus on the difference in values, but on the percentage of total readings for the meter only

In [197]:
grouped2 = df.groupby(['dataid'], sort=['localminute'])
def has_stagnant_values(df):
    current_value = 0
    for index, val in df.iteritems():
        if index == 0:
            current_value = val
            continue
        
        if val < current_value + 2 and val >= current_value:
            return True
        current_value = val
        
meters_with_stagnant = (grouped['meter_value']
                          .agg(has_stagnant_values)
                          .where(lambda x: x)
                          .dropna()
                          .keys())

print("Number of meters with stagnant values: ", len(meters_with_stagnant))

Number of meters with stagnant values:  155


#### The following segment should have calculated the percentage of stagnant values for each meter, but it takes too long (tried for 5 minutes and gave up). At least we know number of meters that reported stagnant values?

In [None]:
# ## Need to re-sort after filter since filter takes all rows and breaks original sorting
# stagnant_meters = (grouped2.filter(lambda x: int(x.name) in meters_with_stagnant)
#               .groupby(['dataid'], sort=['localminute']))

# ## Iterate over values to count offending occurences. Not counting value
# ## WARNING: RUNS VERY SLOWLY. TODO: OPTIMIZE
# offending_values2 = {}
# for group_id, rows in stagnant_meters.groups.items():
#     offending_values2[group_id] = 0
#     group_rows = stagnant_meters.get_group(group_id)
#     group_rows_count = group_rows.shape[0]
#     # Set current_value so we do not trigger first reading
#     current_value = group_rows.iloc[0]['meter_value'] - 5
#     for i in range(group_rows_count):
#         if group_rows.iloc[i]['meter_value'] < current_value + 2 and group_rows.iloc[i]['meter_value'] >= current_value:
#             offending_values2[group_id] += 1
#         current_value = group_rows.iloc[i]['meter_value']

        
# print("Meter ID |", "Number of broken readings |", "Percentage of broken readings for meter")
# for k, v in offending_values2.items():
#     print(str(k).ljust(20), 
#           str(v).ljust(20),  
#           (v / stagnant_meters.get_group(k).shape[0]) * 100)


In [216]:
stagnant_meters = (grouped2.filter(lambda x: int(x.name) in meters_with_stagnant)
              .groupby(['dataid'], sort=['localminute']))

print("Total number of readings to iterate, which is why it is slow: ", 
      reduce(lambda x, y: x + len(y), [0] + list(stagnant_meters.groups.values())))

Total number of readings to iterate, which is why it is slow:  1584818


### Lastly, another requirement of the meters is that they take a reading every 15 seconds before deciding whether to push the results to a collecting server
#### Let us now check for pairs where the time between readings is less than 15 seconds, or for consistent patterns that are more than 15 seconds