# 3. Data Preprocessing

* 싸이그래머 : 스칼라ML - Scala for Machine Learning [1]
* 김무성

# Contents

* Time series
* Moving averages
* Fourier analysis
* The Kalman filter
* Alternative preprocessing techniques
* Summary

#### Data preprocessing consists of 

    - cleaning, 
    - filtering, 
    - transforming, and 
    - normalizing raw observations 
* using statistics in order to correlate features or groups of features, identify trends and model, and filter out noise. 

#### The purpose of cleansing raw data is twofold:

* Extract some basic knowledge from raw datasets
* Evaluate the quality of data and generate clean datasets for unsupervised or supervised learning

# Time series

* Each library has its own container type to manipulate datasets. 
* The challenge is to define all possible conversions between types from different libraries needed to implement a large variety of machine learning models. 
* Such a strategy may result in a combinatorial explosion of <font color="red">implicit conversion</font>. 

<img src="figures/tip3.1.png" width=600 /> 

* The generic data transformation, DT, can be used to transform any XTSeries time series:

In [None]:
class DT[T,U] extends PipeOperator[XTSeries[T], XTSeries[U]] { 
    override def |> : PartialFunction[XTSeries[T], XTSeries[U]]
}

<img src="figures/fig3.1.png" width=600 /> 

#### Let's create the XTSeries class

In [None]:
class XTSeries[T](label: String, arr: Array[T]) { // 1 
    def apply(n: Int): T = arr.apply(n)
    
    @implicitNotFound("Undefined conversion to DblVector") // 2
    def toDblVector(implicit f: T=>Double):DblVector =arr.map(f(_))

    @implicitNotFound("Undefined conversion to DblMatrix") // 2
    def toDblMatrix(implicit fv: T => DblVector): DblMatrix = arr.map( fv( _ ) )
    
    def + (n: Int, t: T)(implicit f: (T,T) => T): T = f(arr(n), t)

    def head: T = arr.head //3

    def drop(n: Int):XTSeries[T] = XTSeries(label,arr.drop(n))

    def map[U: ClassTag](f: T => U): XTSeries[U] = XTSeries[U](label, arr.map( x =>f(x)))

    def foreach( f: T => Unit) = arr.foreach(f) //3

    def sortWith(lt: (T,T)=>Boolean):XTSeries[T] = XTSeries[T](label, arr.sortWith(lt))

    def max(implicit cmp: Ordering[T]): T = arr.max //4
   
    def min(implicit cmp: Ordering[T]): T = arr.min
   ...
}

* line 1
    - The annotation @specialized (line 1) instructs the compiler to generate two versions of the class:
        - A generic XTSeries[T] class that exploits all the implicit conversions required to perform operations on time series of a generic type
        - An optimized XTSeries[Double] class that bypasses the conversion and offers the client code with a faster implementation
* line 2 
    - The conversion to DblVector (resp. DblMatrix) relies on the implicit conversion of elements to type Double (resp. DblVector) (line 2). 
* line 3
    - This code snippet exposes a subset of the Scala higher-order collections methods (line 3) applied to the time series. 
* line 4 
    - The computation of the minimum and maximum values in the time series required that the cmp ordering/compare method be defined for the elements of the type T (line 4).

* The @implicitNotFound annotation instructs the compiler to omit an error if no implicit conversion is detected. * The conversion methods are used to implement the implicit conversion introduced in the previous section. 
* These methods are defined in the singleton org.scalaml.core.Types.CommonsMath library. 
* The following code shows the implementation of the conversion methods:

