In [1]:
from cStringIO import StringIO
import gzip
import os
import glob
import re
import pdb

import gcp
from gcp import storage
from gcp import bigquery as bq
import pandas as pd

# Import zip file from Google Cloud Storage
project = gcp.Context.default().project_id
bucket_name = project + '-datalab'
bucket_path = 'gs://' + bucket_name + '/data'
print 'Bucket: ' + bucket_path
compressed_filename = 'citydata'
season_name = 'season_2'
subdirectory_name = 'test_set_2'

Bucket: gs://datalab-projects-1331-datalab/data


In [None]:
gzip_filename = '{}.tar.gz'.format(compressed_filename)
tar_filename = '{}.tar'.format(compressed_filename)
compressed_file = None
datadir = season_name
# If data has not been extracted, extract it.
if not os.path.isdir(datadir):
  # If citydata.tar has not been downloaded, download it
  if not os.path.isfile(tar_filename):
    bucket_object = '{}/{}'.format(bucket_path, gzip_filename)

In [None]:
%%storage read --object $bucket_object --variable compressed_file

In [None]:
if compressed_file:
  gzip_file = gzip.GzipFile(fileobj=StringIO(compressed_file))
  del compressed_file

  import shutil
  with open(tar_filename, 'wb') as f_out:
    shutil.copyfileobj(gzip_file, f_out)
    
  import tarfile
  tar = tarfile.open(tar_filename, "r")
  tar.extractall()
  tar.close()
  os.remove(tar_filename)

In [None]:
# Downloading and extracting the files from cloud storage.

def process_datafile(localpath, storagepath, table, mode='create', overwrite=False):
  # Upload extracted file into GCS
  storagepath_r = storagepath.split('/')
  bucketname = storagepath_r[2]
  itempath = '/'.join(storagepath_r[3:])
  item = storage.Item(bucketname, itempath)
  if not item.exists() or overwrite:    
    with open(localpath, 'rb') as f:
      item.write_to(f.read(), 'text/plain')
  # Load data into Google BigQuery
  table.load(storagepath, mode=mode, csv_options=bq.CSVOptions(delimiter='\t'))

# Process Training Data

## Districts

In [None]:
table = bq.Table('datalab-projects-1331:xjk_algo_comp_test.districts_preprocessed')
if not table.exists():
  schema = bq.Schema([
      {'name': 'district_hash', 'type': 'STRING'},
      {'name': 'district_id', 'type': 'INTEGER'}
    ])
  table.create(schema)
  
localpath = '{}/{}/cluster_map/cluster_map'.format(season_name, subdirectory_name)
storagepath = os.path.join(bucket_path,localpath)
process_datafile(localpath, storagepath, table, mode='append')

In [None]:
%%bigquery execute -t datalab-projects-1331:xjk_algo_comp_test.districts -m overwrite
SELECT LAST(district_hash) AS district_hash, district_id
FROM [datalab-projects-1331:xjk_algo_comp_test.districts_preprocessed]
GROUP BY district_id

## Weather

In [None]:
table = bq.Table('datalab-projects-1331:xjk_algo_comp_test.weather_preprocessed')
if not table.exists():
  schema = bq.Schema([
      {'name': 'time', 'type': 'STRING'},
      {'name': 'weather', 'type': 'INTEGER'},
      {'name': 'temperature', 'type': 'FLOAT'},
      {'name': 'pm25', 'type': 'FLOAT'}
    ])
  table.create(schema)
  
wildpath = '{}/{}/weather_data/*'.format(season_name, subdirectory_name)
for localpath in glob.glob(wildpath):
  print 'process {}'.format(storagepath)
  storagepath = os.path.join(bucket_path,localpath)
  process_datafile(localpath, storagepath, table, mode='append')

In [None]:
%%bigquery udf --module transform_weather_time

/**
 * Pad with 0 or given string.
 *
 * @param int n Number to add padding to.
 * @param int width Width of number + padding.
 * @param string z (Optional) Other string to replace '0' as padding.
 */
function pad(n, width, z) {
  z = z || '0';
  n = n + '';
  return n.length >= width ? n : new Array(width - n.length + 1).join(z) + n;
}

