In [None]:
from pyspark.sql.functions import *
phone_accel_df = spark.read.format("csv").option("header", "true").load("dbfs:/home/tempo/Phones_accelerometer").withColumn("event_ts", (col("Arrival_Time").cast("double")/1000).cast("timestamp")).withColumn("x", col("x").cast("double")).withColumn("y", col("y").cast("double")).withColumn("z", col("z").cast("double")).withColumn("event_ts_dbl", col("event_ts").cast("double"))
from tempo import *
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", partition_cols = ["User"])
display(phone_accel_tsdf)

In [None]:
# ts_col = timestamp column on which to sort fact and source table
# partition_cols - columns to use for partitioning the TSDF into more granular time series for windowing and sorting

resampled_sdf = phone_accel_tsdf.resample(freq='min', func='floor')
resampled_pdf = resampled_sdf.df.filter(col('event_ts').cast("date") == "2015-02-23").toPandas()

import plotly.graph_objs as go
import plotly.express as px
import pandas as pd

# Plotly figure 1
fig = px.line(resampled_pdf, x='event_ts', y='z',
color="User",
line_group="User", hover_name="User")
fig.update_layout(title='Phone Accelerometer Usage' , showlegend=False)

fig.show()

In [None]:
from pyspark.sql.functions import *

watch_accel_df = spark.read.format("csv").option("header", "true").load("dbfs:/home/tempo/Watch_accelerometer").withColumn("event_ts", (col("Arrival_Time").cast("double")/1000).cast("timestamp")).withColumn("x", col("x").cast("double")).withColumn("y", col("y").cast("double")).withColumn("z", col("z").cast("double")).withColumn("event_ts_dbl", col("event_ts").cast("double"))

watch_accel_tsdf = TSDF(watch_accel_df, ts_col="event_ts", partition_cols = ["User"])

# Applying AS OF join to TSDF datasets
joined_df = watch_accel_tsdf.asofJoin(phone_accel_tsdf, right_prefix="phone_accel")

display(joined_df)
# We can use show() also
# joined_df.show(10, False)

In [None]:
joined_df = watch_accel_tsdf.asofJoin(phone_accel_tsdf, right_prefix="watch_accel", tsPartitionVal = 10, fraction = 0.1)
display(joined_df)
# We can use show() also
# joined_df.show(10, False)

In [None]:
ema_trades = watch_accel_tsdf.EMA("x", window = 50)
display(ema_trades)
# We can use show() also
# ema_trades.show(10, False)

In [None]:
moving_avg = watch_accel_tsdf.withRangeStats("y", rangeBackWindowSecs=600)
moving_avg.select('event_ts', 'x', 'y', 'z', 'mean_y').show(10, False)

In [None]:
ft_df = tsdf.fourier_transform(timestep=1, valueCol="data_col")
display(ft_df)

In [None]:
# Create instance of the TSDF class
input_tsdf = TSDF(
            input_df,
            partition_cols=["partition_a", "partition_b"],
            ts_col="event_ts",
        )


# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
# Note: When chaining interpolate after a resample, there is no need to provide a freq or func parameter. Only method is required.
interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpolate(
    method="linear"
)

# What the following interpolation method does is:
# 1. Aggregate columnA and columnBN  using mean into 30 second intervals
# 2. Interpolate any missing intervals or null values using linear fill
interpolated_tsdf = input_tsdf.interpolate(
    freq="30 seconds",
    func="mean",
    target_cols= ["columnA","columnB"],
    method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
    partition_cols=["partition_c"],
    ts_col="other_event_ts"
    freq="30 seconds",
    func="mean",
    target_cols= ["columnA","columnB"],
    method="linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
    partition_cols=["partition_c"],
    ts_col="other_event_ts"
    freq="30 seconds",
    func="mean",
    method="linear",
    target_cols= ["columnA","columnB"],
    show_interpolated=True,
)

In [None]:
grouped_stats = watch_accel_tsdf.withGroupedStats(metricCols = ["y"], freq="1 minute")
display(grouped_stats)