Skip to content

Commit

Permalink
move to server-based workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
kpwebb committed Feb 8, 2019
1 parent 70c849c commit 7063381
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 110 deletions.
4 changes: 3 additions & 1 deletion .gitignore
@@ -1 +1,3 @@
data/
/data/
node_modules/

2 changes: 1 addition & 1 deletion LICENSE
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2018 SharedStreets
Copyright (c) 2018-2019 SharedStreets

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
6 changes: 2 additions & 4 deletions README.md
Expand Up @@ -34,9 +34,7 @@ This project provides a pluggable architecture for adding new MDS provider data

1. ```yarn install ```

2. Run ```process_status_changes.ts``` as a cron job.

3. Run ```express.ts``` server api end point.
2. Run ```server.ts``` to run data collector and API end point.


### Technical Details
Expand All @@ -56,4 +54,4 @@ The SharedStreets tools provide an additional layer of protection by encrypting
**Encrypting state data:** most recent cached event is encrypted
![MDS events to device states](docs/images/event_process4.png)
**Decrypting state data:** cached events can only be decrypted using data contained in a future event from the same vehicle
![MDS events to device states](docs/images/event_process5.png)
![MDS events to device states](docs/images/event_process5.png)
31 changes: 0 additions & 31 deletions express.ts

This file was deleted.

35 changes: 21 additions & 14 deletions process_status_changes.ts → mds_processor.ts
Expand Up @@ -11,34 +11,41 @@ const h3 = require("h3-js");

async function getStatusChanges() {

const mdsProvider = new UberMDSProvider();
const mdsProvider = new BirdMDSProvider();

// last two hours for testing...
var endTime = Math.round(Date.now() / 1000);
var startTime = endTime - (60 * 60 * 2);
var now = Math.round(Date.now() / 1000);
var startTime = now - (60 * 60 * (72 * 20));
var endTime = startTime + (60 * 60 * 10)


var mdsQuery = new MDSStatusChangeQuery(mdsProvider, startTime, endTime);
var statusEventMap = new DiskBackedMDSStatusMap();

var mdsQuery = new MDSStatusChangeQuery(mdsProvider, startTime, endTime);


await mdsQuery.run(); // loads data pages for time range query

var h3AvailabilityAggregator = new H3AvailabilityStatusAggregator();

// event generator loop
var statusEvents = statusEventMap.processStatusEvents(mdsQuery);
for await(var event of statusEvents) {
if(event.error) {
console.log("out of order...");

// TODO QA logging for MDS data
}
else {
var statusMetric = new StatusMetric(event)
h3AvailabilityAggregator.addData(statusMetric);
}

if(event.error) {
console.log("out of order...");

// TODO QA logging for MDS data
}
else {
var statusMetric = new StatusMetric(event)
h3AvailabilityAggregator.addData(statusMetric);
}

h3AvailabilityAggregator.save();
}

h3AvailabilityAggregator.save();

}

getStatusChanges();
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -4,6 +4,7 @@
"@turf/invariant": "^6.1.2",
"@types/levelup": "^3.1.0",
"@types/node": "^10.12.0",
"cors": "^2.8.5",
"express": "^4.16.4",
"h3-js": "^3.2.0",
"leveldown": "^4.0.1",
Expand Down
38 changes: 38 additions & 0 deletions server.ts
@@ -0,0 +1,38 @@
import * as fs from "fs";
import { H3AvailabilityStatusAggregator } from "./src/metrics/status";

const { fork } = require('child_process');
const { join } = require('path');

var cors = require('cors')
const express = require('express');
const app = express();

app.use(cors())

// boot the mds data processor
const childProcess = fork('mds_processor.ts');

var port = '8082';

app.get('/metric/:metric/:week/:period', async (req, res) =>
{
if(req.params.week && req.params.period) {
var weeks = [req.params.week];
var period = req.params.period;

if(req.params.metric === 'h3_availability') {

var availabilityStatus = new H3AvailabilityStatusAggregator();
try {
var geoJson = availabilityStatus.getGeoJson(weeks,period);
res.send(geoJson);
}
catch(e) {
console.log(e);
}
}
}
});

