In [1]:
!pwd

/home/2305B47/BigData_cute


#### 1. Create the Spark Environment  (2M)

In [2]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

#### 2. Load the required Libraries for Spark Context and Spark Session (2M)

In [3]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

#### 3. Create the Spark Context and Spark Session (2M)

In [4]:
spark = SparkSession.builder\
    .appName("SparkML")\
    .master('local[*]')\
    .enableHiveSupport()\
    .config('spark.sql.warehouse.dir', 'hdfs://bigdata:8020/user/chaithanyas/spark-warehouse')\
    .getOrCreate()
sc = spark.sparkContext

#### 4. Load the libraries for schema definition in Pyspark (2M)

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.sql.functions import isnan, when, count, col, countDistinct

#### Problem Statement:
* This data was extracted from the census bureau database.The task is to classify the records based on the income field.Incomes have been binned at the 50K level to present a binary classification problem.The instance_weight attribute should not be used in the classifier. All the other attributes and their description are givn below.

#### Description of the Attributes
* **age**: continuous.
* **class of worker**: Not in universe, Federal government, Local government, Never worked, Private, Self-employed- incorporated, Self-employed-not incorporated, State government, Without pay.
* **detailed industry recode**: 0, 40, 44, 2, 43, 47, 48, 1, 11, 19, 24, 25, 32, 33, 34, 35, 36, 37, 38, 39, 4, 42, 45, 5, 15, 16, 22, 29, 31, 50, 14, 17, 18, 28, 3, 30, 41, 46, 51, 12, 13, 21, 23, 26, 6, 7, 9, 49, 27, 8, 10, 20.
* **detailed occupation recode** : 0, 12, 31, 44, 19, 32, 10, 23, 26, 28, 29, 42, 40, 34, 14, 36, 38, 2, 20, 25, 37, 41, 27, 24, 30, 43, 33, 16, 45, 17, 35, 22, 18, 39, 3, 15, 13, 46, 8, 21, 9, 4, 6, 5, 1, 11, 7.
* **education**: Children, 7th and 8th grade, 9th grade, 10th grade, High school graduate, 11th grade, 12th grade no diploma, 5th or 6th grade, Less than 1st grade, Bachelors degree(BA AB BS), 1st 2nd 3rd or 4th grade, Some college but no degree, Masters degree(MA MS MEng MEd MSW MBA), Associates degree-occup /vocational, Associates degree-academic program, Doctorate degree(PhD EdD), Prof school degree (MD DDS DVM LLB JD).
* **wage per hour**: continuous.
* **enroll in edu inst last wk**: Not in universe, High school, College or university.
* **marital stat**: Never married, Married-civilian spouse present, Married-spouse absent, Separated, Divorced, Widowed, Married-A F spouse present.
* **major industry code**: Not in universe or children, Entertainment, Social services, Agriculture, Education, Public administration, Manufacturing-durable goods, Manufacturing-nondurable goods, Wholesale trade, Retail trade, Finance insurance and real estate, Private household services, Business and repair services, Personal services except private HH, Construction, Medical except hospital, Other professional services, Transportation, Utilities and sanitary services, Mining, Communications, Hospital services, Forestry and fisheries, Armed Forces.
* **major occupation code**: Not in universe, Professional specialty, Other service, Farming forestry and fishing, Sales, Adm support including clerical, Protective services, Handlers equip cleaners etc , Precision production craft & repair, Technicians and related support, Machine operators assmblrs & inspctrs, Transportation and material moving, Executive admin and managerial, Private household services, Armed Forces.
* **race**: White, Black, Other, Amer Indian Aleut or Eskimo, Asian or Pacific Islander.
* **hispanic origin**: Mexican (Mexicano), Mexican-American, Puerto Rican, Central or South American, All other, Other Spanish, Chicano, Cuban, Do not know, NA.
* **sex**: Female, Male.
* **member of a labor union**: Not in universe, No, Yes.
* **reason for unemployment**: Not in universe, Re-entrant, Job loser - on layoff, New entrant, Job leaver, Other job loser.
* **Full or part time employment stat**: Children or Armed Forces, Full-time schedules, Unemployed part- time, Not in labor force, Unemployed full-time, PT for non-econ reasons usually FT, PT for econ reasons usually PT, PT for econ reasons usually FT.
* **capital gains**: continuous.
* **capital losses**: continuous.
* **dividends from stocks**: continuous.
* **tax filer stat**: Nonfiler, Joint one under 65 & one 65+, Joint both under 65, Single, Head of household, Joint both 65+.
* **region of previous residence**: Not in universe, South, Northeast, West, Midwest, Abroad.
* **state of previous residence**: Not in universe, Utah, Michigan, North Carolina, North Dakota, Virginia, Vermont, Wyoming, West Virginia, Pennsylvania, Abroad, Oregon, California, Iowa, Florida, Arkansas, Texas, South Carolina, Arizona, Indiana, Tennessee, Maine, Alaska, Ohio, Montana, Nebraska, Mississippi, District of Columbia, Minnesota, Illinois, Kentucky, Delaware, Colorado, Maryland, Wisconsin, New Hampshire, Nevada, New York, Georgia, Oklahoma, New Mexico, South Dakota, Missouri, Kansas, Connecticut, Louisiana, Alabama, Massachusetts, Idaho, New Jersey.
* **detailed household and family stat**: Child <18 never marr not in subfamily, Other Rel <18 never marr child of subfamily RP, Other Rel <18 never marr not in subfamily, Grandchild <18 never marr child of subfamily RP, Grandchild <18 never marr not in subfamily, Secondary individual, In group quarters, Child under 18 of RP of unrel subfamily, RP of unrelated subfamily, Spouse of householder, Householder, Other Rel <18 never married RP of subfamily, Grandchild <18 never marr RP of subfamily, Child <18 never marr RP of subfamily, Child <18 ever marr not in subfamily, Other Rel <18 ever marr RP of subfamily, Child <18 ever marr RP of subfamily, Nonfamily householder, Child <18 spouse of subfamily RP, Other Rel <18 spouse of subfamily RP, Other Rel <18 ever marr not in subfamily, Grandchild <18 ever marr not in subfamily, Child 18+ never marr Not in a subfamily, Grandchild 18+ never marr not in subfamily, Child 18+ ever marr RP of subfamily, Other Rel 18+ never marr not in subfamily, Child 18+ never marr RP of subfamily, Other Rel 18+ ever marr RP of subfamily, Other Rel 18+ never marr RP of subfamily, Other Rel 18+ spouse of subfamily RP, Other Rel 18+ ever marr not in subfamily, Child 18+ ever marr Not in a subfamily, Grandchild 18+ ever marr not in subfamily, Child 18+ spouse of subfamily RP, Spouse of RP of unrelated subfamily, Grandchild 18+ ever marr RP of subfamily, Grandchild 18+ never marr RP of subfamily, Grandchild 18+ spouse of subfamily RP.
* **detailed household summary in household**: Child under 18 never married, Other relative of householder, Nonrelative of householder, Spouse of householder, Householder, Child under 18 ever married, Group Quarters- Secondary individual, Child 18 or older.
| instance weight: ignore.
* **instance weight**: continuous.
* **migration code-change in msa**: Not in universe, Nonmover, MSA to MSA, NonMSA to nonMSA, MSA to nonMSA, NonMSA to MSA, Abroad to MSA, Not identifiable, Abroad to nonMSA.
* **migration code-change in reg**: Not in universe, Nonmover, Same county, Different county same state, Different state same division, Abroad, Different region, Different division same region.
* **migration code-move within reg**: Not in universe, Nonmover, Same county, Different county same state, Different state in West, Abroad, Different state in Midwest, Different state in South, Different state in Northeast.
* **live in this house 1 year ago**: Not in universe under 1 year old, Yes, No.
* **migration prev res in sunbelt**: Not in universe, Yes, No.
* **num persons worked for employer**: continuous.
* **family members under 18**: Both parents present, Neither parent present, Mother only present, Father only present, Not in universe.
* **country of birth father**: Mexico, United-States, Puerto-Rico, Dominican-Republic, Jamaica, Cuba, Portugal, Nicaragua, Peru, Ecuador, Guatemala, Philippines, Canada, Columbia, El-Salvador, Japan, England, Trinadad&Tobago, Honduras, Germany, Taiwan, Outlying-U S (Guam USVI etc), India, Vietnam, China, Hong Kong, Cambodia, France, Laos, Haiti, South Korea, Iran, Greece, Italy, Poland, Thailand, Yugoslavia, Holand-Netherlands, Ireland, Scotland, Hungary, Panama.
* **country of birth mother**: India, Mexico, United-States, Puerto-Rico, Dominican-Republic, England, Honduras, Peru, Guatemala, Columbia, El-Salvador, Philippines, France, Ecuador, Nicaragua, Cuba, Outlying-U S (Guam USVI etc), Jamaica, South Korea, China, Germany, Yugoslavia, Canada, Vietnam, Japan, Cambodia, Ireland, Laos, Haiti, Portugal, Taiwan, Holand-Netherlands, Greece, Italy, Poland, Thailand, Trinadad&Tobago, Hungary, Panama, Hong Kong, Scotland, Iran.
* **country of birth self**: United-States, Mexico, Puerto-Rico, Peru, Canada, South Korea, India, Japan, Haiti, El-Salvador, Dominican-Republic, Portugal, Columbia, England, Thailand, Cuba, Laos, Panama, China, Germany, Vietnam, Italy, Honduras, Outlying-U S (Guam USVI etc), Hungary, Philippines, Poland, Ecuador, Iran, Guatemala, Holand-Netherlands, Taiwan, Nicaragua, France, Jamaica, Scotland, Yugoslavia, Hong Kong, Trinadad&Tobago, Greece, Cambodia, Ireland.
* **citizenship**: Native- Born in the United States, Foreign born- Not a citizen of U S , Native- Born in Puerto Rico or U S Outlying, Native- Born abroad of American Parent(s), Foreign born- U S citizen by naturalization.
* **own business or self employed**: 0, 2, 1.
* **fill inc questionnaire for veteran's admin**: Not in universe, Yes, No.
* **veterans benefits**: 0, 2, 1.
* **weeks worked in year**: continuous.
* **year**: 94, 95.

