In [1]:
"""
####
####    This script expects a file name parameter containing
####    full paths of local Parquet files to 
####    convert to delimited records according
####    to the monolith function below
####
####    The design assumes several of these jobs to 
####    be submitted to a standalone Spark cluster
####    for parallel processing, each getting a
####    different workload file.
####
""";

In [2]:
from sys import path, argv, exit
from os import environ, getenv
from glob import glob

# Used to take epoch Unix timestamp
# and make a filename string from it
from time import gmtime, strftime

# Used for compressing TSVs on write
import gzip

In [3]:
####
####    BEGIN parameter section
####

# Spark master
# Note: this needs to match what
# `hostname` returns on the master
sparkMasterHostname = "Kyle-Dunn-MacBook-Pro.local"
sparkMasterPort = "7077"

# Resources to use
numExecutors = 1
coresPerExecutor = 1
memoryPerExecutor = "1500m"

# File to look in for list of Parquet files
# to be converted
# Note: these should be the *full* path
inputFilename = argv[1]

# Location of the Parquet/Avro schema file
schemaFile = '/Users/kdunn/Desktop/TWC/schema_34.avsc'

# Directory to place output files under
outputFilesRootDir = '/Users/kdunn/Desktop/TWC/'

# Output column delimiter
columnDelimiter = "\t"

# Null string
nullMarker = 'Null'

# Outer-most level delimiter
outerDelim = "|^|"

# Nested level 1 delimiter
levelOneDelim = "="

# Nested level 2 delimiter
levelTwoDelim = "%^%"

####
####    END parameter section
####

In [4]:
SPARK_HOME = getenv('SPARK_HOME', None)
if SPARK_HOME:
    environ['PATH'] = "%s:%s/bin" % (getenv("PATH", ""), SPARK_HOME)
else:
    print( "SPARK_HOME needs to be set (eg. export SPARK_HOME=/opt/spark)" )
    exit(3)
    
path.insert(0, '/'.join([SPARK_HOME, 'python']))
path.append('/'.join([SPARK_HOME, 'python/lib/py4j-*-src.zip']))

# Can only import these if SPARK_HOME and PYTHONPATH are set
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

In [5]:
# Set Spark context parameters
config = SparkConf().setMaster('spark://{host}:{port}'.format(host=sparkMasterHostname, port=sparkMasterPort))
config.set("spark.num.executors", numExecutors)
config.set("spark.cores.max", coresPerExecutor)
config.set("spark.executor.memory", memoryPerExecutor)

config.setAppName("Parquet file subset {s}".format(s=inputFilename))

# Get a Spark context from the configuration
sc = SparkContext(conf=config)

# Create a SparkSQL context object from the SparkContext object
sqlCtx = SQLContext(sc)

In [None]:
# Load the Parquet schema file
schemaRdd = sc.textFile(schemaFile, 1).collect()

# Merge the list of lines into a readable string
mergedString = ' \n'.join(schemaRdd)

# Convert the string into a Python dictionary, replacing keywords with Python equivalents
schemaDict = eval(mergedString.replace('null', 'None').replace('false', 'False').replace('true', 'True'))

# Take the name of every top-level field and put in a nice list
topLevelFields = [k['name'] for k in schemaDict["fields"]]

# Create a formattable template string (this also defines the db table columns)
templateString = "%(" + ")s" + columnDelimiter + "%(".join(topLevelFields) + ")s"

# Initialize every top-level field in the schema with a 'Null' string
fieldsAndNulls = zip([f.strip('') for f in topLevelFields],[nullMarker]*len(topLevelFields))

