# ServiceX Example

In [49]:
import requests
from minio import Minio
import tempfile
import pyarrow.parquet as pq
import pyarrow as pa
import awkward
from confluent_kafka import Consumer, KafkaException



## Submit the Transform Request
We will create a REST request that specifies a DID along with a list of columns we want extracted.

We also tell ServiceX that we want the resulting columns to be stored as parquet files in the object store

In [32]:
    servicex_endpoint = "http://localhost:5000/servicex"
    response = requests.post(servicex_endpoint+"/transformation", json={
        "did": "mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_00",
        "columns": "Electrons.pt(), Electrons.eta(), Electrons.phi(), Electrons.e(), Muons.pt(), Muons.eta(), Muons.phi(), Muons.e()",
        "image": "sslhep/servicex-transformer:latest",
        "result-destination": "kafka",
        "kafka":{
            "broker": "servicex-kafka-1.slateci.net:19092"
        },
        "chunk-size": 500,
        "workers": 1
    })

    print(response.json())
    request_id = response.json()["request_id"]
    status_endpoint = servicex_endpoint+"/transformation/{}/status".format(request_id)


{'request_id': '7a68f3d5-30d0-44b3-94bf-1d25f7988449'}


## Wait for the Transform to Complete

In [33]:
status = requests.get(status_endpoint).json()
print("We have processed {} files there are {} remainng".format(status['files-processed'], status['files-remaining']))


We have processed 0 files there are 1 remainng


In [None]:
conf = {'bootstrap.servers': "servicex-kafka-1.slateci.net:19092",
        'group.id': "foo2",
        'default.topic.config': {'auto.offset.reset': 'smallest'}}

c = Consumer(conf)

print(c.list_topics().topics[request_id])
    
c.subscribe([request_id])
timeout = 10.0 # Need a long timeout to allow for partition assignment
running = True
while running:
    msg = c.poll(timeout=timeout)
    if msg is None:
        running = False
        continue
    if msg.error():
        raise KafkaException(msg.error())
    else:
        # Proper message
        print('%% %s [%d] at offset %d with key %s:\n' %
                         (msg.topic(), msg.partition(), msg.offset(),
                          str(msg.key())))
        buf = msg.value()
        reader = pa.ipc.open_stream(buf)
        batches = [b for b in reader]
        for batch in batches:
            arrays = awkward.fromarrow(batch)
            print(arrays)

