#### 1. Create the Spark Environment  

In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

#### 2. Load the required Libraries for Spark Context and Spark Session (2M)

In [2]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#### 3. Create the Spark Context and Spark Session (2M)

In [3]:
conf = SparkConf().setAppName("CensusIncome").setMaster('local[*]').set("spark.executor.memory","40g")
sc = SparkContext(conf=conf)  # RDD sufficient
spark = SparkSession(sc)  #Dataframes

In [4]:
spark

#### 4. Load the libraries for schema definition in Pyspark (2M)

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import *


#### Problem Statement:
* This data was extracted from the census bureau database.The task is to classify the records based on the income field.Incomes have been binned at the 50K level to present a binary classification problem.The instance_weight attribute should not be used in the classifier. All the other attributes and their description are givn below.

#### Description of the Attributes
* **age**: continuous.
* **class of worker**: Not in universe, Federal government, Local government, Never worked, Private, Self-employed- incorporated, Self-employed-not incorporated, State government, Without pay.
* **detailed industry recode**: 0, 40, 44, 2, 43, 47, 48, 1, 11, 19, 24, 25, 32, 33, 34, 35, 36, 37, 38, 39, 4, 42, 45, 5, 15, 16, 22, 29, 31, 50, 14, 17, 18, 28, 3, 30, 41, 46, 51, 12, 13, 21, 23, 26, 6, 7, 9, 49, 27, 8, 10, 20.
* **detailed occupation recode** : 0, 12, 31, 44, 19, 32, 10, 23, 26, 28, 29, 42, 40, 34, 14, 36, 38, 2, 20, 25, 37, 41, 27, 24, 30, 43, 33, 16, 45, 17, 35, 22, 18, 39, 3, 15, 13, 46, 8, 21, 9, 4, 6, 5, 1, 11, 7.
* **education**: Children, 7th and 8th grade, 9th grade, 10th grade, High school graduate, 11th grade, 12th grade no diploma, 5th or 6th grade, Less than 1st grade, Bachelors degree(BA AB BS), 1st 2nd 3rd or 4th grade, Some college but no degree, Masters degree(MA MS MEng MEd MSW MBA), Associates degree-occup /vocational, Associates degree-academic program, Doctorate degree(PhD EdD), Prof school degree (MD DDS DVM LLB JD).
* **wage per hour**: continuous.
* **enroll in edu inst last wk**: Not in universe, High school, College or university.
* **marital stat**: Never married, Married-civilian spouse present, Married-spouse absent, Separated, Divorced, Widowed, Married-A F spouse present.
* **major industry code**: Not in universe or children, Entertainment, Social services, Agriculture, Education, Public administration, Manufacturing-durable goods, Manufacturing-nondurable goods, Wholesale trade, Retail trade, Finance insurance and real estate, Private household services, Business and repair services, Personal services except private HH, Construction, Medical except hospital, Other professional services, Transportation, Utilities and sanitary services, Mining, Communications, Hospital services, Forestry and fisheries, Armed Forces.
* **major occupation code**: Not in universe, Professional specialty, Other service, Farming forestry and fishing, Sales, Adm support including clerical, Protective services, Handlers equip cleaners etc , Precision production craft & repair, Technicians and related support, Machine operators assmblrs & inspctrs, Transportation and material moving, Executive admin and managerial, Private household services, Armed Forces.
* **race**: White, Black, Other, Amer Indian Aleut or Eskimo, Asian or Pacific Islander.
* **hispanic origin**: Mexican (Mexicano), Mexican-American, Puerto Rican, Central or South American, All other, Other Spanish, Chicano, Cuban, Do not know, NA.
* **sex**: Female, Male.
* **member of a labor union**: Not in universe, No, Yes.
* **reason for unemployment**: Not in universe, Re-entrant, Job loser - on layoff, New entrant, Job leaver, Other job loser.
* **Full or part time employment stat**: Children or Armed Forces, Full-time schedules, Unemployed part- time, Not in labor force, Unemployed full-time, PT for non-econ reasons usually FT, PT for econ reasons usually PT, PT for econ reasons usually FT.
* **capital gains**: continuous.
* **capital losses**: continuous.
* **dividends from stocks**: continuous.
* **tax filer stat**: Nonfiler, Joint one under 65 & one 65+, Joint both under 65, Single, Head of household, Joint both 65+.
* **region of previous residence**: Not in universe, South, Northeast, West, Midwest, Abroad.
* **state of previous residence**: Not in universe, Utah, Michigan, North Carolina, North Dakota, Virginia, Vermont, Wyoming, West Virginia, Pennsylvania, Abroad, Oregon, California, Iowa, Florida, Arkansas, Texas, South Carolina, Arizona, Indiana, Tennessee, Maine, Alaska, Ohio, Montana, Nebraska, Mississippi, District of Columbia, Minnesota, Illinois, Kentucky, Delaware, Colorado, Maryland, Wisconsin, New Hampshire, Nevada, New York, Georgia, Oklahoma, New Mexico, South Dakota, Missouri, Kansas, Connecticut, Louisiana, Alabama, Massachusetts, Idaho, New Jersey.
* **detailed household and family stat**: Child <18 never marr not in subfamily, Other Rel <18 never marr child of subfamily RP, Other Rel <18 never marr not in subfamily, Grandchild <18 never marr child of subfamily RP, Grandchild <18 never marr not in subfamily, Secondary individual, In group quarters, Child under 18 of RP of unrel subfamily, RP of unrelated subfamily, Spouse of householder, Householder, Other Rel <18 never married RP of subfamily, Grandchild <18 never marr RP of subfamily, Child <18 never marr RP of subfamily, Child <18 ever marr not in subfamily, Other Rel <18 ever marr RP of subfamily, Child <18 ever marr RP of subfamily, Nonfamily householder, Child <18 spouse of subfamily RP, Other Rel <18 spouse of subfamily RP, Other Rel <18 ever marr not in subfamily, Grandchild <18 ever marr not in subfamily, Child 18+ never marr Not in a subfamily, Grandchild 18+ never marr not in subfamily, Child 18+ ever marr RP of subfamily, Other Rel 18+ never marr not in subfamily, Child 18+ never marr RP of subfamily, Other Rel 18+ ever marr RP of subfamily, Other Rel 18+ never marr RP of subfamily, Other Rel 18+ spouse of subfamily RP, Other Rel 18+ ever marr not in subfamily, Child 18+ ever marr Not in a subfamily, Grandchild 18+ ever marr not in subfamily, Child 18+ spouse of subfamily RP, Spouse of RP of unrelated subfamily, Grandchild 18+ ever marr RP of subfamily, Grandchild 18+ never marr RP of subfamily, Grandchild 18+ spouse of subfamily RP.
* **detailed household summary in household**: Child under 18 never married, Other relative of householder, Nonrelative of householder, Spouse of householder, Householder, Child under 18 ever married, Group Quarters- Secondary individual, Child 18 or older.
| instance weight: ignore.
* **instance weight**: continuous.
* **migration code-change in msa**: Not in universe, Nonmover, MSA to MSA, NonMSA to nonMSA, MSA to nonMSA, NonMSA to MSA, Abroad to MSA, Not identifiable, Abroad to nonMSA.
* **migration code-change in reg**: Not in universe, Nonmover, Same county, Different county same state, Different state same division, Abroad, Different region, Different division same region.
* **migration code-move within reg**: Not in universe, Nonmover, Same county, Different county same state, Different state in West, Abroad, Different state in Midwest, Different state in South, Different state in Northeast.
* **live in this house 1 year ago**: Not in universe under 1 year old, Yes, No.
* **migration prev res in sunbelt**: Not in universe, Yes, No.
* **num persons worked for employer**: continuous.
* **family members under 18**: Both parents present, Neither parent present, Mother only present, Father only present, Not in universe.
* **country of birth father**: Mexico, United-States, Puerto-Rico, Dominican-Republic, Jamaica, Cuba, Portugal, Nicaragua, Peru, Ecuador, Guatemala, Philippines, Canada, Columbia, El-Salvador, Japan, England, Trinadad&Tobago, Honduras, Germany, Taiwan, Outlying-U S (Guam USVI etc), India, Vietnam, China, Hong Kong, Cambodia, France, Laos, Haiti, South Korea, Iran, Greece, Italy, Poland, Thailand, Yugoslavia, Holand-Netherlands, Ireland, Scotland, Hungary, Panama.
* **country of birth mother**: India, Mexico, United-States, Puerto-Rico, Dominican-Republic, England, Honduras, Peru, Guatemala, Columbia, El-Salvador, Philippines, France, Ecuador, Nicaragua, Cuba, Outlying-U S (Guam USVI etc), Jamaica, South Korea, China, Germany, Yugoslavia, Canada, Vietnam, Japan, Cambodia, Ireland, Laos, Haiti, Portugal, Taiwan, Holand-Netherlands, Greece, Italy, Poland, Thailand, Trinadad&Tobago, Hungary, Panama, Hong Kong, Scotland, Iran.
* **country of birth self**: United-States, Mexico, Puerto-Rico, Peru, Canada, South Korea, India, Japan, Haiti, El-Salvador, Dominican-Republic, Portugal, Columbia, England, Thailand, Cuba, Laos, Panama, China, Germany, Vietnam, Italy, Honduras, Outlying-U S (Guam USVI etc), Hungary, Philippines, Poland, Ecuador, Iran, Guatemala, Holand-Netherlands, Taiwan, Nicaragua, France, Jamaica, Scotland, Yugoslavia, Hong Kong, Trinadad&Tobago, Greece, Cambodia, Ireland.
* **citizenship**: Native- Born in the United States, Foreign born- Not a citizen of U S , Native- Born in Puerto Rico or U S Outlying, Native- Born abroad of American Parent(s), Foreign born- U S citizen by naturalization.
* **own business or self employed**: 0, 2, 1.
* **fill inc questionnaire for veteran's admin**: Not in universe, Yes, No.
* **veterans benefits**: 0, 2, 1.
* **weeks worked in year**: continuous.
* **year**: 94, 95.

