In [1]:
/*
* Author: Mike Urciuoli
* email: urciuolim@gmail.com
* description: The purpose of this notebook is to show how to clean the Census Business Patterns (CBP) dataset for
*              usage in a county prediction model. The overall goal of the project is to predict some feature
*              of a county based on other input features. Each row of the CBP dataset is a count of how many businesses exist in
*              a specific county in a given year (year indicated by the file that row is part of) according to a specifc NAICS
*              code. At this point we don't know which NAICS codes the user is interested in, and if a specific county has no
*              businesses that fall into a NAICS code category the record is not present in the dataset. So grouping all records
*              by county/year will allow us to sense when records are missing for any given county.
* 
*              NAICS codes: https://www.naics.com/naics-code-description/
*/

import org.apache.spark.sql.functions.{udf, input_file_name}
import org.apache.spark.sql.Row
import spark.implicits._

// Custom UDF that extracts the year from the filename of each Census CBP input file
val extractYear = udf((filename: String) => "20" + filename.split("/").last.substring(3,5))
// Load raw dataset, with added year column, dropping unneeded columns and renaming/casting those remaining
val cbpRaw = spark.read.option("header", "true").
    csv("s3://agimodeltrainer/DATA/CBP/").
    withColumn("Year", extractYear(input_file_name())).
    drop(List("empflag","emp_nf","emp","qp1_nf","qp1","ap_nf","ap", "censtate", "cencty", "n1000"):_*).
    select(
        $"fipstate".as("State FIPS"),
        $"fipscty".as("County FIPS"),
        $"Year".cast("int"),
        regexp_replace($"naics", "[-/]", "").as("NAICS Code").cast("Int"),
        $"est".as("Total").cast("int"),
        $"n1_4".as("1_4").cast("int"),
        $"n5_9".as("5_9").cast("int"),
        $"n10_19".as("10_19").cast("int"),
        $"n20_49".as("20_49").cast("int"),
        $"n50_99".as("50_99").cast("int"),
        $"n100_249".as("100_249").cast("int"),
        $"n250_499".as("250_499").cast("int"),
        $"n500_999".as("500_999").cast("int"),
        $"n1000_1".as("1000_1499").cast("int"),
        $"n1000_2".as("1500_2499").cast("int"),
        $"n1000_3".as("2500_4999").cast("int"),
        $"n1000_4".as("5000plus").cast("int")
    ).na.fill(0)
cbpRaw.show

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1600108946459_0002,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.functions.{udf, input_file_name}
import org.apache.spark.sql.Row
import spark.implicits._
extractYear: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
cbpRaw: org.apache.spark.sql.DataFrame = [State FIPS: string, County FIPS: string ... 15 more fields]
+----------+-----------+----+----------+-----+---+---+-----+-----+-----+-------+-------+-------+---------+---------+---------+--------+
|State FIPS|County FIPS|Year|NAICS Code|Total|1_4|5_9|10_19|20_49|50_99|100_249|250_499|500_999|1000_1499|1500_2499|2500_4999|5000plus|
+----------+-----------+----+----------+-----+---+---+-----+-----+-----+-------+-------+-------+---------+---------+---------+--------+
|        01|        001|2011|         0|  835|412|178|  122|   78|   33|     10|      1|      1|        0|        0|        0|       0|
|        01|        001|2011|        11|    6|  5|  0|    1|    0|    0|      0|      0|      0|     

In [2]:
cbpRaw.select("NAICS Code").filter($"NAICS Code" < 100).distinct.orderBy("NAICS Code").show(21)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|NAICS Code|
+----------+
|         0|
|        11|
|        21|
|        22|
|        23|
|        31|
|        42|
|        44|
|        48|
|        51|
|        52|
|        53|
|        54|
|        55|
|        56|
|        61|
|        62|
|        71|
|        72|
|        81|
|        99|
+----------+



In [3]:
val cols = Array("Total", "1_4", "5_9", "10_19", "20_49", "50_99", "100_249", "250_499", "500_999", "1000_1499", "1500_2499", "2500_4999", "5000plus")
// Description of each of the above columns. A few points:
// 1. There is at least one business in each county, as indicated by the min value in the Total column.
// 2. The average number of businesses for each category of number of employees (1-4 => 1-4 Employees)
//    shrinks as number of employees increases.
// These numbers are based on all NAICS codes, and we could get more specific descriptions for a specifc code if we wanted to.

