# 特徵工程

在機器學習中，一個「特徵」指的是一個「被觀察的現象之可測量屬性」，而該觀測現象之所有特徵的集合又稱為該現象的「表徵」。
從原始資料中抽取特徵的過程稱之為「特徵工程」。特徵工程的目的，是為了從原始資料中，轉換出適合機器學習算法的表徵。

特徵工程通常需要一定程度的領域知識，並可分為下列階段 [[1]]($ref_1)：
1. 思考可能的特徵
2. 決定該抽取哪些特徵
3. 抽取特徵
4. 研究特徵如何影響模型的預測能力
5. 重複 1~4 直到模型可以被接受

In [1]:
import pyspark
from functools import reduce
from pyspark.sql import SparkSession, SQLContext, Window
import pyspark.sql.functions as F
from pyspark.storagelevel import StorageLevel
from os import path

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1545316055964_0002,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


請使用自己的 S3 存貯體：`s3a://<YOUR_BUCKET>`

In [2]:
data_root_dir = 's3a://pdm-sagemaker-dac43ec6047111e995b9d4619d3b01d6'
telemetry_dir = data_root_dir + '/static/telemetry'
input_logs_dir = data_root_dir + '/static/logs'

In [18]:
import pandas as pd
import matplotlib
matplotlib.use('agg',warn=False, force=True)
import matplotlib.pyplot as plt

matplotlib.style.use('ggplot')

## 讀取資料

In [4]:
sc = SparkSession.builder.getOrCreate()
sqlContext = SQLContext(sc)

telemetry_input_df = sqlContext.read.parquet(telemetry_dir)
logs_input_df = sqlContext.read.parquet(input_logs_dir)

telemetry_input_df.printSchema()
logs_input_df.printSchema()

telemetry_entries_count = telemetry_input_df.count()
log_entries_count = logs_input_df.count()

print('Total:\n\n{0:10d} telemetry entries\n{1:10d} log entries'.format(
    telemetry_entries_count, log_entries_count))

root
 |-- timestamp: timestamp (nullable = true)
 |-- ambient_pressure: double (nullable = true)
 |-- ambient_temperature: double (nullable = true)
 |-- machineID: string (nullable = true)
 |-- pressure: double (nullable = true)
 |-- speed: double (nullable = true)
 |-- speed_desired: long (nullable = true)
 |-- temperature: double (nullable = true)

root
 |-- timestamp: timestamp (nullable = true)
 |-- code: string (nullable = true)
 |-- level: string (nullable = true)
 |-- machineID: string (nullable = true)

Total:

  13043552 telemetry entries
       537 log entries

## 資料探索

讓我們先回答幾個關於資料集的基本問題

### Q: 有多少不同的機器產生了遙測資料？

In [19]:
distinct_machines_df = telemetry_input_df.select('machineID').distinct()
machine_count = distinct_machines_df.count()
print('{0} machine(s)'.format(machine_count))

1000 machine(s)

### Q: 遙測資料橫跨了多長的時間區段？

In [6]:
telemetry_input_df.select(
    F.min('timestamp').alias('start'), F.max('timestamp').alias('end')
).withColumn(
    'duration (days)',
    F.datediff(F.col('end'), F.col('start'))
).show()

+-------------------+-------------------+---------------+
|              start|                end|duration (days)|
+-------------------+-------------------+---------------+
|2017-05-15 00:02:00|2017-08-14 22:57:59|             91|
+-------------------+-------------------+---------------+

### Q: 共有哪些類型的日誌事件？

In [7]:
logs_input_df.select('level').groupBy('level').agg(
    F.count('level')).show()

+--------+------------+
|   level|count(level)|
+--------+------------+
|    INFO|         265|
|CRITICAL|         272|
+--------+------------+

日誌事件類型遵循下列規範：

* **INFO**: 
非錯誤的紀錄。除此之外，也包含關於排程/非排程的修復程序。
* **WARNING**: 
非嚴重，但可能有危害，或異常的狀況。
* **CRITICAL**:
不可回復的狀況，需要額外干預（修補）。

### Q: 什麼類型的 CRITICAL 事件（故障）被記錄在日誌中？

In [17]:
failures_df = logs_input_df.where(F.col('level') == 'CRITICAL')

failures_df.select('code').groupBy('code').agg(
    F.count(F.lit(1)).alias('count')
).toPandas()

  code  count
