### Objective: Extract Summarized Categorical features of Bosch assembly line

1. Reject Null
2. View unique components per line
3. Get throughput per line
4. Get count unique components assembled per line 

In [1]:
#Import Packages: 

from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *
import zipfile
import pandas as pd
import numpy as np
import vaex
from pyspark.sql import functions as F
import re

In [2]:
#Create Spark sesison:

spark = SparkSession\
        .builder\
        .master("local[*]")\
        .appName('Spark Project')\
        .getOrCreate()
spark

In [3]:
#Number of cores used: 
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")

You are working with 1 core(s)


In [4]:
#Read csv files: 
category = spark.read.csv('../data/train_categorical.csv', header = True)
date  = spark.read.csv('../data/train_date.csv', header = True)
numeric = spark.read.csv('../data/train_numeric.csv', header = True)

In [5]:
#Create Temporary View to run SQl commands:
category.createOrReplaceTempView('category')
date.createOrReplaceTempView('date')
numeric.createOrReplaceTempView('numeric')

### Initial Exploration: 

In [6]:
#View Catgeory Schema: 
category.printSchema()

root
 |-- Id: string (nullable = true)
 |-- L0_S1_F25: string (nullable = true)
 |-- L0_S1_F27: string (nullable = true)
 |-- L0_S1_F29: string (nullable = true)
 |-- L0_S1_F31: string (nullable = true)
 |-- L0_S2_F33: string (nullable = true)
 |-- L0_S2_F35: string (nullable = true)
 |-- L0_S2_F37: string (nullable = true)
 |-- L0_S2_F39: string (nullable = true)
 |-- L0_S2_F41: string (nullable = true)
 |-- L0_S2_F43: string (nullable = true)
 |-- L0_S2_F45: string (nullable = true)
 |-- L0_S2_F47: string (nullable = true)
 |-- L0_S2_F49: string (nullable = true)
 |-- L0_S2_F51: string (nullable = true)
 |-- L0_S2_F53: string (nullable = true)
 |-- L0_S2_F55: string (nullable = true)
 |-- L0_S2_F57: string (nullable = true)
 |-- L0_S2_F59: string (nullable = true)
 |-- L0_S2_F61: string (nullable = true)
 |-- L0_S2_F63: string (nullable = true)
 |-- L0_S2_F65: string (nullable = true)
 |-- L0_S2_F67: string (nullable = true)
 |-- L0_S3_F69: string (nullable = true)
 |-- L0_S3_F71: st

In [7]:
#Line 0, station 1 and feature 25 has a component T1 used 25 times: 
(category.select('L0_S2_F35').filter(category['L0_S2_F35'].isNotNull())).groupby('L0_S2_F35').count().show()

+---------+-----+
|L0_S2_F35|count|
+---------+-----+
|     T128|   20|
|      T64|   22|
|      T32|  178|
|      T96|   29|
|       T4|    1|
|      T48|   13|
+---------+-----+



In [8]:
#View date Schema:
date.printSchema()
#looks like dates are saved as string. 

root
 |-- Id: string (nullable = true)
 |-- L0_S0_D1: string (nullable = true)
 |-- L0_S0_D3: string (nullable = true)
 |-- L0_S0_D5: string (nullable = true)
 |-- L0_S0_D7: string (nullable = true)
 |-- L0_S0_D9: string (nullable = true)
 |-- L0_S0_D11: string (nullable = true)
 |-- L0_S0_D13: string (nullable = true)
 |-- L0_S0_D15: string (nullable = true)
 |-- L0_S0_D17: string (nullable = true)
 |-- L0_S0_D19: string (nullable = true)
 |-- L0_S0_D21: string (nullable = true)
 |-- L0_S0_D23: string (nullable = true)
 |-- L0_S1_D26: string (nullable = true)
 |-- L0_S1_D30: string (nullable = true)
 |-- L0_S2_D34: string (nullable = true)
 |-- L0_S2_D38: string (nullable = true)
 |-- L0_S2_D42: string (nullable = true)
 |-- L0_S2_D46: string (nullable = true)
 |-- L0_S2_D50: string (nullable = true)
 |-- L0_S2_D54: string (nullable = true)
 |-- L0_S2_D58: string (nullable = true)
 |-- L0_S2_D62: string (nullable = true)
 |-- L0_S2_D66: string (nullable = true)
 |-- L0_S3_D70: string 

