giờ chúng ta sẽ thực hiện fake dữ liệu cho lĩnh vực logistics
bao gồm 2 bảng: 1. phiếu gửi: chứa thông tin đơn, 2. hành trình: chứa thông tin vận chuyển, hành trình của đơn
# Nghiệp vụ: 
- mỗi phiếu gửi sẽ xúât hiện duy nhất một lần trong bảng phiếu gửi với mã phiếu gửi là duy nhất
- mỗi phiếu gửi sẽ có một hoặc nhiều hành trình trong bảng hành trình
## Thông tin các bảng:
### Phiếu gửi:
- MA_PHIEUGUI: string, duy nhất
- MA_BUUCUC_GOC: string
- MA_BUUCUC_PHAT: string
- NGAY_GUI_BP: datetime
- TONG_CUOC_VND: float
- NGAY_NHAP_MAY: datetime
- TRONG_LUONG: float
- THU_HO: float
- MA_LOAI_HANGHOA: string

### Hành trình:
 - MA_VANDON: string
 - TRANG_THAI: int
 - THOI_GIAN: datetime
 - MA_BUUCUC: string
 - NHANVIENPHAT: string

# Yêu cầu
- một số điều kiện: 
  - mã phiếu gửi: prefix 'VTP' và còn lại là 12 characters of random string
  - với các mã bưu cục, lấy từ file csv được cung cấp sẵn, lấy mã random từ file đó. (số random này sẽ hợp lý nếu các bưu cục tại HN và HCM mỗi cái chiếm tầm 15% trên tổng số bill)
  - ngày gửi bưu phẩm: có 2 lựa chọn. một là random ngẫu nhiên một ngày bất kỳ, họăc gen ngày từ thời điểm hiện tại
  - ngày nhập máy = ngày gửi bưu phẩm (cho 1% số lượng bill có ngày nhập máy > ngày gửi bưu phẩm (tối đa 3 phút))
  - mã loại hàng hóa: TH hoặc HH, nếu là TH -> trọng lượng: 10 -> 100g; nếu là HH: > 100g, max 10kg, nhiều nhất vẫn loanh quanh tầm 500g - 1kg
  - tổng cước, thu hộ: random từ 10k -> 1000k

- với hành trình:
  - trạng thái sẽ bắt đầu từ 10x: khách đem gửi hàng, 200: nhập máy, 300: tạo bàn giao, 400 nhận bàn giao, 500: bắt đầu giao, 501: phát thành công, 503: đơn hủy, 504: đơn hoàn
    - nếu ngày nhập máy = ngày gửi bưu phẩm thì đơn sẽ bắt đầu hành trình từ 200, ngược lại sẽ bắt đầu từ 100
    - phần lớn hành trình sẽ trong giai đoạn 300, 400
    - nếu trạng thái là 503, 504 thì có thể sẽ phải tiếp tục hành trình lại từ 300, 400
    - nếu trạng thái 503, 504 thì không được lên lại 503, 504 một lần nữa
  - thời gian dựa vào ngày gửi bưu phẩm, thời gian sẽ random từ 1 -> 10 ngày sau ngày gửi bưu phẩm, thời gian này cũng cần phải hợp lý với trạng thái của đơn
  - mã nhân viên phát: random từ 100000 -> 999999

In [1]:
import random
import string
import pandas as pd
from datetime import datetime, timedelta
import os

def random_order_id():
    def random_string(length):
        return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))
    return 'VTP' + random_string(12)

def random_weight(loai_hanghoa):
    if loai_hanghoa == 'TH':
        return random.uniform(10, 100)
    return random.uniform(100, 10000)

def random_fee():
    return random.uniform(10000, 1000000)

def random_loai_hanghoa():
    return random.choice(['TH', 'HH'])

def random_nhanvienphat():
    return random.randint(100000, 999999)