* **Income** : -50000 and 50000

#### 5. Define the schema from the description above (4M)

In [6]:
# bankDataSchema = StructType([
#     StructField("age", IntegerType(), True),
#     StructField("class of worker", StringType(), True),
#     StructField("detailed occupation recode", IntegerType(), True),
#     StructField("education", StringType(), True),
#     StructField("wage per hour", IntegerType(), True),
#     StructField("enroll in edu inst last wk", StringType(), True),
#     StructField("marital stat", StringType(), True),
#     StructField("major industry code", StringType(), True),        
#     StructField("major occupation code", StringType(), True),
#     StructField("hispanic origin", StringType(), True),
#     StructField("race", StringType(), True),
#     StructField("sex", StringType(), True),
#     StructField("member of a labor union", StringType(), True),
#     StructField("reason for unemployment", StringType(), True),
#     StructField("Full or part time employment stat", StringType(), True),
#     StructField("capital gains", IntegerType(), True),
#     StructField("capital losses", IntegerType(), True),
#     StructField("dividends from stocks", IntegerType(), True),
#     StructField("tax filer stat", StringType(), True),
#     StructField("region of previous residence", StringType(), True),
#     StructField("capital gains", IntegerType(), True),
#     StructField("capital gains", IntegerType(), True),
#     StructField("capital gains", IntegerType(), True),
#     StructField("capital gains", IntegerType(), True)])
censusDataSchema = StructType([
    StructField("age", IntegerType(), True),
    StructField("class_of_worker", StringType(), True),
    StructField("detailed_industry_recode", IntegerType(), True),
    StructField("detailed_occupation_recode", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("wage_per_hour", DoubleType(), True),
    StructField("enroll_inst", StringType(), True),
    StructField("marital_stat", StringType(), True),        
    StructField("industry_code", StringType(), True),
    StructField("occupation_code", StringType(), True),
    StructField("race", StringType(), True),
    StructField("hispanic_origin", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("labor_union", StringType(), True),
    StructField("reason_unemployment", StringType(), True),
    StructField("employment_stat", StringType(), True),
    StructField("capital_gains", DoubleType(), True),
    StructField("capital_losses", DoubleType(), True),
    StructField("dividends", DoubleType(), True),
    StructField("tax_filer_stat", StringType(), True),
    StructField("region_previous_residence", StringType(), True),
    StructField("state_previous_residence", StringType(), True),
    StructField("detailed_family_stat", StringType(), True),
    StructField("detailed_house_summary", StringType(), True),
    StructField("instance_weight", DoubleType(), True),
    StructField("migration_code_msa", StringType(), True),
    StructField("migration_code_in_reg", StringType(), True),
    StructField("migration_code_within_reg", StringType(), True),
    StructField("1_year_ago", StringType(), True),
    StructField("prev_res_sunbelt", StringType(), True),
    StructField("num_persons_worked", IntegerType(), True),
    StructField("family_under_18", StringType(), True),
    StructField("country_birth_father", StringType(), True),
    StructField("country_birth_mother", StringType(), True),
    StructField("country_birth_self", StringType(), True),
    StructField("citizenship", StringType(), True),
    StructField("business_type", StringType(), True),
    StructField("fill_questionnaire", StringType(), True),
    StructField("veterans_benefits", StringType(), True),
    StructField("weeks_worked", IntegerType(), True),
    StructField("year", StringType(), True),
    StructField("Income", StringType(), True)])

#### 6. Read the Data from the CSV File (3M)

In [7]:
data = spark.read.csv(path='/user/2305B47/BigDataCute/trainData.csv',
                      header=False,
                      schema=censusDataSchema)

#### 7.  Read/View Four rows (2M)

In [8]:
data.head(4)
data.show(4)

+---+--------------------+------------------------+--------------------------+--------------------+-------------+---------------+--------------------+--------------------+--------------------+-----+---------------+------+---------------+-------------------+--------------------+-------------+--------------+---------+-------------------+-------------------------+------------------------+--------------------+----------------------+---------------+------------------+---------------------+-------------------------+--------------------+----------------+------------------+---------------+--------------------+--------------------+------------------+--------------------+-------------+------------------+-----------------+------------+----+--------+
|age|     class_of_worker|detailed_industry_recode|detailed_occupation_recode|           education|wage_per_hour|    enroll_inst|        marital_stat|       industry_code|     occupation_code| race|hispanic_origin|   sex|    labor_union|reason_unemplo

#### 8. Inspect the data types of the Columns (2M)

In [9]:
data.dtypes

[('age', 'int'),
 ('class_of_worker', 'string'),
 ('detailed_industry_recode', 'int'),
 ('detailed_occupation_recode', 'int'),
 ('education', 'string'),
 ('wage_per_hour', 'double'),
 ('enroll_inst', 'string'),
 ('marital_stat', 'string'),
 ('industry_code', 'string'),
 ('occupation_code', 'string'),
 ('race', 'string'),
 ('hispanic_origin', 'string'),
 ('sex', 'string'),
 ('labor_union', 'string'),
 ('reason_unemployment', 'string'),
 ('employment_stat', 'string'),
 ('capital_gains', 'double'),
 ('capital_losses', 'double'),
 ('dividends', 'double'),
 ('tax_filer_stat', 'string'),
 ('region_previous_residence', 'string'),
 ('state_previous_residence', 'string'),
 ('detailed_family_stat', 'string'),
 ('detailed_house_summary', 'string'),
 ('instance_weight', 'double'),
 ('migration_code_msa', 'string'),
 ('migration_code_in_reg', 'string'),
 ('migration_code_within_reg', 'string'),
 ('1_year_ago', 'string'),
 ('prev_res_sunbelt', 'string'),
 ('num_persons_worked', 'int'),
 ('family_und

#### 9. Find the Total rows and Columns in the Dataset(2M)

In [10]:
print('No. of Rows = {}'.format(data.count()))
print("No. of Columns = {}".format(len(data.columns)))

No. of Rows = 99579
No. of Columns = 42


#### 10. Find the summary Statistics for the numerical attributes (2M)

In [11]:
CatVar = []
NumVar = []
def variableSplitting(data):
    for c in data.dtypes:
        if c[1] == 'string':
            CatVar.append(c[0])  
        else:
            NumVar.append(c[0])
        
    return NumVar, CatVar
(NumVar, CatVar) = variableSplitting(data)
print(NumVar)
print(CatVar)

['age', 'detailed_industry_recode', 'detailed_occupation_recode', 'wage_per_hour', 'capital_gains', 'capital_losses', 'dividends', 'instance_weight', 'num_persons_worked', 'weeks_worked']
['class_of_worker', 'education', 'enroll_inst', 'marital_stat', 'industry_code', 'occupation_code', 'race', 'hispanic_origin', 'sex', 'labor_union', 'reason_unemployment', 'employment_stat', 'tax_filer_stat', 'region_previous_residence', 'state_previous_residence', 'detailed_family_stat', 'detailed_house_summary', 'migration_code_msa', 'migration_code_in_reg', 'migration_code_within_reg', '1_year_ago', 'prev_res_sunbelt', 'family_under_18', 'country_birth_father', 'country_birth_mother', 'country_birth_self', 'citizenship', 'business_type', 'fill_questionnaire', 'veterans_benefits', 'year', 'Income']


In [12]:
data.select(NumVar).describe().show()

+-------+------------------+------------------------+--------------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|               age|detailed_industry_recode|detailed_occupation_recode|    wage_per_hour|     capital_gains|    capital_losses|         dividends|   instance_weight|num_persons_worked|      weeks_worked|
+-------+------------------+------------------------+--------------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|             99579|                   99579|                     99579|            99579|             99579|             99579|             99579|             99579|             99579|             99579|
|   mean| 34.56644473232308|      15.372909950893261|        11.260958635856959|55.66638548288294|443.29279265708635|37.530402996615756| 197.0633065

#### 11. Find the missing Values in each Column (2M)

In [13]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
             for c in data.columns]).show()

+---+---------------+------------------------+--------------------------+---------+-------------+-----------+------------+-------------+---------------+----+---------------+---+-----------+-------------------+---------------+-------------+--------------+---------+--------------+-------------------------+------------------------+--------------------+----------------------+---------------+------------------+---------------------+-------------------------+----------+----------------+------------------+---------------+--------------------+--------------------+------------------+-----------+-------------+------------------+-----------------+------------+----+------+
|age|class_of_worker|detailed_industry_recode|detailed_occupation_recode|education|wage_per_hour|enroll_inst|marital_stat|industry_code|occupation_code|race|hispanic_origin|sex|labor_union|reason_unemployment|employment_stat|capital_gains|capital_losses|dividends|tax_filer_stat|region_previous_residence|state_previous_residence|

#### 12. Drop the Columns that have missing values more than 20% (3M)

In [14]:
from pyspark.sql.functions import col

for c in data.columns:
    if data.select(c).filter(col(c).isNull()).count() > (data.count()*0.20):
        data = data.drop(c)

#### 13. Drop the rows with NA values and work on the remaining dataset(2M)

In [15]:
# data = data.dropna(how='any', thresh=None, subset=None)
data=data.na.drop()
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
             for c in data.columns]).show()
