In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
import numpy
import scipy
import matplotlib
import matplotlib.dates as mpd
import pylab as plt
import datetime as dtm
import pytz
import multiprocessing as mpp
import pickle
import os
import time
import numba
#
import pyspark
import h5py
#
# TODO: phase out unreferenced hpc_lib calls...
import hpc_lib

#
#data_file_name = 'data/mazama_usage_20200506_tool8.out'
#data_file_name = 'data/sacct_sherlock_out_serc2020_05_08.out'
data_file_name = 'data/serc_usage_20200914.out'
#
pkl_name = "{}.pkl".format(os.path.splitext(data_file_name)[0])
h5_name = "{}.h5".format(os.path.splitext(data_file_name)[0])
#

### PySpark implementation of HPC_analytics (prototype)
- Right now, a hybrid of vanilla `RDD` and `PySpark DataFrames`
- Mixing a bit of using `SQLContext` context methods and whatever the native (??) Spark context.

#### Brief sumary:
PySpark is the Python implementation of Spark, which is a distributed data processing infrastructure. Spark should parallelize across multiple nodes, and so should be a better multi-processing option than `python.multiprocessing`. The syntax is not Pythonic. At all, so it's basically like writing another language in Python, and the workflow strategies are very different as well, so if you're a Python person, be prepared to pivot a bit.

The biggest problem I'm still having is the final act of reading a very large data file into memory or transfering into a new (disk based) container. This, of course, should be simple, since it is really the fundamental and primary purpose of Spark, but alas... The problem arises when the distributed (Java) VMs exceed memory limitations. But we'll get there...

#### 1. A quick look at the inpuit data:

In [3]:
# Take a quick look at the input data:
#
with open(data_file_name, 'r') as fin:
    k=0
    for rw in fin:
        print('** ', rw)
        k+=1
        if k>10: break

**  User|Group|GID|JobName|JobID|JobIDRaw|Partition|State|Timelimit|NCPUS|NNodes|Submit|Eligible|Start|End|Elapsed|SystemCPU|UserCPU|TotalCPU|NTasks|CPUTimeRAW|Suspended|

**  saipb|oneillm|328022|hovmuller|62339523|62339523|serc|COMPLETED|4-00:00:00|16|1|2020-03-01T00:10:36|2020-03-01T00:10:36|2020-03-01T00:11:24|2020-03-01T00:12:05|00:00:41|00:02.369|00:17.315|00:19.685||656|00:00:00|

**  |||batch|62339523.batch|62339523.batch||COMPLETED||16|1|2020-03-01T00:11:24|2020-03-01T00:11:24|2020-03-01T00:11:24|2020-03-01T00:12:05|00:00:41|00:02.367|00:17.315|00:19.683|1|656|00:00:00|

**  |||extern|62339523.extern|62339523.extern||COMPLETED||16|1|2020-03-01T00:11:24|2020-03-01T00:11:24|2020-03-01T00:11:24|2020-03-01T00:12:05|00:00:41|00:00.001|00:00:00|00:00.001|1|656|00:00:00|

**  pjwomble|gorelick|26961|6dff71d6eaf0c|62339657_0|62339659|serc|COMPLETED|00:59:00|1|1|2020-03-01T00:17:07|2020-03-01T00:17:08|2020-03-01T00:17:14|2020-03-01T00:39:21|00:22:07|00:10.982|21:27.659|21:38.642||1327|

#### 2. Instantiate and configure some context handler objects.
- There are a few...
- The "spark" and "sql" variants seem to come from different branches of the project, or source projects, that have since merged, albeit perhaps not entirely gracefully.

### Build some working components:
- Spark context objects
- Type dictionary translator
- Other type, etc. translator maps


In [4]:
n_cpu = 8
#
# .config("spark.driver.memory", "15g")
#conf = pyspark.SparkConf('local[*]').set("spark.cores.max", "6").set("spark.executor.instances", "4").set("spark.executor.cores","2")
conf = pyspark.SparkConf('local[{}]'.format(n_cpu)).set("spark.driver.memory", "15g")
#
#conf = conf.set("spark.executor.memory", "4g").set("spark.executor.pyspark.memory", "3g")
sc   = pyspark.SparkContext(conf=conf)
#