// Description show in two halves for formatting reasons
cbpRaw.describe(cols.slice(0, cols.size/2):_*).show
cbpRaw.describe(cols.slice(cols.size/2, cols.size):_*).show

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cols: Array[String] = Array(Total, 1_4, 5_9, 10_19, 20_49, 50_99, 100_249, 250_499, 500_999, 1000_1499, 1500_2499, 2500_4999, 5000plus)
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|             Total|               1_4|               5_9|             10_19|            20_49|             50_99|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|          13876272|          13876272|          13876272|          13876272|         13876272|          13876272|
|   mean|22.843167386744796|12.420304531361161| 4.203605334343403|2.8697289156626504|1.986593517336645|0.6628389815362512|
| stddev|374.57349854462854|217.24034394202823|63.772179406914425|45.116066955231354|33.75968424617814|11.924911741797375|
|    min|                 1|                 0|                 0|                 0|                0|                 0|
|  

In [4]:
// Condense the above columns into one map record, where the key is the NAICS code of that row
// and the value is an array of int counts, where each count pertains to the number of businesses
// for that county/year/NAICS code of a specific employee size
def condenseRow(r:Row) = {
    val statefips = r.getString(r.fieldIndex("State FIPS"))
    val countyfips = r.getString(r.fieldIndex("County FIPS"))
    val year = r.getInt(r.fieldIndex("Year"))
    val naics = r.getInt(r.fieldIndex("NAICS Code"))
    val startIndex = r.fieldIndex("Total")
    val endIndex = r.fieldIndex("5000plus")
    val record = r.toSeq.slice(startIndex, endIndex+1).map(_.toString.toInt)
    (
        statefips+countyfips,
        year, 
        Map(naics->record)
    )
}
// Apply the above function to each raw row, see below for example output
val cbpMaps = cbpRaw.map(condenseRow(_)).select(
    $"_1".as("FIPS"),
    $"_2".as("Year"),
    $"_3".as("Record")
)
cbpMaps.printSchema
cbpMaps.show(false)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

condenseRow: (r: org.apache.spark.sql.Row)(String, Int, scala.collection.immutable.Map[Int,Seq[Int]])
cbpMaps: org.apache.spark.sql.DataFrame = [FIPS: string, Year: int ... 1 more field]
root
 |-- FIPS: string (nullable = true)
 |-- Year: integer (nullable = false)
 |-- Record: map (nullable = true)
 |    |-- key: integer
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: integer (containsNull = false)

+-----+----+---------------------------------------------------------+
|FIPS |Year|Record                                                   |
+-----+----+---------------------------------------------------------+
|01001|2011|[0 -> [835, 412, 178, 122, 78, 33, 10, 1, 1, 0, 0, 0, 0]]|
|01001|2011|[11 -> [6, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]]          |
|01001|2011|[113 -> [5, 4, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]]         |
|01001|2011|[1133 -> [5, 4, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]]        |
|01001|2011|[11331 -> [5, 4, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]]       |
|0

In [5]:
// I couldn't find a built-in way to join maps in a spark reduce-like function, so a way to do this
// would be to group rows by FIPS and Year, then aggregate each individual map into a list via
// the collect_list aggregate function. We can then use a custom UDF to turn these lists into
// maps as intended. A more straightfoward way to do this is to define a custom UDAF, but that
// would require writing more code than this one.
val seqToMap = udf((seq: Seq[Map[Int, Seq[Int]]]) => seq.flatten.toMap)
val cbpMapsAgg = cbpMaps.groupBy("FIPS", "Year").
    agg(collect_list("Record").alias("Records")).
    withColumn("Maps", seqToMap($"Records")).
    drop("Records")
cbpMapsAgg.printSchema
cbpMapsAgg.show

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

seqToMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(IntegerType,ArrayType(IntegerType,false),true),Some(List(ArrayType(MapType(IntegerType,ArrayType(IntegerType,false),true),true))))
cbpMapsAgg: org.apache.spark.sql.DataFrame = [FIPS: string, Year: int ... 1 more field]
root
 |-- FIPS: string (nullable = true)
 |-- Year: integer (nullable = false)
 |-- Maps: map (nullable = true)
 |    |-- key: integer
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: integer (containsNull = false)