In [9]:
#Let's find the datestamp for L0_S1_F25, which should be L0_S1_D26 
#Here D26 represents datestamp of feature 25, which went through line 0 and station 1:
date.select('L0_S1_D26')\
    .filter(date['L0_S1_D26'].isNotNull())\
    .groupby('L0_S1_D26')\
    .count()\
    .show()

+---------+-----+
|L0_S1_D26|count|
+---------+-----+
|      691|   21|
|  1432.79|   24|
|     73.7|   15|
|   560.38|   18|
|   189.92|   16|
|   641.53|   17|
|  1612.75|   18|
|  1669.13|   16|
|  1542.16|   16|
|    829.4|   19|
|   958.75|   15|
|   909.14|   19|
|    37.21|   23|
|   368.62|   21|
|   961.48|    8|
|  1227.46|   23|
|   188.27|   15|
|  1239.79|   18|
|   611.05|   19|
|  1154.08|   19|
+---------+-----+
only showing top 20 rows



In [10]:
#View Numeric Schema:
numeric.printSchema()

#Even numeric field is in string format

root
 |-- Id: string (nullable = true)
 |-- L0_S0_F0: string (nullable = true)
 |-- L0_S0_F2: string (nullable = true)
 |-- L0_S0_F4: string (nullable = true)
 |-- L0_S0_F6: string (nullable = true)
 |-- L0_S0_F8: string (nullable = true)
 |-- L0_S0_F10: string (nullable = true)
 |-- L0_S0_F12: string (nullable = true)
 |-- L0_S0_F14: string (nullable = true)
 |-- L0_S0_F16: string (nullable = true)
 |-- L0_S0_F18: string (nullable = true)
 |-- L0_S0_F20: string (nullable = true)
 |-- L0_S0_F22: string (nullable = true)
 |-- L0_S1_F24: string (nullable = true)
 |-- L0_S1_F28: string (nullable = true)
 |-- L0_S2_F32: string (nullable = true)
 |-- L0_S2_F36: string (nullable = true)
 |-- L0_S2_F40: string (nullable = true)
 |-- L0_S2_F44: string (nullable = true)
 |-- L0_S2_F48: string (nullable = true)
 |-- L0_S2_F52: string (nullable = true)
 |-- L0_S2_F56: string (nullable = true)
 |-- L0_S2_F60: string (nullable = true)
 |-- L0_S2_F64: string (nullable = true)
 |-- L0_S3_F68: string 

In [11]:
#Let's review a numeric feature (L0_S1_F25), and see it's maximum passed and failed responses:  
 
fail_25 = numeric.select('L0_S1_F24', 'Response')\
        .filter(numeric['L0_S1_F24']\
                .isNotNull())\
        .groupBy('L0_S1_F24', 'Response')\
        .count()\
        .orderBy('Response', 'count', ascending = False)

pass_25 = numeric.select('L0_S1_F24', 'Response')\
        .filter(numeric['L0_S1_F24']\
                .isNotNull())\
        .groupBy('L0_S1_F24', 'Response')\
        .count()\
        .orderBy('count', ascending = False)

In [12]:
#Maximum pass and Failure Counts: 
fail_25.show(1), pass_25.show(1)

+---------+--------+-----+
|L0_S1_F24|Response|count|
+---------+--------+-----+
|   -0.058|       1|   51|
+---------+--------+-----+
only showing top 1 row

+---------+--------+-----+
|L0_S1_F24|Response|count|
+---------+--------+-----+
|   -0.045|       0| 8449|
+---------+--------+-----+
only showing top 1 row



(None, None)

In [13]:
#What is the size of each dataframe: 

query_cat = """SELECT COUNT(*) FROM category"""
query_date = """SELECT COUNT(*) FROM date"""
query_num = """SELECT COUNT(*) FROM numeric"""

In [14]:
spark.sql(query_cat).show(), print(len(category.columns))
spark.sql(query_date).show(), print(len(date.columns))
spark.sql(query_num).show(), print(len(numeric.columns))

+--------+
|count(1)|
+--------+
| 1183747|
+--------+

2141
+--------+
|count(1)|
+--------+
| 1183747|
+--------+

1157
+--------+
|count(1)|
+--------+
| 1183747|
+--------+