# data.filter(data.col("country_birth_mother").isNotNull())
# data.na.drop(["country_birth_mother"])
# dropna(how='any', thresh=None, subset=None)

+---+---------------+------------------------+--------------------------+---------+-------------+-----------+------------+-------------+---------------+----+---------------+---+-----------+-------------------+---------------+-------------+--------------+---------+--------------+-------------------------+------------------------+--------------------+----------------------+---------------+----------+------------------+---------------+--------------------+--------------------+------------------+-----------+-------------+------------------+-----------------+------------+----+------+
|age|class_of_worker|detailed_industry_recode|detailed_occupation_recode|education|wage_per_hour|enroll_inst|marital_stat|industry_code|occupation_code|race|hispanic_origin|sex|labor_union|reason_unemployment|employment_stat|capital_gains|capital_losses|dividends|tax_filer_stat|region_previous_residence|state_previous_residence|detailed_family_stat|detailed_house_summary|instance_weight|1_year_ago|num_persons_w

In [16]:
data = data.drop(data.instance_weight)

#### 14. The distribution of income class on education(2M)

In [17]:
data.groupBy(data.education,data.Income).count().orderBy(data.education).show()

+--------------------+--------+-----+
|           education|  Income|count|
+--------------------+--------+-----+
|          10th grade|- 50000.| 3554|
|          10th grade| 50000+.|   33|
|          11th grade| 50000+.|   28|
|          11th grade|- 50000.| 3299|
|12th grade no dip...|- 50000.|  961|
|12th grade no dip...| 50000+.|   16|
|1st 2nd 3rd or 4t...| 50000+.|    5|
|1st 2nd 3rd or 4t...|- 50000.|  864|
|    5th or 6th grade|- 50000.| 1528|
|    5th or 6th grade| 50000+.|    9|
|   7th and 8th grade| 50000+.|   36|
|   7th and 8th grade|- 50000.| 3710|
|           9th grade| 50000+.|   17|
|           9th grade|- 50000.| 2997|
|Associates degree...| 50000+.|  211|
|Associates degree...|- 50000.| 1887|
|Associates degree...|- 50000.| 2379|
|Associates degree...| 50000+.|  205|
|Bachelors degree(...| 50000+.| 1849|
|Bachelors degree(...|- 50000.| 7440|
+--------------------+--------+-----+
only showing top 20 rows



#### 15. Find the Correlation between  Columns : "weeks worked in year" and "no. of persons worked for employer" (2M)

In [18]:
# from pyspark.ml.stat import Correlation

# Correlation.corr(data, "weeks_worked", "num_persons_worked" )
data.stat.corr('weeks_worked', 'num_persons_worked')

0.7495145872631152

#### 16. Define the schema dict from data type of the Data Frame (2M)

In [19]:
data.dtypes
data_dict = dict(data.dtypes)
print(data_dict)



{'employment_stat': 'string', 'tax_filer_stat': 'string', 'capital_gains': 'double', 'country_birth_father': 'string', 'detailed_house_summary': 'string', 'Income': 'string', 'sex': 'string', 'year': 'string', 'education': 'string', 'labor_union': 'string', 'citizenship': 'string', 'marital_stat': 'string', 'weeks_worked': 'int', '1_year_ago': 'string', 'occupation_code': 'string', 'industry_code': 'string', 'region_previous_residence': 'string', 'num_persons_worked': 'int', 'detailed_occupation_recode': 'int', 'country_birth_mother': 'string', 'fill_questionnaire': 'string', 'detailed_industry_recode': 'int', 'hispanic_origin': 'string', 'state_previous_residence': 'string', 'capital_losses': 'double', 'dividends': 'double', 'veterans_benefits': 'string', 'country_birth_self': 'string', 'business_type': 'string', 'enroll_inst': 'string', 'wage_per_hour': 'double', 'age': 'int', 'class_of_worker': 'string', 'race': 'string', 'reason_unemployment': 'string', 'detailed_family_stat': 'str

#### 17. Write code to Seperate the columns in Categorical and Numerical attributes using the datatypes dictionary (Not Manual)(2M)

In [20]:
Cat_Var = []
Num_Var = []
for key, value in data_dict.iteritems():
    if value == 'string':
        Cat_Var.append(key)
    else :
        Num_Var.append(key)

#### 18. Split the data into train and test (2M)

In [21]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])
print(trainingData.count())
print(testData.count())

66536
28657


#### 19. Cache the train and validation data sets and unpersist data (2M)¶

In [22]:
trainingData.cache()
testData.cache()
data.unpersist()

DataFrame[age: int, class_of_worker: string, detailed_industry_recode: int, detailed_occupation_recode: int, education: string, wage_per_hour: double, enroll_inst: string, marital_stat: string, industry_code: string, occupation_code: string, race: string, hispanic_origin: string, sex: string, labor_union: string, reason_unemployment: string, employment_stat: string, capital_gains: double, capital_losses: double, dividends: double, tax_filer_stat: string, region_previous_residence: string, state_previous_residence: string, detailed_family_stat: string, detailed_house_summary: string, 1_year_ago: string, num_persons_worked: int, family_under_18: string, country_birth_father: string, country_birth_mother: string, country_birth_self: string, citizenship: string, business_type: string, fill_questionnaire: string, veterans_benefits: string, weeks_worked: int, year: string, Income: string]

#### 20. Check for the Class balance in the train and test data set (2M)

In [23]:
trainingData.groupBy(trainingData.Income).count().show()

testData.groupBy(trainingData.Income).count().show()

+--------+-----+
|  Income|count|
+--------+-----+
| 50000+.| 4081|
|- 50000.|62455|
+--------+-----+

+--------+-----+
|  Income|count|
+--------+-----+
| 50000+.| 1705|
|- 50000.|26952|
+--------+-----+



#### 21.  Perform the required feature Preprocessing  and create the pipeline for the flow: (10M)

In [24]:
Cat_Var.remove("Income")

In [25]:
from pyspark.ml.feature import VectorAssembler

assembler_Num = VectorAssembler(inputCols=Num_Var, outputCol="num_features")

from pyspark.ml.feature import MinMaxScaler

min_Max_Scalar = MinMaxScaler(inputCol="num_features", outputCol="scaled_num_features")
# Cat_Var.remove('Income')

In [26]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

indexers_Cat = [StringIndexer(inputCol=cat_Var_Name, handleInvalid='keep', outputCol="{0}_index".format(cat_Var_Name)) for cat_Var_Name in Cat_Var ]
encoders_Cat = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_vec".format(indexer.getInputCol())) for indexer in indexers_Cat]
assembler_Cat = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders_Cat], outputCol="cat_features")