In [51]:
# Automagically turn nested Parquet into TSV with *very* wide text columns
def parquetToTSV(data):
    # Reference the path as a global since <RDD>.foreach(myFunction) doesn't
    # allow passing additional arguments into the function
    outputFilesRootDir = "/mapr/user/pivotal"

    # Convert the PySpark RDD object into a Python Dict
    tempDict = data.asDict()

    #####
    #####
    ##### BEGIN: customer data-specific section
    #####
    #####

    # Pre-filter conditions for Austin area only
    if 'customeraccount_divisionid' not in tempDict:
        return
    else:
        division = tempDict['customeraccount_divisionid']
        if division is not None and division.upper() != 'STX':
            return
        else:
            if 'customeraccount_city' in tempDict:
                city = tempDict['customeraccount_divisionid']
                if city is not None and city.lower() != "austin":
                    return
            else:
                # No defined city in the record
                # means we can't explicity exclude it
                goOn = True


    # Create a dictionary from the key-value pairs
    # for building the record dictionary
    finalDict = dict( fieldsAndNulls )

    # Update the dictionary with this record's
    # data for every column
    finalDict.update(tempDict)

    # Flatten the key/val[,val] in the annotations section
    annotationsSection = list()
    if tempDict['annotations'] is not None:
        for key, val in tempDict['annotations'].items():
            if val.asDict() is not None:
                for param, op in val.asDict().items():
                    #print '='.join([str(param), str(op)])
                    try:
                        annotationsSection.append("{k}:".format(k=key) + levelOneDelim.join([param, op]))
                    except TypeError:
                        annotationsSection.append("{k}:".format(k=key) + levelOneDelim.join([param, nullMarker]))


    annotationsSectionString = nullMarker
    if len(annotationsSection) > 0:
        annotationsSectionString = outerDelim.join(annotationsSection)

    # Flatten content group fields (including the nested genre array)
    contentSection = list()
    for (k, v) in tempDict['content'].asDict().iteritems():
        theVal = v
        # Flatten the nested array
        if k == 'contentmetadata_genres':
            if v is not None:
                if isinstance(v, list):
                    theVal = levelTwoDelim.join(v)
                else:
                    theVal = v
            else:
                theVal = nullMarker

        try:
            contentSection.append("{k}{d}{v}".format(k=k, d=levelOneDelim, v=theVal.encode('utf-8', 'ignore')))
        except AttributeError:
            contentSection.append("{k}{d}{v}".format(k=k, d=levelOneDelim, v=nullMarker))
        #except UnicodeEncodeError:
            #pass
            #continue #contentSection.append("{k}={v}".format(k=k.decode('utf-8', 'ignore'), v=theVal))

    contentSectionString = nullMarker
    if len(contentSection) > 0:
        contentSectionString = outerDelim.join(contentSection)

    # Flatten the array of upcomingContent group fields (including the nested genre array)
    upcomingContentSection = list()
    for contentEntry in tempDict['upcomingContent']:
        thisContent = list()
        for (k, v) in contentEntry.asDict().iteritems():
            theVal = v
            # Flatten the nested array
            if k == 'contentmetadata_genres':
                if v is not None:
                    if len(v) > 0:
                        theVal = levelTwoDelim.join(v)
                    else:
                        theVal = v
                else:
                    v = nullMarker

            try:
                thisContent.append("{k}{d}{v}".format(k=k, d=levelOneDelim, v=theVal.encode('utf-8', 'ignore')))
            except AttributeError:
                thisContent.append("{k}{d}{v}".format(k=k, d=levelOneDelim, v=nullMarker))

        if len(thisContent) > 0:
            upcomingContentSection.append(levelTwoDelim.join(thisContent))
        else:
            upcomingContentSection.append(thisContent)

    upcomingContentSectionString = nullMarker
    if len(upcomingContentSection) > 0:
        upcomingContentSectionString = outerDelim.join(upcomingContentSection)

    timeshiftformatsString = nullMarker
    if 'contentrestriction_timeshiftformats' in tempDict and\
       tempDict['contentrestriction_timeshiftformats'] is not None and\
       len(tempDict['contentrestriction_timeshiftformats']) > 0:
        timeshiftformatsString = outerDelim.join(tempDict['contentrestriction_timeshiftformats'])

    demographicsString = nullMarker
    if len(tempDict['demographics']) > 0:
        demographicsString = outerDelim.join([levelOneDelim.join([key, val]) for key, val in tempDict['demographics'].items()])

    # Replace the record's nested fields with the flattend versions generated above
    finalDict['annotations'] = annotationsSectionString
    finalDict['content'] = contentSectionString
    finalDict['upcomingContent'] = upcomingContentSectionString
    finalDict['contentrestriction_timeshiftformats'] = timeshiftformatsString
    finalDict['demographics'] = demographicsString

    # Make a YEAR-MONTH-DAY-HOUR string from the timestamp to be used for a filename
    hourString = strftime('%Y-%m-%d-%H', gmtime(finalDict['timestamp_received']/1000))

    # Convert millisecond epoch to UTC string timestamp
    finalDict['timestamp_received'] = strftime('%Y-%m-%d %H:%M:%S', gmtime(finalDict['timestamp_received']/1000))

    # Initialize the path to write the output to
    targetFlatFile = outputFilesRootDir + "/{time}.tsv.gz"
    targetFlatFile = targetFlatFile.format(time=hourString)

    #####
    #####
    ##### END customer data-specific section
    #####
    #####

    # Make a final pass on the record,
    # force the 'Null' key where necessary
    # and ensure encoding is correct
    for (k, v) in finalDict.iteritems():
        if v is None:
            finalDict[k] = nullMarker
        elif isinstance(v, unicode):
            continue
        elif isinstance(v, str):
            finalDict[k] = v.decode('utf-8', 'ignore')

    # Place the values into consistent columns, tab delimited
    theRow = templateString % finalDict

    # gzip compress the output file
    theFile = gzip.GzipFile(targetFlatFile, 'a+')
    theFile.write(theRow.encode('utf-8', 'ignore'))
    theFile.write("\n")
    theFile.close()


