### importing the necessary libraries

In [1]:
import pyspark

In [161]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col
from pyspark.sql import functions as func
from pyspark.sql.functions import col, isnan, when, count
from functools import reduce
from pyspark.sql.types import StructType, StructField, StringType, LongType, DateType, IntegerType

### creating sparksession object

In [162]:
spark =SparkSession.builder.appName('DecemberCps').getOrCreate()

### reading the dataset into the environment 

In [163]:
df_cps = spark.read.csv("dec17pub.dat")

In [164]:
df_cps.show(10)

+--------------------+
|                 _c0|
+--------------------+
|00000479511071912...|
|00000479511071912...|
|00007169100494112...|
|00007169100494112...|
|00007169100494112...|
|00011017798798612...|
|00011017798798612...|
|00011020659338112...|
|00011028481568012...|
|00011032785646912...|
+--------------------+
only showing top 10 rows



### extracting the required columns from the dataset using the data dictionary file

In [165]:
df_cps1 = df_cps.select(func.concat_ws("",func.substring("_c0",1,15),func.substring("_c0", 71,5)).alias("householdId"),\
                                      func.concat_ws("/",func.substring("_c0",18,4),func.date_format(func.to_date(func.substring("_c0",16,2), "MM"),"MMM")).alias("interviewTime"),\
                                      func.substring("_c0", 24,3).alias("finalOutcome"),\
                                      func.substring("_c0", 31,2).alias("housingType"),\
                                      func.substring("_c0", 61,2).alias("householdType"),\
                                      func.substring("_c0", 33,2).alias("telephoneInHousehold"),\
                                      func.substring("_c0", 35,2).alias("accessToTelephoneElsewhere"),\
                                      func.substring("_c0", 37,2).alias("acceptableTelephoneInterview"),\
                                      func.substring("_c0", 65,2).alias("interviewType"),\
                                      func.substring("_c0", 39,2).alias("familyIncomeRange"),\
                                      func.substring("_c0", 91,1).alias("geoDivision"),\
                                      func.substring("_c0", 139,2).alias("race"))            

### showing ten(10) samples 

In [166]:
df_cps1.show(10)

+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+
|         householdId|interviewTime|finalOutcome|housingType|householdType|telephoneInHousehold|accessToTelephoneElsewhere|acceptableTelephoneInterview|interviewType|familyIncomeRange|geoDivision|race|
+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+
|00000479511071906011|     2017/Dec|         201|          1|            1|                   1|                        -1|                           1|            2|                9|          6|   1|
|00000479511071906011|     2017/Dec|         201|          1|            1|                   1|                        -1|                           1|            2|                9|        

### checking for number of rows in dataset

In [167]:
df_cps1.count()


146456

### printing schema of dataframe

In [168]:
df_cps1.printSchema()

root
 |-- householdId: string (nullable = false)
 |-- interviewTime: string (nullable = false)
 |-- finalOutcome: string (nullable = true)
 |-- housingType: string (nullable = true)
 |-- householdType: string (nullable = true)
 |-- telephoneInHousehold: string (nullable = true)
 |-- accessToTelephoneElsewhere: string (nullable = true)
 |-- acceptableTelephoneInterview: string (nullable = true)
 |-- interviewType: string (nullable = true)
 |-- familyIncomeRange: string (nullable = true)
 |-- geoDivision: string (nullable = true)
 |-- race: string (nullable = true)



### checking for null values 

In [169]:
sum(df_cps1.where(reduce(lambda x, y: x | y, (func.col(x).isNull() for x in df_cps1.columns))))

Column<'((((((((((((householdId + 0) + interviewTime) + finalOutcome) + housingType) + householdType) + telephoneInHousehold) + accessToTelephoneElsewhere) + acceptableTelephoneInterview) + interviewType) + familyIncomeRange) + geoDivision) + race)'>

### Checking for missing values

In [170]:
for x in df_cps1.columns:
    list_of_values = df_cps1.select([count(when(isnan(x) | col(x).isNull(), x))]).collect()
print(list_of_values)

[Stage 467:>                                                        (0 + 4) / 4]

