In [1]:
import findspark
findspark.init('/usr/hdp/current/spark2-client')

import pyspark
from pyspark.sql.functions import lit, col, instr, expr, pow, round, bround, corr, count, mean, stddev_pop, min, max
from pyspark.sql.functions import monotonically_increasing_id, initcap, lower, upper, ltrim, rtrim, rpad, lpad, trim
from pyspark.sql.functions import regexp_replace, translate, regexp_extract, current_date, current_timestamp, struct
from pyspark.sql.functions import date_add, date_sub, datediff, months_between, to_date, to_timestamp, coalesce, split, size
from pyspark.sql.functions import array_contains, explode, udf
from pyspark.sql import HiveContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, when

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, FloatType, LongType

In [2]:
def get_Spark():

    conf = pyspark.SparkConf().setAll([
        ('spark.submit.deployMode', 'client'), # deploy in yarn-client or yarn-cluster
        ('spark.executor.memory', '8g'),       # memory allocated for each executor
        ('spark.executor.cores', '3'),         # number of cores for each executor
        ('spark.executor.instances', '10'),    # number of executors in total
        ('spark.yarn.am.memory', '10g')])      # memory for spark driver (application master)
    spark = SparkSession.builder \
    .master("yarn") \
    .appName("name") \
    .enableHiveSupport() \
    .config(conf = conf) \
    .getOrCreate()

    return spark

spark = get_Spark()
spark_context = spark.sparkContext
hc = HiveContext(spark_context)

In [3]:
normaltimeFormat = "yyyyMMddHHmmss"

In [4]:
def build_schema(model):
    types = {
        "string": StringType(),
        "long": LongType(),
        "date": StringType(),
        "categorical - integer": IntegerType(),
        "double": DoubleType(),
        "integer": DoubleType(),
        "int": IntegerType(),
        "float": FloatType()
    }

    with open('schemas/{}_schema.csv'.format(model if model != "conv" else model + "_1s"), 'r') as lines:
        columns = []
        for line in lines:
            lineArray = line.split(",")
            columns.append(StructField(lineArray[1], types[lineArray[2]]))
        schema = StructType(columns)
    return schema

In [5]:
from datetime import date, timedelta
d1 = date(2019,1,25)
d2 = date(2019,1,26)
delta = d2 - d1
days=[]
for i in range(delta.days+1):
    days.append((d1+timedelta(i)).strftime("%Y%m%d"))

In [6]:
import subprocess
def run_cmd(args_list):
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    proc.communicate()
    return proc.returncode

In [7]:
def load1sDataPerDay(day, dfSchema):
    data_file = "hdfs://namenode:8020/apps/hive/warehouse/conv_tsp_tbls.db/a7m_1s_orc/dt=" + str(day)
    returncode = run_cmd(['hdfs', 'dfs', '-test', '-e', data_file])
    if returncode:
        print('{} does not exist, skipping ..'.format(data_file))
    else :
        return spark.read.format("orc").schema(dfSchema).load(data_file)

In [8]:
def load5sDataPerDay(day):
    data_file = "hdfs://namenode:8020/apps/hive/warehouse/conv_tsp_tbls.db/a7m_5s_orc/dt=" + str(day) + "/000000_0"
    returncode = run_cmd(['hdfs', 'dfs', '-test', '-e', data_file])
    if returncode:
        print('{} does not exist, skipping ..'.format(data_file))
    else :
        return spark.read.format("orc").load(data_file).select("_col0", "_col1", "_col19")\
            .withColumnRenamed("_col0", "vin").withColumnRenamed("_col1", "normaltime")\
            .withColumnRenamed("_col19", "icm_totalodometer")

In [9]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

my_window = Window.partitionBy("vin").orderBy("normaltime")
my_next_window = Window.partitionBy("vin").orderBy(desc("normaltime"))

