<div style="text-align:center"><h1>AWAKE Data Reduction</h1></div>
<div style="text-align:center"><h2><i>Large Scale Data Reduction of AWAKE experiment data (HDF5 files) with Apache Spark</i></h2></div>
<hr style="border-top-width: 4px; border-top-color: #34609b;">

This notebook demonstrates accessing the AWAKE CSV compressed database and visualizing the extracted datasets  
***Author: Aman Singh Thakur***  
***Contact: Prasanth Kothuri***  
  
To run this notebook we used the following configuration:
* *Software stack*: LCG_96 Python3
* *Platform*: centos8-gcct
* *Spark Cluster*: cloud containers

This is developed in the context of CERN-HSF Google Summer Of Code 
<img align="left" src="https://developers.google.com/open-source/gsoc/resources/downloads/GSoC-logo-horizontal-200.png" alt="Italian Trulli">

In [5]:
import os
import h5py, io
import shutil
import csv
from csv import reader
from pyspark.sql.readwriter import DataFrameWriter
from pyspark.sql.functions import *
import re

print (h5py.__version__)

prefix_domain_user = "root://eosuser.cern.ch/"
prefix_domain = "root://eospublic.cern.ch/"
dir_csv_home = "/eos/user/p/pkothuri/CSVFiles/SparkFinal/"
dir_spark_files = "/eos/user/p/pkothuri/AWAKE/CSVFiles/SparkFinal/"
dir_home = "/eos/experiment/awake/event_data/"
dir_h5_filepath = "/eos/experiment/awake/event_data/2016/"
final_pair = {}

def get_all_files(dir_filepath, file_type):
    all_files = []
    #print(dir_filepath)
    for file in os.listdir(dir_filepath):
        file_fields = file.split(".")
        if len(file_fields) > 1:
            if str(file_fields[1]) == str(file_type):
                all_files.append(prefix_domain+dir_filepath+str(file))
                #all_files.append(dir_filepath+str(file))
    return all_files

def get_all_files_home_util(dir_h5_filepath, depth):
    if depth == 0:
        return get_all_files(dir_h5_filepath, "h5")
    else:
        temp_arr = os.listdir(dir_h5_filepath)
        all_H5_files = []
        for val in temp_arr:
            all_H5_files+=get_all_files_home_util(dir_h5_filepath+val+"/", depth-1)
    return all_H5_files

def get_all_files_home(dir_h5_filepath, depth):
    year_directories = os.listdir(dir_h5_filepath)
    all_H5_files = []
    all_H5_files = get_all_files_home_util(dir_h5_filepath, depth)
    return all_H5_files

def mkdir_structure(filename):
    if not os.path.exists(dir_csv_home):
        os.mkdir(dir_csv_home)
    filename = filename[int(len(prefix_domain+dir_home)-1):]
    filename_arr = filename.split(".")
    file_arr = filename.split("/")
    result = dir_csv_home
    for i in range(0, len(file_arr)-1):
        result+=file_arr[i]+"/"
        if not os.path.exists(result):
            os.mkdir(result)
        if i == 0:
            year = file_arr[i]
        elif i == 1:
            month = file_arr[i]
        elif i==2:
            day = file_arr[i]
    return year, month, day, filename_arr[0]

def read_input(filepath):
    all_h5_files = []
    with open(filepath, 'r') as file:
        files_list = file.read().split(",")
        for file in files_list:
            if(file!=""):
                all_h5_files.append(file)
    return all_h5_files

def write_input(all_h5_files, filename):
    if not os.path.exists(dir_csv_home):
        os.mkdir(dir_csv_home)
    with open(dir_csv_home+filename, 'w') as file:
        for row in all_h5_files:
            file.write(row+",") 

# check the number of files in the input            
all_h5_files = get_all_files_home(dir_h5_filepath, 2)
print(len(all_h5_files))

2.9.0
215769


In [6]:
def clean_database(dir_filepath):
    shutil.rmtree(dir_filepath)

#clean_database(dir_csv_home)
# GETTING VALUE AS PER DATATYPE
def get_dataset_value(size, file):
    if(int(size) == 0 or int(size) > 1):
        return ""
    else:
        if(str(file.dtype) == "|S1"):
            return ord(file[0])
        elif "S" in str(file.dtype):
            return str(file[0].decode('utf-8'))
        else:
            return str(file[0])    