[Row(count(CASE WHEN (isnan(race) OR (race IS NULL)) THEN race END)=0)]


                                                                                

## Decoding the required encoded columns

### decoding 'family income range'

In [171]:
new_df = df_cps1.withColumn("new_FamilyIncomeRange",
                                when(col("familyIncomeRange")=="1", lit("0-5000")).
                                when(col("familyIncomeRange")=="2", lit("5000-7499")).
                                when(col("familyIncomeRange")=="3", lit("7500-9999")).
                                when(col("familyIncomeRange")=="4", lit("10000-12499")).
                                when(col("familyIncomeRange")=="5", lit("12500-14499")).
                                when(col("familyIncomeRange")=="6", lit("15000-19999")).
                                when(col("familyIncomeRange")=="7", lit("20000-24999")).
                                when(col("familyIncomeRange")=="8", lit("25000-29999")).
                                when(col("familyIncomeRange")=="9", lit("30000-34999")).
                                when(col("familyIncomeRange")=="10",lit("35000-39999")).
                                when(col("familyIncomeRange")=="11",lit("40000-49999")).
                                when(col("familyIncomeRange")=="12",lit("50000-59999")).
                                when(col("familyIncomeRange")=="13",lit("60000-74999")).
                                when(col("familyIncomeRange")=="14",lit("75000-99999")).
                                when(col("familyIncomeRange")=="15",lit("100000-149999")).
                                when(col("familyIncomeRange")=="16",lit("150000...")).
                                otherwise(lit("not_applicable")))


In [172]:
new_df.show(5)

+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+
|         householdId|interviewTime|finalOutcome|housingType|householdType|telephoneInHousehold|accessToTelephoneElsewhere|acceptableTelephoneInterview|interviewType|familyIncomeRange|geoDivision|race|new_FamilyIncomeRange|
+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+
|00000479511071906011|     2017/Dec|         201|          1|            1|                   1|                        -1|                           1|            2|                9|          6|   1|       not_applicable|
|00000479511071906011|     2017/Dec|         201|          1|            1|                   1|        

### decoding 'geographical division'

In [173]:
new_df = new_df.withColumn("new_geoDivision", 
                            when(col("geoDivision") == "1", lit("New England")).
                            when(col("geoDivision") == "2", lit("Middle Atlantic")).
                            when(col("geoDivision") == "3", lit("East North Central")).
                            when(col("geoDivision") == "4", lit("West North Central")).
                            when(col("geoDivision") == "5", lit("South Atlantic")).
                            when(col("geoDivision") == "6", lit("East South Central")).
                            when(col("geoDivision") == "7", lit("West South Central")).
                            when(col("geoDivision") == "8", lit("Mountain")).
                            when(col("geoDivision") == "9", lit("Pacific")).
                            otherwise(lit("not_applicable")))

In [174]:
new_df.show(5)

+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+
|         householdId|interviewTime|finalOutcome|housingType|householdType|telephoneInHousehold|accessToTelephoneElsewhere|acceptableTelephoneInterview|interviewType|familyIncomeRange|geoDivision|race|new_FamilyIncomeRange|   new_geoDivision|
+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+
|00000479511071906011|     2017/Dec|         201|          1|            1|                   1|                        -1|                           1|            2|                9|          6|   1|       not_applicable|East South Central|
|00000479511071906011|     2

### decoding 'race'

