# Travis Exploration Notebook

## Steps:

1. Add any required modules to the imports
2. Set County specific variables
3. Set path to sample dataset(s)
4. Import the dataset(s) in the order that they will be merged.


In [24]:
# imports
import datetime
import json
import pyspark
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DoubleType, IntegerType, DateType, TimestampType, BooleanType
import quinn

In [25]:
# set county vars
county = "travis"
state = "TX"
fips_code = "48453"

schema_check_db = "faxdb"
schema_check_coll = "parcels_validation_test"

county_vars = {
    "county": county,
    "state": state,
    "fips_code": fips_code,
    "pipeline_run_id": "987654321",
    "pipeline_run_time": datetime.datetime.now(),
    "dataset_date": datetime.datetime.now(),
}

In [4]:
# init spark with delta, mongo support and 16g of memory
spark = (pyspark.sql.SparkSession.builder.appName(county)
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0")
        .config("spark.mongodb.input.uri", f"mongodb://127.0.0.1/{county}.default")
        .config("spark.mongodb.output.uri", f"mongodb://127.0.0.1/{county}.default")
        .config("spark.driver.memory", "16g")
        .getOrCreate()
)

In [27]:
# sample dataset paths (version controlled)
sample_ade_info = "./samples/PROP.TXT"
sample_ade_land = "./samples/LAND_DET.TXT"
sample_ade_imp_info = "./samples/IMP_DET.TXT"
# full datasets paths (not version controlled)
full_ade_info = ""
full_ade_land = ""
full_ade_imp_info = ""
# set dataset paths
path_ade_info = sample_ade_info
path_ade_land = sample_ade_land
path_ade_imp_info = sample_ade_imp_info

In [28]:
# load schema path
schema_path = f"./schema_{state}_{county}.json"
with open(schema_path, "rb") as s:
    schema = StructType.fromJson(json.load(s))

In [29]:
# read dataset APPRAISAL_INFO.TXT
def _transform_ade_info():
    return (
        (spark.read.text(path_ade_info)
                .select(
                    F.trim(F.col("value").substr(1, 12)).cast("string").alias("parcel_id"),
                    F.trim(F.col("value").substr(547,50)).cast("string").alias("tax_id"),
                    F.trim(F.col("value").substr(13, 5)).alias("type"),
                    F.trim(F.col("value").substr(4460,15)).alias("street_number"),
                    F.trim(F.col("value").substr(1040,10)).alias("street_pre_direction"),
                    F.trim(F.col("value").substr(1050,50)).alias("street_name"),
                    F.trim(F.col("value").substr(1100,10)).alias("street_post_direction"),
                    F.trim(F.col("value").substr(4475,5)).alias("unit"),
                    F.trim(F.col("value").substr(1110,30)).alias("city"),
                    F.trim(F.col("value").substr(1140,10)).alias("zip"),
                    F.trim(F.col("value").substr(1150,255)).alias("legal_description"),
                    F.trim(F.col("value").substr(1676,10)).alias("legal_subdivision"),
                    F.trim(F.col("value").substr(2734,10)).alias("improvement_use_code"),
                    (F.trim(F.col("value").substr(2772,20)).cast("double") / 10000).alias("lot_size_acres"),
                    F.trim(F.col("value").substr(4214,14)).cast("integer").alias("market_value"),
                    F.trim(F.col("value").substr(1946,15)).cast("integer").alias("assessed_value"),
                    F.to_timestamp(F.col("value").substr(2034,25), "MM/dd/yyyy").alias("deed_transfer_date"),
                    F.trim(F.col("value").substr(4492,70)).alias("oor_1"),
                    F.trim(F.col("value").substr(4562,60)).alias("oor_2"),
                    F.trim(F.col("value").substr(4622,60)).alias("oor_3"),
                    F.trim(F.col("value").substr(4682,60)).alias("oor_4"),
                    F.trim(F.col("value").substr(4742,50)).alias("oor_5"),
                    F.trim(F.col("value").substr(4792,50)).alias("oor_6"),
                    F.trim(F.col("value").substr(4847,5)).alias("oor_7"),
                    F.trim(F.col("value").substr(4852,4)).alias("oor_8"),
                    F.trim(F.col("value").substr(4842,5)).alias("oor_9")         
                )
                .where((F.col("type") == "R") | (F.col("type") == "M"))
                .withColumn(
                    "parcel_id",
                    F.regexp_replace('parcel_id', r'^[0]*', '')
                )
                .withColumn(
                    "legal_description",
                    quinn.single_space(F.col("legal_description"))
                )
                .withColumn(
                    "street_address",
                    quinn.single_space(
                        F.regexp_replace(
                            F.concat_ws(
                                " ",
                                "street_number",
                                "street_pre_direction",
                                "street_name",
                                "street_post_direction",
                                "unit",
                            ),
                            '"',
                            "",  # regexp replacement values
                        )
                    ),
                )
                .withColumn(
                    "oor",
                    quinn.single_space(
                        F.regexp_replace(
                            F.concat_ws(
                                " ",
                                "oor_1",
                                "oor_2",
                                "oor_3",
                                "oor_4",
                                "oor_5",
                                "oor_6",
                                "oor_7",
                                "oor_9",
                            ),
                            '"',
                            "",  # regexp replacement values
                        )
                    ),
                )
        ).drop("oor_1", "oor_2", "oor_3", "oor_4", "oor_5", "oor_6", "oor_7", "oor_8", "oor_9")
    )


