# Dynamic MapReduce jobs from Python

Data in different Cassandra partitions can be retrieved, changed or inserted in parallel with large efficiency. This can be done with a `DynMapReduce` job, where we especify the range (filter) for the type that hosts the Cassandra partition keys, and the number of batches that will run in parallel. The total number of instances of the Cassandra key type will then be split into the number of batches (the `map` operation), and the `reduce` operation will be performed for each batch.

To construct the workflow, we need to explicitly implement these two functions and transform them into C3 `Lambda`s, which will then be used as arguments of `DynMapReduce`.

## AOD Data

At the moment, we are testing upserting and retrieving data from the `aod-3hourly` container. The relevant types are:

- `GeosurfaceTimePoint`: a GeoSurfaceTimePoint type that serves as the key to each Cassandra partition
- `Simulation3HourlyAODOutput`: hosted in Cassandra, includes the data for each TestGSTP instance

### The mapper, reduced, and the job

In [1]:
def cassandra_DynamicMapReduceFetch(gstpFilter, nBatches=200):
    """
    This constructs a DynMapReduce job to fetch Cassandra data in parallel. The fetches are made to the Cassandra 
    partitions that are especified via the gstpFilter argument, which finds the relevant partitions.
    It returns a job instance that can be checked regularly until it completes.
    """
    
    def cassandra_mapper(batch, objs, job):
        """
        Maps the instances of the key type to each batch.
        """
        return {"result": c3.Simulation3HourlyAODOutput.fetch({"filter": c3.Filter().intersects("geoSurfaceTimePoint.id", [obj.id for obj in objs]).value}).objs}
    
    def cassandra_reducer(key, interValues, job):
        """
        Operation to perform in each batch.
        """
        values = []
        for iv in interValues:
            for val in iv:
                values.append(val)
        return values
    
    # Transform mapper and reducer to Lambdas
    map_lambda = c3.Lambda.fromPython(cassandra_mapper)
    reduce_lambda = c3.Lambda.fromPython(cassandra_reducer)
    
    # Schedule the job 
    job = c3.DynMapReduce.startFromSpec(c3.DynMapReduceSpec(
        targetType="GeoSurfaceTimePoint",       
        filter=gstpFilter, 
        mapLambda=map_lambda,
        reduceLambda=reduce_lambda,
        batchSize=nBatches
        )
    )
    
    return job

### Retrieving a Pandas dataframe with the results

The status of the job that is returned from the previous function can be checked with `job.status()`. The function below gets the results and puts it into a Pandas dataframe.

In [2]:
def get_df_from_job_results(job):
    import pandas as pd
    import datetime as dt
    if job.status().status == "completed":
        df = pd.DataFrame(job.results()['result'])
        lats = []
        lons = []
        tims = []
        for i in range(len(df)):
            x = df.iloc[i].id.split("_")
            lat = float(x[0])
            lon = float(x[1])
            time = x[2]
            time = dt.datetime.strptime(time[:-5], "%Y-%m-%dT%H:%M:%S")
            lats.append(lat)
            lons.append(lon)
            tims.append(time)
        df["latitude"] = lats
        df["longitude"] = lons
        df["time"] = tims
        return df
    else:
        print("Job did not complete")
        return

### Example

We basically just need to build a filter that gives us the `GeoSurfaceTimePoint`s that we want.

In [3]:
# Filter
target_date = "2017-07-21"
start_time = target_date + "T00:00:00.000"
stop_time = target_date + "T23:59:59.999"
gstpFilter = c3.Filter().ge("time", start_time).and_().le("time", stop_time)

In [4]:
# Schedule MapReduceJob
job = cassandra_DynamicMapReduceFetch(gstpFilter)

In [10]:
# check status
job.status()

c3.MapReduceStatus(
 started=datetime.datetime(2022, 7, 2, 13, 48, 59, tzinfo=datetime.timezone.utc),
 startedby='babreu@illinois.edu',
 status='running',
 step='map')

In [8]:
# retrieve job results
df = get_df_from_job_results(job)

Job did not complete


In [31]:
df.sample(10)

Unnamed: 0,type,id,version,simulationSample,dust,solubleAitkenMode,solubleAccumulationMode,solubleCoarseMode,insolubleAitkenMode,geoSurfaceTimePoint,latitude,longitude,time
61238,TestAODData,-53.125_87.188_2017-07-21T03:20:00#IK3P,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",6.7e-05,0.000629,0.060495,0.067233,0.00043,"{'type': 'TestGSTP', 'id': '-53.125_87.188_201...",-53.125,87.188,2017-07-21 03:20:00
91604,TestAODData,-76.875_167.812_2017-07-21T00:20:00#HMK2,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",3.8e-05,0.000511,0.021375,0.027053,0.000169,"{'type': 'TestGSTP', 'id': '-76.875_167.812_20...",-76.875,167.812,2017-07-21 00:20:00
205912,TestAODData,8.125_-132.188_2017-07-21T12:20:00#LFD2,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",5.7e-05,0.005943,0.103223,0.040653,0.000178,"{'type': 'TestGSTP', 'id': '8.125_-132.188_201...",8.125,-132.188,2017-07-21 12:20:00
80608,TestAODData,-68.125_102.188_2017-07-21T12:20:00#KZTX,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",4.7e-05,0.000437,0.02717,0.015961,0.000193,"{'type': 'TestGSTP', 'id': '-68.125_102.188_20...",-68.125,102.188,2017-07-21 12:20:00
177095,TestAODData,58.125_-173.438_2017-07-21T09:20:00#KRTE,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",0.001552,0.009892,0.17205,0.003912,0.021591,"{'type': 'TestGSTP', 'id': '58.125_-173.438_20...",58.125,-173.438,2017-07-21 09:20:00
167385,TestAODData,5.625_62.812_2017-07-21T03:20:00#ITVC,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",0.024306,0.022334,0.242162,0.085541,0.000161,"{'type': 'TestGSTP', 'id': '5.625_62.812_2017-...",5.625,62.812,2017-07-21 03:20:00
150460,TestAODData,38.125_83.438_2017-07-21T06:20:00#JTRN,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",0.01409,0.0275,0.114356,0.000987,0.000531,"{'type': 'TestGSTP', 'id': '38.125_83.438_2017...",38.125,83.438,2017-07-21 06:20:00
205187,TestAODData,79.375_111.562_2017-07-21T18:20:00#NFX4,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",0.000494,0.008276,0.165359,0.001643,0.004234,"{'type': 'TestGSTP', 'id': '79.375_111.562_201...",79.375,111.562,2017-07-21 18:20:00
81115,TestAODData,-68.125_27.188_2017-07-21T03:20:00#IISP,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",9.7e-05,0.000583,0.024309,0.021947,0.000326,"{'type': 'TestGSTP', 'id': '-68.125_27.188_201...",-68.125,27.188,2017-07-21 03:20:00
1680,TestAODData,-1.875_-145.312_2017-07-21T06:20:00#JNTT,1,"{'type': 'SimulationSample', 'id': 'EnsNo_1_Si...",7.2e-05,0.002027,0.040829,0.097889,0.000136,"{'type': 'TestGSTP', 'id': '-1.875_-145.312_20...",-1.875,-145.312,2017-07-21 06:20:00