In [21]:
"""
Tạo dữ liệu giả lập
num_orders: số lượng vận đơn cần tạo
mode: 'recent' - tạo dữ liệu trong khoảng ngày gần đây, 'current' - tạo dữ liệu phiếu gửi trong ngày hiện tại
"""
def fake_data(num_orders=1000, mode='current'):
    # Đọc dữ liệu bưu cục
    dm_buucuc = pd.read_csv('data/mapping_cn.csv')
    dm_buucuc = dm_buucuc[dm_buucuc['ma_quanhuyen'].notnull()]
    random_buucuc = lambda: random.choice(dm_buucuc['ma_buucuc'].values)
    # Danh sách trạng thái
    transition_rules = {
        100: [200],
        200: [300],
        300: [400],
        400: [500],
        500: [501, 503, 504],
        503: [300],
        504: [300],
    }
    orders = []
    orderS_journey = []
    # Tạo session id dưới dạng timestamp gmt
    sessionId = datetime.now().timestamp()*1000
    for _ in range(num_orders):
        order_id = random_order_id()  # Mã vận đơn
        if mode == 'recent': # trả về các ngày gần đây
            start_date_parse = datetime.now() - timedelta(days=random.randint(0,15), hours=random.randint(0,12), minutes=random.randint(0,30)) # Ngày gửi bưu phẩm
        elif mode == 'current': # trả về random thời điểm trong ngày
            now = datetime.now()
            start_date_parse = datetime(now.year, now.month, now.day, 0, 0, 0)  + timedelta(hours=random.randint(0,23), minutes=random.randint(0,59), seconds=random.randint(0,59)) # Ngày gửi bưu phẩm
        start_date = start_date_parse.timestamp()*1000
        ma_buucuc_goc = random_buucuc()  # Bưu cục gốc
        ma_buucuc_phat = random_buucuc()  # Bưu cục phát
        tong_cuoc_vnd = random_fee()  # Tổng cước
        ngay_nhap_may_parse = start_date_parse + timedelta(minutes=random.randint(0, 3), seconds=random.randint(0,30))  # Ngày nhập máy
        ngay_nhap_may = ngay_nhap_may_parse.timestamp()*1000
        trong_luong = random_weight(random_loai_hanghoa())  # Trọng lượng
        thu_ho = random_fee()  # Thu hộ
        ma_loai_hanghoa = random_loai_hanghoa()  # Mã loại hàng hóa
        # Thêm dữ liệu phiếu gửi vào danh sách
        orders.append({
            "ma_phieugui": order_id,
            "ma_buucuc_goc": ma_buucuc_goc,
            "ma_buucuc_phat": ma_buucuc_phat,
            "ngay_gui_bp_parsed": start_date_parse,
            "ngay_gui_bp": start_date,
            "tong_cuoc_vnd": tong_cuoc_vnd,
            "ngay_nhap_may_parsed": ngay_nhap_may_parse,
            "ngay_nhap_may": ngay_nhap_may,
            "trong_luong": trong_luong,
            "thu_ho": thu_ho,
            "ma_loai_hanghoa": ma_loai_hanghoa,
            "partition": start_date_parse.strftime("%y%m%d"),
            "sessionId": sessionId
        })
        current_date = start_date_parse
        # Random trạng thái bắt đầu
        current_status = 200 if random.random() > 0.5 else 100  # Bắt đầu từ 200 hoặc 100
        # Tạo hành trình
        journey = []
        has_503_504 = False
        while current_status:
            station = random_buucuc()
            if current_status >= 400:
                if has_503_504:
                    station = ma_buucuc_phat
                # xác suất 80% sẽ rơi vào ma_buuc_phat
                if random.random() > 0.2:
                    station = ma_buucuc_phat
            if current_status >= 500:
                shiper = random_nhanvienphat()  # Nhân viên phát
            else:
                shiper = None
            journey.append((current_date, current_status, shiper, station))
            # Kết thúc hành trình nếu trạng thái là 501 (phát thành công)
            if current_status == 501:
                break
            if current_status in (503, 504):
                has_503_504 = True
            # Lấy trạng thái tiếp theo theo quy tắc
            next_states = transition_rules.get(current_status, [])
            # Loại bỏ 503 nếu đã có 504 và ngược lại
            if has_503_504:
                next_states = [s for s in next_states if s not in (503, 504)]
            if not next_states:
                break
            current_status = random.choice(next_states)
            # Tăng thời gian hợp lý dựa trên trạng thái
            current_date += timedelta(days=random.randint(0, 1), hours=random.randint(0, 6), minutes=random.randint(0, 30))
        # Thêm dữ liệu vào danh sách
        orderS_journey.append({
            "order_id": order_id,
            "journey": journey,
        })
    # Chuyển đổi sang DataFrame
    journey_data = []
    for order_journey in orderS_journey:
        for date_parse, status, shiper, station in order_journey["journey"]:
            journey_data.append({
                "ma_vandon": order_journey["order_id"],
                "thoi_gian_parse": date_parse,
                "thoi_gian": date_parse.timestamp() * 1000,
                "trang_thai": status,
                "nhanvienphat": shiper,
                "ma_buucuc": station,
                "partition": date_parse.strftime("%y%m%d"),
                "sessionId": sessionId
            })
    return orders, journey_data

