# Setting up sparkContext in pyspark

Web UI
- __http://10.64.22.215:8080/__


Start a pySpark session including third party packages (SCALA version: 2.11) which will pull from Maven repository:
- __spark-root_2.11:0.1.16__ 
- __histogrammar-sparksql_2.11:1.0.4__
- __histbook-1.2.1__

Declares external dependency to the Spark application, in case we are using:

- __XrootD-Connector__
- __XrootD-Connector Grid Certificate__

The default cores (defaults at 4) for every spark application is bottlenecked by __SPARK_MASTER_OPTS__ predefined in spark-env.sh. In order to configure spark application beyond the defaults cores, set __spark.cores.max__ in SparkContext before setting:
- __spark.executor.instances__
- __spark.executor.cores__
- __spark.executor.memory__

In the example below, a total of __10 cores__ are allocated to Spark with __5 executors__ (__2 cores per executor__) and __2GB RAM__ per executor.


In [1]:
import pyspark.sql
session = pyspark.sql.SparkSession.builder \
    .master('spark://10.64.22.215:7077') \
    .appName('Zpeak_Nanoaod-SPARK') \
    .config('spark.jars.packages','org.diana-hep:spark-root_2.11:0.1.16,org.diana-hep:histogrammar-sparksql_2.11:1.0.4') \
    .config('spark.driver.extraClassPath','/opt/hadoop/share/hadoop/common/lib/EOSfs.jar') \
    .config('spark.executor.extraClassPath','/opt/hadoop/share/hadoop/common/lib/EOSfs.jar') \
    .config('spark.sql.caseSensitive','true') \
    .config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
    .config("spark.cores.max", "10") \
    .config('spark.executor.instances', '5') \
    .config('spark.executor.cores','2') \
    .config('spark.executor.memory','2g') \
    .getOrCreate()
    
sqlContext = session
print 'Spark version: ',sqlContext.version
print 'SparkSQL sesssion created'

Spark version:  2.3.1
SparkSQL sesssion created


# Ingesting Nanoaod root file into Dataframe

Dataframe is a RDD (Resilient Distributed Dataset) commonly used as an abstraction in Big Data. A Dataframe is an API to the RDD.

Root files in NANOAOD format serving from remote CERN public EOS area were read via XrootD-Connector and instantiate in Dataframe. A list of root files (dataset and monte carlo background samples) is defined externally in __samples.py__

In [2]:
from pyspark.sql.functions import lit
from samples import *

DFList = [] 

for s in samples:
    print 'Loading {0} sample from EOS file'.format(s) 
    dsPath = "root://eospublic.cern.ch//eos/opstest/cmspd-bigdata/"+samples[s]['filename']    
    tempDF = sqlContext.read \
                .format("org.dianahep.sparkroot") \
                .option("tree", "Events") \
                .load(dsPath)\
                .withColumn("pseudoweight", lit(samples[s]['weight'])) \
                .withColumn("sample", lit(s))                
    DFList.append(tempDF)

Loading ZH sample from EOS file
Loading TT sample from EOS file
Loading WW sample from EOS file
Loading SingleMuon sample from EOS file
Loading ZZ sample from EOS file
Loading DYJetsToLL sample from EOS file
Loading WZ sample from EOS file


# Access DataFrame content

Return a list of columns in one of the DataFrame, a column corresponds to branche in ROOT TTree.

# Data reduction

Subsets of interesting attributes can be selected via 'select' operations on the DataFrames (equivalent to "pruning" steps in ROOT-based frameworks).

All datasets can be joined into a single DataFrame (e.g. collecting data from various samples).

In [3]:
# Define interesting attributes to be selected
columns = [
    ### MUON
    'Muon_pt',
    'Muon_eta',
    'Muon_phi',
    'Muon_mass',
    'Muon_charge',
    'Muon_mediumId',
    'Muon_softId',
    'Muon_tightId',
    'nMuon',
    ### SAMPLE
    'sample',
    ### Jet
    'nJet',
    'Jet_pt',
    'Jet_eta',
    'Jet_phi',
    'Jet_mass',
    'Jet_bReg',
    ### Weight
    'pseudoweight',
]

# Select columns from dataframe
DF = DFList[0].select(columns)

# Merge all dataset into a single dataframe
for df_ in DFList[1:]:
    DF = DF.union(df_.select(columns))
    
print 'Partition allocated for Dataframe:',DF.rdd.getNumPartitions(), 'partition'
print 'Partition allocated for Dataframe reported from executors (JVM):',sqlContext._jsc.sc().getExecutorMemoryStatus().size(), 'partition'
print 'Default number of partition (defaultParallelism) = ',sqlContext._jsc.sc().defaultParallelism()

Partition allocated for Dataframe: 7 partition
Partition allocated for Dataframe reported from executors (JVM): 6 partition
Default number of partition (defaultParallelism) =  10


In [4]:
DF.filter(DF['sample'] == 'DYJetsToLL')\
  .select('sample','nMuon','Muon_pt','Muon_eta','Muon_phi','Muon_charge')\
  .show(5)

+----------+-----+-----------+------------+------------+-----------+
|    sample|nMuon|    Muon_pt|    Muon_eta|    Muon_phi|Muon_charge|
+----------+-----+-----------+------------+------------+-----------+
|DYJetsToLL|    1| [34.75507]|[-1.3212891]|[-1.0375977]|        [1]|
|DYJetsToLL|    0|         []|          []|          []|         []|
|DYJetsToLL|    1|[3.3921983]|[-1.5285645]|[-0.2514038]|       [-1]|
|DYJetsToLL|    0|         []|          []|          []|         []|
|DYJetsToLL|    0|         []|          []|          []|         []|
+----------+-----+-----------+------------+------------+-----------+
only showing top 5 rows



# Create derivate quantities and structures - 1

User defined functions can be used for transformations evalueted row by row to compute derived quantity, such as invaraint mass of two physics objects involving multiple column.
The return value is added as a new column in the output DataFrame.

Dimuon candidate structure is created as an example.

In [5]:
from pyspark.sql.types import *

dimuonSchema = StructType([
    StructField("pass", BooleanType(),False),   # True if filled / False if default(empty) 
    #
    StructField("mass", FloatType(),False),     # Dimuon mass
    StructField("pt", FloatType(),False),       # Dimuon pt
    StructField("eta", FloatType(),False),      # Dimuon eta
    StructField("phi", FloatType(),False),      # Dimuon phi
    StructField("dPhi", FloatType(),False),     # DeltaPhi(mu1,mu2)
    StructField("dR", FloatType(),False),       # DeltaR(mu1,mu2)
    StructField("dEta", FloatType(),False),     # DeltaEta(mu1,mu2)
    #
    StructField("mu1_pt", FloatType(),False),   # leading mu pT 
    StructField("mu2_pt", FloatType(),False),   # sub-leading mu pT 
    StructField("mu1_eta", FloatType(),False),  # leading mu eta
    StructField("mu2_eta", FloatType(),False),  # sub-leading mu eta
    StructField("mu1_phi", FloatType(),False),  # leading mu phi
    StructField("mu2_phi", FloatType(),False),  # sub-leading mu phi
])

dimuonSchemav2="struct<pass: boolean \
                      ,mass: float \
                      ,pt: float \
                      ,eta: float \
                      ,phi: float \
                      ,dPhi: float \
                      ,dR: float \
                      ,dEta: float \
                      ,mu1_pt: float \
                      ,mu2_pt: float \
                      ,mu1_eta: float \
                      ,mu2_eta: float \
                      ,mu1_phi: float \
                      ,mu2_phi: float \
                      >"

dimuonSchemav3="array<float>"

# Create derivate quantities and structures - 2

And a generic function filling the candidate structure can be defined.

In [6]:
import function
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.SCALAR)
def dimuonCandidate(pt,eta,phi,mass,charge,mediumid,index):
#def dimuonCandidate(pt,eta,phi,mass,charge):
    #add new column with dimuon information
    # default class implementation   
    default_ = (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
    #default_ = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
    
    #convert rdd to pdf
    #rows = (row_.asDict() for row_ in df_)
    #pdf = pd.DataFrame(rows)
    
    #if len(df) > 0:
    if len(pt) > 0:
        #pt = df['Muon_pt']
        #eta = df['Muon_eta']
        #phi = df['Muon_phi']
        #mass = df['Muon_mass']
        #charge = df['Muon_charge']
        #mediumid = df['Muon_mediumId']
        """
        Z->mm candidate from arbitrary muon selection:
        N(mu) >= 2
        pT > 30, 10
        abs(eta) < 2.4, 2.4
        mediumId muon
        opposite charge
        """
    
        if len(pt) < 2:
            return default_
    
        #Identify muon candidate
        leadingIdx = None
        trailingIdx = None
 
        for idx in range(len(pt)):
            if leadingIdx == None:
                if pt[idx] > 30 and abs(eta[idx]) < 2.4: #and mediumid[idx]:
                    leadingIdx = idx
            elif trailingIdx == None:
                if pt[idx] > 10 and abs(eta[idx]) < 2.4: #and mediumid[idx]:
                    trailingIdx = idx
            else:
                if pt[idx] > 10 and abs(eta[idx]) < 2.4: #and mediumid[idx]:
                    return default_

        if leadingIdx != None and trailingIdx != None and charge[leadingIdx] != charge[leadingIdx]:            
            # Candidate found
            #dimuon_   = (1.0,) + \
            dimuon_    = \
                        [invMass(pt[leadingIdx], pt[trailingIdx],
                        eta[leadingIdx], eta[trailingIdx],
                        phi[leadingIdx], phi[trailingIdx],
                        mass[leadingIdx], mass[trailingIdx]) + \
                        (pt[leadingIdx], pt[trailingIdx],
                         eta[leadingIdx], eta[trailingIdx],
                         phi[leadingIdx], phi[trailingIdx])]
    return dimuon_[index]

@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_plus_one(m,j):
    return m+j

def Compute(df_):
    #@pandas_udf('double', PandasUDFType.SCALAR)
    #def pandas_plus_one(m,j):
    #    return m+j
    #@pandas_udf('double', PandasUDFType.SCALAR)
    #def pandas_plus_two(m,j,k):
    #    return m+j+k
    
    
    NEWDF=DF.withColumn("mu1_pt",dimuonCandidate(
                                col("Muon_pt"),
                                col("Muon_eta"),
                                col("Muon_phi"),
                                col("Muon_mass"),
                                col("Muon_charge"),
                                col("Muon_mediumId"),
                                lit(1)
                              ))
    return NEWDF
### ArrowNotImplementedError: Not implemented type for list in DataFrameBlock: bool: ARROWWWW

In [7]:
##SQL Querying
#selection="SELECT nJet,nMuon,Muon_pt from Events \
#WHERE nJet==1 AND nMuon>1 AND Muon_pt[0]>50"
'''
selection="\
    SELECT Muon_pt,Muon_eta from Events \
    WHERE nMuon>2 AND Muon_pt[0]>30 AND Muon_pt[1]>10 \
    AND Muon_eta[0] BETWEEN -2.4 AND 2.4 \
    AND Muon_eta[1] BETWEEN -2.4 AND 2.4 \
    AND Muon_mediumId[0] = '1' AND Muon_mediumId[1] = '1' \
"

DF.registerTempTable("Events")
sqlContext.sql("%s" %selection).show(5)
'''

'\nselection="    SELECT Muon_pt,Muon_eta from Events     WHERE nMuon>2 AND Muon_pt[0]>30 AND Muon_pt[1]>10     AND Muon_eta[0] BETWEEN -2.4 AND 2.4     AND Muon_eta[1] BETWEEN -2.4 AND 2.4     AND Muon_mediumId[0] = \'1\' AND Muon_mediumId[1] = \'1\' "\n\nDF.registerTempTable("Events")\nsqlContext.sql("%s" %selection).show(5)\n'

# Create derivate quantities and structures - 3

Finally, a dimuon candidate structure (an array of defined quantities) can be appended to the DataFrame as an additional column

In [8]:
#from pyspark.sql.functions import udf
from pyspark.sql.functions import col
#dimuonUDF = udf(dimuonCandidate, dimuonSchema)
#scalar 
#seriesDimuonUDF = pandas_udf(dimuonCandidate, returnType=dimuonSchema)
#func_udf = pandas_udf(dimuonCandidate, FloatType(),PandasUDFType.SCALAR)

test=Compute(DF)
#test.select("addedTHIS").show()
test.select("mu1_pt").show()

Py4JJavaError: An error occurred while calling o400.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 10, 10.64.22.217, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 150, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 280, in load_stream
    pdf = batch.to_pandas()
  File "table.pxi", line 727, in pyarrow.lib.RecordBatch.to_pandas
  File "table.pxi", line 1120, in pyarrow.lib.Table.to_pandas
  File "/usr/lib64/python2.7/site-packages/pyarrow/pandas_compat.py", line 569, in table_to_blockmanager
    categories)
  File "/usr/lib64/python2.7/site-packages/pyarrow/pandas_compat.py", line 733, in _table_to_blocks
    memory_pool, categories)
  File "table.pxi", line 809, in pyarrow.lib.table_to_blocks
  File "error.pxi", line 85, in pyarrow.lib.check_status
ArrowNotImplementedError: Not implemented type for list in DataFrameBlock: bool

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:171)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 150, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/opt/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 280, in load_stream
    pdf = batch.to_pandas()
  File "table.pxi", line 727, in pyarrow.lib.RecordBatch.to_pandas
  File "table.pxi", line 1120, in pyarrow.lib.Table.to_pandas
  File "/usr/lib64/python2.7/site-packages/pyarrow/pandas_compat.py", line 569, in table_to_blockmanager
    categories)
  File "/usr/lib64/python2.7/site-packages/pyarrow/pandas_compat.py", line 733, in _table_to_blocks
    memory_pool, categories)
  File "table.pxi", line 809, in pyarrow.lib.table_to_blocks
  File "error.pxi", line 85, in pyarrow.lib.check_status
ArrowNotImplementedError: Not implemented type for list in DataFrameBlock: bool

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:171)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:90)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:131)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:93)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
beta.show()

# Return statistical information about the data

Exploit pySparkSql functions to get statistical insights on the DataFrame

In [None]:
from pyspark.sql.functions import *

print 'Number of events, pre-selection level'

DF.groupBy("sample").count().show()

print 'Number of events, Dimuon invariant mass in [70-110] GeV'

DF.where( (col("Dimuon.mass") > 70) & (col("Dimuon.mass") < 110) ).groupBy("sample").count().show()

print 'Mean of Dimuon mass, evaluated in [70-110] GeV range'

DF.where( (col("Dimuon.mass") > 70) & (col("Dimuon.mass") < 110) ).groupBy('sample').mean('Dimuon.mass').show()

print 'Description of Dimuon mass features for SingleMuon dataset only, evaluated in [70-110] GeV range'

DF.where( (col("Dimuon.mass") > 70) & (col("Dimuon.mass") < 110) & (DF["sample"] == "SingleMuon") ).describe('Dimuon.mass').show()


# Plotting the Zpeak mass

Finally, the interesting variables are plotted using accumulator from Histogrammar package and the graphic object is handled by backend matplotlib.

In [None]:
'''
# Load libraries, and append histogrammar functionalities to dataframe

#%matplotlib inline
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import histogrammar as hg
import histogrammar.sparksql
import numpy as np

DF = DF.where( (col("Dimuon.mass") > 70) & (col("Dimuon.mass") < 110) )

hg.sparksql.addMethods(DF)

plots = hg.UntypedLabel(
    # 1d histograms
    LeadPt       = hg.Bin(50, 30, 180,   DF['Dimuon.mu1_pt'],hg.Sum(DF['pseudoweight'])),
    LeadPtEta    = hg.Bin(48, -2.4, 2.4, DF['Dimuon.mu1_eta'],hg.Sum(DF['pseudoweight'])),
    SubLeadPt    = hg.Bin(100, 0, 200,   DF['Dimuon.mu2_pt'],hg.Sum(DF['pseudoweight'])),
    SubLeadPtEta = hg.Bin(48, -2.4, 2.4, DF['Dimuon.mu2_eta'],hg.Sum(DF['pseudoweight'])),
    InvMass      = hg.Bin(80, 70, 110,   DF['Dimuon.mass'],hg.Sum(DF['pseudoweight'])),
    DeltaR       = hg.Bin(50, 0, 5,      DF['Dimuon.dPhi'],hg.Sum(DF['pseudoweight'])),
    DeltaPhi     = hg.Bin(64, -3.2, 3.2, DF['Dimuon.dR'],hg.Sum(DF['pseudoweight'])),
)

# Make a set of histograms, categorized per-sample
bulkHisto = hg.Categorize(quantity = DF['sample'], value = plots)

# Fill from spark
bulkHisto.fillsparksql(df=DF)
print 'Filling histogrammar done'
'''