* **Income** : -50000 and 50000

#### 5. Define the schema from the description above (4M)

In [6]:
incomeDataSchema = StructType([
    StructField("age", IntegerType(), True),
    StructField("class_of_worker", StringType(), True),
    StructField("industry_code",StringType(), True),
    StructField("occupation_code", StringType(), True),
    StructField("education", StringType(), True),
    StructField("wage_per_hour", IntegerType(), True),
    StructField("enrol_eduinlat_wk", StringType(), True),
    StructField("marital_satus", StringType(), True),        
    StructField("major_industry_code", StringType(), True),
    StructField("major_occ_code", StringType(), True),
    StructField("race", StringType(), True),
    StructField("hispanic_origin", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("mem_of_lab_union", StringType(), True),
    StructField("reason_for_unemployment", StringType(), True),
    StructField("FT_or_PT", StringType(), True),
    StructField("CAPGAIN", IntegerType(), True),
    StructField("CAPLOSS", IntegerType(), True),
    StructField("dividend_from_stocks", IntegerType(), True),
    StructField("tax_filer_stat", StringType(), True),
    StructField("region_previous_residence", StringType(), True),
    StructField("state_previous_residence", StringType(), True),
    StructField("houshold_family_stat", StringType(), True),
    StructField("household_summary_in_household", StringType(), True),
    StructField("instance_weight", FloatType(), True),
    StructField("migration_codechange_msa", StringType(), True),
    StructField("migration_codechange_reg", StringType(), True),
    StructField("migration_codemove_reg", StringType(), True),
    StructField("live_1yr_ago", StringType(), True),
    StructField("migration_prev_res_sunbelt", StringType(), True),
    StructField("num_persons_worked_for_employer",IntegerType(), True),
    StructField("members_under_18", StringType(), True),
    StructField("country_of_birth_father", StringType(), True),
    StructField("country_of_birth_mother", StringType(), True),
    StructField("country_of_birth_self", StringType(), True),
    StructField("citizenship", StringType(), True),
    StructField("ownbusinees_selfemp", StringType(), True),
    StructField("veterans_admin", StringType(), True),
    StructField("veterans_benefit", StringType(), True),
    StructField("weeks_worked_in_year",IntegerType(), True),
    StructField("year", StringType(), True),
    StructField("income", StringType(), True)
    ])

#### 6. Read the Data from the CSV File (3M)

In [7]:
data = spark.read.csv(path = "file:///home/mahidharv/CensusIncome/Cutedataset/traindata/*",sep=",",ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,schema=incomeDataSchema,nullValue='?')
                     

#### 7.  Read/View Four rows (2M)

In [8]:
data.show(4,truncate=False)

+---+------------------------------+-------------+---------------+--------------------------+-------------+-----------------+-------------------------------+---------------------------------+-------------------------------------+-----+---------------+------+----------------+-----------------------+------------------------+-------+-------+--------------------+-------------------+-------------------------+------------------------+---------------------+------------------------------+---------------+------------------------+------------------------+----------------------+--------------------------------+--------------------------+-------------------------------+----------------+-----------------------+-----------------------+---------------------+---------------------------------+-------------------+---------------+----------------+--------------------+----+--------+
|age|class_of_worker               |industry_code|occupation_code|education                 |wage_per_hour|enrol_eduinlat_wk

#### 8. Inspect the data types of the Columns (2M)

In [9]:
data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- class_of_worker: string (nullable = true)
 |-- industry_code: string (nullable = true)
 |-- occupation_code: string (nullable = true)
 |-- education: string (nullable = true)
 |-- wage_per_hour: integer (nullable = true)
 |-- enrol_eduinlat_wk: string (nullable = true)
 |-- marital_satus: string (nullable = true)
 |-- major_industry_code: string (nullable = true)
 |-- major_occ_code: string (nullable = true)
 |-- race: string (nullable = true)
 |-- hispanic_origin: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- mem_of_lab_union: string (nullable = true)
 |-- reason_for_unemployment: string (nullable = true)
 |-- FT_or_PT: string (nullable = true)
 |-- CAPGAIN: integer (nullable = true)
 |-- CAPLOSS: integer (nullable = true)
 |-- dividend_from_stocks: integer (nullable = true)
 |-- tax_filer_stat: string (nullable = true)
 |-- region_previous_residence: string (nullable = true)
 |-- state_previous_residence: string (nullab

#### 9. Find the Total rows and Columns in the Dataset(2M)

In [10]:
print("No. of Columns = {}".format(len(data.columns)))

print('No. of Records = {}'.format(data.count()))

No. of Columns = 42
No. of Records = 99579


#### 10. Find the summary Statistics for the numerical attributes (2M)

In [11]:
data.select('age','wage_per_hour','CAPGAIN',"CAPLOSS").describe().show()

+-------+------------------+-----------------+------------------+------------------+
|summary|               age|    wage_per_hour|           CAPGAIN|           CAPLOSS|
+-------+------------------+-----------------+------------------+------------------+
|  count|             99579|            99579|             99579|             99579|
|   mean| 34.56644473232308|55.66638548288294|443.29279265708635|37.530402996615756|
| stddev|22.332816121498283|274.8698275934865| 4758.023857997538| 272.5512452587624|
|    min|                 0|                0|                 0|                 0|
|    max|                90|             9800|             99999|              4608|
+-------+------------------+-----------------+------------------+------------------+



#### 11. Find the missing Values in each Column (2M)

In [12]:
from pyspark.sql.functions import *
coulmns_with_na_percentage = data.select([round(((count(when(isnan(c) | col(c).isNull(), c))/data.count())*100),2).alias(c) for c in data.columns])

In [13]:
coulmns_with_na_percentage.show()

+---+---------------+-------------+---------------+---------+-------------+-----------------+-------------+-------------------+--------------+----+---------------+---+----------------+-----------------------+--------+-------+-------+--------------------+--------------+-------------------------+------------------------+--------------------+------------------------------+---------------+------------------------+------------------------+----------------------+------------+--------------------------+-------------------------------+----------------+-----------------------+-----------------------+---------------------+-----------+-------------------+--------------+----------------+--------------------+----+------+
|age|class_of_worker|industry_code|occupation_code|education|wage_per_hour|enrol_eduinlat_wk|marital_satus|major_industry_code|major_occ_code|race|hispanic_origin|sex|mem_of_lab_union|reason_for_unemployment|FT_or_PT|CAPGAIN|CAPLOSS|dividend_from_stocks|tax_filer_stat|region_previo

#### 12. Drop the Columns that have missing values more than 20% (3M)

In [14]:
data = data.drop("migration_codechange_msa","migration_codechange_reg","migration_codemove_reg","migration_prev_res_sunbelt")

In [15]:
data.columns

['age',
 'class_of_worker',
 'industry_code',
 'occupation_code',
 'education',
 'wage_per_hour',
 'enrol_eduinlat_wk',
 'marital_satus',
 'major_industry_code',
 'major_occ_code',
 'race',
 'hispanic_origin',
 'sex',
 'mem_of_lab_union',
 'reason_for_unemployment',
 'FT_or_PT',
 'CAPGAIN',
 'CAPLOSS',
 'dividend_from_stocks',
 'tax_filer_stat',
 'region_previous_residence',
 'state_previous_residence',
 'houshold_family_stat',
 'household_summary_in_household',
 'instance_weight',
 'live_1yr_ago',
 'num_persons_worked_for_employer',
 'members_under_18',
 'country_of_birth_father',
 'country_of_birth_mother',
 'country_of_birth_self',
 'citizenship',
 'ownbusinees_selfemp',
 'veterans_admin',
 'veterans_benefit',
 'weeks_worked_in_year',
 'year',
 'income']

#### 13. Drop the rows with NA values and work on the remaining dataset(2M)

In [16]:
data = data.na.drop()

In [17]:
data.count()

95193

#### 14. The distribution of income class on education(2M)

In [18]:
data.crosstab(col1='income',col2='education').show()

+----------------+----------+----------+---------------------+------------------------+----------------+-----------------+---------+----------------------------------+-----------------------------------+--------------------------+--------+-------------------------+--------------------+-------------------+--------------------------------------+--------------------------------------+--------------------------+
|income_education|10th grade|11th grade|12th grade no diploma|1st 2nd 3rd or 4th grade|5th or 6th grade|7th and 8th grade|9th grade|Associates degree-academic program|Associates degree-occup /vocational|Bachelors degree(BA AB BS)|Children|Doctorate degree(PhD EdD)|High school graduate|Less than 1st grade|Masters degree(MA MS MEng MEd MSW MBA)|Prof school degree (MD DDS DVM LLB JD)|Some college but no degree|
+----------------+----------+----------+---------------------+------------------------+----------------+-----------------+---------+----------------------------------+---------

#### 15. Find the Correlation between  Columns (2M)

In [19]:
from pyspark.sql.functions import corr
data.select(corr('weeks_worked_in_year','num_persons_worked_for_employer')).show()


+-----------------------------------------------------------+
|corr(weeks_worked_in_year, num_persons_worked_for_employer)|
+-----------------------------------------------------------+
|                                         0.7495145872631154|
+-----------------------------------------------------------+



#### 16. Define the schema dict from data type of the Data Frame (2M)

In [20]:
schema_dict = dict(data.dtypes)

#### 17. Write code to Seperate the columns in Categorical and Numerical attributes (Not Manual)(2M)

In [21]:
num_Var_Names=[]
cat_Var_Names=[]
schema_dict = dict(data.dtypes)
iterator = [key for key in schema_dict.keys() if key!='income' ]
for col in iterator:
    if schema_dict[col]=='int':
        
            num_Var_Names.append(col)
    else:
        cat_Var_Names.append(col)

In [22]:
num_Var_Names

['dividend_from_stocks',
 'weeks_worked_in_year',
 'num_persons_worked_for_employer',
 'CAPLOSS',
 'CAPGAIN',
 'wage_per_hour',
 'age']

In [23]:
cat_Var_Names

['country_of_birth_self',
 'tax_filer_stat',
 'marital_satus',
 'instance_weight',
 'sex',
 'country_of_birth_father',
 'year',
 'citizenship',
 'education',
 'country_of_birth_mother',
 'enrol_eduinlat_wk',
 'members_under_18',
 'mem_of_lab_union',
 'veterans_admin',
 'occupation_code',
 'industry_code',
 'region_previous_residence',
 'hispanic_origin',
 'state_previous_residence',
 'FT_or_PT',
 'ownbusinees_selfemp',
 'reason_for_unemployment',
 'live_1yr_ago',
 'major_occ_code',
 'major_industry_code',
 'houshold_family_stat',
 'veterans_benefit',
 'class_of_worker',
 'race',
 'household_summary_in_household']

#### 18. Split the data into train and test (2M)

In [24]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#### 19. Cache the train and validation data sets and unpersist data (2M)¶

In [25]:
trainingData.cache()
testData.cache()
data.unpersist()

DataFrame[age: int, class_of_worker: string, industry_code: string, occupation_code: string, education: string, wage_per_hour: int, enrol_eduinlat_wk: string, marital_satus: string, major_industry_code: string, major_occ_code: string, race: string, hispanic_origin: string, sex: string, mem_of_lab_union: string, reason_for_unemployment: string, FT_or_PT: string, CAPGAIN: int, CAPLOSS: int, dividend_from_stocks: int, tax_filer_stat: string, region_previous_residence: string, state_previous_residence: string, houshold_family_stat: string, household_summary_in_household: string, instance_weight: float, live_1yr_ago: string, num_persons_worked_for_employer: int, members_under_18: string, country_of_birth_father: string, country_of_birth_mother: string, country_of_birth_self: string, citizenship: string, ownbusinees_selfemp: string, veterans_admin: string, veterans_benefit: string, weeks_worked_in_year: int, year: string, income: string]

#### 20. Check for the Class balance in the train and test data set (2M)

In [26]:
trainingData.groupby('income').count().show()
testData.groupby('income').count().show()

+--------+-----+
|  income|count|
+--------+-----+
| 50000+.| 4065|
|- 50000.|62529|
+--------+-----+

+--------+-----+
|  income|count|
+--------+-----+
| 50000+.| 1721|
|- 50000.|26878|
+--------+-----+



#### 21.  Perform the required feature Preprocessing (10M)

In [27]:
from pyspark.ml.feature import VectorAssembler

assembler_Num = VectorAssembler(inputCols=num_Var_Names, outputCol="num_features")

In [28]:
from pyspark.ml.feature import MinMaxScaler

min_Max_Scalar = MinMaxScaler(inputCol="num_features", outputCol="scaled_num_features")

In [29]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

indexers_Cat = [StringIndexer(inputCol=cat_Var_Name, outputCol="{0}_index".format(cat_Var_Name),handleInvalid='skip') for cat_Var_Name in cat_Var_Names ]
encoders_Cat = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_vec".format(indexer.getInputCol())) for indexer in indexers_Cat]
assembler_Cat = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders_Cat], outputCol="cat_features")

