# Data preprocess Pipeline

The purpose of this data process pipeline is to generate a DataFrame for FATP return board machine learning modeling.

## Preprocess steps
Step 1. Download necessary data (using HIVE) and tranform to DataFrame <br />
    - FCT from test log csv (1 week)
    - GateKeeper from Bobcat (2 weeks)
    - RPC (4 weeks)
Step 2. Join and filter data (X) <br />
Step 3. Filter RPC data (y) <br />
Step 4. Extract FCT test values (FCT['items']) and store it as a separate DataFrame <br />
Step 5. Missing value handling before data scalling <br />
Step 6. Data Scalling (Normalization, Max-Min Scalling) <br />
Step 7. Missing value handling after data scalling <br />

### Step 1. Download necessary data and tranform to DataFrame

#### a. ssh log in server **(10.195.223.53)** and download test log data with user specified date and period with HIVE, e.g station = **FCT**, date = 2015-10-26, period = 6 days (a week)

In [None]:
!ssh mlb@10.195.223.53 "hive -e \"use cpk; select * from mlb_test_log_detail \
                        where station = 'FCT'\
                        and model = 'N71'\
                        and hour between '2015-02-19_00' and '2015-10-31_23';\"" \
                        > Data/FCT_20151026.log

#### b. Download **BOBCAT** data with same date and period.

In [None]:
!ssh mlb@10.195.223.53 "hive -e \"use cpk; select * from mlb_bobcat_raw \
                        where model = 'Agera'\
                        and day between '2015-10-26' and '2015-11-07';\"" \
                        > Data/Bobcat_20151026.log

#### c. Download RPC data with same starting date but period = 4 weeks

In [None]:
!ssh mlb@zz2 "hive -e \"use cpk; select * from rpc_file\
                        where day between '2015-10-26' and '2015-11-21';\"" \
                        > Data/rpc_20151026.log

## Step 2. Transform downloaded data to DataFrame

In the following steps, we will use SparkSQL DataFrame to preprocess data

In [None]:
#Import necessary libraries

import findspark
findspark.init('/Users/hadoop1/srv/spark')
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext, Row
import pandas as pd
sc = pyspark.SparkContext()
hc = HiveContext(sc)



In [None]:
fctLines = sc.textFile("Data/FCT_20151026.log")

In [None]:
fctParts = fctLines.map(lambda l: l.split("\t"))

In [None]:
fctBoard = fctParts.map(lambda p: Row(serial_number=p[0], test_result=p[1], fct_test_time=p[2],\
                                     version=p[3],line=p[4],machine=p[5],\
                                     slot=p[7],items=p[8],model=p[10],station=p[11]))

In [None]:
fctBoardDf = hc.createDataFrame(fctBoard)

**b. Bobcat **

0	wip_no	string	
1	test_time	string	
2	test_hour	string	
3	is_test_fail	string	
4	symptom_code	string	
5	symptom_code_first	string	
6	factory	string	
7	station	string	
8	station_code	string	
9	line	string	
10	machine	string	
11	line_type	string	
12	test_times	int	3
13	rankno	int	
14	fail_count	int	
15	test_result	string	
16	symptom	string	
17	day	string	
18	model	string

In [None]:
bobcatLines = sc.textFile('Data/Bobcat_20151026.log')

In [None]:
bobcatParts = bobcatLines.map(lambda l: l.split('\t'))

In [None]:
bobcatRows = bobcatParts.map(lambda p: Row(sympton = p[4], serial_num = p[0],test_time=p[1],station=p[7],test_result=p[15]))

In [None]:
bobcatDf = hc.createDataFrame(bobcatRows)

**c. RPC Data**


 	Name	Type	Comment
0	namec	string	
1	station_code	string	
2	serial_num	string	
3	add_date	string	
4	emp	string	
5	station_type	string	
6	fail_location	string	
7	code	string	
8	desce	string	
9	other	string	
10	day	string

In [None]:
rpcLines = sc.textFile('Data/rpc_20151026.log')

In [None]:
rpcParts = rpcLines.map(lambda l: l.split('\t'))

In [None]:
rpcRows = rpcParts.map(lambda p: Row(namec = p[0],serial_num=p[2], add_date = p[3], emp = p[4],\
                                     fail_location=p[6], code = p[7], desce = p[8], day = p[10],\
                                     ))

In [None]:
rpcDf = hc.createDataFrame(rpcRows)

### Step 3. Join and filter data (X)
Now we have 3 dataframes, fctBoardDf, bobcatDf and rpcDf.

In [None]:
fctBoardDf

In [None]:
bobcatDf

In [None]:
rpcDf