+-----+----+--------------------+
| FIPS|Year|                Maps|
+-----+----+--------------------+
|01003|2016|[532490 -> [5, 1,...|
|01023|2013|[484110 -> [3, 3,...|
|01025|2017|[62161 -> [7, 0, ...|
|01027|2016|[484110 -> [2, 1,...|
|01037|2017|[0 -> [96, 71, 9,...|
|01049|2011|[484110 -> [2, 1,...|
|01049|2014|[484110 -> [1, 0,...|
|01055|2011|[532490 -> [1, 1,...|
|01059|2015|[484110 -> [2, 1,...|
|01067|2016|[484110 -> [2, 

In [6]:
// Quick check to make sure the number of rows is expected. There are about 3,141 counties in the US, plus some
// U.S. territory equivalents
cbpMapsAgg.count
cbpMapsAgg.select("FIPS").distinct.count
cbpMapsAgg.select("Year").distinct.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res32: Long = 22338
res33: Long = 3198
res34: Long = 7


In [7]:
// Store our final table in S3 for usage later in the application.
cbpMapsAgg.write.parquet("s3://agimodeltrainer/Clean_Data/Census_Business_Patterns.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

org.apache.spark.sql.AnalysisException: path s3://agimodeltrainer/Clean_Data/Census_Business_Patterns.parquet already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:119)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(Sp

In [8]:
// Quick test to show that data was written successfully. Notice that schema is maintained after write/read due to parquet file.
val test = spark.read.parquet("s3://agimodeltrainer/Clean_Data/Census_Business_Patterns.parquet")
test.printSchema
test.show
test.count
test.select("FIPS").distinct.count
test.select("Year").distinct.count

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

test: org.apache.spark.sql.DataFrame = [FIPS: string, Year: int ... 1 more field]
root
 |-- FIPS: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Maps: map (nullable = true)
 |    |-- key: integer
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: integer (containsNull = true)

+-----+----+--------------------+
| FIPS|Year|                Maps|
+-----+----+--------------------+
|01001|2016|[532490 -> [1, 1,...|
|01005|2012|[484110 -> [1, 0,...|
|01007|2016|[484110 -> [5, 2,...|
|01019|2015|[484110 -> [1, 0,...|
|01025|2014|[484110 -> [5, 2,...|
|01033|2017|[484110 -> [3, 0,...|
|01071|2011|[62161 -> [4, 1, ...|
|01075|2014|[532490 -> [1, 1,...|
|01083|2013|[484110 -> [5, 4,...|
|01085|2014|[484110 -> [1, 1,...|
|01101|2017|[532490 -> [3, 0,...|
|01105|2015|[532490 -> [1, 1,...|
|01105|2017|[0 -> [121, 70, 2...|
|01129|2011|[484110 -> [2, 2,...|
|02070|2017|[0 -> [98, 59, 20...|
|02195|2016|[484110 -> [1, 1,...|
|02240|2017|[0 -> [177, 103, ...|

In [9]:
val step1 = test.filter($"Year" === 2016).filter($"FIPS" === "01001").collect

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

step1: Array[org.apache.spark.sql.Row] = Array([01001,2016,Map(532490 -> WrappedArray(1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), 484110 -> WrappedArray(1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0), 62161 -> WrappedArray(2, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0), 53139 -> WrappedArray(1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), 6231 -> WrappedArray(2, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0), 3399 -> WrappedArray(2, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0), 8114 -> WrappedArray(5, 5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), 333 -> WrappedArray(1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), 722514 -> WrappedArray(3, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0), 518 -> WrappedArray(1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), 448140 -> WrappedArray(6, 2, 0, 1, 3, 0, 0, 0, 0, 0, 0, 0, 0), 52213 -> WrappedArray(5, 1, 3, 1, 0, 0, 0, 0, 0, 0, 0, ...

In [10]:
step1(0).getAs[Map[Int, Seq[Int]]](2)(0)
println("-----------------------------------")
step1(0).getAs[Map[Int, Seq[Int]]](2)(11)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res43: Seq[Int] = WrappedArray(851, 404, 186, 126, 97, 27, 8, 2, 1, 0, 0, 0, 0)
-----------------------------------
res45: Seq[Int] = WrappedArray(9, 6, 2, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0)
