# Simple End-to-End Data Engineering

Materi ini adalah bagian dari Free Live Course oleh **Sekolah Data Engineering by PACMANN**

#### Background

- Anda adalah data engineer di perusahaan importir handphone yang melakukan manajemen invoice pemesanan dengan sistem informasi jadul berbasis web.

- Tim Business Intelligence mengalami kesulitan untuk menganalisis performa perusahaan karena pengambilan data secara manual.

- Anda akan membuat data pipeline end-to-end untuk menyelesaikan masalah tersebut

### Day 1: Data Pipeline & Web Scraping
#### Goal
- Ingest semua invoice beserta detailnya menggunakan web scraping
- Transform format data sesuai desain yang diberikan
- Store data ke dalam SQLite

# Ingesting Data

In [None]:
import requests
from bs4 import BeautifulSoup as bs

## Get invoice detail

In [None]:
def get_invoice_detail(invoice_id):
    url = f'https://invoice-scraping.demo.pacmann.ai/invoice/{invoice_id}'
    page = requests.get(url).text
    soup = bs(page)
    rows = soup.find('tbody').find_all('tr')
    result = []
    for row in rows:
        x = row.getText()
        brand = row.find('td', class_='brand').getText()
        type_ = row.find('td', class_='type').getText()
        price = row.find('td', class_='price').getText()
        quantity = row.find('td', class_='quantity').getText()
        data = {
            'brand': brand,
            'type': type_,
            'price': int(price.split('.')[0]),
            'quantity': quantity
        }
        result.append(data)
    return result

## Get invoice overview 

In [None]:
def get_invoice_overview(row):
    invoice_id = row.find('a').getText()
    invoice_detail_url = row.find('a')['href']
    invoice_date = row.find('td', class_='invoice_date').getText()
    country_of_origin = row.find('td', class_='country_of_origin').getText()
    seller = row.find('td', class_='seller').getText()
    distribution_area = row.find('td', class_='distribution_area').getText()
    total_price = row.find('td', class_='total_price').getText()
    
    # get invoice detail
    invoice_detail = get_invoice_detail(invoice_id)

    row_data = {
        'invoice_id': int(invoice_id),
        'invoice_detail_url': invoice_detail_url,
        'invoice_date': invoice_date,
        'country_of_origin': country_of_origin,
        'seller': seller,
        'distribution_area': distribution_area,
        'total_price': int(total_price.split('.')[0]),
        'detail': invoice_detail
    }
    return row_data

# Extract all invoices on a page

In [None]:
def get_all_invoice(url):
    page = requests.get(url).text
    soup = bs(page)
    rows = soup.find('tbody').find_all('tr')
    result = []
    for row in rows:
        x = get_invoice_overview(row)
        result.append(x)
    return result

## Extract all pages

In [2]:
start = 0
last_ids = []
for i in range(11):
    if start > 0:
        last_ids.append(start)
    start += 39
last_ids

[39, 78, 117, 156, 195, 234, 273, 312, 351, 390]

In [None]:
invoice_data = []
url = 'https://invoice-scraping.demo.pacmann.ai/'
print(url)
invoice_data += get_all_invoice(url)
for last_id in last_ids:
    url_next_page = f'https://invoice-scraping.demo.pacmann.ai/nextpage/{last_id}'
    print(url_next_page)
    invoice_data += get_all_invoice(url_next_page)

# Storing Data

In [None]:
# import & connect
import sqlite3
con = sqlite3.connect("invoice.db")
cur = con.cursor()

## create invoice master table

In [None]:
# create table
cur.execute("""
CREATE TABLE IF NOT EXISTS invoice(
    invoice_id			int,
    invoice_date			text,
    country_of_origin	text,
    seller				text,
    distribution_area 	text,
    total_price			int
);
""")

### transform data into specified structure

In [None]:
invoice_master_data = []
for row in invoice_data:
    invoice_id = row['invoice_id']
    invoice_date = row['invoice_date']
    country_of_origin = row['country_of_origin']
    seller = row['seller']
    distribution_area = row['distribution_area']
    total_price = row['total_price']
    tmp = (invoice_id, invoice_date,
           country_of_origin, seller,
           distribution_area, total_price)
    invoice_master_data.append(tmp)
# invoice_master_data

In [None]:
# insert data
cur.executemany(
    "INSERT INTO invoice VALUES(?, ?, ?, ?, ?, ?)",
    invoice_master_data
)
con.commit()

In [None]:
# fetch data
data = cur.execute("""
    SELECT *
    FROM invoice
    ORDER BY total_price DESC
    LIMIT 2
""").fetchall()
for d in data:
    print(d)

## create table invoice_product

In [None]:
# create table
cur.execute("""
CREATE TABLE IF NOT EXISTS invoice_product (
	invoice_id 	int,
	brand 		text,
	type 		text,
	price 		int,
	quantity 	int
);
""")

### transform data into specified structure

In [None]:
invoice_detail_data = []
invoice_product = []
for row in invoice_data:
    detail = row['detail']
    for d in detail:
        invoice_id = row['invoice_id']
        brand = d['brand']
        type_ = d['type']
        price = d['price']
        quantity = int(d['quantity'])
        tmp = (invoice_id, brand, type_, price, quantity)
        invoice_detail_data.append(tmp)
invoice_detail_data[0]

In [None]:
# insert data
cur.executemany(
    "INSERT INTO invoice_product VALUES(?, ?, ?, ?, ?)",
    invoice_detail_data
)
con.commit()

In [None]:
# fetch data
data = cur.execute("""
    SELECT brand, sum(quantity) total_quantity
    FROM invoice_product
    GROUP BY brand
    ORDER BY total_quantity DESC
    LIMIT 2
""").fetchall()
for d in data:
    print(d)

*All rights reserved*