0   F2     77
1   F1    195

### Q: 每部機台發生多少次故障？

In [20]:
machine_failures_df = distinct_machines_df.join(
    failures_df, 'machineID', 'left_outer'
).groupBy('machineID').agg(F.count('code').alias('count'))

mf_pandas = machine_failures_df.toPandas()
mf_pandas.columns = ['# Of Machines', 'Failure Count']
mf_pandas.groupby('Failure Count').count()

               # Of Machines
Failure Count               
0                        734
1                        260
2                          6

### Q: 單一機台的原始資料流有哪些特性？
#### 敘述統計

In [21]:
# get ID of one of the most "faulty" machines
faultyMachineID = machine_failures_df.orderBy(F.desc('count')).first().machineID

faulty_machine_sequence_df = telemetry_input_df.where(
    telemetry_input_df.machineID == faultyMachineID
).orderBy(telemetry_input_df.timestamp)

faulty_machine_sequence_pdf = faulty_machine_sequence_df.toPandas()
faulty_machine_sequence_pdf.set_index('timestamp', inplace=True)
print('Descriptive statistics of "faulty" machine {}'.format(faultyMachineID))
faulty_machine_sequence_pdf.describe()

Descriptive statistics of "faulty" machine M_0776
       ambient_pressure      ...        temperature
count      13420.000000      ...       13420.000000
mean         100.999055      ...         141.870811
std            0.058138      ...           6.758752
min          100.900000      ...          24.520000
25%          100.950000      ...         135.720000
50%          101.000000      ...         138.190000
75%          101.050000      ...         147.800000
max          101.100000      ...         154.400000

[8 rows x 6 columns]

為了比較，這裡列出 **健康** 機台的敘述統計

In [22]:
# get ID of one of the "healthy" machines
healthyMachineID = machine_failures_df.orderBy(F.asc('count')).first().machineID

healthy_machine_sequence_df = telemetry_input_df.where(
    telemetry_input_df.machineID == healthyMachineID
).orderBy(telemetry_input_df.timestamp)

healthy_machine_sequence_pdf = healthy_machine_sequence_df.toPandas()
healthy_machine_sequence_pdf.set_index('timestamp', inplace=True)
print('Descriptive statistics of "healthy" machine {}'.format(healthyMachineID))
healthy_machine_sequence_pdf.describe()

Descriptive statistics of "healthy" machine M_0830
       ambient_pressure      ...        temperature
count      12349.000000      ...       12349.000000
mean         101.000020      ...         137.494886
std            0.058062      ...           2.709765
min          100.900000      ...          23.060000
25%          100.950000      ...         136.980000
50%          101.000000      ...         137.520000
75%          101.050000      ...         138.140000
max          101.100000      ...         138.940000

[8 rows x 6 columns]

#### 時間特性

In [23]:
# Data sample
print('Data sample of "faulty" machine {}'.format(faultyMachineID))
faulty_machine_sequence_pdf.head()

Data sample of "faulty" machine M_0776
                     ambient_pressure     ...       temperature
timestamp                                 ...                  
2017-05-15 10:10:00            100.93     ...             24.52
2017-05-15 10:10:01            100.97     ...             30.35
2017-05-15 10:10:02            100.97     ...             37.67
2017-05-15 10:10:03            101.10     ...             46.72
2017-05-15 10:10:04            101.00     ...             57.49

[5 rows x 7 columns]

In [25]:
# Occurence of entries in the "faulty" telemetry stream over time

# faulty_machine_sequence_pdf.apply(lambda x: 1, axis=1).plot(
#     title='Telemetry stream of "faulty" machine {0}'.format(faultyMachineID), style='.', yticks=[])
# plt.show()

faulty_machine_sequence_pdf.apply(lambda x: 1, axis=1)

timestamp
2017-05-15 10:10:00    1
2017-05-15 10:10:01    1
2017-05-15 10:10:02    1
2017-05-15 10:10:03    1
2017-05-15 10:10:04    1
2017-05-15 10:10:05    1
2017-05-15 10:10:06    1
2017-05-15 10:10:07    1
2017-05-15 10:10:08    1
2017-05-15 10:10:09    1
2017-05-15 10:10:10    1
2017-05-15 10:10:11    1
2017-05-15 10:10:12    1
2017-05-15 10:10:13    1
2017-05-15 10:10:14    1
2017-05-15 10:10:15    1
2017-05-15 10:10:16    1
2017-05-15 10:10:17    1
2017-05-15 10:10:18    1
2017-05-15 10:10:19    1
2017-05-15 10:10:20    1
2017-05-15 10:10:21    1
2017-05-15 10:10:22    1
2017-05-15 10:10:23    1
2017-05-15 10:10:24    1
2017-05-15 10:10:25    1
2017-05-15 10:10:26    1
2017-05-15 10:10:27    1
2017-05-15 10:10:28    1
2017-05-15 10:10:29    1
                      ..
