In [31]:
pip install luigi



In [32]:
import luigi
import pandas as pd
luigi.__version__

data = pd.read_csv("marketing_data.csv")

data


Unnamed: 0,CustomerID,Genre,Age,Annual_Income_(k$),Spending_Score
0,1,Male,19,15,39
1,2,Male,21,15,81
2,3,Female,20,16,6
3,4,Female,23,16,77
4,5,Female,31,17,40
...,...,...,...,...,...
195,196,Female,35,120,79
196,197,Female,45,126,28
197,198,Male,32,126,74
198,199,Male,32,137,18


In [33]:

data.shape

(200, 5)

In [34]:

# %
data.info

In [35]:
#membuat pipeline untuk extract data

class ExtractData(luigi.Task):

  def requires(self):
    pass

  def run(self):
    #read data
    marketing_data = pd.read_csv("marketing_data.csv")

    marketing_data.to_csv(self.output().path, index=False)

  def output(self):
    return luigi.LocalTarget("/content/extract_data.csv")

#/Users/rindangcahyaning/Documents/Bootcamp/PacmannDE


In [36]:
luigi.build([ExtractData()], local_scheduler = True)

DEBUG: Checking if ExtractData() is complete
DEBUG:luigi-interface:Checking if ExtractData() is complete
INFO: Informed scheduler that task   ExtractData__99914b932b   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   ExtractData__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO:luigi-interface:Done scheduling tasks
INFO: Running Worker with 1 processes
INFO:luigi-interface:Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG:luigi-interface:Asking scheduler for work...
DEBUG: Pending tasks: 1
DEBUG:luigi-interface:Pending tasks: 1
INFO: [pid 301] Worker Worker(salt=5473844753, workers=1, host=9e64d2dfa607, username=root, pid=301) running   ExtractData()
INFO:luigi-interface:[pid 301] Worker Worker(salt=5473844753, workers=1, host=9e64d2dfa607, username=root, pid=301) running   ExtractData()
INFO: [pid 301] Worker Worker(salt=5473844753, workers=1, host=9e64d2dfa607, username=root, pid=301) done      ExtractData()
INFO:luigi

True

In [21]:
#transform phase

#rename columns

RENAME_COLS = {

        "CustomerID" : "customer_id",
        "Genre" : "gender",
        "Age" : "age",
        "Annual_Income_(k$)" : "annual_income",
        "Spending_Score" : "spending_score"

}

In [22]:
data = data.rename(columns = RENAME_COLS)

data.head()

Unnamed: 0,customer_id,gender,age,annual_income,spending_score
0,1,Male,19,15,39
1,2,Male,21,15,81
2,3,Female,20,16,6
3,4,Female,23,16,77
4,5,Female,31,17,40


In [23]:
#filter data terhadap kolom spending score

data = data[data["spending_score"] >= 50 ]

data.head()

Unnamed: 0,customer_id,gender,age,annual_income,spending_score
1,2,Male,21,15,81
3,4,Female,23,16,77
5,6,Female,22,17,76
7,8,Female,23,18,94
9,10,Female,30,19,72


In [37]:
#setelah berhasil maka selanjutnya kita masukkan ke dalam class luigi transform

class TransformData(luigi.Task):

  def requires(self):
    return ExtractData()

  def run(self):
    #read data from previous process

    extract_data = pd.read_csv(self.input().path)

    #initialize dictionary fro re-name column

    RENAME_COLS = {

        "CustomerID" : "customer_id",
        "Genre" : "gender",
        "Age" : "age",
        "Annual_Income_(k$)" : "annual_income",
        "Spending_Score" : "spending_score"

    }

    extract_data = extract_data.rename(columns = RENAME_COLS)

    #filter data

    extract_data = extract_data[extract_data["spending_score"] >= 50 ]

    extract_data.to_csv(self.output().path, index=False)

  def output(self):
    return luigi.LocalTarget("/content/transforms_data.csv")



In [25]:
luigi.build([ExtractData()], local_scheduler = True)

DEBUG: Checking if ExtractData() is complete
DEBUG:luigi-interface:Checking if ExtractData() is complete
INFO: Informed scheduler that task   ExtractData__99914b932b   has status   DONE
INFO:luigi-interface:Informed scheduler that task   ExtractData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO:luigi-interface:Done scheduling tasks
INFO: Running Worker with 1 processes
INFO:luigi-interface:Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG:luigi-interface:Asking scheduler for work...
DEBUG: Done
DEBUG:luigi-interface:Done
DEBUG: There are no more tasks to run at this time
DEBUG:luigi-interface:There are no more tasks to run at this time
INFO: Worker Worker(salt=3576230994, workers=1, host=9e64d2dfa607, username=root, pid=301) was stopped. Shutting down Keep-Alive thread
INFO:luigi-interface:Worker Worker(salt=3576230994, workers=1, host=9e64d2dfa607, username=root, pid=301) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Executi

True

In [38]:
pip install sqlalchemy




In [55]:
#membuat helper function untuk koneksi antara pandas dengan postgresql

from sqlalchemy import create_engine

def postgres_engine():

  engine = create_engine("postgresql://postgres@localhost:5432/pacmann")


  return engine

engine = postgres_engine()

In [56]:
try:
    engine = postgres_engine()
    # Debug: Print engine to confirm it's created correctly
    print("Engine created successfully:", engine)
except Exception as e:
    print("Error creating engine:", e)

Engine created successfully: Engine(postgresql://postgres@localhost:5432/pacmann)


In [57]:
#class load data

class LoadData(luigi.Task):

  def requires(self):
    return TransformData()

  def run(self):

    #read data from transformed data
    transforms_data = pd.read_csv(self.input().path)

    #create engine
    engine = postgres_engine()

    #insert to database
    transforms_data.to_sql(name = "mall_customer",
                            con = engine,
                            if_exists = "append",
                            index = False)


  def output(self):
    pass



In [54]:
luigi.build([ExtractData(),
             TransformData(),
             LoadData()], local_scheduler = True)

DEBUG: Checking if ExtractData() is complete
DEBUG:luigi-interface:Checking if ExtractData() is complete
INFO: Informed scheduler that task   ExtractData__99914b932b   has status   DONE
INFO:luigi-interface:Informed scheduler that task   ExtractData__99914b932b   has status   DONE
DEBUG: Checking if TransformData() is complete
DEBUG:luigi-interface:Checking if TransformData() is complete
INFO: Informed scheduler that task   TransformData__99914b932b   has status   DONE
INFO:luigi-interface:Informed scheduler that task   TransformData__99914b932b   has status   DONE
DEBUG: Checking if LoadData() is complete
DEBUG:luigi-interface:Checking if LoadData() is complete
  is_complete = task.complete()
DEBUG: Checking if TransformData() is complete
DEBUG:luigi-interface:Checking if TransformData() is complete
INFO: Informed scheduler that task   LoadData__99914b932b   has status   PENDING
INFO:luigi-interface:Informed scheduler that task   LoadData__99914b932b   has status   PENDING
INFO: Infor

False

In [50]:
print(engine)


Engine(postgresql://postgres@localhost:5432/pacmann)