In [30]:
df_ade_info = _transform_ade_info()

In [8]:
# load APPRAISAL_INFO.TXT to db
df_ade_info.write.format("mongo").mode("overwrite").option("collection", "info").save()


NameError: name 'df_ade_info' is not defined

In [19]:
# read APPRAISAL_LAND_DETAIL.TXT
def _transform_land():
    return (
        (spark.read.text(path_ade_land)
            .select(
                F.trim(F.col("value").substr(1, 12)).cast("string").alias("parcel_id"),
                #   Incorrect values, see readme
                #   F.trim(F.col("value").substr(29,10)).cast("string").alias("land_use_code"),
                #   F.trim(F.col("value").substr(39,25)).cast("string").alias("land_use_desc"),
                F.trim(F.col("value").substr(84,14)).cast("integer").alias("lot_size_sqft"),
                F.trim(F.col("value").substr(112,14)).cast("integer").alias("lot_depth_ft"),
                F.trim(F.col("value").substr(98,14)).cast("integer").alias("lot_frontage_ft")
            )
            .withColumn(
                    "parcel_id",
                    F.regexp_replace('parcel_id', r'^[0]*', '')
                )
        )
    )

In [31]:
df_ade_land = _transform_land()

In [10]:
# load APPRAISAL_LAND_DETAIL.TXT to db
df_ade_land.write.format("mongo").mode("overwrite").option("collection", "info").save()


NameError: name 'df_ade_land' is not defined

In [22]:
# read APPRAISAL_IMPROVEMENT_DETAIL.TXT
def _transfrom_imp_detail():
    w = Window.partitionBy("parcel_id")
    return(
    
        (spark.read.text(path_ade_imp_info)
            .select(
                F.trim(F.col("value").substr(1,12)).alias("parcel_id"),
                F.trim(F.col("value").substr(86,4)).cast("integer").alias("year"),
                F.trim(F.col("value").substr(94,15)).cast("integer").alias("sqft")
            )
            .withColumn(
                    "parcel_id",
                    F.regexp_replace('parcel_id', r'^[0]*', '')
            )
            .withColumn("structure_total_sqft", F.sum("sqft").over(w))
            .withColumn("year_built", F.sum("year").over(w))
        ).drop("sqft", "year").drop_duplicates(["parcel_id"])
    )



In [32]:
df_ade_imp_info = _transfrom_imp_detail()

In [12]:
# load APPRAISAL_IMPROVEMENT_DETAIL.TXT to db
df_ade_imp_info.write.format("mongo").mode("overwrite").option("collection", "info").save()


NameError: name 'df_ade_imp_info' is not defined

In [33]:
# merge datasets (if necassary)

# Example assuming more than one dataset
df_county = df_ade_info.join(df_ade_land, "parcel_id", "left").join(df_ade_imp_info, "parcel_id", "left")

In [34]:
# add required columns
county_parcels = df_county \
    .withColumn("county", F.lit(county_vars["county"])) \
        .withColumn(
        "fingerprint", F.base64(F.concat(F.lit(county_vars["county"]), "parcel_id"))
        ) \
        .withColumn("state", F.lit(county_vars["state"])) \
        .withColumn("fips_code", F.lit(county_vars["fips_code"])) \
        .withColumn("pipeline_run_id", F.lit(county_vars["pipeline_run_id"])) \
        .withColumn("pipeline_run_time", F.lit(county_vars["pipeline_run_time"])) \
        .withColumn("dataset_date", F.lit(county_vars["dataset_date"])) \
        .withColumn("record_hash", F.sha2(F.concat_ws("||", *df_county.columns), 256)) \
    .drop_duplicates(["fingerprint"])

In [None]:
# validate schema

In [None]:
# load final dataset into a mongo collection with schema validation
county_parcels.write.format("mongo").mode("append").option("database", schema_check_db).option("collection", schema_check_coll).save()

In [35]:
# get missing fields and document why they're missing on county readme.md
missing_fields = [f for f in schema.fieldNames() if f not in df_county.schema.fieldNames()]
print(missing_fields)

['fingerprint', 'pipeline_run_id', 'pipeline_run_time', 'dataset_date', 'record_hash', 'county', 'fips_code', 'state', 'unit_type', 'unit_count', 'unit_number', 'zip_four', 'zoning', 'land_use_code', 'land_use_desc', 'state_use_code', 'municipal_use_code', 'rooms', 'bedrooms', 'whole_bath', 'half_bath', 'bathrooms', 'land_market_value', 'improvement_market_value', 'foundation', 'roof_material', 'roof_style', 'heating_type', 'heating_fuel', 'cooling_type', 'style', 'condition', 'flag']


In [36]:
# check for extra fields
extra_fields = [f for f in county_parcels.schema.fieldNames() if f not in schema.fieldNames()]
print(extra_fields)

[]