2017-08-14 14:27:30    1
2017-08-14 14:27:31    1
2017-08-14 14:27:32    1
2017-08-14 14:27:33    1
2017-08-14 14:27:34    1
2017-08-14 14:27:35    1
2017-08-14 14:27:36    1
2017-08-14 14:27:37    1
2017-08-14 14:2

In [26]:
# Telemetry frequency
# ('diffs' are computed by subtracting the previous timestamp
# from the current one for each telemetry entry)

diffs = faulty_machine_sequence_pdf.index.to_series().diff()
mode_interval = diffs.mode()[0] # mode of the time intervals
print('Telemetry frequency: {}'.format(mode_interval))

Telemetry frequency: 0 days 00:00:01

In [27]:
# Other, less common, intervals between telemetry entries (minutes)

# gaps = diffs[diffs != mode_interval].dt.seconds/60
# gaps.hist()
# plt.title('telemetry frequency of "faulty" machine {}'.format(faultyMachineID))
# plt.xlabel('Time [s]')
# plt.ylabel('Count')
# plt.show()

diffs[diffs != mode_interval].dt.seconds/60

timestamp
2017-05-15 10:10:00            NaN
2017-05-19 03:42:00    1048.016667
2017-05-22 13:47:00     601.016667
2017-05-23 01:01:00     670.016667
2017-05-24 03:17:00     134.050000
2017-05-25 04:40:00      81.016667
2017-05-25 21:04:00     979.016667
2017-05-26 21:20:00      14.016667
2017-05-29 14:23:00    1022.016667
2017-05-30 20:29:00     362.016667
2017-05-31 11:25:00     895.016667
2017-05-31 17:48:00     379.016667
2017-06-01 20:56:00     183.016667
2017-06-02 04:19:00     439.016667
2017-06-02 06:50:00     149.016667
2017-06-02 16:29:00     578.016667
2017-06-02 17:16:00      44.016667
2017-06-06 06:22:00     782.016667
2017-06-06 21:06:00     879.016667
2017-06-07 10:57:00     826.016667
2017-06-08 22:56:00     718.016667
2017-06-10 21:40:00    1361.016667
2017-06-11 15:36:00    1071.016667
2017-06-13 06:09:00     868.016667
2017-06-15 01:59:00    1189.016667
2017-06-15 10:17:00     497.016667
2017-06-15 15:28:00     310.016667
2017-06-16 14:45:00    1394.016667
2017-06-16

這表示機台循環運作後，有停滯的週期

### 了解機台活動的循環／模式

許多真實世界的機器以循環運作，一個循環可看作為時間上的週期，這可用來描述某個機台的運作狀態。例如：商用客機的渦輪風扇引擎，能被簡要的用循環描述：引擎運作（飛行時）或引擎停止（飛機著陸）。在此案例中，引擎被眼單的描述為引擎開關狀態，雖然非得這樣描述。在接下來的說明，我們將解釋為何將資料分為不同循環狀態，能有助於預測機台的情境

## 建立特徵

### 循環層級的資料聚合

原始的遙測資料量，雖然對於實時監測和故障偵測至關重要，卻無直接被用來建立預測模型。因為通常假設在一個運作循環中，突發的衰退的可能性不高。通常的做法，是在每個運作時間單位內，產生一個聚合的測量值來描述資產的健康特性。運作時間單位通常是以小時或循環數表示。