970


(None, None)

In [15]:
query_num = """SELECT Response,COUNT(Response),ROUND(COUNT(Response)/1183747,2) AS percent\
                FROM numeric\
                GROUP BY Response"""

In [16]:
spark.sql(query_num).show()

+--------+---------------+-------+
|Response|count(Response)|percent|
+--------+---------------+-------+
|       0|        1176868|   0.99|
|       1|           6879|   0.01|
+--------+---------------+-------+



#### Observation: 
So, what does this tell us so far?

1. Feature 25 under -0.58 failed 51 times. (Worst condition)
2. Feature 25 under -0.045 passed 8449 times. (Best Condition)
3. Since, each dataframe has 1,183,747 rows and some categories don't have date and numeric values given, there will be many missing values, which we will need to filter it out.
4. In order to do this, we will need to combine dataframes based on category, date and numeric values, and  reject nulls

In [17]:
#View tables in pandas: 
category.limit(5).toPandas()

Unnamed: 0,Id,L0_S1_F25,L0_S1_F27,L0_S1_F29,L0_S1_F31,L0_S2_F33,L0_S2_F35,L0_S2_F37,L0_S2_F39,L0_S2_F41,...,L3_S49_F4225,L3_S49_F4227,L3_S49_F4229,L3_S49_F4230,L3_S49_F4232,L3_S49_F4234,L3_S49_F4235,L3_S49_F4237,L3_S49_F4239,L3_S49_F4240
0,4,,,,,,,,,,...,,,,,,,,,,
1,6,,,,,,,,,,...,,,,,,,,,,
2,7,,,,,,,,,,...,,,,,,,,,,
3,9,,,,,,,,,,...,,,,,,,,,,
4,11,,,,,,,,,,...,,,,,,,,,,


In [18]:
numeric.limit(5).toPandas()

Unnamed: 0,Id,L0_S0_F0,L0_S0_F2,L0_S0_F4,L0_S0_F6,L0_S0_F8,L0_S0_F10,L0_S0_F12,L0_S0_F14,L0_S0_F16,...,L3_S50_F4245,L3_S50_F4247,L3_S50_F4249,L3_S50_F4251,L3_S50_F4253,L3_S51_F4256,L3_S51_F4258,L3_S51_F4260,L3_S51_F4262,Response
0,4,0.03,-0.034,-0.197,-0.179,0.118,0.116,-0.015,-0.032,0.02,...,,,,,,,,,,0
1,6,,,,,,,,,,...,,,,,,,,,,0
2,7,0.088,0.086,0.003,-0.052,0.161,0.025,-0.015,-0.072,-0.225,...,,,,,,,,,,0
3,9,-0.036,-0.064,0.294,0.33,0.074,0.161,0.022,0.128,-0.026,...,,,,,,,,,,0
4,11,-0.055,-0.086,0.294,0.33,0.118,0.025,0.03,0.168,-0.169,...,,,,,,,,,,0


In [19]:
date.limit(5).toPandas()

Unnamed: 0,Id,L0_S0_D1,L0_S0_D3,L0_S0_D5,L0_S0_D7,L0_S0_D9,L0_S0_D11,L0_S0_D13,L0_S0_D15,L0_S0_D17,...,L3_S50_D4246,L3_S50_D4248,L3_S50_D4250,L3_S50_D4252,L3_S50_D4254,L3_S51_D4255,L3_S51_D4257,L3_S51_D4259,L3_S51_D4261,L3_S51_D4263
0,4,82.24,82.24,82.24,82.24,82.24,82.24,82.24,82.24,82.24,...,,,,,,,,,,
1,6,,,,,,,,,,...,,,,,,,,,,
2,7,1618.7,1618.7,1618.7,1618.7,1618.7,1618.7,1618.7,1618.7,1618.7,...,,,,,,,,,,
3,9,1149.2,1149.2,1149.2,1149.2,1149.2,1149.2,1149.2,1149.2,1149.2,...,,,,,,,,,,
4,11,602.64,602.64,602.64,602.64,602.64,602.64,602.64,602.64,602.64,...,,,,,,,,,,