In [175]:
new_df=new_df.withColumn("new_Race",
                           when(col("race") == " 1", lit("White Only")).
                           when(col("race") == " 2", lit("Black Only")).
                           when(col("race") == " 3", lit("American Indian,Alaskan Native Only")).
                           when(col("race") == " 4", lit("Asian Only")).
                           when(col("race") == " 5", lit("Hawaiian/Pacific Islander Only")).
                           when(col("race") == " 6", lit("White-Black")).
                           when(col("race") == " 7", lit("White-AI")).
                           when(col("race") == " 8", lit("White-Asian")).
                           when(col("race") == " 9", lit("White-HP")).
                           when(col("race") == "10", lit("Black-AI")).
                           when(col("race") == "11", lit("Black-Asian")).
                           when(col("race") == "12", lit("Black-HP")).
                           when(col("race") == "13", lit("AI-Asian")).
                           when(col("race") == "14", lit("AI-HP")).
                           when(col("race") == "15", lit("Asian-HP")).
                           when(col("race") == "16", lit("W-B-AI")).
                           when(col("race") == "17", lit("W-B-A")).
                           when(col("race") == "18", lit("W-B-HP")).
                           when(col("race") == "19", lit("W-AI-A")).
                           when(col("race") == "20", lit("W-AI-HP")).
                           when(col("race") == "21", lit("W-A-HP")).
                           when(col("race") == "22", lit("B-AI-A")).
                           when(col("race") == "23", lit("W-B-AI-A")).
                           when(col("race") == "24", lit("W-AI-A-HP")).
                           when(col("race") == "25", lit("Other 3 Race Combinations")).
                           when(col("race") == "26", lit("Other 4 and 5 Race Combinations")).
                           otherwise(lit("not_applicable")))

In [176]:
new_df.show(5)

+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+
|         householdId|interviewTime|finalOutcome|housingType|householdType|telephoneInHousehold|accessToTelephoneElsewhere|acceptableTelephoneInterview|interviewType|familyIncomeRange|geoDivision|race|new_FamilyIncomeRange|   new_geoDivision|  new_Race|
+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+
|00000479511071906011|     2017/Dec|         201|          1|            1|                   1|                        -1|                           1|            2|                9|          6|   1|       not_applicable|East South Cent

### decoding 'household has a telephone'

In [177]:
new_df = new_df.withColumn("new_telephoneInHousehold",
                          when(col("telephoneInHousehold") == " 1", lit("Yes")).
                          when(col("telephoneInHousehold") == " 2", lit("No")).
                          otherwise(lit("not_applicable")))
                           

In [178]:
new_df.show(5)

+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+------------------------+
|         householdId|interviewTime|finalOutcome|housingType|householdType|telephoneInHousehold|accessToTelephoneElsewhere|acceptableTelephoneInterview|interviewType|familyIncomeRange|geoDivision|race|new_FamilyIncomeRange|   new_geoDivision|  new_Race|new_telephoneInHousehold|
+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+------------------------+
|00000479511071906011|     2017/Dec|         201|          1|            1|                   1|                        -1|                           1|           

### decoding 'household can access a telephone elsewhere'

In [179]:
new_df = new_df.withColumn("new_AccessToTelephoneElsewhere",
                          when(col("accessToTelephoneElsewhere") == " 1", lit("Yes")).
                          when(col("accessToTelephoneElsewhere") == " 2", lit("No")).
                          otherwise(lit("not_applicable")))

In [180]:
new_df.show(5)

+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+------------------------+------------------------------+
|         householdId|interviewTime|finalOutcome|housingType|householdType|telephoneInHousehold|accessToTelephoneElsewhere|acceptableTelephoneInterview|interviewType|familyIncomeRange|geoDivision|race|new_FamilyIncomeRange|   new_geoDivision|  new_Race|new_telephoneInHousehold|new_AccessToTelephoneElsewhere|
+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+------------------------+------------------------------+
|00000479511071906011|     2017/Dec|         201|          1|         

### decoding 'Is telephone interview acceptable'

In [181]:
new_df = new_df.withColumn("new_acceptableTelephoneInterview",
                          when(col("acceptableTelephoneInterview") == " 1", lit("Yes")).
                          when(col("acceptableTelephoneInterview") == " 1", lit("No"))) 

In [182]:
new_df.show(5)

+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+------------------------+------------------------------+--------------------------------+
|         householdId|interviewTime|finalOutcome|housingType|householdType|telephoneInHousehold|accessToTelephoneElsewhere|acceptableTelephoneInterview|interviewType|familyIncomeRange|geoDivision|race|new_FamilyIncomeRange|   new_geoDivision|  new_Race|new_telephoneInHousehold|new_AccessToTelephoneElsewhere|new_acceptableTelephoneInterview|
+--------------------+-------------+------------+-----------+-------------+--------------------+--------------------------+----------------------------+-------------+-----------------+-----------+----+---------------------+------------------+----------+------------------------+------------------------------+-----