In [None]:
object Types {
    object CommonMath {
        implicit def series2DblVector[T](xt: XTSeries[T])(implicit f: T=>Double):DblVector = xt.toDblVector(f)
        implicit def series2DblMatrix[T](xt: XTSeries[T])(implicit f: T=>DblVector): DblMatrix = xt.toDblMatrix(f)
    ...
}

# Moving averages

* The simple moving average
* The weighted moving average
* The exponential moving average

<img src="http://s.hankyung.com/pdsdata/bbs3/_column_281_1/thumb/6e5b68ecb3e1515359d6ac6d0cada248" width=300 />

<img src="https://i-msdn.sec.s-msft.com/dynimg/IC394321.gif" />

<img src="figures/tip3.2.png" width=600 />

## The simple moving average

<img src="figures/tip3.3.png" width=600 />

In [None]:
abstract class MovingAverage[T <% Double] extends
PipeOperator[XTSeries[T], XTSeries[Double]]

In [None]:
class SimpleMovingAverage[@specialized(Double) T <% Double](val period: Int)(implicit num: Numeric[T]) 
extends MovingAverage[T] {
    
    def |> : PartialFunction[XTSeries[T], XTSeries[Double]] {
        case xt: XTSeries[T] if(xt != null && xt.size > 0) => {
            val slider = xt.take(data.size-period)
                            .zip(data.drop(period)) //1
            val a0 = xt.take(period).toArray.sum/period //2 var a: Double = a0
            val z = Array[Array[Double]](
                Array.fill(period)(0.0), a, slider.map(x => {
                                    a += (x._2 - x._1)/period
                                        a})
            ).flatten //3
            XTSeries[Double](z)
        }
    }
}

* The class has a type T and is specialized for the Double type for faster processing. The implicitly defined num: Numeric[T] is required by the arithmetic operators sum and/(line2).

<img src="figures/fig3.2.png" width=600 />   

## The weighted moving average

<img src="figures/tip3.4.png" width=600 />

In [None]:
class WeightedMovingAverage[@specialized(Double) T <% Double](val weights: DblVector) 
extends MovingAverage[T] {
    def |> : PartialFunction[XTSeries[T], XTSeries[Double]] = {
        case xt: XTSeries[T] if(xt != null && xt.size > 1) => {
            val smoothed = Range(weights.size, xt.size).map(i => { 
                xt.toArray.slice(i- weights.size , i)
                          .zip(weights)
                          .foldLeft(0.0)((s, x) => s + x._1*x._2) }) //1 
            XTSeries[Double](Array.fill(weights.size)(0.0) ++ smoothed) //2
        } 
    }
}

## The exponential moving average

<img src="figures/tip3.5.png" width=600 />

In [None]:
class ExpMovingAverage[@specialized(Double) T <% Double](val alpha: Double) 
extends MovingAverage[T] {
    def |> : PartialFunction[XTSeries[T], XTSeries[Double]] = {
        case xt: XTSeries[T] if(xt != null && xt.size > 1) => {
            val alpha_1 = 1-alpha
            var y: Double = data(0)
            xt.map( x => {
                val z = x*alpha + y*alpha_1; y=z; z })
       }
    } 
}

In [None]:
def apply[T <% Double](nyquist: Int): ExpMovingAverage[T] = new 
ExpMovingAverage[T](2/( nyquist + 1))

In [None]:
val p_2 = p >>1
val w = Array.tabulate(p)(n =>if(n==p_2) 1.0 else 1.0/(Math. abs(n-p_2)+1)) //1
val weights = w map { _ / w.sum } //2
val src = DataSource("resources/data/chap3/BAC.csv", false)//3
val price = src |> YahooFinancials.adjClose //4 val sMvAve = SimpleMovingAverage(p)
val wMvAve = WeightedMovingAverage(weights)
val eMvAve = ExpMovingAverage(p)

val results = price :: sMvAve.|>(price) :: wMvAve.|>(price) :: eMvAve.|>(price) :: List[XTSeries[Double]]() //5
val outFile = "output/chap3/mvaverage" + p.toString + ".csv" 
DataSink[Double]( outFile) |> results //6

<img src="figures/fig3.3.png" width=600 /> 

<img src="figures/fig3.4.png" width=600 />

<img src="figures/fig3.5.png" width=600 />

# Fourier analysis

* Discrete Fourier transform (DFT)
* DFT-based filtering
* Detection of market cycles

<img src="figures/tip3.6.png" width=600 /> 

## Discrete Fourier transform (DFT)

<img src="figures/tip3.7.png" width=600 />

<img src="figures/tip3.8.png" width=600 />

In [None]:
trait DTransform[T] 
extends PipeOperator[XTSeries[T], XTSeries[Double]] {
    def padSize(xtSz: Int, even: Boolean=true): Int = {
        val sz = if( even ) xtSz else xtSz-1
        if( (sz & (sz-1)) == 0) 0
        else {
           var bitPos = 0
           do {
           bitPos += 1
           } while( (sz >> bitPos) > 0)
           (if(even) (1<<bitPos) else (1<<bitPos)+1) - xtSz
        } 
    }

    def pad(xt: XTSeries[T], even: Boolean=true) 
        (implicit f: T => Double): DblVector = {
        val newSize = padSize(xt.size, even)
        val arr: DblVector = xt
        if( newSize > 0) arr ++ Array.fill(newSize)(0.0) else arr
￼    } 
}

<img src="figures/tip3.9.png" width=600 /> 

In [None]:
class DFT[@specialized(Double) T<%Double] 
extends DTransform[T] { def |> : PartialFunction[XTSeries[T], XTSeries[Double]] = {
       case xt: XTSeries[T] if(xt != null && xt.length > 0) =>
         XTSeries[Double](fwrd(xt)._2)
}
def fwrd(xt:XTSeries[T]): (RealTransformer, DblVector)= {
val rdt = if(Math.abs(xt.head) < DFT_EPS)
new FastSineTransformer(DstNormalization.STANDARD_DST_I)
else new FastCosineTransformer(DctNormalization.STANDARD_DCT_I)
        (rdt, rdt.transform( pad(xt,xt.head==0.0),TransformType.FORWARD))
      }
}

<img src="figures/tip3.10.png" width=600 />

In [None]:
val _T= 1.0/1024
val h = (x:Double) =>2.0*Math.cos(2.0*Math.PI*_T*x) +
Math.cos(5.0*Math.PI*_T*x) + Math.cos(15.0*Math.PI*_T*x)/3

<img src="figures/fig3.6.png" width=600 />

In [None]:
val rawOut = "output/chap3/raw.csv"
val smoothedOut = "output/chap3/smoothed.csv" 
val values = Array.tabulate(1025)(x =>h(x/1025)) 
DataSink[Double](rawOut) |> values //1

val smoothed = DFT[Double] |> XTSeries[Double](values) //2
DataSink[Double]("output/chap3/smoothed.csv") |> smoothed

<img src="figures/tip3.11.png" width=600 /> 

<img src="figures/tip3.12.png" width=600 /> 

<img src="figures/tip3.13.png" width=600 /> 

## DFT-based filtering

<img src="figures/tip3.14.png" width=600 />

<img src="figures/fig3.7.png" width=600 />

In [None]:
def sinc(f: Double, fC: Double): Double = if(Math.abs(f) < fC) 1.0 else 0.0
def sinc2(f: Double, fC: Double): Double = if(f*f < fC) 1.0 else 0.0

In [None]:
class DFTFir[T <% Double](val g: (Double, Double) =>Double, val fC; Double) 
extends DFT[T]

In [None]:
def |> : PartialFunction[XTSeries[T], XTSeries[Double]] = {
    case xt: XTSeries[T] if(xt != null && xt.size > 2) => {
        val spectrum = fwrd(xt) //1
        val cutOff = fC*spectrum._2.sizeval filtered = spectrum._2.zipWithIndex.map(x => x._1*g(x._2, cutOff)) //2
        XTSeries[Double](spectrum._1.transform(filtered, TransformType.INVERSE)) //3
    }
    val filtered = spectrum._2.zipWithIndex.map(x => x._1*g(x._2, cutOff)) //2
    XTSeries[Double](spectrum._1.transform(filtered, TransformType.INVERSE)) //3
}

In [None]:
val price = src |> YahooFinancials.adjClose
val filter = new DFTFir[Double](sinc, 4.0)
val filteredPrice = filter |> price

<img src="figures/fig3.8.png" width=600 /> 

## Detection of market cycles

<img src="figures/fig3.9.png" width=600 />

<img src="figures/fig3.10.png" width=600 /> 

In [None]:
def sinc(f: Double, w: (Double, Double)): Double = 
    if(Math.abs(f) > w._1 && Math.abs(f) < w._2) 1.0 else 0.0

<img src="figures/fig3.11.png" width=600 />

<img src="figures/fig3.12.png" width=600 />  

<img src="figures/fig3.13.png" width=600 />  

# The Kalman filter

* The state space estimation
    - The transition equation
    - The measurement equation
* The recursive algorithm
    - Prediction
    - Correction
    - Kalman smoothing
    - Experimentation

<img src="figures/tip3.15.png" width=600 />    

<img src="figures/tip3.16.png" width=600 />

## The state space estimation

### The transition equation

<img src="figures/eq3.1.png" width=600 />  

<img src="figures/tip3.17.png" width=600 />

### The measurement equation

<img src="figures/eq3.2.png" width=600 />  

## The recursive algorithm

<img src="figures/fig3.14.png" width=600 />

<img src="figures/tip3.18.png" width=600 />

In [None]:
type DblMatrix = Array[Array[Double]] 

type DblVector = Array[Double]

implicit def double2RealMatrix(x: DblMatrix): RealMatrix = 
    new Array2DRowRealMatrix(x)

implicit def double2RealRow(x: DblVector): RealMatrix = 
    new Array2DRowRealMatrix(x)

implicit def double2RealVector(x: DblVector): RealVector = 
    new ArrayRealVector(x)

In [None]:
class QRNoise(qr: XY, white: Double=> Double) { 
    def q = white(qr._1)
    def r = white(qr._2)
    def noisyQ = Array[Double](q,q)
    def noisyR = Array[Double](r,r)
}

In [None]:
class DKalman(A:DblMatrix, B:DblMatrix, H:DblMatrix, P:DblMatrix) 
(implicit val qrNoise: QRNoise) extends PipeOperator[XY,XY] {

    val Q = new DblMatrix(A.size).map(_ => Array.fill(A.size)(qrNoise. qr.1))
    
    var x: RealVector = _

    var filter: KalmanFilter =_ 
}

### Prediction

<img src="figures/eq3.3.png" width=600 />  

<img src="figures/tip3.19.png" width=600 /> 

<img src="figures/tip3.20.png" width=600 /> 

In [None]:
x = A.operate(x).add(qrNoise.noisyQ)

### Correction

<img src="figures/eq3.4.png" width=600 />

<img src="figures/tip3.21.png" width=600 /> 

### Kalman smoothing

In [None]:
def |> : PartialFunction[XTSeries[XY], XTSeries[XY]] = {
    case xt: XTSeries[XY] if(xt.size> 0) => xt.map( y => {
        initialize(Array[Double](y._1, y._2)) //1
        val nState = newState  //2
        (nState(0), nState(1)) }) //3
    ...

<img src="figures/tip3.22.png" width=600 />

<img src="figures/eq3.5.png" width=600 />

In [None]:
val PROCESS_NOISE_Q = 0.03 
val PROCESS_NOISE_R = 0.1 
val MEASUREMENT_NOISE = 0.4

def newState: DblVector = {
    Range(0, maxIters) foreach( _ => {
       filter.predict  //1
       val w = qrNoise.create(PROCESS_NOISE_Q, PROCESS_NOISE_R)
       x = A.operate(x).add(qrNoise.noisyQ) //2
       val v = qrNoise.create(MEASUREMENT_NOISE)
       val z = H.operate(x).add(qrNoise.noisyR) //3
       filter.correct(z) // 4
    })
    filter.getStateEstimation
}

<img src="figures/tip3.23.png" width=600 /> 

### Experimentation

In [None]:
implicit val qrNoise = 
    QRNoise((0.2, 0.4), 
            (m: Double) => m * (new Random(System.currentTimeMillis)).nextGaussian
           ) //1

val A: DblMatrix = ((0.9, 0.0), (0.0, 0.1))
val B: DblMatrix = (0.0, 0.0)
val H: DblMatrix  = (1.0, 1.0)
val P0: DblMatrix = ((0.4, 0.5), (0.4, 0.5))
val x0: DblVector = (175.0, 175.0)

val dKalman = new DKalman(A, B, H, P0) //2 val output = "output/chap3/kalman.csv"
val zt_1 = zSeries.drop(1)
val zt = zSeries.take(zSeries.size-1)
val filtered = dKalman |> XTSeries[(Double, Double)](zt_1.zip(zt)) //3 
DataSink[Double](output) |> filtered.map(_._1) //4

<img src="figures/tip3.24.png" width=600 />

<img src="figures/fig3.15.png" width=600 />

<img src="figures/fig3.16.png" width=600 /> 

<img src="figures/fig3.17.png" width=600 />    

# Alternative preprocessing techniques

# Summary

# 참고자료

* [1] Scala for Machine Learning - https://www.packtpub.com/big-data-and-business-intelligence/scala-machine-learning
* [2] IntelliJ 다운로드 - https://www.jetbrains.com/idea/download/
* [3] 윈도우즈에서 IntelliJ IDEA + Scala 개발환경 만들기기 - http://webnautes.tistory.com/456
* [4] 저자 소스 - https://github.com/prnicolas/ScalaMl.git