In [54]:
# Set up modes and dirs
import os
from datetime import datetime
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.sql.functions import col

In [None]:
from pyspark.sql.types import LongType
overwrite = True
databricks = False
is_yellow = False

yellow = "yellow" if is_yellow else "foil"
pick_up = "pickup"
drop_off = "dropoff"

In [None]:
if not databricks:
    data_dir = "/Users/kzmain/LSDE/data"
    spark = SparkSession.builder.getOrCreate()
else:
    data_dir = "/dbfs/mnt/group01"
clean_file  = clean_dbfs  = (data_dir + "/{}".format(yellow) + "/cln/{}/{}.gz.parquet")
result_file = result_dbfs = (data_dir + "/{}".format(yellow) + "/feature/{}/{}.gz.parquet")

if databricks:
    clean_dbfs = clean_dbfs.replace("/dbfs", "")
    result_dbfs = result_dbfs.replace("/dbfs", "")

In [None]:
fr_year = 2009
fr_month = 1

to_year = 2017
to_month = 12

cluster_radian = 0.1

In [None]:
def check_file_exist(_path):
    if os.path.exists(_path) and not overwrite:
            print("[SYSTEM]: File exists: {}".format(_path))
            return True
    else:
        return False

In [None]:
def get_cln(_year, _month):
    return spark.read.parquet(clean_dbfs.format(_year, _month)).repartition(200, "pick_day")

In [None]:
def filter_duration(_in_df):
    dur_col = "duration_second"
    _in_df = _in_df.withColumn(dur_col, f.when(col(dur_col) > 4 * 60 * 60, False).otherwise(col(dur_col)))
    return _in_df

In [None]:
def feature_is_weekend(_in_df):
    _in_df = _in_df.withColumn("is_weekend", f.when(col("week_day") > 5, f.lit(True)).otherwise(f.lit(False)))
    return _in_df

In [None]:
def get_weather():
    # |         time_stamp|hour|day|high|low|baro|wind| wd|hum|weather|year|month|
    weather_dbfs = (data_dir + "/nyc/weather/parquet").replace("/dbfs", "")
    return spark.read.parquet(weather_dbfs).drop("time_stamp")

In [None]:
def feature_weather(_in, _year):
    _wh = get_weather().filter(col("year") == _year)
    _pm = "pick_month"
    _pd = "pick_day"
    _ph = "pick_hour"
    _m = "month"
    _d = "day"
    _h = "hour"
    _l = "left_outer"

    return _in.join(_wh, (_in[_pm] == _wh[_m]) & (_in[_pd] == _wh[_d]) & (_in[_ph] == _wh[_h]), how=_l)

In [None]:
# if is foil
def feature_trip_num_in_day(_in_df):
    _w1 = Window.partitionBy("medallion", "hack_license", "pick_month", "pick_day").orderBy("pickup_datetime")
    _w2 = Window.partitionBy("medallion", "hack_license", "pick_month", "pick_day")
    _w_month = Window.partitionBy("medallion", "hack_license", "pick_month")
    _in_df = _in_df.withColumn("trip_prev_drop", f.lag(_in_df["dropoff_datetime"]).over(_w1)) \
        .withColumn("trip_cruise_second", f.unix_timestamp(col("pickup_datetime")) - f.unix_timestamp(col("trip_prev_drop")))
    _in_df = _in_df.withColumn("day_is_sleep_in_day", f.when(col("trip_cruise_second") > 4 * 60 * 60, True).otherwise(False)) \
        .withColumn("day_is_sleep_in_day", f.max(col("day_is_sleep_in_day")).over(_w2)) \
        .withColumn("day_sleep_second", f.when(col("trip_cruise_second") > 4 * 60 * 60, col("trip_cruise_second")).otherwise(f.lit(0)))
    _in_df = _in_df.withColumn("trip_cruise_second", f.when(col("trip_cruise_second") > 4 * 60 * 60, f.lit(None)).otherwise(col("trip_cruise_second")))
    _in_df = _in_df.withColumn("day_trip_count",  f.count(col("pickup_datetime")).over(_w2))
    _in_df = _in_df.withColumn("day_start_stamp", f.min(f.unix_timestamp(col("pickup_datetime"))).over(_w2))
    _in_df = _in_df.withColumn("day_end_stamp"  , f.max(f.unix_timestamp(col("dropoff_datetime"))).over(_w2))
    _in_df = _in_df.withColumn("day_work_second", col("day_end_stamp") - col("day_start_stamp") - f.sum(col("day_sleep_second")).over(_w2))
    _in_df = _in_df.withColumn("day_mean_tip_per_trip", f.mean("tip_amount").over(_w2)) \
        .withColumn("day_mean_travel_second_per_trip", f.mean("duration_second").over(_w2)) \
        .withColumn("day_mean_cruise_second_per_trip", f.mean("trip_cruise_second").over(_w2)) \
        .withColumn("month_income_total", f.sum("total_amount").over(_w_month)) \
        .withColumn("month_work_seconds"  , f.sum(col("day_end_stamp") - col("day_start_stamp")).over(_w_month) - f.sum(col("day_sleep_second")).over(_w_month)   ) \
        .withColumn("month_income_per_second", f.col("month_income_total") /  f.col("month_work_seconds"))\
        .withColumn("month_work_on_weekend", f.max("is_weekend").over(_w_month))
    return _in_df

In [None]:
import osmnx as ox
map_dir = os.path.join(data_dir, "nyc/map")
nyc_map = ox.load_graphml(os.path.join(map_dir, "NYC.mph"))

def get_node(lat, lon):
    osmnx_id = ox.get_nearest_node(nyc_map, (lat,lon))
    return int(osmnx_id)

get_node_udf = f.udf(get_node, LongType())

In [None]:
def get_osmid(_in_df, _mode):
    _c1_lat = "{}_latitude".format(_mode)
    _c1_lon = "{}_longitude".format(_mode)
    _c1_oid = '{}_osmid'.format(_mode)
    _cor_df = _in_df.select([_c1_lat, _c1_lon]).dropDuplicates()
    _cor_df = _cor_df.withColumn(_c1_oid, get_node_udf(_c1_lat, _c1_lon))
    _in_df  = _in_df.join(_cor_df, [_c1_lat, _c1_lon])
    return _in_df

In [None]:
for year in range(2009, 2020):
    for month in range(1, 13):
        if not os.path.exists(clean_file.format(year, month)):
            continue
        if check_file_exist(result_file.format(year, month)):
            print("HAVE : {}-{}".format(year, month))
            continue
        print("Start: {}-{}".format(year, month))
        print("clean")
        cln = get_cln(year, month)
        print("[System]: Feature duration")
        cln = filter_duration(cln)
        print("[System]: Feature is weekend")
        cln = feature_is_weekend(cln)
        print("[System]: Feature weather")
        cln = feature_weather(cln, year)
        print("feature_weather")
        cln = get_osmid(cln, pick_up)

        if not is_yellow:
            cln = feature_trip_num_in_day(cln)

        cln.write.mode("overwrite").option("compression", "gzip")\
            .partitionBy("pick_day").parquet(result_dbfs.format(year, month))