assembler = VectorAssembler(inputCols=["scaled_num_features", "cat_features"], outputCol="features")

In [27]:
indexer_Label = StringIndexer(inputCol="Income", outputCol="label")

In [28]:
preprocessiong_Stages = [assembler_Num]+[min_Max_Scalar]+indexers_Cat+encoders_Cat+[assembler_Cat]+[assembler]+[indexer_Label]

In [29]:
import gc
gc.collect()

32

#### 22. Create the Logistic regression Model(5M)

In [30]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, labelCol="label", featuresCol="features")

In [31]:
from pyspark.ml import Pipeline

lr_Pipeline = Pipeline(stages=preprocessiong_Stages+[lr]) 

lr_Pipeline_model = lr_Pipeline.fit(trainingData)

* Print the objective history.

In [32]:
# print("Coefficients: " + str(lr_Pipeline_model.stages[-1].coefficients))
# print("Intercept: " + str(lr_Pipeline_model.stages[-1].intercept))

lr_Summary = lr_Pipeline_model.stages[-1].summary
objectiveHistory = lr_Summary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

objectiveHistory:
0.230625722838
0.19104693132
0.150907815599
0.145387620438
0.140426120397
0.137673920945
0.135326729591
0.132719428186
0.12892305362
0.126120743824
0.125326579264