In [30]:
def fake_to_postgres(num_orders=1000, mode='current'):
    # cách khác, sử dụng psycopg2
    import psycopg2

    # Create an engine instance
    POSTGRES_ADDRESS = 'localhost'
    POSTGRES_PORT = '5432'
    POSTGRES_USERNAME = 'postgres'
    POSTGRES_PASSWORD = 'postgres'
    POSTGRES_DBNAME = 'postgres'
    postgres_str = f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}@{POSTGRES_ADDRESS}:{POSTGRES_PORT}/{POSTGRES_DBNAME}"

    # Connect to the database
    conn = psycopg2.connect(
        host=POSTGRES_ADDRESS,
        database=POSTGRES_DBNAME,
        user=POSTGRES_USERNAME,
        password=POSTGRES_PASSWORD
    )

    # Create a cursor object
    cur = conn.cursor()

    # Xử lý một chút
    phieu_gui, hanh_trinh = fake_data(num_orders, mode)
    df_phieu_gui = pd.DataFrame(phieu_gui)
    df_phieu_gui.drop(columns=['ngay_gui_bp_parsed', 'ngay_nhap_may_parsed', 'partition', 'sessionId'], inplace=True)
    df_phieu_gui['ngay_gui_bp'] = pd.to_datetime(df_phieu_gui['ngay_gui_bp'], unit='ms')
    df_phieu_gui['ngay_nhap_may'] = pd.to_datetime(df_phieu_gui['ngay_nhap_may'], unit='ms')
    df_hanh_trinh = pd.DataFrame(hanh_trinh)
    df_hanh_trinh.drop(columns=['thoi_gian_parse', 'partition', 'sessionId'], inplace=True)
    df_hanh_trinh['thoi_gian'] = pd.to_datetime(df_hanh_trinh['thoi_gian'], unit='ms')

    # Insert query phieu_gui
    query = """
    INSERT INTO phieu_gui (ma_phieugui, ma_buucuc_goc, ma_buucuc_phat, ngay_gui_bp, tong_cuoc_vnd, ngay_nhap_may, trong_luong, thu_ho, ma_loai_hanghoa)
    VALUES %s
    -- ON CONFLICT DO NOTHING;
    """
    # Insert the data phieu_gui
    psycopg2.extras.execute_values(cur, query, df_phieu_gui.values)
    # Insert query hanh_trinh
    query = """
    INSERT INTO hanh_trinh (ma_vandon, thoi_gian, trang_thai, nhanvienphat, ma_buucuc)
    VALUES %s
    -- ON CONFLICT DO NOTHING;
    """
    # Insert the data hanh_trinh
    psycopg2.extras.execute_values(cur, query, df_hanh_trinh.values)
    # Commit the transaction
    conn.commit()
    # Close the cursor and the connection
    cur.close()
    conn.close()

In [34]:
def fake_to_hdfs(num_orders, mode):
  phieu_gui, hanh_trinh = fake_data(num_orders, mode)

  df_phieu_gui = pd.DataFrame(phieu_gui)
  df_hanh_trinh = pd.DataFrame(hanh_trinh)

  # ép kiểu đưa tất cả sang stringType
  df_phieu_gui = df_phieu_gui.astype(str)
  df_hanh_trinh = df_hanh_trinh.astype(str)

  # write to hdfs parquet with partition
  os.system('rm -rf .tmp/hdfs')

  df_phieu_gui.to_parquet('tmp/hdfs/phieu_gui', partition_cols=['partition'])
  df_hanh_trinh.to_parquet('tmp/hdfs/hanh_trinh', partition_cols=['partition'])

  from hdfs import InsecureClient

  # Connect to HDFS
  client = InsecureClient('http://localhost:9870', user='hung')

  client.delete('/HIVE', True)
  client.upload('/HIVE/PHIEU_GUI', 'tmp/phieu_gui/', overwrite=True)
  client.upload( '/HIVE/HANH_TRINH', 'tmp/hanh_trinh/', overwrite=True)

In [None]:
fake_to_postgres()