# Tranfrom Data


* Load

In [35]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Create Data Frame')\
.config("spark.jars", "C:/Program Files/PostgreSQL/16/postgresql-42.7.3.jar") \
.getOrCreate()

In [36]:
df1 = spark.read \
.format("csv").option("header", 'true')\
.option('inferSchema','true')\
.option('dateFormat', "yyyy-MM-dd")\
.load(r'C:\Users\chihi\OneDrive\Desktop\report\data\lazada_data.csv')

In [37]:
df1.show(5)

+--------------------+---------+--------------------+--------+------------+--------+--------+--------------------+
|               title|    price|           link_item|discount|        sold|evaluate|location|           timestamp|
+--------------------+---------+--------------------+--------+------------+--------+--------+--------------------+
|Thời trang mùa th...|112.100 ₫|https://www.lazad...| 17% Off|   36 Đã bán|    (17)|   China|2024-07-23 13:42:...|
|PSD nam màu xanh ...| 66.900 ₫|https://www.lazad...| 23% Off|  316 Đã bán|     (9)|    NULL|2024-07-23 13:42:...|
|Quần denim, quần ...| 89.800 ₫|https://www.lazad...| 23% Off|    6 Đã bán|    NULL|   China|2024-07-23 13:42:...|
|Quần jean nam chấ...|139.000 ₫|https://www.lazad...| 22% Off|14.1K Đã bán|  (3416)|    NULL|2024-07-23 13:42:...|
|Người Đàn Ông Mới...|199.200 ₫|https://www.lazad...| 23% Off|   42 Đã bán|    (23)|   China|2024-07-23 13:42:...|
+--------------------+---------+--------------------+--------+------------+-----

* thêm cột id

In [38]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

windowSpec = Window.orderBy("price")
df1 = df1.withColumn("id", row_number().over(windowSpec))
# chỉnh cột id về đầu
columns = ['id'] + [col for col in df1.columns if col != 'id']
df1 = df1.select(columns)


In [39]:
df1.show(5)

+---+--------------------+---------+--------------------+--------+-----------+--------+-----------+--------------------+
| id|               title|    price|           link_item|discount|       sold|evaluate|   location|           timestamp|
+---+--------------------+---------+--------------------+--------+-----------+--------+-----------+--------------------+
|  1|QUần Côn Nam Hàng...|100.000 ₫|https://www.lazad...|    NULL|  75 Đã bán|    (17)|       NULL|2024-07-23 13:42:...|
|  2|Quần Jeans nam ca...|100.000 ₫|https://www.lazad...| 68% Off|  44 Đã bán|    (23)|      China|2024-07-23 13:42:...|
|  3|Quần short denim ...|104.400 ₫|https://www.lazad...| 43% Off|  11 Đã bán|     (5)|Hồ Chí Minh|2024-07-23 13:42:...|
|  4|M-3XL Mùa Hè Hàn ...|104.600 ₫|https://www.lazad...| 23% Off|  64 Đã bán|    (17)|       NULL|2024-07-23 13:42:...|
|  5|HOÀN TIỀN 15% - Q...|105.000 ₫|https://www.lazad...| 50% Off|1.0K Đã bán|   (354)|       NULL|2024-07-23 13:42:...|
+---+--------------------+------

In [40]:
df1.printSchema()

root
 |-- id: integer (nullable = false)
 |-- title: string (nullable = true)
 |-- price: string (nullable = true)
 |-- link_item: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- sold: string (nullable = true)
 |-- evaluate: string (nullable = true)
 |-- location: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [41]:
from pyspark.sql.functions import regexp_replace, col, when, split, concat_ws


# Xử lý cột 'evaluate': xóa dấu ngoặc đơn, thay thế giá trị null bằng 0, 
# và chuyển đổi thành kiểu int
df1 = df1.withColumn(
    'evaluate',
    when(col('evaluate').isNull(), '0')
    .otherwise(regexp_replace(col('evaluate'), '[()]', ''))
    .cast('int')
)

# Xử lý cột 'price': xóa dấu chấm và ký hiệu tiền tệ, thay thế giá trị null bằng 0, 
# và chuyển đổi thành kiểu int
df1 = df1.withColumn(
    'price',
    when(col('price').isNull(), '0')
    .otherwise(regexp_replace(col('price'), '[^0-9]', ''))
    .cast('int')
)

# Xử lý cột 'discount': lấy giá trị phần trăm, thay thế giá trị null bằng 0, 
# và chuyển đổi thành số thập phân
df1 = df1.withColumn(
    'discount',
    when(col('discount').isNull(), '0')
    .otherwise(regexp_replace(col('discount'), '[^0-9]', ''))
    .cast('int') / 100
)