# also build a SQL context? We never ended up using this, though I think you can use
#. it to do SQL querries on the data sets.
sc_sql = pyspark.SQLContext(sc)
#
spark = pyspark.sql.SparkSession.builder.appName('HPC_loader').master('local[{}]'.format(n_cpu)).config("spark.driver.memory", "15g").getOrCreate()
#spark = pyspark.sql.SparkSession.builder.appName('HPC_loader').config(conf).getOrCreate()
#
#sc.stop()
#spark.stop()

Pull some data structures, handler functions, etc. from relevant modules (ie `hpc_lib`). Note that eventually, we'll want to consolicate the `process_row()` function.

In [5]:
delim = '|'
types_dict = hpc_lib.SACCT_data_handler.default_types_dict
print('** typex_dict: ', types_dict)
#
# some numpy. (and other?) data types are not (well) supported by Spark, most pointedly some of the 
#. numpy.float{k} variants, so write a dictionary/map to re-type them. NOTE: this might need to include
#. some numpy.int{k} types as well.
numpy_re_typer={numpy.float64:float, numpy.float128:float, numpy.int64:int }
#
# python functions to use on grouped/aggregated columns:
group_py_functions = {'End':numpy.nanmax, 'Start':numpy.nanmin, 'NCPUS':numpy.nanmax, 'NNodes':numpy.nanmax}
#
# Define one or more row processing functions.
def f_rw(rw, header_names, RH_index, types_dict=types_dict, delim='|'):
    rws = rw[:-1].split(delim)
    #
    #if not len(rws)==0:
    #    return rws
    return [None if s=='' else types_dict.get(h,str)(s) for h,s in zip(header_names,rws)] + [rws[RH_index['JobID']].split('.')[0] ]
    #return [str(s) for h,s in zip(header_names,rws)]
#
# this will be used to reduce() the raw data to summary data:
def f_reduce_row(r1, r2):
     return tuple([group_py_functions.get(hdr, lambda x: x[0] )([x1, x2]) 
             for k, (hdr,x1,x2) in enumerate(zip(header_names,
                                *sorted([r1, r2], key=lambda X:X[RH_index['Submit']]))) ] )

#
def spark_types_to_numpy(spark_df):
    # row1 = df1.agg({"x": "max"}).collect()[0]
    #
    # peel off the dtypes so it will pickle properly:
    spark_dtypes = spark_df.dtypes
    #
    lens = spark_df.rdd.map(lambda rw: [len(s) if spark_dtypes[k][1]=='string' else None for k,s in enumerate(rw) ] ).reduce(lambda x1,x2: max(x1,x2))

    dtypes_out = []
    for (nm,tp), s_len in zip(spark_dtypes, lens):
        if tp in ('int', 'bigint'):
            tp_n = '>i8'
            #tp_n = '>f8'
            # do we need a casting function?
            #nm_f = float
        elif tp in ('float', 'double'):
            tp_n = '>f8'
        elif tp in ('string'):
            tp_n = 'S{}'.format(s_len)
        #
        dtypes_out += [(nm, tp_n)]
    #
    return dtypes_out



** typex_dict:  {'User': <class 'str'>, 'JobID': <class 'str'>, 'JobName': <class 'str'>, 'Partition': <class 'str'>, 'State': <class 'str'>, 'JobID_parent': <class 'str'>, 'Timelimit': <function elapsed_time_2_day at 0x7fa40335b820>, 'Start': <function str2date_num at 0x7fa40335b700>, 'End': <function str2date_num at 0x7fa40335b700>, 'Submit': <function str2date_num at 0x7fa40335b700>, 'Eligible': <function str2date_num at 0x7fa40335b700>, 'Elapsed': <function elapsed_time_2_day at 0x7fa40335b820>, 'MaxRSS': <class 'str'>, 'MaxVMSize': <class 'str'>, 'NNodes': <class 'int'>, 'NCPUS': <class 'int'>, 'MinCPU': <class 'str'>, 'SystemCPU': <function elapsed_time_2_day at 0x7fa40335b820>, 'UserCPU': <function elapsed_time_2_day at 0x7fa40335b820>, 'TotalCPU': <function elapsed_time_2_day at 0x7fa40335b820>, 'NTasks': <class 'int'>}


In [6]:
# Preliminarily, read file into line-strings. Extract header row so that we
#. can type() each column when we split() it up.
#
lines = sc.textFile(data_file_name)
header_names = (lines.take(1)[0])[:-1].split(delim)
RH_index = {s:k for k,s in enumerate(header_names) }
#
print('** Headers: ', header_names)