/**
 * Transform timestamps of weather table into timeslots in weather table.
 *
 * @param {{time: string, weather: integer, temperature: float, pm25: float}} r
 * @param function({{time: string, weather: integer, temperature: float, pm25: float,
                     timeslot: string, timeofday_slot: integer, day_in_week: integer,
                     date: string}}) emitFn
 */
function(r, emitFn) {
  var t = r.time.split(/[ :\-]/);
  var slot = Math.floor((parseInt(t[3]) * 60 + parseInt(t[4])) / 10) + 1;
  r.timeslot = t[0] + '-' + pad(t[1], 2) +
               '-' + pad(t[2], 2) + '-' + slot;
  r.timeofday_slot = slot;
  r.date = t[0] + '-' + pad(t[1], 2) + '-' + pad(t[2], 2);
  r.day_in_week = new Date(parseInt(t[0]), parseInt(t[1])-1, parseInt(t[2])).getDay();
  emitFn(r);
}

In [None]:
%%bigquery execute -t datalab-projects-1331:xjk_algo_comp_test.weather -m overwrite
SELECT LAST(time) AS time, LAST(weather) AS weather,
  LAST(temperature) AS temperature, LAST(pm25) AS pm25, timeslot
FROM transform_weather_time([datalab-projects-1331:xjk_algo_comp_test.weather_preprocessed])
GROUP BY timeslot

## Traffic

In [None]:
table = bq.Table('datalab-projects-1331:xjk_algo_comp_test.traffic_preprocessed')
if not table.exists():
  schema = bq.Schema([
      {'name': 'district_hash', 'type': 'STRING'},
      {'name': 'tj_level1', 'type': 'INTEGER'},
      {'name': 'tj_level2', 'type': 'INTEGER'},
      {'name': 'tj_level3', 'type': 'INTEGER'},
      {'name': 'tj_level4', 'type': 'INTEGER'},
      {'name': 'tj_time', 'type': 'STRING'}
    ])
  table.create(schema)
  
wildpath = '{}/{}/traffic_data/*'.format(season_name, subdirectory_name)
for localpath in glob.glob(wildpath):
  with open(localpath, 'rb') as f:
    text = f.read()
  with open(localpath, 'wb') as f:
    f.write(re.sub(r'\b\t[0-9]:\b', '\t', text))
  storagepath = os.path.join(bucket_path,localpath)
  print 'process {}'.format(storagepath)
  process_datafile(localpath, storagepath, table, mode='append', overwrite=True)

In [2]:
%%bigquery udf --module transform_traffic_time
  
/**
 * Pad with 0 or given string.
 *
 * @param int n Number to add padding to.
 * @param int width Width of number + padding.
 * @param string z (Optional) Other string to replace '0' as padding.
 */
function pad(n, width, z) {
  z = z || '0';
  n = n + '';
  return n.length >= width ? n : new Array(width - n.length + 1).join(z) + n;
}

/**
 * Transform timestamps of weather table into timeslots in traffic table.
 *
 * @param {{district_hash: string, tj_level1: integer, tj_level2: integer, tj_level3: integer, 
            tj_level4: integer, tj_time: string}} r
 * @param function({{district_hash: string, tj_level1: integer, tj_level2: integer, tj_level3: integer, 
                     tj_level4: integer, tj_time: string, timeslot: string, timeofday_slot: integer,
                     day_in_week: integer, date: string}}) emitFn
 */
function(r, emitFn) {
  var t = r.tj_time.split(/[ :\-]/);
  var slot = Math.floor((parseInt(t[3]) * 60 + parseInt(t[4])) / 10) + 1;
  r.timeslot = t[0] + '-' + pad(t[1], 2) +
               '-' + pad(t[2], 2) + '-' + slot;
  r.timeofday_slot = slot;
  r.date = t[0] + '-' + pad(t[1], 2) + '-' + pad(t[2], 2);
  r.day_in_week = new Date(parseInt(t[0]), parseInt(t[1])-1, parseInt(t[2])).getDay();
  emitFn(r);
}

In [3]:
%%bigquery execute -t datalab-projects-1331:xjk_algo_comp_test.traffic -m overwrite
  