dfSchema = build_schema("conv")
df = None
for day in days:
    df_1s = load1sDataPerDay(day, dfSchema)
    df_5s = load5sDataPerDay(day)

    if not (df_1s is None or df_5s is None):
        df_tmp = df_1s.join(df_5s, ["vin", "normaltime"], "inner").withColumn("normaltime", to_timestamp(col("normaltime"), normaltimeFormat))
        df_tmp = df_tmp.withColumn("next_normaltime", F.lag(df_tmp.normaltime).over(my_next_window))
        df_tmp = df_tmp.withColumn("prev_normaltime", F.lag(df_tmp.normaltime).over(my_window))
        df_tmp = df_tmp.withColumn("prev_diff", F.when(F.isnull(df_tmp.normaltime.cast("long") - df_tmp.prev_normaltime.cast("long")), 1000).otherwise(df_tmp.normaltime.cast("long") - df_tmp.prev_normaltime.cast("long")))
        df_tmp = df_tmp.withColumn("next_diff", F.when(F.isnull(df_tmp.next_normaltime.cast("long") - df_tmp.normaltime.cast("long")), 1000).otherwise(df_tmp.next_normaltime.cast("long") - df_tmp.normaltime.cast("long")))
        df_tmp = df_tmp.where("prev_diff >= 60 or next_diff >= 60")
        print('{} starting/ending rows  ..'.format(df_tmp.count()))
        if df is None:
            df = df_tmp
        else:
            df = df.union(df_tmp)
    print('{} processing ..'.format(day))
    if not (df is None):
        print('{} rows loaded ..'.format(df.count()))
if not (df is None):
    df.show(100)

hdfs://namenode:8020/apps/hive/warehouse/conv_tsp_tbls.db/a7m_1s_orc/dt=20190125 does not exist, skipping ..
hdfs://namenode:8020/apps/hive/warehouse/conv_tsp_tbls.db/a7m_5s_orc/dt=20190125/000000_0 does not exist, skipping ..
20190125 processing ..
241278 starting/ending rows  ..
20190126 processing ..
241278 rows loaded ..
+-----------------+-------------------+---------+-------------+------------------+--------------------+----------------------+---------------------+------------------------+------------------+----------+--------------------+-------------------+------------------+------------------+------------------+-----------------+-------------------+--------------------+-----------------+------------------+-------------------+------------+------------+--------------------+----------------+-----------------+--------------------+-------------------+--------------+-------------+-------------+----------------+---------------+---------------+------------------+----------------+-----

In [10]:
res = {}
res['vin'] = []
res['start_time'] = []
res['end_time'] = []
res['start_lat'] = []
res['start_lon'] = []
res['end_lat'] = []
res['end_lon'] = []
res['distance']  = []

df = df.sort(["vin","normaltime"], ascending=[0,1])
pdf = df.toPandas()

pdf.head(100)

