In [1]:
import pandas
import findspark
findspark.init()


In [2]:
import pandas
import findspark
findspark.init()



from pyspark.sql.functions import *
from pyspark.sql.types import * 
from pyspark.ml.feature import StringIndexer , StandardScaler , Imputer, OneHotEncoder, VectorAssembler, ChiSqSelector, VarianceThresholdSelector
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql import functions as F 
from pyspark.ml import Pipeline,  PipelineModel

import pickle
import pyspark
from pyspark.sql import SparkSession

from pyspark import SparkContext 

spark = SparkSession.builder.getOrCreate()
spark

import json
from pyspark.sql.column import Column

sc = SparkContext.getOrCreate()


ImportError: cannot import name 'VarianceThresholdSelector' from 'pyspark.ml.feature' (D:\spark\spark-3.0.1-bin-hadoop2.7\spark-3.0.1-bin-hadoop2.7\python\pyspark\ml\feature.py)

Part1 - load dataframe

In [4]:
import pyspark
pyspark.__version__

'3.0.1'

In [6]:
#From Data Lake

def get_data_spark(hive_statement):
    """Imports data from PV Cluster - contact Asia_Data_Lab@manulife.com for access
	
	Args:
	
	1. hive_statement = select query from tables in PV cluster
    
    Returns: Pandas Dataframe
    """
    
    df_spark = hive.executeQuery(hive_statement)

    return df_spark 

In [7]:
# From CSV

def get_data_csv(path, csv_file):
    """Imports data from CSV file
    
    
    Args:
    
    1. path = file folder
    2. csv_file = name of csv file
    
    Returns:
    
    Spark Dataframe

    
    """
    df_spark = spark.read.csv(path+csv_file, inferSchema=True, header=True)
    
    return df_spark

In [124]:
path = "C:/Users/admin/AppData/Roaming/Microsoft/Windows/Start Menu/Programs/udemy/Datasets/"

df_spark = get_data_csv(path = "C:/Users/admin/AppData/Roaming/Microsoft/Windows/Start Menu/Programs/udemy/Datasets/",
                       csv_file  = 'Toddler Autism dataset July 2018_practice.csv')

df_spark.show(5)

+-------+--------+--------------+----+--------------+--------+-------------------+----------------------+-----------------+
|Case_No|Age_Mons|Qchat-10-Score| Sex|     Ethnicity|Jaundice|Family_mem_with_ASD|Who completed the test|Class/ASD Traits |
+-------+--------+--------------+----+--------------+--------+-------------------+----------------------+-----------------+
|      1|    null|             3|null|middle eastern|     yes|                 no|         family member|               No|
|      5|    null|             9|null|White European|      no|                yes|         family member|              Yes|
|     13|    null|             0|null|middle eastern|     yes|                 no|         family member|               No|
|     14|    null|             7|null|middle eastern|     yes|                 no|         family member|              Yes|
|     18|    null|             8|null|middle eastern|     yes|                 no|         family member|              Yes|
+-------

Part 2 Data Preparation

In [10]:
def data_preparation(df, remove_list):
    """Removes uncesscary columms and creates feature dataframe
    
    Args:
    
    df : spark dataframe input_data
    remove_list = list object containing columns to drop
    
    Returns: new dataframe
    """
    
    df = df.drop(*remove_list)
    
    return df

In [11]:
df_spark.printSchema()

root
 |-- Case_No: integer (nullable = true)
 |-- Age_Mons: integer (nullable = true)
 |-- Qchat-10-Score: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Jaundice: string (nullable = true)
 |-- Family_mem_with_ASD: string (nullable = true)
 |-- Who completed the test: string (nullable = true)
 |-- Class/ASD Traits : string (nullable = true)



In [12]:
df_spark = data_preparation(df_spark, remove_list = ['Ethnicity', 'Who completed the test'])

df_spark.printSchema()

root
 |-- Case_No: integer (nullable = true)
 |-- Age_Mons: integer (nullable = true)
 |-- Qchat-10-Score: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Jaundice: string (nullable = true)
 |-- Family_mem_with_ASD: string (nullable = true)
 |-- Class/ASD Traits : string (nullable = true)



Part 3 : Train Tesy split. Pyspark does not have stratified train and test split. Use this custom function instead

In [13]:


def stratified_split_train_test(df, frac, label, join_on, seed=123):
    """
    Creates  a stratified train test split for the dataset. Stratification is based on % of Y
    
    Args:
    
    1. df = input spark dataframe
    2. frac = fraction of training set to be used
    3. label = target variable
    4. join_on =   unique key to ensure uniqueness of train and test data
    
    Returns 
    
    df_train = df based on fraction
    df_test =  % remaining from fraction
    
    """
    
    fractions = df.select(label).distinct().withColumn("fraction", F.lit(frac)).rdd.collectAsMap()
    df_train = df.stat.sampleBy(label, fractions, seed)
    df_test = df.join(df_train, on=join_on, how="left_anti")
    
    
    return df_train, df_test