In [20]:
# Find number of assembly steps in each production line: 
#Columns specific to 4 production lines: 
L0_cols = [] #Production line 1
L1_cols = [] #Production line 2
L2_cols = [] #Production line 3
L3_cols = [] #Production line 4
for i in category.columns: 
    if i[0:2]=="L0":
        L0_cols.append(i)
    if i[0:2]=="L1":
        L1_cols.append(i)
    if i[0:2]=="L2":
        L2_cols.append(i)
    if i[0:2]=="L3":
        L3_cols.append(i)

In [21]:
#Production Line 1: 
df_l0 = category.select(L0_cols)
print("Columns in Line 1:", len(df_l0.columns)) 

#Production Line 2: 
df_l1 = category.select(L1_cols)
print("Columns in Line 2:",len(df_l1.columns)) 

#Production Line 3: 
df_l2 = category.select(L2_cols)
print("Columns in Line 3:",len(df_l2.columns)) 

#Production Line 4: 
df_l3 = category.select(L3_cols)
print("Columns in Line 4:",len(df_l3.columns)) 

#So, line 2 seems to have the most combination of stations and category features getting assesmbled.

Columns in Line 1: 323
Columns in Line 2: 1227
Columns in Line 3: 159
Columns in Line 4: 431


In [22]:
#Spark Function: 
#Yields subcomponents and its count per production line
#Wile removing null values and empty pandas dataframe.
#Limit of subcomponents has been set to 1000 components.

def summarize_cat(df):
    for i in df.schema.names[1:]:
        df_cat = df.select(i).dropna().groupBy(i).count().orderBy('count', ascending= False).limit(1000).toPandas()
        if not df_cat.empty:
            yield pd.DataFrame(df_cat,columns = [df_cat.columns[0], 'count'])
            

In [23]:
%%time
#Storing aggregated values as a list. 
#Collecting production line number (LSF)
#Components (T1, T2..Tn)
#Count of each sub-component (T = 25, T2 =35..Tn=z) 

production_line = []
components = []
for j in summarize_cat(category):
    production_line.append(j.columns[0])
    components.append(j.values[0:])



CPU times: user 19.6 s, sys: 3.58 s, total: 23.2 s
Wall time: 6h 47min 26s


In [24]:
#Build Dataframe: 
df_summary = pd.DataFrame({"production_line":production_line, 'components': components})

In [25]:
df_summary

Unnamed: 0,production_line,components
0,L0_S1_F25,"[[T1, 25]]"
1,L0_S1_F27,"[[T9, 25]]"
2,L0_S1_F29,"[[T1, 27]]"
3,L0_S1_F31,"[[T9, 25], [T24, 1], [T48, 1]]"
4,L0_S2_F33,"[[T1, 263]]"
...,...,...
1972,L3_S49_F4234,"[[T1, 33]]"
1973,L3_S49_F4235,"[[T16, 33], [T2, 7], [T4, 3]]"
1974,L3_S49_F4237,"[[T1, 43]]"
1975,L3_S49_F4239,"[[T1, 33]]"


In [139]:
#Get Number of Components per production line: 
df_summary['number_of_components'] = [len(j) for j in df_summary['components']]

In [140]:
df_summary

Unnamed: 0,production_line,components,number_of_components
0,L0_S1_F25,"[[T1, 25]]",1
1,L0_S1_F27,"[[T9, 25]]",1
2,L0_S1_F29,"[[T1, 27]]",1
3,L0_S1_F31,"[[T9, 25], [T24, 1], [T48, 1]]",3
4,L0_S2_F33,"[[T1, 263]]",1
...,...,...,...
1972,L3_S49_F4234,"[[T1, 33]]",1
1973,L3_S49_F4235,"[[T16, 33], [T2, 7], [T4, 3]]",3
1974,L3_S49_F4237,"[[T1, 43]]",1
1975,L3_S49_F4239,"[[T1, 33]]",1


In [142]:
#Make csv file:
df_summary.to_csv('components_summary.csv')

In [143]:
#Explode Dataframe: 
df_exp= df_summary.set_index(['production_line', 'number_of_components']).explode('components').explode('components')

In [144]:
df_exp

Unnamed: 0_level_0,Unnamed: 1_level_0,components
production_line,number_of_components,Unnamed: 2_level_1
L0_S1_F25,1,T1
L0_S1_F25,1,25
L0_S1_F27,1,T9
L0_S1_F27,1,25
L0_S1_F29,1,T1
...,...,...
L3_S49_F4240,3,33
L3_S49_F4240,3,T2
L3_S49_F4240,3,7
L3_S49_F4240,3,T4


