In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

from utils.columns_enum import ValidColumns
from utils import data_dictionary

In [2]:
spark = SparkSession \
    .builder \
    .appName("Testing Spark") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/19 11:25:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.text('/home/ubuntu/dec17pub.dat')

#### Extracting information

In [4]:
# Extracting Full household identifier.
extract_column = udf(lambda row: row[0:15] + row[70:75], StringType())
df = df.withColumn(ValidColumns.FULL_HOUSEHOLD_IDENTIFIER.value, extract_column("value"))

In [5]:
# Extracting Time of interview in YYYY/MMM format.
extract_column = udf(lambda row: row[17:21] + '/' + data_dictionary.MONTHS.get(int(row[15:17])), StringType())
df = df.withColumn(ValidColumns.TIME_OF_INTERVIEW.value, extract_column("value"))

In [6]:
# Extracting Final outcome of the survey.
extract_column = udf(lambda row: data_dictionary.VALID_FINAL_OUTCOME.get(row[23:26].zfill(3)), StringType())
df = df.withColumn(ValidColumns.FINAL_OUTCOME_OF_SURVEY.value, extract_column("value"))

In [7]:
# Extracting Type of housing unit.
extract_column = udf(lambda row: data_dictionary.VALID_TYPE_OF_HOUSING_UNIT.get(int(row[30:32])), StringType())
df = df.withColumn(ValidColumns.TYPE_OF_HOUSING_UNIT.value, extract_column("value"))

In [8]:
# Extracting Household type.
extract_column = udf(lambda row: data_dictionary.VALID_HOUSEHOLD_TYPE.get(int(row[60:62])), StringType())
df = df.withColumn(ValidColumns.HOUSEHOLD_TYPE.value, extract_column("value"))

In [9]:
# Extracting Apartment/Household has a telephone.
extract_column = udf(lambda row: data_dictionary.VALID_YES_NO.get(int(row[32:34])), StringType())
df = df.withColumn(ValidColumns.HOUSEHOLD_HAS_TELEPHONE.value, extract_column("value"))

In [10]:
# Extracting Apartment/Household can access a telephone elsewhere.
extract_column = udf(lambda row: data_dictionary.VALID_YES_NO.get(int(row[34:36])), StringType())
df = df.withColumn(ValidColumns.HOUSEHOLD_CAN_ACCESS_TELEPHONE_ELSEWHERE.value, extract_column("value"))

In [11]:
# Extracting Is telephone interview acceptable for the responder.
extract_column = udf(lambda row: data_dictionary.VALID_YES_NO.get(int(row[36:38])), StringType())
df = df.withColumn(ValidColumns.IS_TELEPHONE_INTERVIEW_ACCEPTABLE_FOR_THE_RESPONDER.value, extract_column("value"))

In [12]:
# Extracting Type of interview.
extract_column = udf(lambda row: data_dictionary.VALID_TYPE_OF_INTERVIEW.get(int(row[64:66])), StringType())
df = df.withColumn(ValidColumns.TYPE_OF_INTERVIEW.value, extract_column("value"))

In [13]:
# Extracting Family income range.
extract_column = udf(lambda row: data_dictionary.VALID_FAMILY_INCOME_RANGE.get(int(row[38:40])), StringType())
df = df.withColumn(ValidColumns.FAMILY_INCOME.value, extract_column("value"))

In [14]:
# Extracting Geographical division/location.
extract_column = udf(lambda row: data_dictionary.VALID_DIVISIONS.get(int(row[90:91])) + '/' + data_dictionary.VALID_REGIONS.get(int(row[88:90])), StringType())
df = df.withColumn(ValidColumns.GEOGRAPHICAL_DIVISION_LOCATION.value, extract_column("value"))

In [15]:
# Extracting Race
extract_column = udf(lambda row: data_dictionary.VALID_RACE.get(row[38:40].zfill(2)), StringType())
df = df.withColumn(ValidColumns.RACE.value, extract_column("value"))

In [16]:
column_list = [col.value for col in ValidColumns]
df.select(column_list).show(10)

[Stage 0:>                                                          (0 + 1) / 1]

+-------------------------+-----------------+-----------------------+--------------------+--------------------+-----------------------+----------------------------------------+---------------------------------------------------+-----------------+-------------------+------------------------------+-----------+
|full_household_identifier|time_of_interview|final_outcome_of_survey|type_of_housing_unit|      household_type|household_has_telephone|household_can_access_telephone_elsewhere|is_telephone_interview_acceptable_for_the_responder|type_of_interview|family_income_range|geographical_division_location|       race|
+-------------------------+-----------------+-----------------------+--------------------+--------------------+-----------------------+----------------------------------------+---------------------------------------------------+-----------------+-------------------+------------------------------+-----------+
|     00000479511071906011|         2017/DEC|          CAPI COMPLETE|H

                                                                                

#### Analysis

In [17]:
# What is the count of responders per family income range (show top 10)?
df.groupBy('family_income_range').count().sort("count", ascending=False).show(10)

[Stage 1:>                                                          (0 + 2) / 2]

+-------------------+-----+
|family_income_range|count|
+-------------------+-----+
|               null|20391|
| 100,000 TO 149,999|17794|
|   75,000 TO 99,999|16557|
|    150,000 OR MORE|15704|
|   60,000 TO 74,999|13442|
|   50,000 TO 59,999| 9971|
|   40,000 TO 49,999| 9788|
|   30,000 TO 34,999| 6743|
|   35,000 TO 39,999| 6620|
|   20,000 TO 24,999| 6312|
+-------------------+-----+
only showing top 10 rows



                                                                                

In [18]:
# What is the count of responders per geographical division/location and race (show top 10)?
df.groupBy(['geographical_division_location', 'race']).count().sort("count", ascending=False).show(10)

[Stage 4:>                                                          (0 + 2) / 2]

+------------------------------+--------+-----+
|geographical_division_location|    race|count|
+------------------------------+--------+-----+
|          SOUTH ATLANTIC/SOUTH|    null|11436|
|          WEST SOUTH CENTRA...|    null| 7133|
|                 MOUNTAIN/WEST|    null| 7084|
|                  PACIFIC/WEST|    null| 6840|
|          EAST NORTH CENTRA...|    null| 5600|
|          EAST SOUTH CENTRA...|    null| 4826|
|          MIDDLE ATLANTIC/N...|    null| 4797|
|          WEST NORTH CENTRA...|    null| 4785|
|          NEW ENGLAND/NORTH...|    null| 4079|
|          SOUTH ATLANTIC/SOUTH|Asian-HP| 3162|
+------------------------------+--------+-----+
only showing top 10 rows





In [19]:
# How many responders do not have telephone in their house, but can access a telephone elsewhere and telephone interview is accepted (show top 10)?
df.select(['household_has_telephone', 'household_can_access_telephone_elsewhere', 'is_telephone_interview_acceptable_for_the_responder'])\
        .filter("household_has_telephone = 'NO' and household_can_access_telephone_elsewhere = 'YES' and is_telephone_interview_acceptable_for_the_responder = 'YES'").count()

                                                                                

633

In [20]:
# How many responders can access to a telephone, but telephone interview is not accepted (show top 10)?
df.select(['household_has_telephone', 'is_telephone_interview_acceptable_for_the_responder'])\
        .filter("household_has_telephone = 'YES' and is_telephone_interview_acceptable_for_the_responder = 'NO'").count()

                                                                                

0

#### Download cleaned data to csv

In [22]:
column_list = [col.value for col in ValidColumns]
df.select(column_list).write.csv("dec17pub.csv", mode="overwrite")

                                                                                