In [1]:
#--------------------------------------import-------------------------------------
import os, sys, gzip, random, json, datetime, re, io
import pandas as pd
from pathlib import Path
from scipy import stats as st
#-----------------------------------environment------------------------------------
dL = os.listdir(os.environ['LAV_DIR']+'/src/')
sys.path = list(set(sys.path + [os.environ['LAV_DIR']+'/src/'+x for x in dL]))
baseDir = os.environ['LAV_DIR']
projDir = baseDir + "/rem/src/feature_exp/"
#------------------------------------local-import----------------------------------
from sawmill import aws_utils as a_u
from sawmill import proc_spark as p_s
from albio import series_stream as s_s
import importlib

In [32]:
#-----------------------------pyspark-import----------------------
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk-amd64'
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import to_utc_timestamp, from_utc_timestamp
from pyspark.sql.functions import date_format
from pyspark.sql import functions as func
from pyspark.sql.functions import col
from pyspark.sql.window import Window

conf = (SparkConf()
    .setMaster("yarn-client")
    .setAppName("proc library")
    .set("spark.deploy-mode", "cluster"))
conf.set("spark.executor.memory", "10g")
conf.set("spark.executor.cores", "10")
conf.set("spark.executor.instances", "2")
conf.set("spark.driver.maxResultSize", "10g")
conf.set("spark.driver.memory", "10g")
conf.set("spark.sql.crossJoin.enabled", "true")

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
sc.setLogLevel("ERROR")

In [1]:
from pyspark.sql import SparkSession

In [74]:
timeFmt = "yyyy-MM-dd HH:mm:ss.SSS"

