Notes:
1. 20160522, 20160523, 20160527 has mixed line length
2. 20150522, 20150523, 20150524, 20160528, 20170905 does not exist

## start pyspark session and import

In [1]:
from vehicle import *
import findspark
# findspark.init('/home/stang/myenv/lib/python2.7/site-packages/pyspark')
findspark.init("/usr/hdp/current/spark2-client")
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

In [2]:
conf = pyspark.SparkConf().setAll([('spark.app.name', 'daily_stats_run'), # App Name
                                 ('spark.master', 'yarn'),              # spark run mode: locally or remotely
                                 ('spark.submit.deployMode', 'client'), # deploy in yarn-client or yarn-cluster
                                 ('spark.executor.memory', '4g'),       # memory allocated for each executor
                                 ('spark.executor.cores', '3'),         # number of cores for each executor
                                 ('spark.executor.instances', '80'),    # number of executors in total
                                 ('spark.yarn.am.memory','20g')])       # memory for spark driver
# sc.stop()
sc = pyspark.SparkContext(conf=conf)
sc.addPyFile('/home/stang/user-profile/stats-spark/veh.zip')
sc.getConf().getAll()
sc

## Examine new data sources

## get daily result

In [3]:
col_dict = {1: 'TDATE', 15: 'CCS_CHARGEVOLT', 59: 'BCS_VEHSPD', 60: 'ICM_TOTALODOMETER'}
def transform_to_tuple(line):
    fields = line.split(",")
    vin = fields[0]
    otherfields = {}
    for col_index, col in col_dict.items():
        this_value = fields[int(col_index)]
        otherfields[col] = this_value
    return vin, otherfields

def compute_stats(x):
    df = pd.DataFrame(list(x[1]))
    df['VIN'] = x[0]
    
    stats = ['average_speed', 'miles_driven', 'charging_hours']
    freqs = ['hourly', 'daily']

    veh = Vehicle(df, list(df.columns))
    attributes = veh.get_attributes()
    res = veh.get_stats(stats, freqs, save_to_csv=False, save_to_json=False)
    return attributes['vin'], res

In [5]:
import subprocess
def run_cmd(args_list):
#     print('Running system command: {0}'.format(' '.join(args_list)))
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    s_output, s_err = proc.communicate()
    s_return =  proc.returncode
    return s_return, s_output, s_err 

(ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', '/data/ag/by-day/'])
lines = out.split('\n')

In [6]:
files = ['hdfs://namenode:8020' + line.split()[-1] for line in lines[1:-1]]
len(files), files[:2]

(1097,
 ['hdfs://namenode:8020/data/ag/by-day/ag_20150118.csv',
  'hdfs://namenode:8020/data/ag/by-day/ag_20150119.csv'])

In [7]:
def get_daily_stats(data_file, save_to_json=True):
    rdd = sc.textFile(data_file).filter(lambda line: len(line.split(',')) == 86)
    res = rdd.map(transform_to_tuple).groupByKey().map(compute_stats).collect()  # new
    res = OrderedDict(res)
    if save_to_json:
        curr_date = data_file.split('/')[-1].split('_')[-1].split('.')[0]
        json_file = 'ag_stats_{}.json'.format(curr_date)
        with open(json_file, 'w') as jsonf:
            json.dump(res, jsonf)
    print(data_file + ' done.')

In [8]:
%%time
# 2.9Gb
get_daily_stats('hdfs://namenode:8020/data/ag/by-day/ag_20170728.csv')
# print(run_cmd(['wc', '-l', '/gaei/gacrnd/data/ag_by_day/ag_20170728.csv']))

hdfs://namenode:8020/data/ag/by-day/ag_20170728.csv done.
CPU times: user 2.57 s, sys: 62.3 ms, total: 2.64 s
Wall time: 1min 18s


## merge daily result

In [9]:
def generate_stat_json_for_first_day():
    in_file = 'ag_stats_20150118.json'
    out_file = 'res_ag_stats_20150118.json'
    time_label = '2015-1'
    with open(in_file, 'r') as inf:
        x = OrderedDict(json.load(inf))
    for vin in x.keys():
        x_stats = x[vin]
        for stat in ['average_speed_monthly', 'miles_driven_monthly', 'charging_hours_monthly']:
            daily_stat = stat.replace('monthly', 'daily')
            x_stats[stat] = []
            val_to_append = x_stats[daily_stat][-1][1]
            x_stats[stat].append((time_label, val_to_append))
    with open(out_file, 'w') as outf:
        json.dump(x, outf)
generate_stat_json_for_first_day()

In [10]:
def merge(old_file, new_file, out_file):
#     print('old file: {}, new file: {}, output to {}'.format(old_file, new_file, out_file))
    def weighted_average(a, b):
        if np.isnan(a[0]) and np.isnan(b[0]):
            return (np.nan, 0)
        if a[0] == 0:
            a[1] = 1
        if b[0] == 0:
            b[1] = 1
        
        count = a[1] + b[1]
        avg = np.nansum([a[0]*a[1], b[0]*b[1]]) / count
        return (avg, count)

    def sum_two_num(a, b):
        if np.isnan(a) and np.isnan(b):
            return np.nan
        return np.nansum([a, b]) # treat nan as zero

    with open(old_file, 'r') as oldf, open(new_file, 'r') as newf:
        x = OrderedDict(json.load(oldf))
        y = OrderedDict(json.load(newf))
    allvins = list(set().union(x.keys(), y.keys()))
    date = new_file.split('.')[0].split('_')[-1]
    time_label = convert_time_label_to_str(pd.to_datetime(date).tz_localize('Asia/Hong_Kong'), 'monthly')
    for vin in allvins:
        if vin in x.keys() and vin not in y.keys():
            # append null result
            x_stats = x[vin]
            for stat, val in x_stats.iteritems():
                tl = []  # time labels to append
                freq = stat.split('_')[-1]
                last_timestamp = pd.to_datetime(val[-1][0]).tz_localize('Asia/Hong_Kong')
                if (freq == 'daily') or (freq == 'monthly' and date[-2:] == '01'):
                    tmp = next_time_label(last_timestamp, freq)  # output format: timestamp
                    tl.append(convert_time_label_to_str(tmp, freq))
                elif freq == 'hourly':
                    for i in range(24):
                        tmp = next_time_label(last_timestamp, freq)
                        tl.append(convert_time_label_to_str(tmp, freq))
                        last_timestamp = tmp
                for time_label in tl:
                    val_to_append = (np.nan, 0) if 'average_speed' in stat else np.nan
                    val.append((time_label, val_to_append))
        elif vin not in x.keys() and vin in y.keys():
            # copy daily result
            x[vin] = y[vin]
            # add monthly stat, monthly stats will be the same with daily stats
            x_stats = x[vin]
            for stat in ['average_speed_monthly', 'miles_driven_monthly', 'charging_hours_monthly']:
                daily_stat = stat.replace('monthly', 'daily')
                x_stats[stat] = []
                val_to_append = x_stats[daily_stat][-1][1]
                x_stats[stat].append((time_label, val_to_append))
        else:  # in both x and y
            x_stats = x[vin]
            y_stats = y[vin]
            for stat, val in x_stats.iteritems():
                # extend daily stats and hourly stats
                if 'monthly' not in stat:
                    val.extend(y_stats[stat])
                    continue
                # for monthly, if this is first day of month, monthly stats will be the same with daily stats
                daily_stat = stat.replace('monthly', 'daily')
                if date[-2:] == '01':
                    val.append((time_label, y_stats[daily_stat][-1][1]))
                    continue
                # if not first day of month, then recalculate monthly stats
                if 'average_speed' not in stat:
                    val[-1][1] = sum_two_num(val[-1][1], y_stats[daily_stat][-1][1])
                else:
                    val[-1][1] = weighted_average(val[-1][1], y_stats[daily_stat][-1][1])
    
    with open(out_file, 'w') as jsonf:
        json.dump(x, jsonf)
    return x

In [None]:
%%time
old_file = 'res_ag_stats_20150118.json'
stats_files = sorted([f for f in os.listdir('.') if 'ag_stats_' in f and '.json' in f])
for f in stats_files[1:]:
    date = f.split('.')[0].split('_')[-1]
    out_file = 'res_ag_stats_' + date + '.json'
    merge(old_file, f, out_file)
    old_file = out_file