Unnamed: 0,vin,normaltime,bcm_keyst,bcs_yawratest,bcs_vehspd,bcs_actvehlongaccel,bcs_vehlongacceloffset,bcs_actvehlaltrlaccel,bcs_vehlaltrlacceloffset,ems_engtorq,...,tel_altitudetype,tel_gpsdirection,tel_gpsspeed,tel_gpsaltitude,dt,icm_totalodometer,next_normaltime,prev_normaltime,prev_diff,next_diff
0,LMGMS1G8XK1036329,2019-01-26 20:32:29,3,1,0.00000,0.162642,-0.000118,-0.000118,-0.000118,30.968,...,0,0.0,0.000000,4.0,,6.0,2019-01-26 20:32:34,NaT,1000,5
1,LMGMS1G8XK1036329,2019-01-26 20:35:57,2,1,0.95625,-3.255326,-0.027245,-0.190005,-0.000118,13.328,...,0,0.0,0.000000,0.0,,7.0,NaT,2019-01-26 20:35:52,5,1000
2,LMGMS1G8XK1035990,2019-01-26 14:23:22,2,1,0.00000,-0.352766,-0.054372,-0.000118,-0.000118,28.616,...,0,0.0,0.000000,4.0,,5.0,2019-01-26 14:23:27,NaT,1000,5
3,LMGMS1G8XK1035990,2019-01-26 14:27:00,2,1,12.54375,0.027009,-0.027245,-0.325639,-0.000118,15.288,...,0,5.2,66.000000,6.0,,6.0,2019-01-26 14:41:07,2019-01-26 14:26:55,5,847
4,LMGMS1G8XK1035990,2019-01-26 14:41:07,3,1,0.00000,0.027009,-0.054372,-0.135752,-0.000118,26.656,...,0,0.0,0.000000,10.0,,6.0,2019-01-26 14:41:12,2019-01-26 14:27:00,847,5
5,LMGMS1G8XK1035990,2019-01-26 14:41:42,2,1,0.00000,-0.054372,-0.054372,-0.244259,-0.000118,20.384,...,0,0.0,0.000000,12.0,,6.0,NaT,2019-01-26 14:41:37,5,1000
6,LMGMS1G8XK1035553,2019-01-26 11:18:11,3,1,0.00000,-0.027245,-0.000118,-0.000118,-0.027245,18.032,...,0,0.0,0.000000,1058.0,,8.0,2019-01-26 11:18:16,NaT,1000,5
7,LMGMS1G8XK1035553,2019-01-26 11:42:53,2,1,4.44375,-0.000118,-0.000118,-0.298512,-0.027245,13.720,...,0,3.8,10.000000,1061.0,,8.0,2019-01-26 11:45:53,2019-01-26 11:42:48,5,180
8,LMGMS1G8XK1035553,2019-01-26 11:45:53,2,1,0.00000,-0.135752,-0.000118,-0.000118,-0.027245,5.880,...,0,0.0,0.000000,1056.0,,8.0,2019-01-26 11:45:58,2019-01-26 11:42:53,180,5
9,LMGMS1G8XK1035553,2019-01-26 11:48:40,2,1,0.00000,-0.027245,-0.000118,-0.000118,-0.027245,7.840,...,0,0.0,0.000000,1046.0,,9.0,2019-01-26 11:49:42,2019-01-26 11:48:35,5,62


In [13]:
# get indices where time difference longer than threshold
indices = pdf.index[(pdf['prev_diff'] >= int(600)) | (pdf['next_diff'] >= int(600))].tolist()

for i in range(len(indices) - 1):
    lo = indices[i]
    hi = indices[i+1]
    
    if pdf['tel_latitudedeg'].iloc[lo] == 0 and pdf['tel_latitudemin'].iloc[lo] == 0 and pdf['tel_latitudesec'].iloc[lo] == 0:
        continue
        
    if pdf['tel_longitudedeg'].iloc[lo] == 0 and pdf['tel_longitudemin'].iloc[lo] == 0 and pdf['tel_longitudesec'].iloc[lo] == 0:
        continue
    
    if pdf['tel_latitudedeg'].iloc[hi] == 0 and pdf['tel_latitudemin'].iloc[hi] == 0 and pdf['tel_latitudesec'].iloc[hi] == 0:
        continue
        
    if pdf['tel_longitudedeg'].iloc[hi] == 0 and pdf['tel_longitudemin'].iloc[hi] == 0 and pdf['tel_longitudesec'].iloc[hi] == 0:
        continue
        
    if pdf['prev_diff'].iloc[lo] < int(600) or pdf['next_diff'].iloc[hi] < int(600) or pdf['icm_totalodometer'].iloc[hi] - pdf['icm_totalodometer'].iloc[lo] < 1: 
        continue

    if pdf['vin'].iloc[lo] != pdf['vin'].iloc[hi]:
        print ('{} finished'.format(pdf['vin'].iloc[lo]))
        continue
    
    res['vin'].append(pdf['vin'].iloc[lo])
    res['start_time'].append(pdf['normaltime'].iloc[lo])
    res['start_lat'].append(pdf['tel_latitudedeg'].iloc[lo] + pdf['tel_latitudemin'].iloc[lo] / 60 \
                               + pdf['tel_latitudesec'].iloc[lo] / 3600)
    res['start_lon'].append(pdf['tel_longitudedeg'].iloc[lo] + pdf['tel_longitudemin'].iloc[lo] / 60 \
                               + pdf['tel_longitudesec'].iloc[lo] / 3600)
    res['end_time'].append(pdf['normaltime'].iloc[hi])
    res['end_lat'].append(pdf['tel_latitudedeg'].iloc[hi] + pdf['tel_latitudemin'].iloc[hi] / 60 \
                               + pdf['tel_latitudesec'].iloc[hi] / 3600)
    res['end_lon'].append(pdf['tel_longitudedeg'].iloc[hi] + pdf['tel_longitudemin'].iloc[hi] / 60 \
                               + pdf['tel_longitudesec'].iloc[hi] / 3600)
    res['distance'].append(pdf['icm_totalodometer'].iloc[hi] - pdf['icm_totalodometer'].iloc[lo])