app.listen(port, () => console.log(`app listening on port ${port}!`));
12 changes: 10 additions & 2 deletions src/data/periodicity.ts
Expand Up @@ -25,10 +25,18 @@ export class Week {
day:number;

toString():string {
return this.year + '-' + this.month + '-' + this.day;
const zeroPad = (n) => {
if ( n < 10 ) {
return ( '0' + n.toString () );
}
return n;
};
return this.year + '-' + zeroPad(this.month) + '-' + zeroPad(this.day);
}

isEqual(week):boolean {


if(this.year === week.year && this.month === week.month && this.day === week.day)
return true;
else
Expand Down Expand Up @@ -140,7 +148,7 @@ export function getPeriodsForTimeRange(startTime:number, endTime:number):Periodi
// calculate the factional range for data that begins and ends within the same period
var newPeriod = new PeriodicTimestamp();
Object.assign(newPeriod, startPeriod);
newPeriod.fraction = endPeriod.fraction - (1 - startPeriod.fraction);
newPeriod.fraction = startPeriod.fraction - (1 - endPeriod.fraction);
periods.push(newPeriod);
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/data/sharedstreets.ts
Expand Up @@ -14,7 +14,19 @@ const SHST_API_SEARCH_RADIUS = 50; // meters

export class SharedStreetsLocationRef {
referenceId:string;
referenceLength:number
location:number;

getBin(targetBinSize:number):number {
if(targetBinSize > 0) {
var numberOfBins = Math.floor(this.referenceLength / targetBinSize);
var averageBinLength = this.referenceLength / numberOfBins;
var bin = Math.floor(this.location / averageBinLength) + 1;
return bin;
}
else
return 1;
}
}

async function pointToShStLocationRef(point:Feature<Point>):Promise<SharedStreetsLocationRef> {
Expand All @@ -37,6 +49,7 @@ async function pointToShStLocationRef(point:Feature<Point>):Promise<SharedStreet

if( data.features.length > 0) {
locationRef.referenceId = data.features[0].properties.referenceId;
locationRef.referenceLength = data.features[0].properties.referenceLength;
locationRef.location = data.features[0].properties.location;

return locationRef;
Expand Down
88 changes: 82 additions & 6 deletions src/metrics/generic_aggregator.ts
@@ -1,7 +1,9 @@
import { Week } from "../data/periodicity";
import { Week, PeriodicTimestamp } from "../data/periodicity";
import { SharedStreetsLocationRef } from "../data/sharedstreets";


import * as fs from "fs";
import { join } from 'path';
import { FeatureCollection } from "@turf/helpers/lib/geojson";

const DEFAULT_DATA_DIRECTORY = './data/metrics/';

Expand All @@ -24,23 +26,97 @@ export abstract class GenericMetric {

}

export class Count extends GenericMetric {
count:number = 0;
increment() {
this.count = this.count + 1;
}
}

export class FractionalCount extends GenericMetric {
count:number = 0;
fractionalCount:number = 0;

add(fractionalValue) {
this.count = this.count + 1;
this.fractionalCount = this.fractionalCount + fractionalValue;
}
}

export class Sum extends GenericMetric {
count:number = 0;
sum:number = 0;
add(value) {
this.count = this.count + 1;
this.sum = this.sum + value;
}

avg():number {
if(this.count > 0)
return this.sum / this.count;
else
return 0;
}
}


export abstract class GenericPeriodicMetric {

abstract getPeriodicCounts(metricLabel:string):PeriodicValue[];
}

export abstract class GenericMetricAggregator<T extends GenericMetric> {

abstract metricGroupName:string;
export abstract class GenericMetricAggregator<T extends GenericMetric, V> {

data = {};

constructor(directory=DEFAULT_DATA_DIRECTORY) {
fs.mkdirSync(this.getPath(), {recursive:true});

for(var week of fs.readdirSync(this.getPath())) {
var content = fs.readFileSync(join(this.getPath(), week));
this.data[week] = JSON.parse(content.toString());
}
}

abstract getMetricName():string;

getPath():string {
return DEFAULT_DATA_DIRECTORY + this.metricGroupName;
return DEFAULT_DATA_DIRECTORY + this.getMetricName();
}

abstract defaultValue():V

getBin(period:PeriodicTimestamp, binIndex:string ):V {
var week = period.week.toString();
if(!this.data[week]){
var periodMap = {};
this.data[week] = periodMap;
}

if(!this.data[week][period.period]){
var dataMap = {};
this.data[week][period.period] = dataMap;
}

if(!this.data[week][period.period][binIndex]){
this.data[week][period.period][binIndex] = this.defaultValue();
}

return this.data[week][period.period][binIndex];
}

abstract addData(data:T);
abstract save(data:T);

save() {

for(var week of Object.keys(this.data)){
var weekObject = this.data[week];
var jsonContent = JSON.stringify(weekObject);
fs.writeFileSync(join(this.getPath(), week), jsonContent);
}

}

}

0 comments on commit 7063381

Please sign in to comment.