** Headers:  ['User', 'Group', 'GID', 'JobName', 'JobID', 'JobIDRaw', 'Partition', 'State', 'Timelimit', 'NCPUS', 'NNodes', 'Submit', 'Eligible', 'Start', 'End', 'Elapsed', 'SystemCPU', 'UserCPU', 'TotalCPU', 'NTasks', 'CPUTimeRAW', 'Suspended']


### Excluding header row from data collection:
- This is surprisingly harder than it looks, and searching for solutions seems to be elusive.
-  I think the most direct and common approach is to read text data via `textFile()`, then use filter() to exclude rows that look like the first row. This sounds kinda dumb for a single file, but it is a good option when reading multiple files.
- *(spark) dataframes:* Use a syntactical variant of `spark.read.format('CSV')` method below to read the data into a dataframe. This provides options to exclude and catch the header row.


#### Dataframes:
- using the sql context, we can read the data into a dataframe
- Nominally fast and easy, but I think really for well behaved data.
- Getting the header row is not too tough, but I'm not so sure about excluding a false terminal column, resulting from a terminal delimeter (row string ending in a delimiter).
- In fact, we seem to get some weird behavior from this
- ... to the point that I would probably just err on the side of having more control and maybe burning some cycles on the filter() option (which i expect is pretty well optimized on the back end).
- HOWEVER: Preliminary assessments just doing a `.count()` suggests that DF might be much, much faster than the standard RDD methods... Though that may also be because the `DataFramds` methods are using a context or session that is not CPU limited -- which would make sense.


In [7]:
# # Another way to read the file with headers. This will give an effective array of (val,ky) tuples.
# df_rows1 = spark.read.format('CSV').option('header', 'true').option('sep', '|').load(data_file_name)
# #
# # another syntax:
# df_rows = spark.read.csv(data_file_name, header=True, sep='|')
# #

# for rw in df_rows1.take(10):
#     print('** ', rw[:])

# # Another syntax to load directly into a spark dataframe (via .sql):
# #
# #
# print('** type: ', type(rows_2))
# print('** type: ', type(df_rows))
# print('** dypes: ', df_rows.dtypes)
# print('** header: {}'.format( df_rows.schema.names ) )
# #
# print('\n*** *** ')
# for rw in df_rows.take(5):
#     print('** ', rw[:])
#     #print('* * ', rw.head)
#
# print('*** ', df_rows.schema)
# #
# print('*** ', df_rows.dtypes)
# #print('** ', set())

In [8]:
#
# identifying the header row(s) and filtering bogus terminal characters on each row:
n_terminal = 0
header_string = lines.take(1)[0]
#
while header_string[-1] in ('\n', delim):
    header_string=header_string[:-1]
    n_terminal += 1
#
# if necessary, trim off row-terminal characters:
print('** n_terminal: ', n_terminal)
if n_terminal>0:
    lines = lines.map(lambda ln: ln[:-n_terminal])
#
header_names = header_string.split(delim) + ['JobID_parent']
RH_index = {s:k for k,s in enumerate(header_names) }
print('** Headers[{}]: '.format(len(header_names), header_names) )
#
# for c in (lines.take(2)[1]):
#     print('{}: [{}]'.format(c, ord(c)))

** n_terminal:  1
** Headers[23]: 


In [14]:
# use this to filter header rows:
n_startswith = 15
header_start = header_string[0:n_startswith]
#
rows = lines.filter(lambda s: not s.startswith(header_start) ).map(lambda x: f_rw(x, header_names,
                                            types_dict=types_dict, RH_index=RH_index) )
rows = rows.map(lambda rw: [None if (x is None or x=='') else numpy_re_typer.get(type(x), type(x))(x) for x in rw])
#

In [15]:
#
# we can either group and reduce using the RH{} index, or convert to a DF first:
#rows_df = spark.createDataFrame( rows, header_names ).sort('JobID')
#
# print('** rows_df schema[{}]: {}'.format(len(rows_df.schema), rows_df.schema))
# print('** dytpes[{}]: {}'.format(len(rows_df.dtypes), rows_df.dtypes))
# print('** header_names[{}]: {}'.format(len(header_names), header_names))

In [16]:
# Here, let's try a RDD_pair, then reduce_by_key() function:
# (this appears to work, but needs to be validated )
row_pairs = rows.map(lambda x: (x[RH_index['JobID_parent']], list(x[:])))
#
summary_rdd = row_pairs.reduceByKey(f_reduce_row)
#