assembler = VectorAssembler(inputCols=["scaled_num_features","cat_features"], outputCol="features")

In [30]:
indexer_Label = StringIndexer(inputCol="income", outputCol="label")

In [31]:
preprocessiong_Stages = [assembler_Num]+[min_Max_Scalar]+indexers_Cat+encoders_Cat+[assembler_Cat]+[assembler]+[indexer_Label]

#### 22. Create the Logistic regression Model(5M)

In [32]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, labelCol="label", featuresCol="features")

In [33]:
import gc
gc.collect()

247

In [34]:
from pyspark.ml import Pipeline

lr_Pipeline = Pipeline(stages=preprocessiong_Stages+[lr]) 

lr_Pipeline_model = lr_Pipeline.fit(trainingData)

In [35]:
lr_Summary = lr_Pipeline_model.stages[-1].summary
objectiveHistory = lr_Summary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

objectiveHistory:
0.229823778247
0.15131133058
0.0967792860514
0.0846682297285
0.0761826914562
0.0549582595968
0.0372787516928
0.0314933561042
0.0285251447918
0.0264393914284
0.0251569738356


In [36]:
train_predictions_lr = lr_Pipeline_model.transform(trainingData)
test_predictions_lr = lr_Pipeline_model.transform(testData)

In [37]:
test_predictions_lr.show(2)

