In [243]:
import csv
import io
import json
import os
import re
import sys
import tempfile
from urllib.parse import urlsplit, urljoin
from pathlib import Path
from zipfile import ZipFile

import requests
from pyspark import SparkFiles
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.dataframe import DataFrame

https://pdata.hcad.org/download/ warehouses *all* of the Harris Country Appraisal District's property tax data for Residential , Commercial and Business Personal property with the exception of sales data. The data is partioned by tax year and category. Data is available going back to 2005.

Data ETL steps for pdata.hcad.org data

1. Download (mirror) base source files from remote. Origials .txt files are stored in compressed .zip format + metadata files in .txt format. Fastest and most through tool to do this with is `wget`, see `./bin/hcad-land.sh`.

    We'll keep a copy of the .zip archive containing the original data files stored as `.txt` with `iso-8859-1` text encoding. To id the encoding required inspecting the headers of http responses from the pdata.hcad.org domain.
    
2. Extract all archive members to a temporary directory with zipfile.ZipFile
3. Cleanse the data with spark.read.csv
4. Save the cleansed data compressed in ./samples

In [337]:
spark = SparkSession.builder.getOrCreate()

In [338]:
domain = "https://pdata.hcad.org"
remote = Path("/data/cama/2019/Hearing_files.zip")
dictionary = re.findall(r"(\w+)\s+(\w+)\s+(\d+)\s?", requests.get(urljoin(domain, "/Desc/Layout_and_Length.txt")).text)

In [339]:
samples = Path("samples/")
tmp = Path(tempfile.mkdtemp())
dst = samples.joinpath(remote.parent.name).joinpath(remote.stem)

In [370]:
def get_fields(table: str):
    return [i[1] for i in dictionary if i[0] in table]


def get_max_columns(table: str) -> int:
    return len([i for i in dictionary if i[0] in table])
    
    
def get_max_chars_per_column(table: str) -> int:
    return max(int(i[-1]) for i in dictionary if i[0] in table)

In [371]:
def extract() -> Path:
    print("Extracting %s" % remote)
    r = requests.get(urljoin(domain, remote.as_posix()))
    print(json.dumps(dict(r.headers), indent="    "))
    try:
        dst.mkdir(parents=True)
    except FileExistsError:
        pass
    dst.joinpath(remote.name).write_bytes(r.content)
    ZipFile(dst.joinpath(remote.name)).extractall(tmp)
    for f in tmp.glob("*.txt"):
        yield f
        print("Extracted %s" % f)

In [372]:
def transform(*args: Path) -> DataFrame:
    print("Transforming")
    for f in args:
        print(f)
        fields = get_fields(f.stem)
        max_columns = get_max_columns(f.stem)
        max_chars_per_column = get_max_chars_per_column(f.stem)
        df = spark.read.csv(
            f.as_posix(),
            encoding="iso-8859-1",
            sep="\t",
            header=False,
            maxColumns=max_columns,
            maxCharsPerColumn=max_chars_per_column
        )

        for i, name in zip(df.columns, fields):
            print(i, name)
            df = df.withColumnRenamed(i, name)
        yield f, df

In [373]:
def load(*args: DataFrame) -> None:
    for arg in args:
        f, df = arg
        dst = samples.joinpath(remote.parent.name).joinpath(remote.stem).joinpath(f.name)
        print(f)
        df.show()
        df.write.mode("overwrite").csv(
            dst.with_suffix(".csv").as_posix(),
            header=True,
            compression="gzip"
        )
        print("Loaded %s" % dst)
        os.unlink(f)
    os.removedirs(tmp)

In [374]:
load(*transform(*extract()))
# [os.unlink(i) for i in tmp.iterdir()]
# os.removedirs(tmp)

Extracting /data/cama/2019/Hearing_files.zip
{
    "Server": "nginx/1.14.0 (Ubuntu)",
    "Date": "Sat, 21 Dec 2019 19:24:39 GMT",
    "Content-Type": "application/x-zip-compressed",
    "Content-Length": "13699272",
    "Connection": "keep-alive",
    "Last-Modified": "Mon, 09 Dec 2019 02:37:07 GMT",
    "Accept-Ranges": "bytes",
    "ETag": "\"8fdd9d8c39aed51:0\"",
    "X-Powered-By": "ASP.NET"
}
Extracted /tmp/tmpz66izq0x/arb_hearings_pp.txt
Extracted /tmp/tmpz66izq0x/arb_protest_pp.txt
Extracted /tmp/tmpz66izq0x/arb_hearings_real.txt
Extracted /tmp/tmpz66izq0x/arb_protest_real.txt
Transforming
/tmp/tmpz66izq0x/arb_hearings_pp.txt
_c0 acct
_c1 Tax_Year
_c2 Personal
_c3 Hearing_Type
_c4 State_Class_Code
_c5 Owner_Name
_c6 Scheduled_for_Date
_c7 Actual_Hearing_Date
_c8 Release_Date
_c9 Letter_Type
_c10 Agent_Code
_c11 Initial_Value
_c12 Final_Value
/tmp/tmpz66izq0x/arb_protest_pp.txt
_c0 acct
_c1 protested_by
_c2 protested_dt
/tmp/tmpz66izq0x/arb_hearings_real.txt
_c0 acct
_c1 Tax_Yea

In [349]:
file = samples.joinpath("2019").joinpath("Hearing_files").joinpath("arb_hearings_real.csv")

In [367]:
df = spark.read.csv(file.as_posix(), header=True, dateFormat="MM/dd/yyyy")

In [369]:
df.write.csv?

[0;31mSignature:[0m
[0mdf[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mcsv[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mpath[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmode[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mcompression[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msep[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mquote[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mescape[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mheader[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mnullValue[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mescapeQuotes[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mquoteAll[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mdateFormat[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtimestampFor