# Convert root files to parquet

This notebook demonstrates how to convert root files to the parquet dataformat.
For this notebook you will need to connect to the `k8s` spark cluster and enable the options:
- `spark.dynamicAllocation.maxExecutors`: `100`
- `spark.driver.memory`: `6g`
- `spark.jars`: `laurelin-1.0.0.jar,log4j-api-2.13.0.jar,log4j-core-2.13.0.jar`
- `spark.kubernetes.container.image`: `gitlab-registry.cern.ch/db/spark-service/docker-registry/swan:laurelin`
- `spark.kubernetes.container.image.pullPolicy`: `Always`
- `spark.driver.extraClassPath`: `./laurelin-1.0.0.jar,./log4j-api-2.13.0.jar,./log4j-core-2.13.0.jar`
- Select the "Include MemoryIntensive options"

After running this notebook, the output will be written to the `/hdfs` user directory the user has permission to use.
For example, I have written to my user directory on the `analytix` cluster (`/hdfs/analytix.cern.ch/dntaylor`).
After running this conversion, you can use the `analytix` cluster to run your spark jobs on the parquet datafiles,
which is faster than running over the `root` dataformat using the `k8s` cluster.

Before initializing the spark instance, run the following cell.

In [1]:
# spark.jars.packages doesnt work with Spark 2.4 with kubernetes
!wget -N https://repo1.maven.org/maven2/edu/vanderbilt/accre/laurelin/1.0.0/laurelin-1.0.0.jar && \
wget -N https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-api/2.13.0/log4j-api-2.13.0.jar && \
wget -N https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.13.0/log4j-core-2.13.0.jar

--2020-02-26 18:04:55--  https://repo1.maven.org/maven2/edu/vanderbilt/accre/laurelin/1.0.0/laurelin-1.0.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.112.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.112.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1708408 (1.6M) [application/java-archive]
Server file no newer than local file ‘laurelin-1.0.0.jar’ -- not retrieving.

--2020-02-26 18:04:55--  https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-api/2.13.0/log4j-api-2.13.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.112.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.112.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 286261 (280K) [application/java-archive]
Server file no newer than local file ‘log4j-api-2.13.0.jar’ -- not retrieving.

--2020-02-26 18:04:55--  https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.13.0/log4j-core-2.13.0.jar
Resolv

In [1]:
import os
import glob

In [3]:
baseDir_Run2017_UL = '/eos/cms/store/group/phys_muon/TagAndProbe/ULRereco/2017/102X'
fnamesMap = {
    'Z': {
        'Run2017_UL': {
            'Run2017B': [f for f in glob.glob(os.path.join(baseDir_Run2017_UL, 'Run2017B/tnpZ*.root')) if 'hadd' not in f],
            'Run2017C': [f for f in glob.glob(os.path.join(baseDir_Run2017_UL, 'Run2017C/tnpZ*.root')) if 'hadd' not in f],
            'Run2017D': [f for f in glob.glob(os.path.join(baseDir_Run2017_UL, 'Run2017D/tnpZ*.root')) if 'hadd' not in f],
            'Run2017E': [f for f in glob.glob(os.path.join(baseDir_Run2017_UL, 'Run2017E_99Percent/tnpZ*.root')) if 'hadd' not in f],
            'Run2017F': [f for f in glob.glob(os.path.join(baseDir_Run2017_UL, 'Run2017F_99Percent/tnpZ*.root')) if 'hadd' not in f],
            'DY_madgraph': [f for f in glob.glob(os.path.join(baseDir_Run2017_UL, 'DY_M50_pdfwgt/tnpZ*.root')) if 'hadd' not in f],
        },
    },
    'JPsi': {
    },
}

In [7]:
def convert(resonance,era,subEra):


    fnames = ['root://eosuser'+f for f in fnamesMap.get(resonance,{}).get(era,{}).get(subEra,[])]

    outDir = os.path.join('parquet',resonance,era,subEra)
    outname = os.path.join(outDir,'tnp.parquet')

    treename = 'tpTree/fitter_tree'
    
    # process 1000 files at a time
    # this is about the limit that can be handled when writing
    batchsize = 1000
    new = True
    while fnames:
        current = fnames[:batchsize]
        fnames = fnames[batchsize:]
        
        rootfiles = spark.read.format("root").option('tree', treename).load(current)
        # merge rootfiles. chosen to make files of 8-32 MB (input) become at most 1 GB (parquet recommendation)
        # https://parquet.apache.org/documentation/latest/
        # .coalesce(int(len(current)/32)) \
        # but it is too slow for now, maybe try again later
        if new:
            rootfiles.write.parquet(outname)
            new = False
        else:
            rootfiles.write.mode('append').parquet(outname)
    

In [10]:
resonance = 'Z'
era = 'Run2017_UL'
subEra = 'Run2017F'
convert(resonance, era, subEra)
subEras = fnamesMap.get(resonance,{}).get(era,{}).keys()
#for subEra in subEras:
#    print('converting',resonance,era,subEra)
#    convert(resonance, era, subEra)