In [14]:
df_train, df_test = stratified_split_train_test(df_spark, frac= 0.7, label = 'Class/ASD Traits ' , join_on = 'Case_No')

In [15]:
df_spark.groupby('Class/ASD Traits ').count().show()

+-----------------+-----+
|Class/ASD Traits |count|
+-----------------+-----+
|               No|  326|
|              Yes|  728|
+-----------------+-----+



In [16]:
df_train.groupby('Class/ASD Traits ').count().show()

+-----------------+-----+
|Class/ASD Traits |count|
+-----------------+-----+
|               No|  213|
|              Yes|  514|
+-----------------+-----+



In [17]:
df_test.groupby('Class/ASD Traits ').count().show()

+-----------------+-----+
|Class/ASD Traits |count|
+-----------------+-----+
|               No|  113|
|              Yes|  214|
+-----------------+-----+



Part 4  : Feature Engineering

In [158]:
def add_meta(col, metadata):
            meta = sc._jvm.org.apache.spark.sql.types.Metadata.fromJson(json.dumps(metadata))
            return Column(getattr(col._jc, "as")('', meta))

def feature_engineering(x_train, x_test, unique_key, target_var , use_scaling = True):
    """
    Create Feature Engineering Pipeline 
    
    Apply encoding to categorical variables
    Imputation of null to both numeric and categorical variables
    
    
    Args : 
    
    1. x_train, x_test  = spark dataframe
    2. unique_key = unique key of dataset
    3. target_var = target variable
    4. use_scaling = if True, Standardize, otherwise use original scale.
    
    
    Returns:
    
    x_train,
    x_test,
    preprocessing_pipe
    
    """
    
    categorical_columns = [item[0] for item in x_train.dtypes if item[1].startswith('string')  and item[0] != unique_key and item[0] != target_var]
    numerical_columns = [item[0] for item in x_train.dtypes if not item[1].startswith('string') and item[0] != unique_key and item[0] != target_var]
 

    x_train = x_train.fillna(0, subset=numerical_columns)
    x_train = x_train.fillna("Missing" ,subset = categorical_columns)
    x_test  = x_test.fillna(0, subset=numerical_columns)
    x_test = x_test.fillna("Missing" ,subset = categorical_columns)
    
    
    #Define Steps in Pipelines: 
    
    indexer = StringIndexer(inputCols=categorical_columns, outputCols=[c+"_index" for c in categorical_columns])

    onehot  = OneHotEncoder(dropLast=True,
                        inputCols=[c for c in indexer.getOutputCols()],
                        outputCols=[c+"_onehot" for c in indexer.getOutputCols()])

    catvec  = VectorAssembler(inputCols=onehot.getOutputCols(), outputCol='cat_features')

    numvec   = VectorAssembler(inputCols=numerical_columns, outputCol='num_features')
    
    Standard = StandardScaler(inputCol="num_features", outputCol="num_features_scaled")

    
    #feature_vector_scaled = VectorAssembler(inputCols=["cat_features", "num_features_m"], outputCol="features")
    #feature_vector = VectorAssembler(inputCols=["cat_features", "num_features"], outputCol="features")
    
    
    #Define Pipelines 
    
    categorical_pipeline = Pipeline(stages=[indexer, onehot, catvec])
    

    if use_scaling:
        
            #Function to Get Metadata from StandardScaler

      
        numerical_pipeline = Pipeline(stages =[numvec, Standard])
        preprocessing_pipe = Pipeline(stages=[categorical_pipeline, numerical_pipeline])
        
        
        pipeline_preprocess = preprocessing_pipe.fit(x_train)
        
        #pipeline_preprocess.save("C:/Users")
        

        x_train = preprocessing_pipe.fit(x_train).transform(x_train)
        
    
        x_test = pipeline_preprocess.transform(x_test)
    
        #pickle.dump(preprocessing_pipe, open(trans_pipe, 'wb'))
        
        x_train =  x_train.withColumn('num_features_mt',
                                add_meta(x_train.num_features_scaled, 
                                metadata=x_train.schema["num_features"].metadata))
        x_test =   x_test.withColumn('num_features_mt',
                                add_meta(x_test.num_features_scaled, 
                                metadata=x_test.schema["num_features"].metadata))
        
        return x_train, x_test ,pipeline_preprocess
        
    else:
        
        numerical_pipeline = Pipeline(stages =[numvec])
        
        preprocessing_pipe = Pipeline(stages=[categorical_pipeline, numerical_pipeline])
    
    
        pipeline_preprocess = preprocessing_pipe.fit(x_train)
        
       # pipeline_preprocess.save("C:/Users")
    
        x_train = preprocessing_pipe.fit(x_train).transform(x_train)
    
        x_test = pipeline_preprocess.transform(x_test)
    
        #pickle.dump(preprocessing_pipe, open(trans_pipe, 'wb'))
    
        return x_train, x_test ,pipeline_preprocess