SELECT district_hash, AVG(tj_level1) AS tj_level1,
  AVG(tj_level2) AS tj_level2, AVG(tj_level3) AS tj_level3, AVG(tj_level4) AS tj_level4,
  LAST(tj_time) AS tj_time, timeslot, timeofday_slot, day_in_week, date
FROM transform_traffic_time([datalab-projects-1331:xjk_algo_comp_test.traffic_preprocessed])
GROUP BY district_hash, timeslot

(L3:50): Expression 'timeofday_slot' is not present in the GROUP BY list


## POIs

In [None]:
table = bq.Table('datalab-projects-1331:xjk_algo_comp_test.pois_preprocessed')
  
localpath = '{}/{}/poi_data/poi_data'.format(season_name, subdirectory_name)

pois = []
pois_schema = [{'name': 'district_hash', 'type': 'STRING'}]
with open(localpath, 'rb') as f:
  for line in f:
    line_pois = map(lambda x: ['f{}'.format(x.split(':')[0].replace('#', '_')), x.split(':')[1]],
                    line.split('\t')[1:])
  for poi in line_pois:
    if poi[0] not in pois:
      pois.append(poi[0])
      pois_schema.append({'name': poi[0], 'type': 'INTEGER'})
pois.sort()
pois_schema = sorted(pois_schema, key=lambda k: k['name']) 

if not table.exists():
  schema = bq.Schema.from_data(pois_schema)
  table.create(schema)
  
  pois_data = pd.DataFrame(columns=['district_hash'] + pois)
  with open(localpath, 'rb') as f:
    for line in f:
      hash_pois = {}
      for poi_line in line.split('\t')[1:]:
        hash_pois['f{}'.format(poi_line.split(':')[0].replace('#', '_'))] = poi_line.split(':')[1]
      poi_data = [line.split('\t')[0]]
      # hash_pois = {f1_1: 15, ...}
      # pois = ['f1_1', ...]
      for poi in pois:
        value = '0'
        if poi in hash_pois:
          value = hash_pois[poi].strip()
        poi_data.append(value)
      pois_data.loc[len(pois_data)] = poi_data
  for poi in pois:
    pois_data[poi] = pd.to_numeric(pois_data[poi])
  table.insert_data(pois_data)

Following code can be used to print out feature fields to be used when selecting from tables.

In [None]:
final_text = ''
for counter, poi_text in enumerate(map(lambda x: 'LAST(pois.{}) AS {}'.format(x,x), pois)):
  if counter%3 == 0:
    final_text = '{}\n'.format(final_text)
  final_text = '{}{}, '.format(final_text, poi_text)
print final_text[1:(len(final_text)-2)]

In [None]:
%%bigquery execute -t datalab-projects-1331:xjk_algo_comp_test.pois -m overwrite
  
