<a href="https://colab.research.google.com/github/wfsilva-uea/luigi/blob/master/luigi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install luigi

In [87]:
!npx degit wfsilva-uea/luigi/northwind.sqlite northwind.sqlite -f

[K[?25hnpx: installed 1 in 0.904s
[36m> cloned [1mwfsilva-uea/luigi[22m#[1mmaster[22m to northwind.sqlite[39m


In [25]:
import luigi
import sqlite3
import pandas as pd
import sqlalchemy as sqla

from datetime import datetime

In [14]:
# northwind connection
conn = sqlite3.connect('northwind.sqlite')

In [None]:
# dw connection
conn_dw = sqlite3.connect('dw.sqlite')

In [63]:
class CustomerTask(luigi.Task):
  
  def output(self):
    today_str = datetime.today().strftime('%d%m%Y')
    return luigi.LocalTarget('customers_%s.log' % today_str)

  def run(self):
    with self.output().open('w') as target:

      target.write('Iniciando processo ETL\n')

      sql = """
        select 
          customer_id,
          contact_name,
          contact_title,
          company_name
        from customers
        order by contact_name  
      """

      target.write('Lendo tabela de clientes\n')

      df = pd.read_sql_query(sql, conn)
      
      target.write('%d registros foram encontrados\n' % len(df))

      target.write('Gerando tabela clientes no DW\n')
      
      # storing in CSV
      df.to_csv('clientes.csv', index=False, encoding='utf-8')

      target.write('Finalizando processo ETL\n')

In [64]:
class ProductsTask(luigi.Task):
  
  def requires(self):
    return CustomerTask()

  def output(self):
    today_str = datetime.today().strftime('%d%m%Y')
    return luigi.LocalTarget('products_%s.log' % today_str)

  def run(self):
    with self.output().open('w') as target:

      target.write('Iniciando processo ETL\n')

      sql = """
        select 
          product_id,
          product_name,
          unit_price
        from products
        order by product_name
      """

      target.write('Lendo tabela de produtos\n')

      df = pd.read_sql_query(sql, conn)
      
      target.write('%d registros foram encontrados\n' % len(df))

      target.write('Gerando tabela produtos no DW\n')
      
      # storing in DW
      df.to_sql('produtos', index=False, con=conn_dw, if_exists='replace')

      target.write('Finalizando processo ETL\n')

In [88]:
class OrderTask(luigi.Task):
  
  def requires(self):
    return ProductsTask()

  def output(self):
    today_str = datetime.today().strftime('%d%m%Y')
    return luigi.LocalTarget('orders_%s.log' % today_str)

  def run(self):
    with self.output().open('w') as target:

      target.write('Iniciando processo ETL\n')

      sql = """
        select
          c.customer_id,
          p.product_id,
          sum((od.unit_price * od.quantity * (1 - od.discount) / 100) * 100) as subtotal
        from products p 
        inner join order_details od on p.product_id = od.product_id
        inner join orders o on od.order_id = o.order_id
        inner join customers c on o.customer_id = c.customer_id
        where o.order_date between '1996-01-01' and '1996-12-31'
        group by c.customer_id, p.product_id
      """

      # orders
      target.write('Lendo tabela de pedidos\n')

      df_orders = pd.read_sql_query(sql, conn)
      
      target.write('%d registros foram encontrados\n' % len(df_orders))

      
      # customers
      target.write('Lendo arquivo CSV de clientes\n')

      df_customers = pd.read_csv('clientes.csv', encoding='utf-8')

      target.write('%d registros foram encontrados\n' % len(df_customers))


      # products
      target.write('Lendo tabela de produtos\n')

      df_products = pd.read_sql_query('select * from produtos', conn_dw)

      target.write('%d registros foram encontrados\n' % len(df_products))


      # merging orders and customers
      target.write('Realizando merge das tabela de clientes, produtos e pedidos\n')

      df_merged = pd.merge(df_orders, df_customers, left_on=['customer_id'], right_on=['customer_id'])    
      df_merged = pd.merge(df_merged, df_products, left_on=['product_id'], right_on=['product_id'])
      
      # storing in DW
      target.write('Gerando tabela pedidos no DW\n')
      
      df_merged.to_sql('pedidos', index=False, con=conn_dw, if_exists='replace')

      target.write('Finalizando processo ETL\n')

In [73]:
# run luigi as local scheduler
luigi.build([OrderTask()], local_scheduler=True)

DEBUG: Checking if OrderTask() is complete
DEBUG: Checking if ProductsTask() is complete
INFO: Informed scheduler that task   OrderTask__99914b932b   has status   PENDING
INFO: Informed scheduler that task   ProductsTask__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 55] Worker Worker(salt=996105064, workers=1, host=6090e67df8b8, username=root, pid=55) running   OrderTask()
INFO: [pid 55] Worker Worker(salt=996105064, workers=1, host=6090e67df8b8, username=root, pid=55) done      OrderTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   OrderTask__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=996105064, workers=1, host=6090e67df8b8, username=root, pid=55) was stopped. Shutting down Keep-Alive thread
INFO: 
===== 

True

In [85]:
sql = """
  select
    p.contact_name as cliente_name,
    sum(p.subtotal)
  from pedidos p
  group by p.contact_name
  order by subtotal desc
"""
df = pd.read_sql_query(sql, conn_dw)
df

Unnamed: 0,cliente_name,sum(p.subtotal)
0,Georg Pipps,10033.280040
1,Horst Kloss,11950.080001
2,Karl Jablonski,2938.199998
3,Carlos Hernández,3242.820006
4,Anabela Domingues,1296.000062
...,...,...
62,Fran Wilson,712.000007
63,Maurizio Moroni,80.100001
64,Francisco Chang,100.799999
65,Jytte Petersen,352.599998
