In [1]:
from datetime import datetime

from IPython.core.display import display, HTML
from IPython.display import clear_output

from keras.callbacks import History 
from keras.layers import Dense, Dropout
from keras.metrics import top_k_categorical_accuracy
from keras.models import Sequential,load_model
from keras.optimizers import SGD, Adam
from keras.utils import to_categorical

import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row

import re
from sklearn import preprocessing
from sklearn.utils import class_weight
import sys
import tensorflow

spark = SparkSession.builder \
        .master("local") \
        .config("spark.executor.memory", '32g') \
        .config("spark.driver.memory", '32g') \
        .config("spark.local.dir","/Volumes/Rob Backup/spark") \
        .getOrCreate()

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


In [2]:
display(HTML("<style>.container { width:100% !important; }</style>"))
np.set_printoptions(linewidth=150)
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 500)
pd.options.mode.chained_assignment = None

# Load Data

In [3]:
def allParquets(data_path):
    dfs = []
    for file in os.listdir(data_path):
        if file=="mappings":
            dfs+=allParquets(data_path+"mappings/")
            continue
        if file=="reports" or file=='.DS_Store':
            continue
        df=spark.read.parquet(data_path+file).repartition(128)
        dfs.append({'file':file,'data':df})
    return dfs

# Group by Unique Incidents
Account, system_id, and upload_time uniquely denote each individual incident in the data, so let's group all that data together.

In [4]:
def groupIDs(dfs):
    datasets = []
    numDFs = len(dfs)
    for i,df in enumerate(dfs):
        filename = df['file']
        print("{}/{}: {}".format(i,numDFs,filename))
        if 'mapping' in filename or 'yumlog_yumlog' in filename:
            continue    
        names   = [name for name in df['data'].schema.names if name not in ['account','system_id','upload_time']]
        grouped = df['data'].groupBy('account','system_id','upload_time').agg(*[F.collect_list(name) for name in names])
        mapped  = grouped.rdd.map(lambda x: ['{}_{}_{}'.format(x['account'],x['system_id'],x['upload_time']),
                                             x.asDict()])
        datasets.append({'file':filename,'data':mapped.toDF(['identity','data'])})
    return datasets

In [5]:
def joinSelect(dfDict,num=""):
    return dfDict['data'].selectExpr("identity as identity{}".format(num), "data as {}".format(dfDict['file']))

def joinAll(datasets):
    df = joinSelect(datasets[0])
    for dataset in datasets[1:]:
        df2 = joinSelect(dataset,2)
        df = df.join(df2,df.identity==df2.identity2,"outer").drop("identity2")
    return df

In [16]:
date = '2018-08-09'
data_path="insights_data/{}/".format(date)

df = joinAll(groupIDs(allParquets(data_path)))

=== insights_parsers_uname_uname ===
=== insights_parsers_hostname_hostname ===
=== rule_data ===
=== insights_parsers_ps_psauxcww ===
=== insights_parsers_redhat_release_redhatrelease ===
=== insights_parsers_meminfo_meminfo ===
=== exec_times ===
=== insights_parsers_docker_list_dockerlistimages ===
=== insights_combiners_virt_what_virtwhat ===
=== insights_parsers_installed_rpms_installedrpms ===
=== insights_parsers_dmidecode_dmidecode ===
=== insights_parsers_ps_psaux ===
=== insights_parsers_cpuinfo_cpuinfo ===
=== insights_parsers_yumlog_yumlog ===
=== account_mapping ===
=== system_id_account_mapping ===
=== system_id_mapping ===
=== insights_combiners_services_services ===
=== errors ===
=== insights_parsers_lsmod_lsmod ===
=== insights_parsers_yum_yumrepolist ===


In [17]:
df=df.repartition(50)

In [18]:
df.write.parquet("{}-{}".format(date,'joined'))