In [123]:
#----------------------load-data-frame---------------------------
df = sqlContext.read.parquet(baseDir + "/rem/log/" + 'telemetry')
df = df.orderBy(['session_id','timestamp_ms'])
df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- timestamp_ms: timestamp (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- control_room_id: string (nullable = true)
 |-- actuators_enabled: boolean (nullable = true)
 |-- control_room_jitter: struct (nullable = true)
 |    |-- jitter_ms: float (nullable = true)
 |-- vehicle_jitter: struct (nullable = true)
 |    |-- jitter_ms: float (nullable = true)
 |-- control_room_ping: struct (nullable = true)
 |    |-- ping_ms: float (nullable = true)
 |-- vehicle_ping: struct (nullable = true)
 |    |-- ping_ms: float (nullable = true)
 |-- e2e_latency: struct (nullable = true)
 |    |-- camera_index: integer (nullable = true)
 |    |-- latency_ms: long (nullable = true)
 |-- fault_detected: struct (nullable = true)
 |    |-- fault_source: string (nullable = true)
 |    |-- is_timeout: boolean (nullable = true)
 |-- latency_camera: struct (nullable = true)

In [150]:
#----------------------group-by-deci-seconds---------------------
df = df.withColumn('deci',(func.col('timestamp_ms').cast('double')*10.).cast('long'))
ddf = df.groupBy(["session_id","deci"]).agg(
    func.mean('control_room_jitter.jitter_ms').alias('room_jitter'),
    func.mean('vehicle_jitter.jitter_ms').alias('vehicle_jitter'),
    func.mean('jitter.jitter_ms').alias('camera_jitter'),
    func.mean('vehicle_ping.ping_ms').alias('vehicle_ping'),
    func.mean('joystick_hz.joystick_hz').alias('joystick_freq'),
    func.mean('wheel_speed.mean_km_per_hour').alias('wheel_speed'),
    func.mean('radar_track.object_relative_speed_m_per_sec').alias('radar_speed'),
    func.mean('vehicle_physics.lateral_force_m_per_sec_squared').alias('force_lateral'),
    func.mean('vehicle_physics.longitudinal_force_m_per_sec_squared').alias('force_longitudinal'),
    func.mean('vehicle_physics.yaw_rate_deg_per_sec').alias('yaw_rate'),
    func.mean('vehicle_ram_usage.ram_usage_percent').alias('ram_usage'),
    func.mean('vehicle_cpu_usage.cpu_usage_percent').alias('cpu_usage'),
    func.mean('bitrate_adjusted.bitrate_bps').alias('bit_rate'),
    func.mean('vehicle_temperature_readings.b_cpu_temperature').alias('cpu_temperature'),
    func.mean('control_room_performance.camera_interframe_interval_average_ms').alias('camera_interframe'),
    func.mean('control_room_performance.native_cycle_duration_average_ms').alias('native_cycle'),
    func.mean('latency_camera.latency_ms').alias('camera_latency'),
    func.mean('joystick_latency.joystick_latency_ns').alias('joystick_latency'),
    func.mean('e2e_latency.latency_ms').alias('e2e_latency'),
    )
ddf.show(1)

+--------------------+-----------+-----------+--------------+-----------------+------------------+------------------+-----------+-----------+--------------------+------------------+--------------------+---------+---------+--------+---------------+-----------------+------------+--------------+--------------------+-----------+
|          session_id|       deci|room_jitter|vehicle_jitter|    camera_jitter|      vehicle_ping|     joystick_freq|wheel_speed|radar_speed|       force_lateral|force_longitudinal|            yaw_rate|ram_usage|cpu_usage|bit_rate|cpu_temperature|camera_interframe|native_cycle|camera_latency|    joystick_latency|e2e_latency|
+--------------------+-----------+-----------+--------------+-----------------+------------------+------------------+-----------+-----------+--------------------+------------------+--------------------+---------+---------+--------+---------------+-----------------+------------+--------------+--------------------+-----------+
|7ac225a7-57e8-4f6.

In [151]:
window = Window.partitionBy('session_id').orderBy('deci').rowsBetween(-3, 3)
window = Window.partitionBy('session_id').rowsBetween(-3, 3)
tL = ['room_jitter','camera_jitter','joystick_freq','wheel_speed','radar_speed','force_lateral','force_longitudinal',
      'yaw_rate','ram_usage','cpu_usage','bit_rate','cpu_temperature','camera_interframe','native_cycle','camera_latency',
     'joystick_latency','e2e_latency']
for t in tL:
    ddf = ddf.withColumn(t,func.mean(ddf[t]).over(window))
ddf.show(1)

+--------------------+-----------+-----------------+------------------+------------------+------------------+------------------+-----------+-----------+--------------------+-------------------+--------------------+-----------------+---------+--------+---------------+-----------------+------------+------------------+--------------------+------------------+
|          session_id|       deci|      room_jitter|    vehicle_jitter|     camera_jitter|      vehicle_ping|     joystick_freq|wheel_speed|radar_speed|       force_lateral| force_longitudinal|            yaw_rate|        ram_usage|cpu_usage|bit_rate|cpu_temperature|camera_interframe|native_cycle|    camera_latency|    joystick_latency|       e2e_latency|
+--------------------+-----------+-----------------+------------------+------------------+------------------+------------------+-----------+-----------+--------------------+-------------------+--------------------+-----------------+---------+--------+---------------+-----------------

In [156]:
ddf.write.parquet(baseDir + "/rem/log/prediction/",mode="overwrite")

In [158]:
tel = ddf.toPandas()
tel.to_csv(baseDir + "/rem/raw/prediction/telemetry.csv.gz",compression="gzip",index=False)

In [157]:
df = sqlContext.read.parquet(baseDir + "/rem/log/" + 'prediction')
df.show()

+--------------------+-----------+------------------+------------------+------------------+------------------+------------------+-----------+-----------+--------------------+-------------------+--------------------+-----------------+-----------------+--------+------------------+-----------------+------------+------------------+--------------------+------------------+
|          session_id|       deci|       room_jitter|    vehicle_jitter|     camera_jitter|      vehicle_ping|     joystick_freq|wheel_speed|radar_speed|       force_lateral| force_longitudinal|            yaw_rate|        ram_usage|        cpu_usage|bit_rate|   cpu_temperature|camera_interframe|native_cycle|    camera_latency|    joystick_latency|       e2e_latency|
+--------------------+-----------+------------------+------------------+------------------+------------------+------------------+-----------+-----------+--------------------+-------------------+--------------------+-----------------+-----------------+--------+

In [None]:
def replace_null(orig, ma):
    return func.when(orig.isNull(), ma).otherwise(orig)
ddf = ddf.withColumn('stream_camera',replace_null(col('latency_camera.latency_ms'), col('moving_avg')))
ddf.select(['latency_camera','moving_avg','stream_camera']).show()

In [133]:
ddf1 = df.where(df.latency_camera.latency_ms != 0)
ddf1.select("latency_camera").take(1)

[Row(latency_camera=Row(camera_index=3, latency_ms=116))]

In [54]:
sec_delta = lambda i: i * 1
w = (Window.orderBy(func.col("timestamp_ms").cast('long')).rangeBetween(-sec_delta(1), 0))
ddf = ddf.withColumn('avg', func.avg("latency_camera.latency_ms").over(w))
ddf = ddf.withColumn('jitter', func.avg("control_room_jitter.jitter_ms").over(w))
ddf.select(['timestamp_ms','latency_camera','avg']).show(5,False)

+-----------------------+--------------+------------------+
|timestamp_ms           |latency_camera|avg               |
+-----------------------+--------------+------------------+
|2020-07-02 08:03:15.483|[3, 126]      |136.28571428571428|
|2020-07-02 08:03:15.483|[0, 165]      |136.28571428571428|
|2020-07-02 08:03:15.483|[1, 121]      |136.28571428571428|
|2020-07-02 08:03:15.483|[2, 113]      |136.28571428571428|
|2020-07-02 08:03:15.274|[3, 116]      |136.28571428571428|
+-----------------------+--------------+------------------+
only showing top 5 rows



In [125]:
ddf.agg({'deci':'min'}).show()

+-----------+
|  min(deci)|
+-----------+
|15936697952|
+-----------+



In [16]:
def streamList(col):
    """stream a list - spark"""
    n_buf = 5
    buf = [0. for x in range(n_buf)]
    norm = 1./n_buf
    for c in col:
        if c == c:
            buf = buf[1:] + [c]
        c = sum(buf)*norm
    return col
udf_stream = udf(streamList,FloatType())