In [159]:
x_train, x_test, pipeline_preprocess =feature_engineering(df_train, df_test, unique_key = 'Case_No', target_var =  'Class/ASD Traits ', use_scaling = False)

In [160]:
x_train.groupby('Class/ASD Traits ').count().show()

+-----------------+-----+
|Class/ASD Traits |count|
+-----------------+-----+
|               No|  213|
|              Yes|  514|
+-----------------+-----+



In [161]:
x_test.groupby('Class/ASD Traits ').count().show()

+-----------------+-----+
|Class/ASD Traits |count|
+-----------------+-----+
|               No|  113|
|              Yes|  214|
+-----------------+-----+



In [162]:
#Final VectorAssembler 

def feature_vector(df_train, df_test,inputCols, outputCol):
    """VectorAssembler
    
    
    Args:
    
    df_train
    df_test
    inputCols = list of columns
    outputCols = new feature names
    
    Returns
    
    df_train, df_test 
    """
    
    feature_vector = VectorAssembler(inputCols=inputCols, outputCol=outputCol)
        
    x_train = feature_vector.transform(df_train)
    x_test = feature_vector.transform(df_test)
    
    
    return x_train, x_test
    

In [167]:
x_train, x_test = feature_vector(x_train, x_test, inputCols = ["cat_features", "num_features"], outputCol="features")

In [168]:
x_test.limit(1).toPandas()

Unnamed: 0,Case_No,Age_Mons,Qchat-10-Score,Sex,Jaundice,Family_mem_with_ASD,Class/ASD Traits,Sex_index,Jaundice_index,Family_mem_with_ASD_index,Sex_index_onehot,Jaundice_index_onehot,Family_mem_with_ASD_index_onehot,cat_features,num_features,features
0,13,0,0,Missing,yes,no,No,2.0,1.0,0.0,"(0.0, 0.0)",(0.0),(1.0),"(0.0, 0.0, 0.0, 1.0)","(0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0)"


In [169]:
x_train.limit(1).toPandas()

Unnamed: 0,Case_No,Age_Mons,Qchat-10-Score,Sex,Jaundice,Family_mem_with_ASD,Class/ASD Traits,Sex_index,Jaundice_index,Family_mem_with_ASD_index,Sex_index_onehot,Jaundice_index_onehot,Family_mem_with_ASD_index_onehot,cat_features,num_features,features
0,1,0,3,Missing,yes,no,No,2.0,1.0,0.0,"(0.0, 0.0)",(0.0),(1.0),"(0.0, 0.0, 0.0, 1.0)","[0.0, 3.0]","(0.0, 0.0, 0.0, 1.0, 0.0, 3.0)"


In [170]:
x_train.schema['features'].metadata

{'ml_attr': {'attrs': {'numeric': [{'idx': 4, 'name': 'num_features_Age_Mons'},
    {'idx': 5, 'name': 'num_features_Qchat-10-Score'}],
   'binary': [{'idx': 0, 'name': 'cat_features_Sex_index_onehot_m'},
    {'idx': 1, 'name': 'cat_features_Sex_index_onehot_f'},
    {'idx': 2, 'name': 'cat_features_Jaundice_index_onehot_no'},
    {'idx': 3, 'name': 'cat_features_Family_mem_with_ASD_index_onehot_no'}]},
  'num_attrs': 6}}

In [171]:
x_test.schema['features'].metadata

{'ml_attr': {'attrs': {'numeric': [{'idx': 4, 'name': 'num_features_Age_Mons'},
    {'idx': 5, 'name': 'num_features_Qchat-10-Score'}],
   'binary': [{'idx': 0, 'name': 'cat_features_Sex_index_onehot_m'},
    {'idx': 1, 'name': 'cat_features_Sex_index_onehot_f'},
    {'idx': 2, 'name': 'cat_features_Jaundice_index_onehot_no'},
    {'idx': 3, 'name': 'cat_features_Family_mem_with_ASD_index_onehot_no'}]},
  'num_attrs': 6}}

Part 5 : Feature Selection

Variance Treshold and ChiSq

In [172]:
selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")

result = selector.fit(x_train).transform(x_train)

NameError: name 'VarianceThresholdSelector' is not defined