[Download JSON file from a URL](https://docs.python.org/2/howto/urllib2.html)

[Write JSON data to a file](http://stackoverflow.com/questions/12309269/how-do-i-write-json-data-to-a-file-in-python)

[Converting string into datetime](http://stackoverflow.com/questions/466345/converting-string-into-datetime)

[strftime format mask](https://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior)

[converting string to boolean](http://stackoverflow.com/questions/715417/converting-from-a-string-to-boolean-in-python)

[Set application memory size from pyspark shell](http://stackoverflow.com/questions/21609173/set-application-memory-size-from-pyspark-shell)

In [1]:
from datetime import datetime
import os
import json
import numpy as np
import re
import urllib2

In [24]:
def clean_fieldname(raw_fieldname):
    return re.sub("[:_]", "", raw_fieldname)

def clean_record(raw_data):

    record = format_record(raw_data)

    split_mailingaddresslocation(record)

    split_mailingaddresscitystatezip(record)

    format_timestamps(record)

    format_booleans(record)

    format_ints(record)

    format_floats(record)

    format_telephone_numbers(record)

    format_dates(record)
        
    return record

def format_record(raw_data):
    
    record = {}

    for idx in range(0, len(fieldnames_br.value)):
        record[fieldnames_br.value[idx]] = raw_data[idx]
        
    return record

def split_mailingaddresslocation(record):
    raw_field_name = u'mailing_address_location'

    match_idx = np.argwhere(map(lambda elem: elem['fieldName'] == raw_field_name,\
                                columns_br.value))[0,0]

    matching_column = columns_br.value[match_idx]

    sub_column_types = map(clean_fieldname,
                           matching_column['subColumnTypes'])

    base_field_name = fieldnames_br.value[match_idx]

    mailing_address_location = record[base_field_name]

    human_address = json.loads(mailing_address_location[0])

    for key in human_address.keys():
        record[base_field_name + 'humanaddress' + key] = human_address[key]

    for idx in range(1,len(sub_column_types)):
        record[unicode(base_field_name + sub_column_types[idx])] =\
            mailing_address_location[idx]

    record.pop(base_field_name)
    
    return record
    
def split_mailingaddresscitystatezip(record):

    base_field_name = 'mailingaddresscitystatezip'

    keys = ['city','state','zip']
    
    if record[base_field_name] == None:
        matchobj = None
        values = [None, None, None]
    else:
        patternobj = re.compile("^([A-Z\\s]+)\\s([A-Z]+)\\s([0-9]+)")

        matchobj = patternobj.match(record[u'mailingaddresscitystatezip'])
    
        if matchobj == None:
            values = [None,None,None]
        else:
            values = list(matchobj.groups())

    mailingaddresscitystatezip = dict(zip(keys,values))
    
    if matchobj != None:
        mailingaddresscitystatezip['zip'] = int(mailingaddresscitystatezip['zip'])

    for key in mailingaddresscitystatezip:
        record[unicode('mailingaddress' + key)] = mailingaddresscitystatezip[key]
    
    record.pop(base_field_name)
    
def format_timestamps(record):
    timestamp_keys = [u'createdat', u'updatedat']

    for key in timestamp_keys:
        if record[key] != None:
            try:
                record[key] = datetime.fromtimestamp(record[key])
            except ValueError:
                record[key] = None
        
def format_booleans(record):
    
    boolean_keys = [u'continuingeducationflag']
    
    for key in boolean_keys:
        if record[key] != None:
            try:
                record[key] = record[key] in (u"Y")
            except ValueError:
                record[key] = None
        
def format_ints(record):
    
    int_keys = [u'businesscitystatezip',
                u'createdmeta',
                u'licensenumber',
                u'mailingaddresscountycode',
                u'mailingaddresslocationhumanaddresszip',
                u'mailingaddresszip',
                u'updatedmeta']
    
    for key in int_keys:
        if record[key] != None:
            try:
                record[key] = int(record[key])
            except:
                record[key] = None

def format_floats(record):
    
    float_keys = [u'mailingaddresslocationlatitude',
                  u'mailingaddresslocationlongitude']

    for key in float_keys:
        if record[key] != None:
            try:
                record[key] = float(record[key])
            except:
                record[key] = None
                
def format_telephone_numbers(record):
    
    telephone_number_keys = [u'businesstelephone',
                             u'ownertelephone']

    patternobj = re.compile('^([0-9]{3})([0-9]{3})([0-9]{4})$')

    for key in telephone_number_keys:
        areacode_key = key + 'areacode'
        record[areacode_key] = None
        
        if record[key] != None:
            matchobj = patternobj.match(record[key])
        
            if matchobj != None:
                record[areacode_key] = int(matchobj.group(1))
            
                record[key] = matchobj.group(1) + '-' +\
                              matchobj.group(2) + '-' +\
                              matchobj.group(3)
                        
def format_dates(record):
    date_keys = [u'licenseexpirationdatemmddccyy']
    
    for key in date_keys:
        if record[key] != None:
            try:
                record[key] = datetime.strptime(record[key], "%m%d%Y")
            except ValueError:
                record[key] = None

In [3]:
data_path = "./Data"
data_file = 'tdlrAllLicenses.json'
datafile_fullpath = os.path.join(data_path, data_file)

if not os.path.exists(data_path):
    os.mkdir(data_path)
    
    download_url = "https://data.texas.gov/api/views/7358-krk7/rows.json?" +\
                   "accessType=DOWNLOAD"
        
    response = json.loads(urllib2.urlopen(download_url).read())
    
    with open(datafile_fullpath, "w") as outfile:
        json.dump(response, outfile)
else:
    with open(datafile_fullpath, "r") as infile:
        tdlr_all_licenses = json.load(infile)

In [28]:
fieldnames_br = sc.broadcast(map(clean_fieldname,
                                 [elem['fieldName'] for elem in\
                                  tdlr_all_licenses['meta']['view'][u'columns']]))

columns_br = sc.broadcast(tdlr_all_licenses['meta']['view'][u'columns'])

raw_data_rdd = sc.parallelize([tdlr_all_licenses['data'][:250]])
                           
formatted_data_rdd = raw_data_rdd.map(lambda elem: clean_record(elem)).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 9.0 failed 1 times, most recent failure: Lost task 7.0 in stage 9.0 (TID 79, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-28-bffbcd50ae74>", line 9, in <lambda>
  File "<ipython-input-24-1a3c0807e6f0>", line 8, in clean_record
  File "<ipython-input-24-1a3c0807e6f0>", line 49, in split_mailingaddresslocation
  File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
TypeError: expected string or buffer

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-28-bffbcd50ae74>", line 9, in <lambda>
  File "<ipython-input-24-1a3c0807e6f0>", line 8, in clean_record
  File "<ipython-input-24-1a3c0807e6f0>", line 49, in split_mailingaddresslocation
  File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python2.7/json/decoder.py", line 366, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
TypeError: expected string or buffer

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


In [None]:
np.argwhere([elem == None for elem in formatted_data])

In [27]:
clean_record(tdlr_all_licenses['data'][245])

{u'businessaddressline1': None,
 u'businessaddressline2': None,
 u'businesscitystatezip': None,
 u'businesscounty': u'OUT OF STATE',
 u'businessname': u'WEAVER, WILLIAM F JR',
 u'businesstelephone': None,
 u'businesstelephoneareacode': None,
 u'continuingeducationflag': False,
 u'createdat': datetime.datetime(2015, 3, 11, 11, 40, 55),
 u'createdmeta': 859798,
 u'id': u'765FD01D-8A97-4FE5-807C-2D83697512CD',
 u'licenseexpirationdatemmddccyy': datetime.datetime(2016, 2, 25, 0, 0),
 u'licensenumber': 22016,
 u'licensesubtype': u'AE',
 u'licensetype': u'A/C Contractor',
 u'mailingaddresscity': None,
 u'mailingaddresscounty': None,
 u'mailingaddresscountycode': None,
 u'mailingaddressline1': None,
 u'mailingaddressline2': None,
 u'mailingaddresslocationhumanaddressaddress': u'',
 u'mailingaddresslocationhumanaddresscity': u'',
 u'mailingaddresslocationhumanaddressstate': u'',
 u'mailingaddresslocationhumanaddresszip': 0,
 u'mailingaddresslocationlatitude': None,
 u'mailingaddresslocationlon