In [145]:
#Get Components: 
components = df_exp[df_exp['components'].str.startswith('T')==True]
components

Unnamed: 0_level_0,Unnamed: 1_level_0,components
production_line,number_of_components,Unnamed: 2_level_1
L0_S1_F25,1,T1
L0_S1_F27,1,T9
L0_S1_F29,1,T1
L0_S1_F31,3,T9
L0_S1_F31,3,T24
...,...,...
L3_S49_F4237,1,T1
L3_S49_F4239,1,T1
L3_S49_F4240,3,T16
L3_S49_F4240,3,T2


In [146]:
#Get times_component_used: 
times_component_used = df_exp[df_exp['components'].str.startswith('T')!=True].rename(columns= {"components": "times_component_used"})
times_component_used

Unnamed: 0_level_0,Unnamed: 1_level_0,times_component_used
production_line,number_of_components,Unnamed: 2_level_1
L0_S1_F25,1,25
L0_S1_F27,1,25
L0_S1_F29,1,27
L0_S1_F31,3,25
L0_S1_F31,3,1
...,...,...
L3_S49_F4237,1,43
L3_S49_F4239,1,33
L3_S49_F4240,3,33
L3_S49_F4240,3,7


In [497]:
#Merge components and times_components_used:
components_concat = pd.concat([components, times_component_used], join = 'inner', axis = 1).reset_index()

In [498]:
components_concat

#Ok, this shows us the number of components per line
#and number of times each component was used to manufacture various bosch products.  
#We were able to get rid of the null values and reduce column size from 2141 down to 1977 (i.e. 164 columns)
#Next, let's extract only digits from production line so that we can use that to join columns with numeric and date tables.

Unnamed: 0,production_line,number_of_components,components,times_component_used
0,L0_S1_F25,1,T1,25
1,L0_S1_F27,1,T9,25
2,L0_S1_F29,1,T1,27
3,L0_S1_F31,3,T9,25
4,L0_S1_F31,3,T24,1
...,...,...,...,...
4253,L3_S49_F4237,1,T1,43
4254,L3_S49_F4239,1,T1,33
4255,L3_S49_F4240,3,T16,33
4256,L3_S49_F4240,3,T2,7


In [499]:
#Category Connection Key: To connect other tables such as numeric and date with category table
#Lets' extract only the digits from production line columns and store this value as an integer. 
#We will use this integer value as a foreign key to connect with other tables. 
components_concat['connection_key_category'] = components_concat['production_line'].apply(lambda x: re.sub(r'([^\d+]+)','', x)).astype('int')

In [501]:
#Connection key with numeric: Category F25 would have numeric feature as F24. 
#Hence, I converted connection_key_to numeric and then subtracted 1 from it. 
components_concat['connection_key_numeric'] = components_concat.connection_key_category-1

In [502]:
#Connection Key with date: Category F25 would have a date stamp of D26. 
#Hence, I converted connection key to numeric and then added 1 to it.
components_concat['connection_key_date'] = components_concat.connection_key_category+1

In [503]:
components_concat

#Here we can see connection_keys stored as integer values
#which can be used to join other summarized tables, as per above.

Unnamed: 0,production_line,number_of_components,components,times_component_used,connection_key_category,connection_key_numeric,connection_key_date
0,L0_S1_F25,1,T1,25,125,124,126
1,L0_S1_F27,1,T9,25,127,126,128
2,L0_S1_F29,1,T1,27,129,128,130
3,L0_S1_F31,3,T9,25,131,130,132
4,L0_S1_F31,3,T24,1,131,130,132
...,...,...,...,...,...,...,...
4253,L3_S49_F4237,1,T1,43,3494237,3494236,3494238
4254,L3_S49_F4239,1,T1,33,3494239,3494238,3494240
4255,L3_S49_F4240,3,T16,33,3494240,3494239,3494241
4256,L3_S49_F4240,3,T2,7,3494240,3494239,3494241


In [504]:
#Let's save this file as csv: 
components_concat.to_csv('components_summary_with_keys.csv')

In [None]:
#Next, let's look at numeric features first in another notebook. 
#Then we can connect come back here to build some visualizations. 