# Efficient determination of zero-crossings in noisy real-life time series
## Advanced Data Science Capstone Project
### Extract, Transform, Load.
In this notebook, the data is provided from an internal or external dataset and transformed into the required format. The presented functions are called during the simulation every time, when a new data should be passed into the models. 

In [13]:
#Here, the path to the file [Zero_crossings_in_time_series]_import_libraries_python.ipynb should be indicated.


Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 70kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 17.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=d0b5491c5545384dffd96fb4dd7a51be00fbd517aba77fe551f0baaa1e8b1225
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


First, $N$ time steps $t_1,~t_2,...,~t_N$ are generated starting from the initial time $t_0$. The simplest way is to generate these values using a static or dynamic stepsize $h$ or $\Delta t$: $t_k = t_{k-1} + h$, but sometimes in real-life applications we cannot choose, when to estimate the system's state: in this case, the time observations $t_k$ are generated out of the simulated system. For example, the choice is based on the concrete sensors and/or resources required for getting and loading a new data: e.g., sending the data from a starship requires some resources, so it cannot be performed each second. 
At this moment, an external dataset is not connected just for simplicity, but it can be done very easily in the following function.

In [6]:
def provide_new_time_steps(t0,N=1,h=None):
  if h==None:
    #External dataset should be connected here
    #Data is read from the external dataset
    print('Database is not found')
  else:
    return np.arange(t0+h,t0+h+N*h-h/100,h)
#############################################


Second, new data is received from an external dataset or generated internally. Here, the `time_steps` is the array of the time observations $t_1, t_2, ..., t_N$, where we want to obtain the values of the objective function from an external dataset or to generate them internally using the function $f(\cdot)$. A random noise with the standard deviation $\sigma$ is added only during the generation of the values internally. Otherwise, the noise can be already in the data (e.g., the noise from the sensors). The `model_context` argument indicates in which format do we keep the data: if it is equal to 0 or to 2, then the standard numpy arrays are used. If `model_context=1`, then the Spark dataframe is used. 

Finally, only numerical data is returned: if, e.g., the data was not read correctly and has been returned as a NaN ("Not a number"), then it is not stored. I.e., at this step, we also check the correctness of the format of the data.

In [7]:
def provide_new_data(time_steps,f=None,sigma=0,model_context=0):
  t = []
  y = []
  for i in range(time_steps.shape[0]):
    if f==None:
      #External dataset should be connected here
      #Data is read from the external dataset
      print('Database is not found')
    try:
      y_temp = f(time_steps[i])
      y_new = y_temp+sigma*random.gauss(0,1)
    except:
      print("Error: database is not connected.")
      raise
    if np.logical_not(np.isnan(y_new)).all:
      if model_context==1:
        t.append(Vectors.dense(time_steps[i]))
      else:
        t.append(time_steps[i])
      y.append(float(y_new))
  if model_context==1:
    return spark.createDataFrame(sc.parallelize(zip(t,y)),["t","x"]),None
  else:
    return np.array(t),np.array(y)

Just an example of applying the above mentioned steps.

In [14]:
print("Just an example of ETL steps.")
f = lambda x: x**2-4.0*x+4
t0 = 0
t = provide_new_time_steps(t0,N=10,h=0.1)
t = np.append(t0,t)
print(t)
t, y_arrays = provide_new_data(t,f=f,sigma=0.01,model_context=0)
print(y_arrays)
y_spark,temp = provide_new_data(t,f=f,sigma=0.01,model_context=1)
y_spark.show(20)

Just an example of ETL steps.
[0.  0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]
[4.00708449 3.61691693 3.25130688 2.88846642 2.55638148 2.25977529
 1.96120969 1.68663897 1.42891945 1.21765965 0.99943552]
+--------------------+------------------+
|                   t|                 x|
+--------------------+------------------+
|               [0.0]| 4.008201203643661|
|               [0.1]|3.5905099317204656|
|               [0.2]|3.2399810138404868|
|[0.30000000000000...| 2.888485040952631|
|               [0.4]| 2.550800429047671|
|               [0.5]| 2.248550474394978|
|               [0.6]| 1.968275687250734|
|[0.7000000000000001]|1.6912594765938327|
|               [0.8]|1.4457836903118393|
|               [0.9]|   1.2112731810473|
|               [1.0]|0.9932533850743942|
+--------------------+------------------+