### dropping the encoded columns

In [183]:
new_ddf = new_df.drop("familyIncomeRange","geoDivision","race","telephoneInHousehold","accessToTelephoneElsewhere","acceptableTelephoneInterview")

In [184]:
new_ddf.show(5)

+--------------------+-------------+------------+-----------+-------------+-------------+---------------------+------------------+----------+------------------------+------------------------------+--------------------------------+
|         householdId|interviewTime|finalOutcome|housingType|householdType|interviewType|new_FamilyIncomeRange|   new_geoDivision|  new_Race|new_telephoneInHousehold|new_AccessToTelephoneElsewhere|new_acceptableTelephoneInterview|
+--------------------+-------------+------------+-----------+-------------+-------------+---------------------+------------------+----------+------------------------+------------------------------+--------------------------------+
|00000479511071906011|     2017/Dec|         201|          1|            1|            2|       not_applicable|East South Central|White Only|                     Yes|                not_applicable|                             Yes|
|00000479511071906011|     2017/Dec|         201|          1|            1| 

## Question1


### creating a view in Spark's SQL engine 

In [156]:
new_ddf.createOrReplaceTempView('us_cps')


### Selecting columns that are important for the analysis

In [157]:
spark.sql("""
    SELECT new_FamilyIncomeRange, COUNT(*) as count
    FROM us_cps
    WHERE new_FamilyIncomeRange IS NOT NULL
    GROUP BY new_FamilyIncomeRange
    ORDER BY count DESC
    LIMIT 10
""").show()



+---------------------+-----+
|new_FamilyIncomeRange|count|
+---------------------+-----+
|       not_applicable|56580|
|        100000-149999|17794|
|          75000-99999|16557|
|            150000...|15704|
|          60000-74999|13442|
|          50000-59999| 9971|
|          40000-49999| 9788|
|          35000-39999| 6620|
+---------------------+-----+



                                                                                

### Question 2

### Getting the count of responders per geographical division as well as  race

In [158]:
spark.sql("""
    SELECT new_geoDivision, new_Race, COUNT(*) as count
    FROM us_cps
    WHERE new_Race IS NOT NULL
    GROUP BY new_geoDivision, new_Race
    ORDER BY count DESC
    LIMIT 10
""").show()

[Stage 419:>                                                        (0 + 4) / 4]

+------------------+----------+-----+
|   new_geoDivision|  new_Race|count|
+------------------+----------+-----+
|    South Atlantic|White Only|16999|
|          Mountain|White Only|14343|
|           Pacific|White Only|13214|
|East North Central|White Only|11325|
|West South Central|White Only|11248|
|West North Central|White Only| 9884|
|   Middle Atlantic|White Only| 8487|
|       New England|White Only| 8410|
|East South Central|White Only| 6580|
|    South Atlantic|Black Only| 4899|
+------------------+----------+-----+





## Question 3
### How many responders do not have telephone in their house, but can access a telephone elsewhere and telephone interview is accepted

In [159]:
spark.sql("""
    SELECT COUNT(*) as count
    FROM us_cps
    WHERE new_telephoneInHousehold= "No"
    AND new_AccessToTelephoneElsewhere = "Yes"
    AND new_acceptableTelephoneInterview = "Yes"
""").show()



+-----+
|count|
+-----+
|  633|
+-----+



                                                                                

## Question 4
### Getting the number of responders that have access to a telephone, but telephone interview is not accepted 

In [185]:
spark.sql("""
    SELECT COUNT(*) as count
    FROM us_cps
    WHERE new_telephoneInHousehold = "Yes"
    AND new_AccessToTelephoneElsewhere = "Yes"
    AND new_acceptableTelephoneInterview = "No"
""").show()

[Stage 477:>                                                        (0 + 4) / 4]

+-----+
|count|
+-----+
|    0|
+-----+



                                                                                

***                                             THE END                                                         ***