# Luigi Classification Pipeline

We will build a small Luigi pipeline in order to get started. The task is to classify images into either *lemons* or *bananas*.

Write 3 task:

1. Check for daily data
1. Preprocess images (convert to grayscale, resize to (100, 100))
1. Classify image and write the results into a JSON-File

## Hints and Tricks for openCV

Read an image from disk:
```python
img = cv2.imread("path", cv2.IMREAD_COLOR)
```

Resize an image:
```python
img = cv2.resize(img, (X_SIZE,Y_SIZE))
```

Convert image to grayscale:
```python
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
```

Write an image to disk:
```python
cv2.imwrite("path", img)
```

Find circles to identify lemons:
```python
circles = cv2.HoughCircles(img, 
                           cv2.HOUGH_GRADIENT,
                           dp=2, 
                           minDist=15, 
                           param1=100, 
                           param2=70)
```

## Imports

In [3]:
import json
from datetime import date
import luigi
from luigi.parameter import DateParameter
from luigi import LocalTarget, Task, WrapperTask
from luigi.tools.range import RangeDailyBase
import cv2

## Task 1: Check for daily data

In [89]:
class CheckDailyData(Task):
    date = DateParameter(default=date.today())
    
    def output(self):
        return LocalTarget("exercise-dataset/daily/" + str(self.date.strftime("%m-%d-%Y")) + "/image.jpg")

In [90]:
luigi.build([CheckDailyData(date(2018,2,19))], local_scheduler=True, no_lock=True)

DEBUG: Checking if CheckDailyData(date=2018-02-19) is complete
INFO: Informed scheduler that task   CheckDailyData_2018_02_19_999079b9db   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=425649058, workers=1, host=a9569b3a5016, username=root, pid=466) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 CheckDailyData(date=2018-02-19)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



True

## Task 2: Preprocess input image

In [125]:
class Preprocess(Task):
    date = date(2018,2,19)
    dailyImage = CheckDailyData(date)
    preprocessedImage = "exercise-dataset/daily/" + str(date.strftime("%m-%d-%Y")) + "/preprocessed.jpg"
    
    def requires(self):
        return self.dailyImage

    def output(self):
        return LocalTarget(self.preprocessedImage)

    def run(self):
        img = cv2.imread(self.input().path, cv2.IMREAD_COLOR)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        img = cv2.resize(img, (100,100))
        cv2.imwrite(self.output().path, img)

In [126]:
luigi.build([Preprocess()], local_scheduler=True, no_lock=True)

DEBUG: Checking if Preprocess() is complete
DEBUG: Checking if CheckDailyData(date=2018-02-19) is complete
INFO: Informed scheduler that task   Preprocess__99914b932b   has status   PENDING
INFO: Informed scheduler that task   CheckDailyData_2018_02_19_999079b9db   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 466] Worker Worker(salt=188124165, workers=1, host=a9569b3a5016, username=root, pid=466) running   Preprocess()
INFO: [pid 466] Worker Worker(salt=188124165, workers=1, host=a9569b3a5016, username=root, pid=466) done      Preprocess()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Preprocess__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=188124165, workers=1, host=a9569b3a5016, username=root, pid=466) was stopped. Shutti

True

## Classify image

In [132]:
class Classify(Task):
    
    def requires(self):
        return Preprocess()
    
    def run(self):
        img = cv2.imread(self.input().path, cv2.IMREAD_COLOR)
        circles = cv2.HoughCircles(img, 
                           cv2.HOUGH_GRADIENT,
                           dp=2, 
                           minDist=15, 
                           param1=100, 
                           param2=70)
        if circles is None:
            fruit="banana"
        else:
            fruit="lemon"
            
        json.dump
    
    def output(self):
        return self.circles

## Run the pipeline

In [133]:
luigi.build([Classify()], local_scheduler=True, no_lock=True)

DEBUG: Checking if Classify() is complete
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/luigi/worker.py", line 401, in check_complete
    is_complete = task.complete()
  File "/usr/local/lib/python3.6/site-packages/luigi/task.py", line 565, in complete
    outputs = flatten(self.output())
  File "<ipython-input-132-f2f2d90058e0>", line 16, in output
    return self.circles
AttributeError: 'Classify' object has no attribute 'circles'

INFO: Informed scheduler that task   Classify__99914b932b   has status   UNKNOWN
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=094353003, workers=1, host=a9569b3a5016, username=root, pid=466) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed scheduling:
    - 1 Classify()

Did not run any tasks
This pro

False

## Daily jobs and backfillings 

Now we can classify a single image that is identified by it's savedate. But Luigi comes even more handy when handling "backfillings". Using the *RangeDailyBase* Wrappertask we can process all 3 images with the pipeline we already built.

```python
RangeDailyBase(of=TASK, start=START_DATE, stop=END_DATE, days_back=ALLOWED_DAYS_INTO_PAST)
```

In [None]:
luigi.build([], local_scheduler=True, no_lock=True)