In [8]:
%%time

numFiles = len(goodFiles)
for i, f in enumerate(goodFiles):
    # Load the Parquet data
    moreData = sqlCtx.parquetFile(f)

    # Do the thing
    moreData.foreach(parquetToTSV)
    
    print "File", i, "", numFiles

CPU times: user 26.3 ms, sys: 10.7 ms, total: 37 ms
Wall time: 720 ms


In [38]:
# Load the Parquet files of interest
# from a pre-computed list of full
# file paths
parquetFiles = []
with open(inputFilename) as theFile:
    parquetFiles = [l.strip() for l in theFile.readlines()]
    theFile.close()

In [9]:
import fnmatch
from os import walk

# Recursively glob the Parquet files 
# of interest within a root directory
parquetFiles = []
for root, dirnames, filenames in walk('/Users/kdunn/Desktop/TWC'):
  for filename in fnmatch.filter(filenames, '*.parquet'):
    parquetFiles.append(path.join(root, filename))

#print parquetFiles

In [None]:
# All files in a single directory (preferred)

# Loads every file within path into a DataFrame object
pFiles = sqlCtx.load(source="parquet",
                     path="/Users/kdunn/Desktop/TWC/data/")

inParallel = sc.parallelize(pFiles)

%time inParallel.foreach(parquetToTSV)

In [None]:
# Files spread across many directories

# Create a Python dictionary representation of the Schema
# to be used in creation of the DataFrame object containing
# schema, initialized with Nulls (below)
nullSchemaRdd = dict( fieldsAndNulls )

"""
http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3C14be2870b20.27c8.af1f4bf02661e0e7caf994ede2c15203@prismalytics.io%3E
""";

# Initialize a Null RDD containing the master schema to ensure unions succeed
parquetCollection = sqlContext.createDataFrame(nullSchemaRdd, schemaDict)
#sqlCtx.load(source="parquet", path='/'.join(goodFiles[0].split('/')[:-1])).rdd

for rootDir in goodFiles:
    thisFile = sqlCtx.load(source="parquet", path='/'.join(rootDir.split('/')[:-1])).rdd
    parquetCollection = parquetCollection.union(thisFile)

    
%time parquetCollection.foreach(parquetToTSV)