# for rw in summary_rdd.take(10):
#     print('** ', rw)

In [17]:
# N=100
# rows_df = spark.createDataFrame( rows.takeSample(withReplacement=True, num=N), header_names )

In [18]:
#
# make a sampling DataFrame() to automagically determine the schema (this breaks when we do just 
#. the summary... which suggests maybe a problem -- NTasks is not populating correctly maybe?)
# otherwise, we need to write some more translator dictionaries to map numpy types to Spark types
#
for n in range(2,7):
    N=int(10**n)
    try:
        rows_df = spark.createDataFrame( rows.take(N), header_names )
        break
    except:
        # try a bigger sample.
        print('broke for [{}]. trying a bigger sample.'.format(N))
    #
        
# rows_df = spark.createDataFrame( rows, header_names ).sort('JobID')
summary_df = spark.createDataFrame(summary_rdd.values().map(lambda rw: 
                            [None if (x is None or x=='') else numpy_re_typer.get(type(x),
                                lambda a:a)(x) for x in rw]),
                                   rows_df.schema)

In [None]:
# for rw in summary_df.take(10):
#     print('** ', rw[:])

# dt_dict = dict(summary_df.dtypes)
# sdf_dtypes = summary_df.dtypes
# #
# print('** ', sdf_dtypes)
# #lens = summary_df.rdd.map(lambda rw: [len(s) for k,s in enumerate(rw) if sdf_dtypes[k][1]=='string'] ).reduce(lambda x1,x2: max(x1,x2))
# lens = summary_df.rdd.map(lambda rw: [len(s) if sdf_dtypes[k][1]=='string' else None for k,s in enumerate(rw) ] ).reduce(lambda x1,x2: max(x1,x2))
# print('** lens: ', lens)



# # for rw in lens.take(10):
# #     print('** ', rw)
# print('** ')
# for (cl,n),l in zip(summary_df.dtypes, lens):
#     print('** ', cl,n,l)

In [21]:
print('** ', summary_df.dtypes)
#spark_to_numpy_types('double':float, 'float':float, 'bigint':int, 'int':int, 'string':str)
#
print('** ')
#
numpy_dtypes = spark_types_to_numpy(summary_df)
array_len = summary_df.count()
#
print('** output_dfs: ', numpy_dtypes)

**  [('User', 'string'), ('Group', 'string'), ('GID', 'string'), ('JobName', 'string'), ('JobID', 'string'), ('JobIDRaw', 'string'), ('Partition', 'string'), ('State', 'string'), ('Timelimit', 'double'), ('NCPUS', 'bigint'), ('NNodes', 'bigint'), ('Submit', 'double'), ('Eligible', 'double'), ('Start', 'double'), ('End', 'double'), ('Elapsed', 'double'), ('SystemCPU', 'double'), ('UserCPU', 'double'), ('TotalCPU', 'double'), ('NTasks', 'bigint'), ('CPUTimeRAW', 'string'), ('Suspended', 'string'), ('JobID_parent', 'string')]
** 
** output_dfs:  [('User', 'S8'), ('Group', 'S8'), ('GID', 'S5'), ('JobName', 'S24'), ('JobID', 'S8'), ('JobIDRaw', 'S8'), ('Partition', 'S4'), ('State', 'S18'), ('Timelimit', '>f8'), ('NCPUS', '>i8'), ('NNodes', '>i8'), ('Submit', '>f8'), ('Eligible', '>f8'), ('Start', '>f8'), ('End', '>f8'), ('Elapsed', '>f8'), ('SystemCPU', '>f8'), ('UserCPU', '>f8'), ('TotalCPU', '>f8'), ('NTasks', '>i8'), ('CPUTimeRAW', 'S5'), ('Suspended', 'S7'), ('JobID_parent', 'S8')]


In [23]:

my_types = {'i8':'>f8', 'i16':'>f16'}
foutname = 'data/serc_spark_summary.h5'
#
#os.remove(foutname)
with h5py.File(foutname, 'w') as fout:
    ds = fout.create_dataset('summary', (array_len, ),
                            dtype=numpy_dtypes)
    #
    #ds[...] = numpy.array(summary_df.collect())[:]
    for cl,tp in numpy_dtypes:
        print('** col: {} :: {}'.format(cl, tp))
        #if not cl=='NTasks': continue
