In [17]:
import apache_beam as beam
from apache_beam.pvalue import AsDict
import datetime
import pickle
from sklearn.cluster import KMeans
import datetime

from math import radians, cos, sin, asin, sqrt, pi

In [18]:
temp_location = 'gs://hackathon.jjkoh.com/tmp'
job_name = 'grab-beam-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
project = 'jjkoh95'
runner = 'DataflowRunner' # DirectRunner / DataflowRunner
region = 'us-east1'
template_location = 'gs://hackathon.jjkoh.com/template/'
machine_type = 'n1-highmem-2'
num_workers = 3

input_file = 'gs://hackathon.jjkoh.com/pipeline-dataproc-3/part-*.csv'
output_file = 'gs://hackathon.jjkoh.com/dataflow/final-balance-playground-2/part'

In [19]:
options = {
#     'staging_location': staging_location,
    'temp_location': temp_location,
    'job_name': job_name,
    'project': project,
    'region': region,
    'runner': runner,
    'save_main_session': True, # this makes life so much easier
    'machine_type': machine_type,
    'num_workers': num_workers,
    'autoscaling_algorithm': 'NONE'
}

In [20]:
opts = beam.pipeline.PipelineOptions(flags=["--requirements_file", "./requirements.txt"], **options)

In [21]:
def pause():
    raise Exception('paused')

In [22]:
col_names = ['origin_lat', 'origin_lng', 'origin_timestamp', 'dest_lat', 'dest_lng', 'dest_timestamp']

In [23]:
output_cols = [
    'origin_timestamp',
    'origin_day', 'origin_hour', 'origin_day_sin', 'origin_day_cos', 'origin_hour_sin', 'origin_hour_cos',
    'origin_density_cluster', 'origin_speed_cluster',
    'origin_distance_from_density_center', 'origin_bearing_from_density_center',
    'origin_distance_from_speed_center', 'origin_bearing_from_speed_center',
    'dest_density_cluster', 'dest_speed_cluster',
    'dest_distance_from_density_center', 'dest_bearing_from_density_center',
    'dest_distance_from_speed_center', 'dest_bearing_from_speed_center',
    'distance_origin_dest', 'bearing_origin_dest',
    'distance_origin_dest_density_cluster', 'distance_origin_dest_speed_cluster',
    'origin_distance_from_density_center_squared', 'origin_distance_from_speed_center_squared',
    'dest_distance_from_density_center_squared', 'dest_distance_from_speed_center_squared',
    'distance_origin_dest_squared', 
    'distance_origin_dest_density_cluster_squared', 'distance_origin_dest_speed_cluster_squared',
    'day_hour_crossing', 
    'origin_dest_density_cluster_crossing', 'origin_dest_speed_cluster_crossing',
    'duration',
]

In [24]:
cluster_models = {}

with open('models/traffic-cluster-density-20-kmeans.pkl', 'rb') as pkl:
    cluster_models['density'] = pickle.load(pkl)
    
with open('models/traffic-cluster-speed-20-kmeans.pkl', 'rb') as pkl:
    cluster_models['speed'] = pickle.load(pkl)

In [25]:
def convert_to_dict(rowstring, col_names):
    return {k: float(v) for (k,v) in zip(col_names, rowstring.split(','))}

def populate_datetime(row):
    import datetime
    from math import pi, sin, cos
    dt = datetime.datetime.utcfromtimestamp(row['origin_timestamp'])
    row['origin_day'] = int(dt.weekday())
    row['origin_hour'] = int(dt.hour)
    row['origin_day_sin'] = round(sin(row['origin_day']*(pi/7)), 4)
    row['origin_day_cos'] = round(cos(row['origin_day']*(pi/7)), 4)
    row['origin_hour_sin'] = round(sin(row['origin_hour']*(pi/24)), 4)
    row['origin_hour_cos'] = round(cos(row['origin_hour']*(pi/24)), 4)
    return row

def populate_label(row):
    row['duration'] = int(row['dest_timestamp'] - row['origin_timestamp'])
    return row
    
def filter_meta_origin_dest(row):
    return row['duration'] >= 900 # only care about time > 15 minutes 