In [None]:
'''
# variable for plotting
VARIABLE = 'InvMass'

fig = plt.figure(num=None, figsize=(8, 6), dpi=80, facecolor='w', edgecolor='k')

aHisto   = bulkHisto("SingleMuon")(VARIABLE)
nBins    = len(aHisto.values)
edges    = np.linspace(aHisto.low, aHisto.high, nBins + 1)
width    = (aHisto.high - aHisto.low) / nBins

plotVals = {}
for k in ['DYJetsToLL','ZZ','TT','WW','WZ']:
    #plotVals[k] = [x.toJson()['data']*0.19 for x in bulkHisto(k)(VARIABLE).values]
    plotVals[k] = [x.sum for x in bulkHisto(k)(VARIABLE).values]
    plt.bar(edges[:-1], plotVals[k], width=width, label=k, color=samples[k]['color'], edgecolor=samples[k]['color'], fill=True, log=True)

xdata   = np.linspace(aHisto.low+0.5*width, aHisto.high+0.5*width, nBins)    
#ydata   = [x.toJson()['data'] for x in bulkHisto('SingleMuon')(VARIABLE).values]
ydata   = [x.sum for x in bulkHisto('SingleMuon')(VARIABLE).values]
yerror  = [x**0.5 for x in ydata]

plt.errorbar(xdata, ydata, fmt='ko', label="Data", xerr=width/2, yerr=yerror, ecolor='black')

plt.xlabel('Dimuon invariant mass m($\mu\mu$) (GeV)')
plt.ylabel('Events / 0.5 GeV')
#plt.yscale('log')
plt.legend(loc='upper right', fontsize='x-large', )
'''

In [None]:
from histbook import * 
#import vega
from vega import VegaLite as canvas

PROC={}
PROCLIST=["SingleMuon","WZ","WW","TT","ZZ","DYJetsToLL"]
#cut='cut("((col("Dimuon.mass") > 70) & (col("Dimuon.mass") < 110))")'

#Need a flat dataframe
dimuon=DF.where( (col("Dimuon.mass") > 70) & (col("Dimuon.mass") < 110) ).select('Dimuon.mu1_pt','Dimuon.mu2_pt','Dimuon.mu1_eta','Dimuon.mu2_eta','Dimuon.mass',
                 'Dimuon.dPhi','Dimuon.dR','pseudoweight','sample').persist()

#Group filling
#Hists = Book(
#    Hist(bin("mu1_pt", 50, 30, 180,), weight="pseudoweight"),
#    Hist(bin("mu1_eta", 48, -2.4, 2.4), weight="pseudoweight"),
#    Hist(bin("mu2_pt", 100, 0, 200), weight="pseudoweight"),
#    Hist(bin("mu2_eta", 48, -2.4, 2.4), weight="pseudoweight"),
#    Hist(bin("mass", 80, 70, 110), weight="pseudoweight"),
#    Hist(bin("dPhi", 50, 0, 5), weight="pseudoweight"),
#    Hist(bin("dR", 64, -3.2, 3.2), weight="pseudoweight")
#)

#fill for each processes
for proc in PROCLIST:
    PROC[proc]=Book(
        Hist(bin("mu1_pt", 50, 30, 180,), weight="pseudoweight"),
        Hist(bin("mu1_eta", 48, -2.4, 2.4), weight="pseudoweight"),
        Hist(bin("mu2_pt", 100, 0, 200), weight="pseudoweight"),
        Hist(bin("mu2_eta", 48, -2.4, 2.4), weight="pseudoweight"),
        Hist(bin("mass", 80, 70, 110), weight="pseudoweight"),
        Hist(bin("dPhi", 50, 0, 5), weight="pseudoweight"),
        Hist(bin("dR", 64, -3.2, 3.2), weight="pseudoweight")
    )
    PROC[proc].fill(dimuon.where(col('sample') == '%s' %proc ))

print "Fill Done"

In [None]:
h1=Hist.group(
    ZZ=PROC['ZZ'].allvalues()[4],
    TT=PROC['TT'].allvalues()[4],
    WW=PROC['WW'].allvalues()[4],
    WZ=PROC['WZ'].allvalues()[4],
    DYJetsToLL=PROC['DYJetsToLL'].allvalues()[4],
    #SingleMuon=PROC['SingleMuon'].allvalues()[4]
)

h1.stack("source",order=PROCLIST).area("mass").to(canvas)