# Data extraction of trips using Dask dataframe

# Purpose
As a first step the time series data will be divided into trips, as a data reduction. Energy consumption can be calculated for each trip together with other aggregated quantities such as mean values, standard deviations etc. This will be used to analyze how much trips differ from each other over the year.

But the file is larger than the memory can take so this solution uses a Dask dataframe instead.

# Methodology
* Loop over the dask dataframe partitions and number the trips, save to partquet in each loop.

# Setup

In [None]:
#%load imports.py
%matplotlib inline
%load_ext autoreload
%autoreload 2

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (20,3)

#import seaborn as sns
import os
from collections import OrderedDict

from IPython.display import display

pd.options.display.max_rows = 999
pd.options.display.max_columns = 999
pd.set_option("display.max_columns", None)

import folium
import plotly.express as px
import plotly.graph_objects as go

import sys
import os
sys.path.append('../')
from src.visualization import visualize
from src.data import prepare_dataset
from src.data import trips
import scipy.integrate
import seaborn as sns

import pyarrow as pa
import pyarrow.parquet as pq


In [None]:
from dask.distributed import Client, progress, TimeoutError
client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')
client

In [None]:
#df = prepare_dataset.get_dataset(name='tycho_short_parquet', n_rows=None)
#from azureml.core import Workspace, Dataset
#
#import sys
#sys.path.append("../src/models/pipelines/longterm/scripts/prepdata/trip")
#from src.models.pipelines.longterm.scripts.prepdata.trip import trip_id
#workspace = Workspace.from_config()
#dataset = Dataset.get_by_name(workspace, name='tycho_short_parquet')
#df = trip_id.get(dataset=dataset)

In [None]:
df = prepare_dataset.get_dask(name='tycho_short_parquet')

In [None]:
df.head()

In [None]:
df.npartitions

In [None]:
df2 = pd.DataFrame() 

current_trip_no = 0
parquet_schema = None
parquet_file = 'tycho_short_id.parquet'

for partition in df.partitions:
    
    df_raw = partition.compute()
        
    df_ = prepare_dataset.prepare(df_raw=df_raw)
    trips.numbering(df=df_, start_number=current_trip_no)
    current_trip_no = df_.iloc[-1]['trip_no']
    
    df_['time'] = df_.index.astype(str)
    df_.reset_index(inplace=True, drop=True)
    
    # Write df partiotion to the parquet file
    if parquet_schema is None:
        parquet_schema = pa.Table.from_pandas(df=df_).schema
        parquet_writer = pq.ParquetWriter(parquet_file, parquet_schema, compression='snappy')
    
    table = pa.Table.from_pandas(df_, schema=parquet_schema)
    parquet_writer.write_table(table)
    
parquet_writer.close()

In [None]:
df_.dtypes

## Save trips
Save a new dataset with *trip_no*, *trip_time* and correct column names etc.

In [None]:
from azureml.core import Workspace, Dataset, Datastore

workspace = Workspace.from_config()
datastore = workspace.get_default_datastore()


In [None]:
datastore.upload_files([parquet_file], target_path='tycho_short_id', overwrite=True)

# Load trips

In [None]:
datastore_paths = [(datastore, 'tycho_short_id'),]
                   
ds = Dataset.Tabular.from_parquet_files(datastore_paths)
ds.register(workspace=workspace, name='tycho_test', description='testing...')


In [None]:
df2 = ds.to_pandas_dataframe()
df2.time = pd.to_datetime(df2.time)
df2.set_index('time', inplace=True)
#df2.sort_index(inplace=True)

In [None]:
df2.head()

In [None]:
df2.tail()

In [None]:
groups = df2.groupby(by='trip_no')
df2['trip_time'] = groups['trip_no'].transform(lambda x : x.index - x.index[0])
groups = df2.groupby(by='trip_no')


df_3 = groups.resample('60S').mean()
df_3.dropna(inplace=True)

df_3.index = df_3.index.get_level_values(1)
df_3['trip_time'] = groups['trip_no'].transform(lambda x : x.index - x.index[0] )




In [None]:
fig = px.line(df_3, x='trip_time', y='sog', template="plotly_dark", color='trip_no', width=1500, height=400)
fig.show()

In [None]:
trip = groups.get_group(12)
fig = px.line(trip, x='trip_time', y=['cog','heading'], template="plotly_dark", width=1500, height=400)
fig.show()

In [None]:
trip = groups.get_group(13)
fig = px.line(trip, x='trip_time', y=['cog','heading'], template="plotly_dark", width=1500, height=400)
fig.show()