import pandas as pd

res_df = pd.DataFrame(res)

res_df['duration'] = res_df.apply(lambda x: int((x['end_time']-x['start_time']).total_seconds())/3600.0, axis=1)
res_df['speed'] = res_df.apply(lambda x: float(x['distance'] / x['duration']), axis=1)

res_df.head(100)

Unnamed: 0,distance,end_lat,end_lon,end_time,start_lat,start_lon,start_time,vin,duration,speed
0,1.0,0.000000,0.000000,2019-01-26 20:35:57,22.014081,113.010978,2019-01-26 20:32:29,LMGMS1G8XK1036329,0.057778,17.307692
1,1.0,23.008769,113.014492,2019-01-26 14:27:00,23.009139,113.015211,2019-01-26 14:23:22,LMGMS1G8XK1035990,0.060556,16.513761
2,17.0,40.000592,113.015761,2019-01-26 12:17:15,40.004100,113.015844,2019-01-26 11:18:11,LMGMS1G8XK1035553,0.984444,17.268623
3,1.0,27.014053,120.004492,2019-01-26 14:49:40,27.015164,120.008914,2019-01-26 14:46:52,LMGMS1G8XJ1035017,0.046667,21.428571
4,1.0,27.015289,120.008717,2019-01-26 16:30:43,27.014156,120.004478,2019-01-26 16:19:33,LMGMS1G8XJ1035017,0.186111,5.373134
5,295.0,44.003583,124.011147,2019-01-26 13:58:14,0.000000,0.000000,2019-01-26 09:19:20,LMGMS1G8XJ1034790,4.648333,63.463607
6,186.0,43.000022,123.003586,2019-01-26 16:05:39,44.000131,124.014958,2019-01-26 14:12:52,LMGMS1G8XJ1034790,1.879722,98.950791
7,74.0,42.014375,122.013347,2019-01-26 17:00:55,42.011758,123.005350,2019-01-26 16:19:23,LMGMS1G8XJ1034790,0.692222,106.902087
8,352.0,40.012783,120.003261,2019-01-26 21:37:35,42.009106,122.004178,2019-01-26 17:45:26,LMGMS1G8XJ1034790,3.869167,90.975662
9,16.0,23.005747,113.003139,2019-01-26 13:40:20,0.000000,0.000000,2019-01-26 13:08:07,LMGMS1G8XJ1034773,0.536944,29.798241


In [18]:
my_col = ["vin"
               ,"start_time"
               ,"start_lat"
               ,"start_lon"
               ,"end_time"
               ,"end_lat"
               ,"end_lon"
               ,"distance"
               ,"duration"
               ,"speed"]

mySchema = StructType([StructField("vin", StringType(), True)\
               ,StructField("start_time", StringType(), True)\
               ,StructField("start_lat", DoubleType(), True)\
               ,StructField("start_lon", DoubleType(), True)\
               ,StructField("end_time", StringType(), True)\
               ,StructField("end_lat", DoubleType(), True)\
               ,StructField("end_lon", DoubleType(), True)\
               ,StructField("distance", DoubleType(), True)\
               ,StructField("duration", DoubleType(), True)\
               ,StructField("speed", DoubleType(), True)])
spark_df = hc.createDataFrame(res_df[my_col], mySchema)
cols = [when(~col(x).isin("NULL", "NA", "NaN",""), col(x)).alias(x) for x in spark_df.columns]
spark_df = spark_df.select(*cols)
spark_df.registerTempTable('update_dataframe')

sql_cmd = """INSERT OVERWRITE TABLE ubi.conv_trips SELECT * from update_dataframe"""
print(sql_cmd)
hc.sql(sql_cmd)
print('Table address creation done.')

INSERT OVERWRITE TABLE ubi.conv_trips SELECT * from update_dataframe
Table address creation done.