In [28]:
def aggregate_cycles(df, min_gap = 30):
    """
    Detects cycles and compresses them into 
    aggregate measurements.    
    
    :param DataFrame df: input Spark DataFrame
    :param int min_gap: seconds between cycles (mininum)
    """
    w = Window.partitionBy('machineID').orderBy('timestamp')

    # Difference from the previous record or 0 if this is the first one
    diff = F.coalesce(
        F.unix_timestamp('timestamp') - F.unix_timestamp(
            F.lag('timestamp', 1).over(w)
        ), F.lit(0))

    # 0 if diff <= 30, 1 otherwise
    indicator = (diff > min_gap).cast('integer')

    subgroup = F.sum(indicator).over(w).alias('cycle')

    return df.select("*", subgroup).groupBy('machineID', 'cycle').agg(
        F.max('speed_desired').alias('speed_desired_max'),
        F.avg('speed').alias('speed_avg'),
        F.avg('temperature').alias('temperature_avg'),
        F.max('temperature').alias('temperature_max'),
        F.avg('pressure').alias('pressure_avg'),
        F.max('pressure').alias('pressure_max'),
        F.min('timestamp').alias('cycle_start'),
        F.max('timestamp').alias('cycle_end')
    ).orderBy('cycle')

#### 在單一機台的資料集上測試

In [29]:
machine_cycles_df = aggregate_cycles(faulty_machine_sequence_df)
# machine_cycles_df[['temperature_avg',
#              'temperature_max',
#              'pressure_avg',
#              'pressure_max']].toPandas().plot(subplots=True,
#                                               title='Cycle-level measurements of "faulty" machine {}'.format(faultyMachineID))
# plt.xlabel('Cycles')
# plt.show()

machine_cycles_df[['temperature_avg',
             'temperature_max',
             'pressure_avg',
             'pressure_max']].toPandas()

    temperature_avg  temperature_max  pressure_avg  pressure_max
0        142.232000           145.42   1321.448083       1452.26
1        145.428167           145.63   1317.345417       1451.22
2        145.642417           145.84   1316.304542       1449.43
3        145.810593           145.95   1223.996525       1446.07
4        145.910833           146.06   1202.359583       1446.14
5        146.109400           146.34   1335.937167       1445.74
6        146.320250           146.47   1199.993167       1443.10
7        146.400833           146.53    975.036000       1442.22
8        146.555250           146.77   1309.325458       1442.44
9        146.719167           146.82    975.355167       1440.55
10       146.872333           147.09   1307.604250       1440.00
11       147.178067           147.42   1327.656600       1438.49
12       147.490792           147.71   1305.361792       1436.75
13       147.709500           147.87   1190.611250       1434.73
14       147.819333      

為了比較，這裡展示 **健康** 機器的循環級別測量

In [30]:
machine_cycles_df = aggregate_cycles(healthy_machine_sequence_df)
# machine_cycles_df[['temperature_avg',
#              'temperature_max',
#              'pressure_avg',
#              'pressure_max']].toPandas().plot(subplots=True,
#                                               title='Cycle-level measurements of "healthy" machine {}'.format(healthyMachineID))
# plt.xlabel('Cycles')
# plt.show()

machine_cycles_df[['temperature_avg',
             'temperature_max',
             'pressure_avg',
             'pressure_max']].toPandas()

    temperature_avg  temperature_max  pressure_avg  pressure_max
0        120.626833           136.61   1065.785833       1525.05
1        136.512167           136.61   1063.700833       1520.28
2        136.548458           136.65   1397.339292       1527.57
3        136.586667           136.69   1398.044208       1526.54
4        136.611867           136.73   1418.521333       1525.91
5        136.660444           136.76   1357.716111       1524.16
6        136.686944           136.78   1357.656167       1523.18
7        136.687500           136.80   1062.720333       1519.62
8        136.718500           136.81   1061.098500       1521.45
9        136.722000           136.82   1282.484333       1522.50
10       136.757933           136.87   1414.681000       1521.92
11       136.793833           136.90   1352.931278       1520.63
12       136.821944           136.93   1352.881778       1519.99
13       136.849611           136.95   1352.821667       1519.08
14       136.875750      

#### 對整個資料及進行轉換

In [31]:
cycles_df = aggregate_cycles(telemetry_input_df)
print('Total: {0} cycle(s)'.format(cycles_df.count()))

Total: 72563 cycle(s)

### 標註

在日誌中，故障事件前的機台循環能根據在序列的位置，而被視為「剩餘有用生命 (RUL) 」標註。例如，在一個故障前的循環能被設為 RUL=1，且其之前的循環可設為 RUL=2，依此類推

RUL 能以如下方式轉換為多類別標註：