+---+---------------+-------------+---------------+---------+-------------+-----------------+-------------+--------------------+---------------+--------------------+---------------+----+----------------+-----------------------+--------------------+-------+-------+--------------------+--------------+-------------------------+------------------------+--------------------+------------------------------+---------------+--------------------+-------------------------------+--------------------+-----------------------+-----------------------+---------------------+--------------------+-------------------+---------------+----------------+--------------------+----+--------+------------+--------------------+---------------------------+--------------------+-------------------+---------------------+---------+-----------------------------+----------+-----------------+---------------+-----------------------------+-----------------------+----------------------+----------------------+------------------

#### 23. What are train and validation(test) accuracies(2M)

In [38]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

predictionAndLabels_train_lr = train_predictions_lr.select("prediction", "label")
train_accuracy_lr = evaluator.evaluate(predictionAndLabels_train_lr)

print("Train accuracy  = " + str(train_accuracy_lr))

Train accuracy  = 0.990119229961


In [39]:
predictionAndLabels_test_lr = test_predictions_lr.select("prediction", "label")

In [40]:
test_accuracy_lr = evaluator.evaluate(predictionAndLabels_test_lr)

In [41]:
print("Test accuracy = " + str(test_accuracy_lr))

Test accuracy = 0.8987675374


In [42]:
gc.collect()

505

#### 24. Perform the necessary tuning methods(2M)

In [43]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1]) \
    .addGrid(lr.elasticNetParam, [0.2])\
    .build()
    
lr_crossval = CrossValidator(estimator=lr_Pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             numFolds=2)     

In [44]:
lr_crossval_Model = lr_crossval.fit(trainingData)

In [45]:
train_predictions_lrcv = lr_crossval_Model.transform(trainingData)
test_predictions_lrcv = lr_crossval_Model.transform(testData)

In [46]:
predictionAndLabels_train_lrcv = train_predictions_lrcv.select("prediction", "label")
train_accuracycv = evaluator.evaluate(predictionAndLabels_train_lrcv)
print("Train set accuracy  = " + str(train_accuracycv))

predictionAndLabels_test_lrcv = test_predictions_lrcv.select("prediction", "label")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_lrcv)
print("Test set accuracy = " + str(test_accuracycv))

Train set accuracy  = 0.940580232453
Test set accuracy = 0.919541120843