#### 23. Answer the following questions (2M)

* What is the train accuracy ?

In [33]:
train_predictions_lr = lr_Pipeline_model.transform(trainingData)
test_predictions_lr = lr_Pipeline_model.transform(testData)

In [34]:
train_predictions_lr = lr_Pipeline_model.transform(trainingData)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", 
                                              predictionCol="prediction",
                                              metricName="accuracy")

predictionAndLabels_train_lr = train_predictions_lr.select("prediction", "label")
train_accuracy_lr = evaluator.evaluate(predictionAndLabels_train_lr)

print("Train accuracy  = " + str(train_accuracy_lr))
# test_predictions_lr = lr_Pipeline_model.transform(testData)

# test_predictions_lr = lr_Pipeline_model.transform(testData)
# predictionAndLabels_test_lr = test_predictions_lr.select("prediction", "label")
# test_accuracy_lr = evaluator.evaluate(predictionAndLabels_test_lr)

# print("Test accuracy = " + str(test_accuracy_lr))

Train accuracy  = 0.951800529037


* What is test accuracy?

In [35]:
test_predictions_lr = lr_Pipeline_model.transform(testData)
predictionAndLabels_test_lr = test_predictions_lr.select("prediction", "label")
test_accuracy_lr = evaluator.evaluate(predictionAndLabels_test_lr)

