# Going big by loading several files

All the steps for pre-processing data from a single forceplate sensor have already been explained in [Tutorial One](3_FP_single.ipynb). 

Here we will try to go "big" by batch processing several data files at once!

In case you have
[Tutorial One](3_FP_single.ipynb) still open, make sure that you have stopped
the Spark engine by running the last command.

In [None]:
import pixiedust
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql.functions import *

Make sure that Spark works properly.

In [None]:
spark

First we declate the schema of the file, as follows

In [None]:
forceplate_schema = StructType([
    StructField("Time", IntegerType()),
    StructField("Channel1", DoubleType()),
    StructField("Channel2", DoubleType()),
    StructField("Channel3", DoubleType()),
    StructField("Channel4", DoubleType()),
    StructField("Channel5", DoubleType()),
    StructField("Channel6", DoubleType()),
    StructField("Channel7", DoubleType()),
    StructField("Channel8", DoubleType())
])

## Loading all files

You may recall from the previous tutorial, that for
loading a single CSV file with Spark, we used a statement like this one:

`    channelsDF = spark.read.csv('work/forceplate/18936.csv', header=True, schema=forceplate_schema)`


Now, we need to load several files that are stored in a folder.  
    
 
The only thing we have to change is to replace the filename with the folder location, and
all the CSV files will be read into a single dataframe.

In [None]:
channelsDFall = spark.read.csv('work/forceplate/original', header=True, schema=forceplate_schema)

Check if all files were loaded, for example by printing the length of the file.

In [None]:
channelsDFall.count()

So, the code is working to load all data. Note that this is a massively
parallelizable operation that would have worked the same with really big data
files stored in a remote location. However, this is a task for another tutorial.
You have probably also noticed that the files do not include an identifier of
the turkey they correspond to. This makes our dataframe not useful for the kind
of exercise we try to perform, as we will aim to extract features per turkey. Do
not worry, though, this is a common problem in ETL tasks, thus Spark has some
functionality to solve the problem in a smart way. For each Row in the
dataframe, Spark can recall from which file it was loaded, and we can extract
the Turkey ID information from its path location!

We can do this by using the
`withColumn` function, together with the `input_file_name` one. 

Check the code
below, where we append a new columnname named *input* that contains the  the
*filename* from which each row has been read. 

The `input_file_name` function
returns the full path of the file from which the row has been read, or an empty
string if not available.

In our case, this contains the turkey identifier!
Check the code below and inspect the results of the transformations.

In [None]:
channelsDFall = channelsDFall.withColumn("input", input_file_name())

channelsDFall.select("input").show(4)

### Splitting
Next challenge is to extract the turkey identifier and save is as a separate attribute.

We need to split the input string (file:///home/jovy...) by `/`, and keep the part with the turkey identifier.  
This can be done by the *split* command.  From the resulting array we pick the 9 element, which contains the
turkey ID.

In [None]:
channelsDFall = channelsDFall.withColumn('ID', split(channelsDFall['input'], '/')[8])
 # Check output #

Surely you have noticed that file extension (.csv), is still there. 

To remove this we only keep the first five digits of the splitted element by extracting a substring, using the `substr` command. 

Try the code below and check the output in your ID column.

In [None]:
channelsDFall = channelsDFall.withColumn('ID', split(channelsDFall['input'], '/')[8].substr(1,5))
channelsDFall

 # Check output #
 # i.e. by channelsDFall.show(3)

## Extracting features

Great! You have extracted all necessary information as attributes of your dataframe. 

Next step is to transform them into meaningful attributes for your application!

Lets assume that we want to calculate two attributes for each turkey: its weight and the time (duration) it was on the forceplate.

### Feature 1: Turkey weight

To estimate the weight we will need only the vertical force ($F_z$), which can be calculated as follows (see also previous tutorial).

In [None]:
 # This is a constant of the sensor (versterker-schaal z)
iz = 5000   

df = channelsDFall.withColumn("Fz",((channelsDFall.Channel5+channelsDFall.Channel6+channelsDFall.Channel7+channelsDFall.Channel8)*iz)/38.65)


And then inspect what we have achieved so far.

In [None]:
df.show(5)

The first feature is the turkey weight, which for simplicity we estimate as the maximum value of `Fz` divided by Newton constant.

We need to calculate the max per turkey id, thus we apply the following aggregation grouped by `ID`.

In [None]:
weights = df.groupBy(df.ID).agg(max(df.Fz).alias('maxFz'))\
                           .withColumn("weight", round(col('maxFz')/ 9.80665,2))

weights.show()

### Feature 2: Time turkey walked on the forceplate


Next we are going to extract the second feature:  the duration (in seconds) of how long the turkey standed on the forceplate.

Lets assume that all values below 100 is noise, and the turkey is on the plate only for the part that the $Fz >100$.  

First, lets filter the dataframe to only include the values above 100 in the $F_z$ column.

In [None]:
df2 = df.filter(df.Fz>100)

To estimate the time each turkey spent on the force plate, we first need to select the column *Time*, and calculate the `min` and `max` values,  grouped by the turkey ID. 

In Spark, we can estimate it as:

In [None]:
df2.select(df2.ID, df2.Time).groupBy('ID').min().show()

In [None]:
df2.select(df2.ID, df2.Time).groupBy('ID').max().show()

Next is to put these two aggregates one next to the other, substract them and multiply by 0.01, as the frequency of the sensor is 100Hz. 

You can do this with the following code.

In [None]:
df2.groupBy(df.ID).agg(min(df.Time).alias("from"), max(df.Time).alias("to"))\
                  .withColumn("timeOnPlate",0.01*(col('to')-col('from')))\
                  .show()


## Finishing up

And now lets combine all we have learned to extract both features in one go, and store them in a file, to be ready for followup work.

In [None]:
df3 = df2.groupBy(df.ID).agg(min(df.Time).alias("from"), max(df.Time).alias("to"), max(df.Fz).alias("maxFz"))\
                  .withColumn("timeOnPlate",0.01*(col('to')-col('from')))\
                  .withColumn("weight", round(col('maxFz')/ 9.80665,2))

df3.show()


 #save the extracted feauters to file
df3.select('ID','timeOnPlate', 'weight').write.csv("fp_features.csv", header=True, mode='overwrite')

Check in you local drive to verify that the file has been stored.

When you are finished with this notebook, please stop the spark engine before
closing the notebook.

In [None]:
spark.stop()