SELECT district_hash,
LAST(pois.f1) AS f1, LAST(pois.f11) AS f11, LAST(pois.f11_1) AS f11_1, 
LAST(pois.f11_2) AS f11_2, LAST(pois.f11_3) AS f11_3, LAST(pois.f11_4) AS f11_4, 
LAST(pois.f11_5) AS f11_5, LAST(pois.f11_6) AS f11_6, LAST(pois.f11_7) AS f11_7, 
LAST(pois.f11_8) AS f11_8, LAST(pois.f13_4) AS f13_4, LAST(pois.f13_8) AS f13_8, 
LAST(pois.f14) AS f14, LAST(pois.f14_1) AS f14_1, LAST(pois.f14_10) AS f14_10, 
LAST(pois.f14_2) AS f14_2, LAST(pois.f14_3) AS f14_3, LAST(pois.f14_6) AS f14_6, 
LAST(pois.f14_8) AS f14_8, LAST(pois.f15) AS f15, LAST(pois.f15_1) AS f15_1, 
LAST(pois.f15_2) AS f15_2, LAST(pois.f15_3) AS f15_3, LAST(pois.f15_4) AS f15_4, 
LAST(pois.f15_6) AS f15_6, LAST(pois.f15_7) AS f15_7, LAST(pois.f15_8) AS f15_8, 
LAST(pois.f16) AS f16, LAST(pois.f16_1) AS f16_1, LAST(pois.f16_10) AS f16_10, 
LAST(pois.f16_11) AS f16_11, LAST(pois.f16_12) AS f16_12, LAST(pois.f16_3) AS f16_3, 
LAST(pois.f16_4) AS f16_4, LAST(pois.f16_6) AS f16_6, LAST(pois.f17) AS f17, 
LAST(pois.f17_2) AS f17_2, LAST(pois.f17_3) AS f17_3, LAST(pois.f17_4) AS f17_4, 
LAST(pois.f17_5) AS f17_5, LAST(pois.f19) AS f19, LAST(pois.f19_1) AS f19_1, 
LAST(pois.f19_2) AS f19_2, LAST(pois.f19_3) AS f19_3, LAST(pois.f19_4) AS f19_4, 
LAST(pois.f1_1) AS f1_1, LAST(pois.f1_10) AS f1_10, LAST(pois.f1_11) AS f1_11, 
LAST(pois.f1_2) AS f1_2, LAST(pois.f1_3) AS f1_3, LAST(pois.f1_4) AS f1_4, 
LAST(pois.f1_5) AS f1_5, LAST(pois.f1_6) AS f1_6, LAST(pois.f1_7) AS f1_7, 
LAST(pois.f1_8) AS f1_8, LAST(pois.f20) AS f20, LAST(pois.f20_1) AS f20_1, 
LAST(pois.f20_2) AS f20_2, LAST(pois.f20_4) AS f20_4, LAST(pois.f20_5) AS f20_5, 
LAST(pois.f20_6) AS f20_6, LAST(pois.f20_7) AS f20_7, LAST(pois.f20_8) AS f20_8, 
LAST(pois.f20_9) AS f20_9, LAST(pois.f21_1) AS f21_1, LAST(pois.f21_2) AS f21_2, 
LAST(pois.f22) AS f22, LAST(pois.f22_1) AS f22_1, LAST(pois.f22_2) AS f22_2, 
LAST(pois.f22_3) AS f22_3, LAST(pois.f22_4) AS f22_4, LAST(pois.f22_5) AS f22_5, 
LAST(pois.f23) AS f23, LAST(pois.f23_1) AS f23_1, LAST(pois.f23_2) AS f23_2, 
LAST(pois.f23_3) AS f23_3, LAST(pois.f23_4) AS f23_4, LAST(pois.f23_5) AS f23_5, 
LAST(pois.f23_6) AS f23_6, LAST(pois.f24) AS f24, LAST(pois.f24_1) AS f24_1, 
LAST(pois.f24_2) AS f24_2, LAST(pois.f24_3) AS f24_3, LAST(pois.f25) AS f25, 
LAST(pois.f25_1) AS f25_1, LAST(pois.f25_3) AS f25_3, LAST(pois.f25_7) AS f25_7, 
LAST(pois.f25_8) AS f25_8, LAST(pois.f25_9) AS f25_9, LAST(pois.f2_1) AS f2_1, 
LAST(pois.f2_10) AS f2_10, LAST(pois.f2_11) AS f2_11, LAST(pois.f2_12) AS f2_12, 
LAST(pois.f2_13) AS f2_13, LAST(pois.f2_2) AS f2_2, LAST(pois.f2_4) AS f2_4, 
LAST(pois.f2_5) AS f2_5, LAST(pois.f2_6) AS f2_6, LAST(pois.f2_7) AS f2_7, 
LAST(pois.f2_8) AS f2_8, LAST(pois.f3_1) AS f3_1, LAST(pois.f3_2) AS f3_2, 
LAST(pois.f3_3) AS f3_3, LAST(pois.f4) AS f4, LAST(pois.f4_1) AS f4_1, 
LAST(pois.f4_10) AS f4_10, LAST(pois.f4_11) AS f4_11, LAST(pois.f4_13) AS f4_13, 
LAST(pois.f4_14) AS f4_14, LAST(pois.f4_16) AS f4_16, LAST(pois.f4_17) AS f4_17, 
LAST(pois.f4_18) AS f4_18, LAST(pois.f4_2) AS f4_2, LAST(pois.f4_3) AS f4_3, 
LAST(pois.f4_5) AS f4_5, LAST(pois.f4_6) AS f4_6, LAST(pois.f4_7) AS f4_7, 
LAST(pois.f4_8) AS f4_8, LAST(pois.f4_9) AS f4_9, LAST(pois.f5) AS f5, 
LAST(pois.f5_1) AS f5_1, LAST(pois.f5_3) AS f5_3, LAST(pois.f5_4) AS f5_4, 
LAST(pois.f6) AS f6, LAST(pois.f6_1) AS f6_1, LAST(pois.f6_2) AS f6_2, 
LAST(pois.f6_4) AS f6_4, LAST(pois.f7) AS f7, LAST(pois.f8) AS f8, 
LAST(pois.f8_1) AS f8_1, LAST(pois.f8_2) AS f8_2, LAST(pois.f8_3) AS f8_3, 
LAST(pois.f8_4) AS f8_4, LAST(pois.f8_5) AS f8_5
FROM [datalab-projects-1331:xjk_algo_comp_test.pois_preprocessed] AS pois
GROUP BY district_hash

