In [8]:
!pip3 install kfp

Requirement already up-to-date: kfp in ./.local/lib/python3.6/site-packages (0.4.0)


In [1]:
import kfp
import kubernetes


In [2]:
container_manifest = {
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "kind": "SparkApplication",
    "metadata": {
        "name": "Calculate Basis Vectors",
        "namespace": "kubeflow"
    },
  "spec": {
      "type": "Scala",
      "mode": "cluster",
      "image": "docker.io/rawkintrevo/covid-basis-vectors:0.1.0",
      "imagePullPolicy": "Always",
      "mainClass": "org.rawkintrevo.covid.App",
      "mainApplicationFile": "local:///covid-0.1-jar-with-dependencies.jar", # See the Dockerfile
      "sparkVersion": "2.4.5",
      "restartPolicy": {
        "type": "Never"
      },
      "volumes": [
        {"name": "datapvc",
          "hostPath": {
            "path": "/data",
            "type": "Directory"
          }
        }
      ],
        
  "driver": {
    "cores": 1,  
    "coreLimit": "1200m",  
    "memory": "512m",  
    "labels": {
      "version": "2.4.5",  
    },      
    "serviceAccount": "spark-operatoroperator-sa", # also try spark-operatoroperator-sa
    "volumeMounts": [
        {
            "name": "datapvc",
            "mountPath": "/data"
        }
    ] 
  },
  "executor": {
    "cores": 1,
    "instances": 2,
    "memory": "512m"  
  },    
  "labels": {
    "version": "2.4.5"
  },      
  "volumeMounts": [
    {
        "name": "datapvc",
        "mountPath": "/data"
    }
  ]
  }
}

In [6]:


@kfp.dsl.pipeline(
    name="Covid DICOM Pipe v2",
    description="Create Basis Vectors for Lung Images"
)
def covid_dicom_pipeline():
    vop = kfp.dsl.VolumeOp(
        name="Requisition PVC",
        resource_name="newpvc",
        size="10Gi",
        modes=kfp.dsl.VOLUME_MODE_RWO
    )
    # Step 1 really should be downloading data locally...
    step1 = kfp.dsl.ContainerOp(
        name="Convert DICOMs to vectors",
        image="rawkintrevo/covid-prep-dicom:0.7.12.4",
        command=["/run.sh"],
        pvolumes={"/mnt/data": vop.volume}
    )

    rop = kfp.dsl.ResourceOp(
        name="Calculate Basis Vectors _Apache Spark-Mahout_",
        k8s_resource=container_manifest,
        action="create",
        success_condition="status.applicationState.state == COMPLETED"
    ).after(step1)



In [7]:
kfp.compiler.Compiler().compile(covid_dicom_pipeline,"dicom-pipeline-2.zip")
client = kfp.Client()

my_experiment = client.create_experiment(name='my-experiments')
my_run = client.run_pipeline(my_experiment.id, 'my-run1', 'dicom-pipeline-2.zip')

In [126]:

# #tag::step2[]
#     step2 = dsl.ContainerOp(
#         name="prepdata",
#         image="rawkintrevo/py-fmri-prep:0.2",
#         command=["python", "/pipelines/component/src/program.py"],
#         arguments=["/data/synthetic.nii.gz", "/data/s.csv"],
#         pvolumes={"/data": step1.pvolume}
#     )
# #end::step2[]
    
# #tag::step3[]
#     rop = dsl.ResourceOp(
#         name="spark-scala-mahout-fmri",
#         k8s_resource=container_manifest,
#         action="create",
#         success_condition="status.applicationState.state == COMPLETED"
#     ).after(step2)
# #end::step3[]