#        ds[cl] = numpy.array([numpy.nan if x is None else x for x in numpy.reshape(summary_df.select(cl).collect(),
#                                                                     (array_len,))])
        # None types for integer types are badly handled by numpy and HDF5; we have to convert them \
        #   basically manually
        if tp in ('>i8', '>i16'):
            ds[cl] = numpy.array([numpy.nan if x is None else x for x in numpy.reshape(summary_df.select(cl).collect(),
                                                                     (array_len,))])[:]
        else:
            ds[cl] = numpy.reshape(summary_df.select(cl).collect(), (array_len,)).astype(tp)[:] 
            #ds[cl] = numpy.array(numpy.reshape(summary_df.select(cl).collect(), (array_len,)), dtype=tp)


#


** col: User :: S8
** col: Group :: S8
** col: GID :: S5
** col: JobName :: S24
** col: JobID :: S8
** col: JobIDRaw :: S8
** col: Partition :: S4
** col: State :: S18
** col: Timelimit :: >f8
** col: NCPUS :: >i8
** col: NNodes :: >i8
** col: Submit :: >f8
** col: Eligible :: >f8
** col: Start :: >f8
** col: End :: >f8
** col: Elapsed :: >f8
** col: SystemCPU :: >f8
** col: UserCPU :: >f8
** col: TotalCPU :: >f8
** col: NTasks :: >i8
** col: CPUTimeRAW :: S5
** col: Suspended :: S7
** col: JobID_parent :: S8


In [24]:
with h5py.File(foutname, 'r') as fin:
    #print('NTasks:')
    for cl in ['NTasks', 'JobID', 'NCPUS', 'Elapsed']:
        print('** {}'.format(cl))
        print(fin['summary'][cl][0:20])
        

** NTasks
[-9223372036854775808 -9223372036854775808 -9223372036854775808
 -9223372036854775808 -9223372036854775808 -9223372036854775808
 -9223372036854775808 -9223372036854775808 -9223372036854775808
 -9223372036854775808 -9223372036854775808 -9223372036854775808
 -9223372036854775808 -9223372036854775808 -9223372036854775808
 -9223372036854775808 -9223372036854775808 -9223372036854775808
 -9223372036854775808 -9223372036854775808]
** JobID
[b'62339657' b'62339657' b'62339657' b'62339657' b'62339657' b'62339657'
 b'62344473' b'62344473' b'62344473' b'62344473' b'62344473' b'62344473'
 b'62344473' b'62346892' b'62346892' b'62346892' b'62346892' b'62346892'
 b'62350507' b'62350507']
** NCPUS
[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
** Elapsed
[1.53009259e-02 1.52430556e-02 1.53703704e-02 1.50810185e-02
 1.51388889e-02 1.51620370e-02 1.53703704e-02 1.42592593e-02
 9.25925926e-05 1.54513889e-02 1.54745370e-02 1.41898148e-02
 1.42013889e-02 1.54513889e-02 1.51851852e-02 1.43171296e-02
 1

In [None]:
# #col = 'NCPUS'
# col = 'NTasks'
# #
# with h5py.File('summary.h5', 'a') as fin:
#     print('** ', fin['summary'][col][0:10])
#     #
#     print(fin['summary']['NTasks'][0], fin['summary']['NTasks'][0]+1)
#     #fin['summary']['NTasks'][0:5] = numpy.array([7 for _ in range(5)][:])
#     #fin['summary']['NTasks'][0:5] = (numpy.ones(5)*7)[:]
#     #fin['summary']['NTasks'][0]=42.
#     fin['summary'][col,0:5] = numpy.ones(5)*43
#     fin['summary'][col,5:10] = numpy.array([numpy.nan for _ in range(5)])[:]
#     fin['summary'][col,5:10] = numpy.array([7 for _ in range(5)])[:]
#     #
#     # Not sure write_direct() will work with named columns.
#     #fin['summary'].write_direct(numpy.ones(5)*42., source_sel=numpy.s_[0:5], dest_sel=numpy.s_[5:10, RH_index[col]])
#     #
#     #fin['summary'].write_direct(numpy.ones(5, dtype='>i8')[:]*42, source_sel=numpy.s_[0:5], 
#     #                            dest_sel=numpy.s_[0:5] )
    
#     #
#     print('** ', fin['summary'][col][0:15])
