In [56]:
%run utils.ipynb

In [67]:
# This function unpacks the timeseries depending on the inner and global granularities attribute
# It can take more than one files, it works with parquet files
def unpack(base_dir, paths, drop_all_filled_nulls=True):
    dfs = []
    for path in paths:
        p = path
        df = spark.read.option('basePath', base_dir + '/').parquet(p).sort('timestamp_interval')
        
        # Extract the measurements and data columns
        columns = [col for col in df.columns if 'granularity' not in col and 'timestamp' not in col]
        
        # Get the granularity and the first timestamp
        step = df.select('inner_granularity').first()[0]
        first_timestamp = df.select('timestamp_first').first()[0]
        
        # Combine all data into a temporary column called 'new', and then explode this column
        df1 = df.withColumn('new', arrays_zip(*columns))
        df1 = df1.select('timestamp_first', 'new')
        df1 = df1.withColumn('new', explode('new'))
        
        # Redistribute the columns in a right way
        for c in columns:
            column_name = 'new.' + c
            df1 = df1.withColumn(c, col(column_name))
        df1 = df1.drop('new')
        
        # Give an incrementally increasing counter to each row
        df1 = df1.withColumn('counter', monotonically_increasing_id())
        
        if drop_all_filled_nulls is True:
            df1 = df1.filter(col(columns[0]) != 'filled_null')
        # Calculate the interval between the observations in seconds and calculate the exact timestamp 
        #  for each observation
        inSec = calculate_step_from_string(step)
        df1 = df1.withColumn('inSec', lit(inSec))
        df1 = df1.withColumn('timestamp', (df1.timestamp_first + (df1.counter * df1.inSec)).cast('long'))
        df1 = df1.drop('timestamp_first', 'counter', 'inSec')
        
        # Add the dataframe to the list
        dfs.append(df1)
    
    # Combine all dataframes of the different files
    df = reduce(DataFrame.union, dfs)
    
    return df.sort('timestamp')

In [1]:
def read_and_store(input_path, output_path, inferSchema=False, header=True, timestampFormat='yyyy-MM-dd HH:mm:ss',
                   timestampColumnName='timestamp', inner_granularity='1 minute', 
                   fill_skipped=True, fill_mode='null', global_granularity='1 month'):
    
    # Read the file from the system
    df = spark.read.csv(input_path, inferSchema=inferSchema, header=header, timestampFormat=timestampFormat)
    
    if timestampColumnName != 'timestamp':
        df = df.withColumnRenamed(timestampColumnName, 'timestamp')
    df = df.withColumn('timestamp', to_timestamp('timestamp', timestampFormat))      
    # Checking and changing column names because Spark does not take columns with dot in them
    df = df.toDF(*change_column_names(df.columns))
    
    
    # Change the timestamp to epoch
    df = df.withColumn("timestamp_epoch", df.timestamp.cast("long")).sort('timestamp_epoch')
    
    # Using the concat_udf to merge together the lat and lon
    concat_udf = udf(concat, StringType())
    
    cols = [c for c in df.columns if not c.startswith("timestamp") and not c.startswith(
        "lat") and not c.startswith("lon")]
    coords = [c for c in df.columns if c.startswith("lat") or c.startswith("lon")]
    df1 = df.select('timestamp_epoch', *cols, concat_udf(*coords).alias('coordinates'))
    
    # Dropping the rows with the same timestamp
    df1 = df1.drop_duplicates(['timestamp_epoch']).sort('timestamp_epoch')
    
    # Getting the inner granularity in seconds
    step = calculate_step_from_string(inner_granularity)
    
    
    # Optimally, the attribute fill_skipped should be True, we only support filling skipped with nulls now
    # and we will still create other data-driven approaches to fill the skipped values
    df_joined = df1.withColumn('timestamp', df1.timestamp_epoch)
    df_joined = df_joined.drop('timestamp_epoch')
    if fill_skipped is True:
        minp, maxp = df1.select(min("timestamp_epoch"), max("timestamp_epoch")).first()
        reference = spark.range(
            (minp / step) * step, ((maxp / step) + 1) * step, step).select(
            col("id").cast("timestamp").cast("long").alias("timestamp"))
        
        if fill_mode == 'null':
            # Fill the skipped values with nulls
            df_joined = reference.join(df1,reference.timestamp == df1.timestamp_epoch, "leftouter").drop(
                'timestamp_epoch')
            df_joined = df_joined.na.fill('filled_null')
        
    # Adding the global timestamp to group by it
    global_step = calculate_step_from_string(global_granularity)
    tsGroup = (floor(df_joined.timestamp / lit(global_step)) * lit(global_step)).alias('timestamp_interval')
        
    # A lambda to get the first value from an array
    getFirst = udf(lambda x:x[0], StringType())
    
    # Collect the different observations in lists, in order to store them
    columns = cols + ['coordinates']
    a = [collect_list(c).alias(c) for c in columns]
        
    # Create the dataframe to save when all observations are packed depending on the global granularity
    df2 = (df_joined.groupBy(tsGroup).agg(
    *a, collect_list('timestamp').alias('timestamp_first'))).withColumn(
        'timestamp_first', getFirst('timestamp_first')).orderBy('timestamp_first')
    
    # Add the granularities meta-data
    df2 = df2.withColumn('global_granularity', lit(global_granularity))
    df2 = df2.withColumn('inner_granularity', lit(inner_granularity))
    
    # partition by the global granularity and store the data as parquet
    df2.write.mode('overwrite').partitionBy('timestamp_interval').parquet(output_path)