print("Test accuracy = " + str(test_accuracy_lr))

Test accuracy = 0.951983808494


* What is your observations ? Write your oservations in the markdown format.

There were some NA values in the data which we removed in processing

In [36]:
gc.collect()

774

#### 24. Perform the necessary tuning methods(2M)

In [37]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1]) \
    .addGrid(lr.elasticNetParam, [0.5])\
    .build()
    
lr_crossval = CrossValidator(estimator=lr_Pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             numFolds=2)   

In [38]:
# Run cross-validation, and choose the best set of parameters.
lr_crossval_Model = lr_crossval.fit(trainingData)

In [39]:
train_predictions_lrcv = lr_crossval_Model.transform(trainingData)
test_predictions_lrcv = lr_crossval_Model.transform(testData)

In [40]:
predictionAndLabels_train_lrcv = train_predictions_lrcv.select("prediction", "label")
train_accuracycv = evaluator.evaluate(predictionAndLabels_train_lrcv)
print("Train set accuracy  = " + str(train_accuracycv))

predictionAndLabels_test_lrcv = test_predictions_lrcv.select("prediction", "label")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_lrcv)
print("Test set accuracy = " + str(test_accuracycv))

Train set accuracy  = 0.938664782975
Test set accuracy = 0.940503192937


#### 25. Save the Crossvalidated model and load the model and pass the test data (7M)

In [42]:
# clusters.save(sc, "/user/2305B47/BigDataCute/Model/kmeanModel")
# /user/2305B47/BigDataCute/Models
lr_crossval_Model.bestModel.save("/user/2305B47/BigDataCute/Models/logi_model3")
## Go through the documentation.

In [43]:
from pyspark.ml import PipelineModel

saved_model = PipelineModel.load("/user/2305B47/BigDataCute/Models/logi_model3")
# saved_model = lr_Pipeline_model.load("/user/2305B47/BigDataCute/Models/logi_model")

In [44]:
test_predictions = saved_model.transform(testData)
predictionAndLabels_test_lrcv = test_predictions.select("prediction", "label")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_lrcv)
print("Test set accuracy = " + str(test_accuracycv))
# saved_model.transform(testData)

Test set accuracy = 0.940503192937