And 3 temp tables for sql 

In [None]:
fctBoardDf.registerTempTable("fctBoardDfTemp")

In [None]:
bobcatDf.registerTempTable("bobcatDfTemp")

In [None]:
rpcDf.registerTempTable("rpcDfTemp")

### a. Join FCT ['test_result']==pass and Bobcat GATEKEEPER ['test_result']==First Pass I ['test_result']==Retest Pass  DataFrames on serial_num

First filter fctBoardDf DF with only PASS results, and verify its numbers.

In [None]:
fctBoardPassDf = fctBoardDf.filter(fctBoardDf.test_result == 'PASS')

In [None]:
fctBoardFailDf = fctBoardDf.filter(fctBoardDf.test_result == 'FAIL')

Use bobcatGkDf and filter out serial numbers that passed 'GATEKEEPER-PREBURN' stations. This filtered DF then work as a mask for fctBoardDfPass to make sure all serial numbers are passed at the last station.

In [None]:
bobcatGkPassDf = bobcatDf.filter((bobcatDf.station == 'GATEKEEPER-PREBURN'))\
                         .filter((bobcatDf.test_result == 'First Pass') | 
                                   (bobcatDf.test_result == 'Retest Pass'))

In [None]:
fctBoardPassDf.registerTempTable("fctBoardPassDfTemp")

In [None]:
bobcatGkPassDf.registerTempTable("bobcatGKPassDfTemp")

In [None]:
fctGateKeeper = fctBoardPassDf.join(bobcatGkPassDf, fctBoardPassDf.serial_number == bobcatGkPassDf.serial_num)

fctGateKeeper, fctGateKeeperSql are DFs that contain records that have passed FCT and GateKeeper test stations. 

### b. Join fctGateKeeperSql DF with bobcatDfFctPass DF on [serial_num] and [test_start_time] columns. 

First join fctGateKeeper DF with bobcatDf DF to infer "First Pass" and "Retest Pass" information. 

In [None]:
bobcatFctFirstPassDf = bobcatDf.filter(bobcatDf.station == 'FCT')\
                           .filter(bobcatDf.test_result == 'First Pass')

In [None]:
fctGateKeeper.registerTempTable("fctGateKeeperTemp")

In [None]:
bobcatFctFirstPassDf.registerTempTable("bobcatFctFirstPassDfTemp")

In [None]:
fctGateKeeperFirstPassDf = hc.sql("select F.serial_number, F.line, F.machine,\
                                      F.model, F.slot, F.fct_test_time, F.items, B.test_result\
                                      from fctGateKeeperTemp F\
                                      inner join bobcatFctFirstPassDfTemp B\
                                      on F.serial_number =\
                                      B.serial_num and F.fct_test_time = B.test_time")

In [None]:
fctGateKeeperFirstPassDf.registerTempTable("fctGateKeeperFirstPassDfTemp")

## Add a new column specify item numbers

There are some records contain less than 799 columnn, the reason is still unknown. In order to remove thos incorrect records, we creat a column that specify the number of test items of the record and exclude records according to item number criteria, e.g, item number > 799. 

The functions should be import from pyspark.sql, then we can use functions.udf to defince User Defined Functions. 

In [None]:
from pyspark.sql import functions

When defining user defiend funciton, the type of the returned value should be specified beforehand, therefore the IntegerType shold also be imported.

In [None]:
from pyspark.sql.types import IntegerType

In [None]:
import ast

In [None]:
def itemToNum(items):
    dic = ast.literal_eval(items)
    return len(dic)

In [None]:
sparkItemToNum = functions.udf(lambda items: len(ast.literal_eval(items)), IntegerType())

In [None]:
fctGateKeeperFirstPassItemnumDf = fctGateKeeperFirstPassDf.withColumn('item_num',\
                                  sparkItemToNum(fctGateKeeperFirstPassDf.items))

In [None]:
fctGateKeeperFirstPass799Df = fctGateKeeperFirstPassItemnumDf.filter\
                        (fctGateKeeperFirstPassItemnumDf.item_num == 799)

In [None]:
fctGateKeeperFirstPass799Df.registerTempTable("fctGateKeeperFirstPass799DfTemp")

In [None]:
fctGatekeeperAllpass799 = hc.sql("select * from fctGateKeeperFirstPass799DfTemp F\
                               left outer join rpcDfTemp R\
                               on F.serial_number = R.serial_num\
                               where R.serial_num is null")

In [None]:
fctGatekeeperCRBFail799 = hc.sql("select * from fctGateKeeperFirstPass799DfTemp F\
                            left semi join rpcDfTemp R\
                            on F.serial_number = R.serial_num\
                            and R.namec = 'CRB Check In'")