$$
L_i = \left\{
\begin{array}{ll}
      F_i & rul_i\leq w \\
      \emptyset & \text{otherwise} \\
\end{array} 
\right.
$$

其中 $F_i$ 是某類結束運作循環序列的故障；$rul_i$ 是 RUL 值，$w$ 是未來的水平

In [32]:
# Calculating failure intervals

w = Window.partitionBy('machineID', 'level').orderBy('timestamp')
# the period between consequtive failures,
# or "since the beginning of time" if no previous failure record is on file
diff = F.coalesce(F.lag('timestamp', 1).over(w), F.to_timestamp(F.lit('2000-01-01 00:00:00')))

failure_intervals_df = (failures_df
                .withColumn('last_failure_timestamp', diff)
                .withColumnRenamed('timestamp', 'failure_timestamp')
                .withColumnRenamed('code', 'upcoming_failure')
                .drop('level'))

#### 所選機台的故障間隔

In [33]:
machine_failure_intervals_df = failure_intervals_df.where(
    F.col('machineID') == faultyMachineID)
machine_failure_intervals_df.toPandas()

    failure_timestamp          ...           last_failure_timestamp
0 2017-06-22 03:56:09          ...              2000-01-01 00:00:00
1 2017-08-03 12:28:39          ...              2017-06-22 03:56:09

[2 rows x 4 columns]

#### 給所有循環標註

此外，為每個運作直到故障的循環序列，以及沒有發生故障的循環序列，都生成一個獨一的 ID

In [34]:
w = 7 # future horizon (cycles)

labeled_cycles_df = (
    cycles_df.join(failure_intervals_df,
                   (cycles_df.machineID == failure_intervals_df.machineID) &
                   (cycles_df.cycle_start >= failure_intervals_df.last_failure_timestamp) &
                   (cycles_df.cycle_end <= failure_intervals_df.failure_timestamp),
                   'left_outer')
    .drop(failure_intervals_df.machineID)
    .drop(cycles_df.cycle_end)
    .drop(failure_intervals_df.last_failure_timestamp)
    .withColumnRenamed('cycle_start', 'timestamp')
    .withColumn(
     'rul',                         
      F.when(F.col('upcoming_failure').isNull(), None).otherwise(
         F.row_number().over(Window.partitionBy('machineID', 'failure_timestamp')
                         .orderBy(F.desc('cycle')))))
    .withColumn(
     'sequenceID',
     F.dense_rank().over(Window.partitionBy('machineID')
                   .orderBy(F.desc('upcoming_failure'), 'failure_timestamp')))
    .drop(failure_intervals_df.failure_timestamp)
    .withColumn('immediate_failure',
                F.when(F.col('rul').isNotNull() & (F.col('rul') < w),
                       F.col('upcoming_failure'))))

#### 所選機台的故障間隔與序列

In [35]:
machine_labeled_cycles_df = labeled_cycles_df.where(
    F.col('machineID') == faultyMachineID
).select('timestamp', 'sequenceID', 'upcoming_failure')

max_sequence_id = machine_labeled_cycles_df.select(
    F.max('sequenceID')).collect()[0][0]
# machine_labeled_cycles_df.toPandas().set_index('timestamp').plot(
#     style='.', yticks=range(1, max_sequence_id + 1))
# plt.ylabel('Sequence')
# plt.show()

machine_labeled_cycles_df.toPandas().set_index('timestamp')

                     sequenceID upcoming_failure
timestamp                                       
2017-05-15 10:10:00           1               F2
2017-05-22 13:47:00           1               F2
2017-05-25 21:04:00           1               F2
2017-05-26 21:20:00           1               F2
2017-05-31 11:25:00           1               F2
2017-05-31 17:48:00           1               F2
2017-06-02 04:19:00           1               F2
2017-06-02 06:50:00           1               F2
2017-06-06 06:22:00           1               F2
2017-06-06 21:06:00           1               F2
2017-06-10 21:40:00           1               F2
2017-06-11 15:36:00           1               F2
2017-06-15 10:17:00           1               F2
2017-06-15 15:28:00           1               F2
2017-06-18 23:26:00           1               F2
2017-06-21 00:46:00           1               F2
2017-06-21 22:23:00           1               F2
2017-06-22 03:56:00           1               F2
2017-05-19 03:42:00 

In [36]:
# Labeled data set
labeled_cycles_df.printSchema()

root
 |-- machineID: string (nullable = true)
 |-- cycle: long (nullable = true)
 |-- speed_desired_max: long (nullable = true)
 |-- speed_avg: double (nullable = true)
 |-- temperature_avg: double (nullable = true)
 |-- temperature_max: double (nullable = true)
 |-- pressure_avg: double (nullable = true)
 |-- pressure_max: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- upcoming_failure: string (nullable = true)
 |-- rul: integer (nullable = true)
 |-- sequenceID: integer (nullable = true)
 |-- immediate_failure: string (nullable = true)

### 用時序上延遲的特徵擴增資料

為了幫助描繪每台機器短期的趨勢，通常會用新的特徵擴增原有的資料集 [[3]](#ref_3)，這些特徵是通過滑動時間窗的方式計算的。

In [37]:
# TODO: compute more rolling features
# (e.g., linear regression, standard deviation)

lookback = 5
w = Window.partitionBy(
    'machineID', 'sequenceID'
).rowsBetween(-lookback, Window.currentRow).orderBy('cycle')

# computing rolling averages for these cycle-level features
input_columns = ['temperature_avg', 'temperature_max', 'pressure_avg', 'pressure_max']

# adding rolling features and eliminating the entries in the beginning of each
# sequence which weren't computed using a sufficient number of previous values
augmented_labeled_cycles_df = (
    reduce(lambda _df, ic: _df.withColumn(
        '{0}_avg'.format(ic), F.avg(ic).over(w)), input_columns, labeled_cycles_df)
    .withColumn('is_valid', F.count(F.lit(1)).over(w) > lookback)
    .where(F.col('is_valid'))
    .drop('is_valid'))

augmented_labeled_cycles_df.printSchema()

root
 |-- machineID: string (nullable = true)
 |-- cycle: long (nullable = true)
 |-- speed_desired_max: long (nullable = true)
 |-- speed_avg: double (nullable = true)
 |-- temperature_avg: double (nullable = true)
 |-- temperature_max: double (nullable = true)
 |-- pressure_avg: double (nullable = true)
 |-- pressure_max: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- upcoming_failure: string (nullable = true)
 |-- rul: integer (nullable = true)
 |-- sequenceID: integer (nullable = true)
 |-- immediate_failure: string (nullable = true)
 |-- temperature_avg_avg: double (nullable = true)
 |-- temperature_max_avg: double (nullable = true)
 |-- pressure_avg_avg: double (nullable = true)
 |-- pressure_max_avg: double (nullable = true)

## 保留未來使用的資料集


考慮到一般性，所有特定類別的特徵名稱會模糊化，且明確的時間戳記會被替換為時間 ID 以表示相對的時間順序

In [38]:
feature_columns = [
    c for c in augmented_labeled_cycles_df.columns if c not in (
        'machineID', 'cycle', 'sequenceID', 'rul', 'upcoming_failure', 'immediate_failure', 'timestamp')]
obfuscate_columns = zip(feature_columns, range(1, len(feature_columns) + 1))

feature_df = reduce(lambda _df, ic: _df.withColumnRenamed(ic[0], 's{0}'.format(ic[1])),
    obfuscate_columns, augmented_labeled_cycles_df).withColumn(
    'entryID', F.row_number().over(Window().orderBy('timestamp'))
).drop('upcoming_failure', 'timestamp')

feature_df.printSchema()

root
 |-- machineID: string (nullable = true)
 |-- cycle: long (nullable = true)
 |-- s1: long (nullable = true)
 |-- s2: double (nullable = true)
 |-- s3: double (nullable = true)
 |-- s4: double (nullable = true)
 |-- s5: double (nullable = true)
 |-- s6: double (nullable = true)
 |-- rul: integer (nullable = true)
 |-- sequenceID: integer (nullable = true)
 |-- immediate_failure: string (nullable = true)
 |-- s7: double (nullable = true)
 |-- s8: double (nullable = true)
 |-- s9: double (nullable = true)
 |-- s10: double (nullable = true)
 |-- entryID: integer (nullable = true)

In [39]:
chunks = 5 # split the data into N chunks
feature_df.coalesce(chunks).write.csv(data_root_dir + '/model/features',mode="overwrite", header=True)

## References
https://www.youtube.com/watch?v=drUToKxEAUA

http://waset.org/publications/10006640/building-a-scalable-telemetry-based-multiclass-predictive-maintenance-model-in-r

http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.386.8108&rep=rep1&type=pdf