# Xử lý cột 'sold': xóa chữ không mong muốn và chuyển đổi thành kiểu int
df1 = df1.withColumn(
    'sold',
    when(col('sold').isNull(), '0')
    .otherwise(regexp_replace(col('sold'), '[^\d]', ''))
    .cast('int')
)

# Tính toán giá trị giảm giá và giá sau khi giảm
df1 = df1.withColumn(
    'discount_value',
    col('price') * col('discount')
).withColumn(
    'final_price',
    col('price') - col('discount_value')
)
# Tách cột 'title' thành danh sách các từ
split_col = split(col('title'), ' ')

# Tạo cột 'Name' với 4 từ đầu tiên
df1 = df1.withColumn('Name', concat_ws(' ', split_col.getItem(0), 
                                       split_col.getItem(1), split_col.getItem(2), 
                                       split_col.getItem(3)))

# Di chuyển cột 'id' và 'Name' lên đầu
columns = df1.columns
new_column_order = ['id', 'Name'] + [col for col in columns if col not in ['id', 'Name']]

df1 = df1.select(new_column_order)



In [42]:
df1.show(5)
df1.printSchema()

+---+--------------------+--------------------+------+--------------------+--------+----+--------+-----------+--------------------+--------------+-----------+
| id|                Name|               title| price|           link_item|discount|sold|evaluate|   location|           timestamp|discount_value|final_price|
+---+--------------------+--------------------+------+--------------------+--------+----+--------+-----------+--------------------+--------------+-----------+
|  1|   QUần Côn Nam Hàng|QUần Côn Nam Hàng...|100000|https://www.lazad...|     0.0|  75|      17|       NULL|2024-07-23 13:42:...|           0.0|   100000.0|
|  2|  Quần Jeans nam cao|Quần Jeans nam ca...|100000|https://www.lazad...|    0.68|  44|      23|      China|2024-07-23 13:42:...|       68000.0|    32000.0|
|  3|Quần short denim ...|Quần short denim ...|104400|https://www.lazad...|    0.43|  11|       5|Hồ Chí Minh|2024-07-23 13:42:...|       44892.0|    59508.0|
|  4|    M-3XL Mùa Hè Hàn|M-3XL Mùa Hè Hàn ...

In [43]:
import pandas as pd


In [44]:
# Lưu DataFrame dưới dạng CSV với đường dẫn lưu trữ cụ thể
output_file_path = r'C:\Users\chihi\OneDrive\Desktop\report\data\lazada_data_cleaned.csv'
df1.toPandas().to_csv(output_file_path, index=False)

# Lưu về csdl với bảng Product

In [45]:
import pandas as pd
from sqlalchemy import create_engine

# Thông tin kết nối
username = 'postgres'
password = '010701'
host = 'localhost'
port = '5433'
database = 'lazada'

# Tạo chuỗi kết nối
engine = create_engine(f'postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}')

# Kiểm tra kết nối
try:
    with engine.connect() as connection:
        print("Connected to PostgreSQL database!")
except Exception as e:
    print(f"Error: {e}")


Connected to PostgreSQL database!


* copy dữ liệu vào table prodct

In [46]:
import pandas as pd
import psycopg2
from io import StringIO

# Kết nối tới PostgreSQL
conn = psycopg2.connect(
    dbname='lazada',
    user='postgres',
    password='010701',
    host='localhost',
    port='5433'
)
cur = conn.cursor()

# Đọc file CSV
df2 = pd.read_csv(r'C:\Users\chihi\OneDrive\Desktop\report\data\lazada_data_cleaned.csv')

# Tạo bảng nếu chưa tồn tại
create_table_query = '''
CREATE TABLE IF NOT EXISTS product (
    id INTEGER NOT NULL,
    Name TEXT NOT NULL,
    title TEXT,
    price INTEGER,
    link_item TEXT,
    discount DOUBLE PRECISION,
    sold INTEGER,
    evaluate INTEGER,
    location TEXT,
    timestamp TIMESTAMP,
    discount_value DOUBLE PRECISION,
    final_price DOUBLE PRECISION
);
'''
cur.execute(create_table_query)
conn.commit()

# Chuyển DataFrame thành định dạng CSV trong bộ nhớ
output = StringIO()
df2.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)

# Sử dụng copy_expert để ghi dữ liệu vào PostgreSQL
copy_query = """
COPY product (id, Name, title, price, link_item, discount, sold, evaluate, location, timestamp, discount_value, final_price)
FROM STDIN WITH CSV DELIMITER '\t'
"""
cur.copy_expert(copy_query, output)

conn.commit()
cur.close()
conn.close()

print("Data successfully written to PostgreSQL")


Data successfully written to PostgreSQL


In [47]:
# Đóng SparkSession
spark.stop()