<a href="https://colab.research.google.com/github/kokejohh/PMU_Module_2/blob/main/data_pipeline_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Data pipeline example

In [None]:
import numpy as np
import pandas as pd
import os
import shutil

In [None]:
!git clone https://github.com/Gal1leo2/Data-Pipeline-in-Python---Module-2---Workshop-- data_pipeline

Cloning into 'data_pipeline'...
remote: Enumerating objects: 9, done.[K
remote: Counting objects: 100% (9/9), done.[K
remote: Compressing objects: 100% (7/7), done.[K
remote: Total 9 (delta 0), reused 9 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (9/9), 2.67 MiB | 17.73 MiB/s, done.


In [5]:
#ingest
def ingest():
  src_dir = './data_pipeline/data/raw'
  dest_dir = './data_pipeline/data/staged'
  os.makedirs(dest_dir, exist_ok=True)

  for file in ['products.csv', 'transactions.csv', 'users.csv']:
    shutil.copy(os.path.join(src_dir, file), os.path.join(dest_dir, file))
    print(f'copy {file} successfully')

ingest()

copy products.csv successfully
copy transactions.csv successfully
copy users.csv successfully


In [38]:
#stage
def stage():
  staged_dir = './data_pipeline/data/staged'

  products = pd.read_csv(os.path.join(staged_dir, 'products.csv'))
  transactions = pd.read_csv(os.path.join(staged_dir, 'transactions.csv'))
  users = pd.read_csv(os.path.join(staged_dir, 'users.csv'))

  for name, df in [('products', products), ('transactions', transactions), ('users', users)]:
    print(f'[STAGE]{name}:{df.shape[0]} rows, {df.shape[1]} columns')

  # Convert Type
  transactions['transaction_date'] = pd.to_datetime(transactions['transaction_date'], unit='ms')
  users['birthdate'] = pd.to_datetime(users['birthdate'], unit='ms')

  # Drop Duplicate
  transactions = transactions.drop_duplicates(subset=['transaction_id'])
  users = users.drop_duplicates(subset=['user_id'])

  # Export
  products.to_csv(os.path.join(staged_dir, 'products_clean.csv'), index=False)
  transactions.to_csv(os.path.join(staged_dir, 'transactions_clean.csv'), index=False)
  users.to_csv(os.path.join(staged_dir, 'users_clean.csv'), index=False)

  print('stage successfully')
stage()

[STAGE]products:9000 rows, 4 columns
[STAGE]transactions:30000 rows, 5 columns
[STAGE]users:7000 rows, 5 columns
stage successfully


In [44]:
#transform
def transform():
  staged_dir = './data_pipeline/data/staged'

  products = pd.read_csv(os.path.join(staged_dir, 'products_clean.csv'))
  transactions = pd.read_csv(os.path.join(staged_dir, 'transactions_clean.csv'))
  users = pd.read_csv(os.path.join(staged_dir, 'users_clean.csv'))

  # Join
  merged = transactions.merge(users, on='user_id').merge(products, on='product_id')

  # Aggregate
  summary = merged.groupby(['user_id', 'name', 'city']).agg(
      total_spent = ('amount', 'sum'),
      avg_spent = ('amount', 'mean'),
      transaction_count = ('transaction_id', 'count'),
      unique_categorie = ('category', 'nunique')
  ).reset_index()

  # display(summary)

  os.makedirs('./data_pipeline/output', exist_ok=True)
  summary.to_csv('./data_pipeline/output/summary.csv', index=False)

  print('transform successfully')

transform()

transform successfully


In [45]:
#load

import sqlite3

# .csv -> .db

def load():
  csv_path = './data_pipeline/output/summary.csv'
  db_path = './data_pipeline/output/retail.db'

  df = pd.read_csv(csv_path)
  conn = sqlite3.connect(db_path)

  df.to_sql('summary', conn, if_exists='replace', index=False)

  print('load successfully')

load()

load successfully
