In [0]:
%python
import pyspark.sql.functions as F
import os
from datetime import datetime, timedelta


In [0]:
# https://stackoverflow.com/questions/57537760/pyspark-how-to-generate-a-dataframe-composed-of-datetime-range 
import pyspark.sql.functions as F
def generate_dates(spark,range_list,interval=1,dt_col="date_time_ref"): # TODO: attention to sparkSession
        """
        Create a Spark DataFrame with a single column named dt_col and a range of date within a specified interval (start and stop included).
        With hourly data, dates end at 23 of stop day

        :param spark: SparkSession or sqlContext depending on environment (server vs local)
        :param range_list: array of strings formatted as "2018-01-20" or "2018-01-20 00:00:00"
        :param interval: number of seconds (frequency), output from get_freq()
        :param dt_col: string with date column name. Date column must be TimestampType

        :returns: df from range
         """
        start,stop = range_list
        temp_df = spark.createDataFrame([(start, stop)], ("start", "stop"))
        temp_df = temp_df.select([F.col(c).cast("timestamp") for c in ("start", "stop")])
        temp_df = temp_df.withColumn("stop",F.date_add("stop",1).cast("timestamp"))
        temp_df = temp_df.select([F.col(c).cast("long") for c in ("start", "stop")])
        start, stop = temp_df.first()
        return spark.range(start,stop,interval).select(F.col("id").cast("timestamp").alias(dt_col))


In [0]:
date_range = ['2021-12-15 00:00:00','2021-12-16 23:59:59']
generate_dates(spark,date_range).show(10)

In [0]:
source_table = 'default.ohtlmdsignal'
source_days = ['2021-12-15', '2021-12-16']
required_signals = ['GrsVehWeight','RGndSpdKph']
target_path = '/mnt/weuteststdatabrickspoc01/annotations/ohtsignals'

In [0]:
#.agg(F.avg(F.col('GrsVehWeight')).alias('GrsVehWeight'), F.avg(F.col('RGndSpdKph')).alias('RGndSpdKph'))

## Read from delta table

In [0]:

df = (spark.read.table(source_table)
          .filter((F.col('dailyDate')
               .isin(source_days)))
          .select(['assetIdExplore', 'dateTime', 'dailyDate'] + required_signals)
          .withColumn('dateTime', F.from_unixtime(F.col('dateTime').cast("integer"), format='yyyy-MM-dd HH:mm:ss'))
          .groupBy(['dateTime', 'assetIdExplore', 'dailyDate'])   
          .agg(*[F.avg(F.col(column)).alias(column) for column in required_signals])
          .withColumn("assetId", F.col("assetIdExplore"))
          .sort(F.col("dateTime"))
          
     )
df.show(10)

In [0]:
df_new = generate_dates(spark,date_range)
df_new.show(10)

In [0]:
df_new = df_new.withColumn('date_time_ref', F.from_unixtime(F.col('date_time_ref').cast("integer"), format='yyyy-MM-dd HH:mm:ss'))

df_new.show(10)

In [0]:
timedf = df_new

In [0]:
datadf = df

##Joining two data frames for cotinuous data .....
### Left outer join based on an explicit column expression
#### df1.join(df2, df1['customer_name'] == df2['account_name'], 'left_outer')

In [0]:
df_final = timedf.join(datadf,timedf["date_time_ref"] == datadf["dateTime"],'left_outer')

#### *Bellow we can see that for two 'asserts' the time stamp is merged(common timestamp) we need to change it like:*
- For each assert and each day we need add continuous time stamps.
- We neeed to check how the original data is and how we sort the timestamp.

In [0]:
# displaying how data is present in our final data frame
import pandas as pd
df_final.filter(F.col("date_time_ref").between(pd.to_datetime('2021-12-15 09:16:00'),pd.to_datetime('2021-12-15 23:59:59'))).show(35)

In [0]:
# Experiment:
# we need to replace null values in 'assetId' with the only assetID which we are useing now.(we are using only one assertId)
df_final.select('assetId').distinct().collect()

## Write data to storage dedicated to annotation source

In [0]:
df_final.repartition(1).write.format('csv').option("header",'true').mode('overwrite').partitionBy('assetIdExplore', 'dailyDate').save(target_path)

In [0]:
# experiment:


## List faily files available for labelling

In [0]:
dbutils.fs.ls(target_path)

In [0]:
df.describe()

In [0]:
%fs ls /mnt/weuteststdatabrickspoc01/annotations/ohtsignals/assetIdExplore=T264-5330104/dailyDate=2021-12-15

path,name,size
dbfs:/mnt/weuteststdatabrickspoc01/annotations/ohtsignals/assetIdExplore=T264-5330104/dailyDate=2021-12-15/_committed_2823186501687886309,_committed_2823186501687886309,198
dbfs:/mnt/weuteststdatabrickspoc01/annotations/ohtsignals/assetIdExplore=T264-5330104/dailyDate=2021-12-15/_committed_585081807249630048,_committed_585081807249630048,199
dbfs:/mnt/weuteststdatabrickspoc01/annotations/ohtsignals/assetIdExplore=T264-5330104/dailyDate=2021-12-15/_committed_vacuum3236805355272141955,_committed_vacuum3236805355272141955,95
dbfs:/mnt/weuteststdatabrickspoc01/annotations/ohtsignals/assetIdExplore=T264-5330104/dailyDate=2021-12-15/_started_2823186501687886309,_started_2823186501687886309,0
dbfs:/mnt/weuteststdatabrickspoc01/annotations/ohtsignals/assetIdExplore=T264-5330104/dailyDate=2021-12-15/part-00000-tid-2823186501687886309-d51f910d-5b27-4992-b137-a2ec9a3f6f17-98-8.c000.csv,part-00000-tid-2823186501687886309-d51f910d-5b27-4992-b137-a2ec9a3f6f17-98-8.c000.csv,17889


In [0]:
spark.read.csv('/mnt/weuteststdatabrickspoc01/annotations/ohtsignals/assetIdExplore=T264-5330104/dailyDate=2021-12-15/part-00000-tid-2823186501687886309-d51f910d-5b27-4992-b137-a2ec9a3f6f17-98-8.c000.csv', header=True).display()

date_time_ref,dateTime,GrsVehWeight,RGndSpdKph,assetId
2021-12-15T03:13:35.000Z,2021-12-15 03:13:35,174.8651,0.0,T264-5330104
2021-12-15T03:13:36.000Z,2021-12-15 03:13:36,175.00359799999984,0.0,T264-5330104
2021-12-15T03:13:37.000Z,2021-12-15 03:13:37,175.10460200000006,0.0,T264-5330104
2021-12-15T03:13:38.000Z,2021-12-15 03:13:38,175.25933599999982,0.0,T264-5330104
2021-12-15T03:13:39.000Z,2021-12-15 03:13:39,175.29358999999988,0.0,T264-5330104
2021-12-15T03:13:40.000Z,2021-12-15 03:13:40,175.31924600000002,0.0,T264-5330104
2021-12-15T03:13:41.000Z,2021-12-15 03:13:41,175.39858200000023,0.0,T264-5330104
2021-12-15T03:13:42.000Z,2021-12-15 03:13:42,175.4299699999999,0.0,T264-5330104
2021-12-15T03:13:43.000Z,2021-12-15 03:13:43,175.41165199999995,0.0,T264-5330104
2021-12-15T03:13:44.000Z,2021-12-15 03:13:44,175.4288759999998,0.0,T264-5330104