## Orders

In [None]:
table = bq.Table('datalab-projects-1331:xjk_algo_comp_test.orders_preprocessed')
if not table.exists():
  schema = bq.Schema([{'name': 'order_id', 'type': 'STRING'},
                      {'name': 'driver_id', 'type': 'STRING'},
                      {'name': 'passenger_id', 'type': 'STRING'},
                      {'name': 'start_district_hash', 'type': 'STRING'},
                      {'name': 'dest_district_hash', 'type': 'STRING'},
                      {'name': 'price', 'type': 'FLOAT'},
                      {'name': 'time', 'type': 'STRING'}])
  table.create(schema)
  
wildpath = '{}/{}/order_data/*'.format(season_name, subdirectory_name)
for localpath in glob.glob(wildpath):
  print 'loading {}'.format(localpath)
  storagepath = os.path.join(bucket_path,localpath)
  process_datafile(localpath, storagepath, table, mode='append')

In [None]:
%%bigquery execute -t datalab-projects-1331:xjk_algo_comp_test.orders_preprocessed2 -m overwrite

SELECT order_id, LAST(driver_id) AS driver_id, LAST(passenger_id) AS passenger_id,
  LAST(start_district_hash) AS start_district_hash, LAST(dest_district_hash) AS dest_district_hash,
  LAST(price) AS price, LAST(time) AS time
FROM [datalab-projects-1331:xjk_algo_comp_test.orders_preprocessed]
GROUP BY order_id

In [None]:
%%bigquery udf --module orders_create_additional_fields

/**
 * Pad with 0 or given string.
 *
 * @param int n Number to add padding to.
 * @param int width Width of number + padding.
 * @param string z (Optional) Other string to replace '0' as padding.
 */
function pad(n, width, z) {
  z = z || '0';
  n = n + '';
  return n.length >= width ? n : new Array(width - n.length + 1).join(z) + n;
}

/**
 * Create additional fields on orders table for gaps table creation.
 *
 * @param {{order_id: string, driver_id: string, passenger_id: string,
            start_district_hash: string, dest_district_hash: string, price: float,
            time: string}} r
 * @param function({{order_id: string, driver_id: string, passenger_id: string,
                     start_district_hash: string, dest_district_hash: string, price: float,
                     time: string, timeslot: string, timeofday_slot: integer, day_in_week: integer,
                     date: string}}) emitFn
 */
function(r, emitFn) {
  var t = r.time.split(/[ :\-]/);
  var slot = Math.floor((parseInt(t[3]) * 60 + parseInt(t[4])) / 10) + 1;
  r.timeslot = t[0] + '-' + pad(t[1], 2) +
               '-' + pad(t[2], 2) + '-' + slot;
  r.timeofday_slot = slot;
  r.date = t[0] + '-' + pad(t[1], 2) + '-' + pad(t[2], 2);
  r.day_in_week = new Date(parseInt(t[0]), parseInt(t[1])-1, parseInt(t[2])).getDay();
  emitFn(r);
}

In [None]:
%%bigquery execute -t datalab-projects-1331:xjk_algo_comp_test.orders -m overwrite
SELECT order_id, driver_id, passenger_id,
  start_district_hash, dest_district_hash,
  price, time, timeslot, timeofday_slot, day_in_week, date
FROM orders_create_additional_fields([datalab-projects-1331:xjk_algo_comp_test.orders_preprocessed2])