def decode_attr(file):
    try:
        return file.decode('utf-8')
    except:
        return str(file)
    
def get_dataset_attr(file):
    comment = ""
    acqStamp = "nan"
    exception = "False"
    for key in file.keys():
        if "comment" in key:
            comment = decode_attr(file[key])
        if "acqStamp" in key:
            acqStamp = str(file[key])
        if "exception" in key:
            exception = str(file[key])
    return [comment, acqStamp, exception]

def write_into_CSV(file, values, comment_hash, dataset_hash, group_attrs):
    if(isinstance(file, h5py.Group)):
        for sub in file.keys():
            try:
                group_attrs = get_dataset_attr(file.attrs)
                #print(file.name+" Group "+str(group_attrs))
            except:
                group_attrs = ["", "nan", "False"]
            if(isinstance(file[sub], h5py.Dataset)):
                try:
                    size = str(file[sub].size)
                except:
                    size = "0"
                try:
                    dataset_value = str(get_dataset_value(size, file[sub])).strip()
                except:
                    dataset_value = ""
                try:
                    datatype = str(file[sub].dtype).strip()
                except:
                    datatype = ""
                if (group_attrs[0]=="" and group_attrs[1]=="nan" and group_attrs[2]=="False"):
                    group_attrs = get_dataset_attr(file[sub].attrs)
                #print(file[sub].name+" "+str(group_attrs))
                dataset_hash.add(file[sub].name)
                if group_attrs[0] != "":
                    comment_hash[group_attrs[0]] = file[sub].name
                values.append([file[sub].name, file.name, group_attrs[1], group_attrs[2],
                                      dataset_value, "\""+str(file[sub].shape)+"\"", str(datatype)])
            elif (isinstance(file[sub], h5py.Group)):
                    write_into_CSV(file[sub], values, comment_hash, dataset_hash, group_attrs)
    return values, comment_hash, dataset_hash

In [7]:
def extractHDF5(hdf5file):
    prefix = hdf5file[0]
    content = hdf5file[1]
    f=h5py.File(io.BytesIO(content))
    #f = h5py.File(hdf5file)
    central_csv_db = [] # It'll be loader later
    comment_hash = {}
    group_attrs = ["", "nan", "False"]
    dataset_hash = set()
    values, comment_hash, dataset_hash = write_into_CSV(f, [], comment_hash, dataset_hash, group_attrs)
    result = "DatasetName,GroupName,AcqStamp,Exception,DatasetValue,Shape,Datatype\n"   
    for row in values :
        for i in range(0, len(row)):
            result+=str(row[i])
            if i != len(row)-1:
                result+=","
        result+="\n"
    comment_result = ""
    for key in comment_hash.keys():
        comment_result += "\""+key+"\""+","+comment_hash[key]+"\n"
    dataset_result = ""
    for val in dataset_hash:
        dataset_result+=val+","
    #print(comment_result, dataset_result)
    final_pair[prefix]=result
    final_pair["comment_"+prefix]=comment_result
    final_pair["dataset_"+prefix]=dataset_result
    return final_pair

In [9]:
inputData = sc.binaryFiles(','.join(all_h5_files))
hdf5_reduced_collection = inputData.map(lambda x: extractHDF5(x))
if not os.path.exists(dir_csv_home):
    os.mkdir(dir_csv_home)

if os.path.exists(dir_spark_files):
    clean_database(dir_spark_files)

In [None]:
try:
    hdf5_reduced_collection.repartition(10000).saveAsTextFile(prefix_domain_user+dir_spark_files)
except Exception as e:
     # There is a bug in xrootd-connector and we can ignore it
    if "ch.cern.eos.XRootDFileSystem.delete" in str(e):
        pass
    else:
        raise Exception(e)

In [None]:
with open(dir_csv_home+"Central/"+"dataset_hash.csv", 'w') as csvfile:
    writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    for val in dataset_hash:
        writer.writerow([val])
    csvfile.close()

with open(dir_csv_home+"Central/"+"comment_hash.csv", 'w') as csvfile:
    writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    for key in comment_hash.keys():
        #print([key, comment_hash[key]])
        writer.writerow([key, comment_hash[key]])
    csvfile.close()