def populate_distance_meta(row, c):
    from sklearn.cluster import KMeans
    
    from math import radians, cos, sin, asin, sqrt, pi
    def haversine(lat1, lon1, lat2, lon2):
        """
        Calculate the great circle distance between two points 
        on the earth (specified in decimal degrees)
        """
        # convert decimal degrees to radians 
        lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

        # haversine formula 
        dlon = lon2 - lon1 
        dlat = lat2 - lat1 
        a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers. Use 3956 for miles
        return round(c * r, 4)
    
    from geographiclib.geodesic import Geodesic
    def get_bearing(lat1, long1, lat2, long2):
        brng = Geodesic.WGS84.Inverse(lat1, long1, lat2, long2)['azi1']
        return round(brng, 4)
        
    # origin data stuffs
    row['origin_density_cluster'] = int(c['density'].predict([[row['origin_lat'], row['origin_lng']]])[0])
    row['origin_speed_cluster'] = int(c['speed'].predict([[row['origin_lat'], row['origin_lng']]])[0])
    origin_density_center = c['density'].cluster_centers_[row['origin_density_cluster']]
    origin_speed_center = c['speed'].cluster_centers_[row['origin_speed_cluster']]
    
    row['origin_distance_from_density_center'] = haversine(row['origin_lat'], row['origin_lng'], origin_density_center[0], origin_density_center[1])
    row['origin_bearing_from_density_center'] = get_bearing(row['origin_lat'], row['origin_lng'], origin_density_center[0], origin_density_center[1])
    row['origin_distance_from_speed_center'] = haversine(row['origin_lat'], row['origin_lng'], origin_speed_center[0], origin_speed_center[1])
    row['origin_bearing_from_speed_center'] = get_bearing(row['origin_lat'], row['origin_lng'], origin_speed_center[0], origin_speed_center[1])
    
    # dest data stuffs
    row['dest_density_cluster'] = int(c['density'].predict([[row['dest_lat'], row['dest_lng']]])[0])
    row['dest_speed_cluster'] = int(c['speed'].predict([[row['dest_lat'], row['dest_lng']]])[0])
    dest_density_center = c['density'].cluster_centers_[row['dest_density_cluster']]
    dest_speed_center = c['speed'].cluster_centers_[row['dest_speed_cluster']]
    
    row['dest_distance_from_density_center'] = haversine(row['dest_lat'], row['dest_lng'], dest_density_center[0], dest_density_center[1])
    row['dest_bearing_from_density_center'] = get_bearing(row['dest_lat'], row['dest_lng'], dest_density_center[0], dest_density_center[1])
    row['dest_distance_from_speed_center'] = haversine(row['dest_lat'], row['dest_lng'], dest_speed_center[0], dest_speed_center[1])
    row['dest_bearing_from_speed_center'] = get_bearing(row['dest_lat'], row['dest_lng'], dest_speed_center[0], dest_speed_center[1])
    
    # origin-destination stuffs
    row['distance_origin_dest'] = haversine(row['origin_lat'], row['origin_lng'], row['dest_lat'], row['dest_lng'])
    row['bearing_origin_dest'] = get_bearing(row['origin_lat'], row['origin_lng'], row['dest_lat'], row['dest_lng'])
    
    row['distance_origin_dest_density_cluster'] = haversine(origin_density_center[0], origin_density_center[1], dest_density_center[0], dest_density_center[1])
    row['distance_origin_dest_speed_cluster'] = haversine(origin_speed_center[0], origin_speed_center[1], dest_speed_center[0], dest_speed_center[1])
        
    return row

def filter_distance_origin_dest(row):
    return row['distance_origin_dest'] >= 1

def square_and_bin(row):
    # square all distance variables to make more explicit
    row['origin_distance_from_density_center_squared'] = round(row['origin_distance_from_density_center'] ** 2, 4)
    row['origin_distance_from_speed_center_squared'] = round(row['origin_distance_from_speed_center'] ** 2, 4)
    row['dest_distance_from_density_center_squared'] = round(row['dest_distance_from_density_center'] ** 2, 4)
    row['dest_distance_from_speed_center_squared'] = round(row['dest_distance_from_speed_center'] ** 2, 4)
    row['distance_origin_dest_squared'] = round(row['distance_origin_dest'] ** 2, 4)
    row['distance_origin_dest_density_cluster_squared'] = round(row['distance_origin_dest_density_cluster'] ** 2, 4)
    row['distance_origin_dest_speed_cluster_squared'] = round(row['distance_origin_dest_speed_cluster'] ** 2, 4)
    
    # feature crossing with categorical binning
    row['day_hour_crossing'] = int(row['origin_day']*24 + row['origin_hour'])
    row['origin_dest_density_cluster_crossing'] = int(row['origin_density_cluster']*20 + row['dest_density_cluster'])
    row['origin_dest_speed_cluster_crossing'] = int(row['origin_speed_cluster']*20 + row['dest_speed_cluster'])
    
    return row
    
def format_output(row):
    return ','.join([str(row[c]) for c in output_cols])

In [26]:
with beam.Pipeline(options=opts) as p:
    (
        p | 'input_file' >> beam.io.ReadFromText(input_file, skip_header_lines=1)
          | 'convert_raw_text_to_dictionary' >> beam.Map(convert_to_dict, col_names)
          | 'populate_datetime' >> beam.Map(populate_datetime)
          | 'populate_label' >> beam.Map(populate_label)
          | 'filter_meta_origin_dest' >> beam.Filter(filter_meta_origin_dest)
          | 'populate_distance_meta' >> beam.Map(populate_distance_meta, cluster_models)
          | 'square_and_bin' >> beam.Map(square_and_bin)
          | 'filter_distance_origin_dest' >> beam.Filter(filter_distance_origin_dest)
          | 'format_output' >> beam.Map(format_output)
          | 'output_storage' >> beam.io.WriteToText(output_file, file_name_suffix='.csv', header=','.join(output_cols))
    ) 
print('done')



done