### Step 4. Filter RPC data (y)  ------ Pending

#### a. Filter RPC DataFrame by ['namec'] column and separate ['namec']=='TFB Check In' and ['namec']=='CRB Check In'. We need only 'CRB Check In' records. 

#### b. Filter RPC records by 'NTF' and 'Replaced'.

#### c. Filter 'Replaced' records by 'AP' and 'RF'

Note: AP-Application, RF-Radio Frequency
      FCT test items are mainly for AP. 

#### d. Filter RPC DataFrame by ['serial_num'].isin(above DataFrame['serial_num])

#### e. Join above DataFrame with RPC DataFrame to identify FATP return records. 

### Step 5. Extract FCT test values (FCT['items']) and store it as a separate DataFrame

#### a. The test values of FCT test station are stored as mapped file within FCT['items'] column. Extract these values and stored them as a separate DataFrame. Rename columns of original DataFrame and stored as a metadata DataFrame.

**At this point, we have joined and generated 4 dataframes:**
- fctGatekeeperAllpass(223041)
- fctGatekeeperCRBFail(3126)

- fctGatekeeperAllpass799(222836)
- fctGatekeeperCRBFail799(3117)

Both dataframes have 1 column called 'items' that contain FCT test log values, that will be used for building anomaly detection model and for 異常群集辨識. We will first extract FCT test log values from this 2 dataframes and stored them as a separate dataframe.  

In [None]:
from pyspark.sql import functions as F

In [None]:
import ast

In [None]:
def dic_to_row(record):
    schema = {'{i:s}'.format(i = key):record[key] for key in record}
    return Row(**schema)

In [None]:
def toCSVLine(data):
    return ','.join(str(d) for d in data)

In [None]:
itemsCRBFailRow = fctGatekeeperCRBFail799.map(lambda row: row.items)\
                                .map(lambda s: ast.literal_eval(s))\
                                .map(lambda d: dic_to_row(d))

In [None]:
itemsCRBFailDf = hc.createDataFrame(itemsCRBFailRow)

In [None]:
itemsCRBFailDf = itemsCRBFailDf.withColumn('fatp_return', F.lit(1))

In [None]:
itemsCRBFailDf.take(1)

In [None]:
itemsCRBFailValues = itemsCRBFailDf.map(lambda r: r.asDict())\
                                   .map(lambda d: d.values())

In [None]:
itemsCRBFailValues.take(1)

In [None]:
itemsCRBFailValues = itemsCRBFailDf.map(lambda r: r.asDict())\
                                   .map(lambda d: d.values())\
                                   .map(toCSVLine)

In [None]:
test = itemsCRBFailValues.collect()

In [None]:
len(test)

In [None]:
itemsAllpassRow = fctGatekeeperAllpass799.map(lambda row: row.items)\
                                      .map(lambda items: ast.literal_eval(items))\
                                      .map(lambda dic: dic_to_row(dic))

In [None]:
itemsAllpassDf = hc.createDataFrame(itemsAllpassRow)

In [None]:
itemsAllpassDf = itemsAllpassDf.withColumn('fatp_return', F.lit(0))

In [None]:
itemsAllpassValues = itemsAllpassDf.map(lambda r: r.asDict())\
                                   .map(lambda d: d.values())\
                                   .map(toCSVLine)

In [None]:
allPass = itemsAllpassValues.collect()

In [None]:
itemsAllpassDf.take(1)

In [None]:
itemsAllpassPdf = itemsAllpassDf.toPandas()

** At this point, we have separate test log as a different dataframe, therefore now we have 4 dataframes:**

(without excluding S/N having less than 799 items) 
1. fctGatekeeperAllpass (223041)
2. itemsAllpassDf (223041) 
3. fctGatekeeperCRBFail (3126)
4. itemsCRBFailDf (3126)

(after excluding S/N having less than 799 items)
1. fctGatekeeperAllpass799 (222836)
2. itemsAllpassDf (222836)
3. fctGatekeeperCRBFail799 (3117)
4. itemsCRBFailDf (3117)

### Step 6. Missing value handling before data scalling

#### a. Examine missing value.

### Step 7. Data Scalling (Normalization, Max-Min Scalling)

#### a. Rescale FCT test log DataFrame with normalization and max-min scalling.

### Step 8. Missing value handling after data scalling

#### a. Replace missing value with column.min()

Note: Most of the observed missing value were due to incorrect scientific notation. Whenver the number is too small, the scientific notation will be displayed incorrectly, therefore we take min() value of each column to fill out missing values. 