In [1]:
import pandas as pd

In [2]:
dag_path = r'd:\\Coding\\DataEngineering\\shopsmart\\'
rawdata_path = '../raw_data'

In [3]:
product = pd.read_csv(f'{rawdata_path}/product_catalog[1].csv')
transaction = pd.read_json(f'{rawdata_path}/customer_transactions[3].json')

In [4]:
schema = {
    'product_id ': 'str',  
    'product_name': 'str',  
    'category': 'str',  
    'price': 'float64',     
}

In [5]:
product, product.dtypes

(  product_id product_name    category          price
 0       P001    Product 1  Category A         -100.0
 1       P002    Product 2  Category B          200.0
 2       P003    Product 3  Category A          150.0
 3       P004          NaN  Category C          300.0
 4       P005    Product 5  Category C  invalid_price
 5       P001    Product 1  Category A          100.0
 6       P006    Product 6  Category A          800.0,
 product_id      object
 product_name    object
 category        object
 price           object
 dtype: object)

In [6]:
# clean data
product.dropna(axis=0, inplace=True)
product['price'] = pd.to_numeric(product['price'], errors='coerce')
product.dropna(subset=['price'], inplace=True)
product = product[product['price'] >= 0] # price should not negative value
print(product.dtypes)
product

product_id       object
product_name     object
category         object
price           float64
dtype: object


Unnamed: 0,product_id,product_name,category,price
1,P002,Product 2,Category B,200.0
2,P003,Product 3,Category A,150.0
5,P001,Product 1,Category A,100.0
6,P006,Product 6,Category A,800.0


In [7]:
schema = {
    'transaction_id ': 'str',  
    'customer_id': 'str',  
    'product_id': 'str',  
    'quantity': 'int64',  
    'price': 'float64',     
    'timestamp': 'datetime64[ns]',     
}
# pd.read_json(f'{rawdata_path}/customer_transactions[3].json', dtype=schema)

In [8]:
# clean data
transaction.dropna(axis=0, inplace=True)
transaction['quantity'] = pd.to_numeric(transaction['quantity'], errors='coerce').astype('int64')
transaction['price'] = pd.to_numeric(transaction['price'], errors='coerce').astype('float64')
transaction['timestamp'] = pd.to_datetime(transaction['timestamp'], errors='coerce').astype('datetime64[ns]')
transaction.dropna(subset=['price'], inplace=True)
transaction = transaction[transaction['price'] >= 0] # price should not negative value
for col in ['transaction_id', 'customer_id', 'product_id']:
    transaction[col] = transaction[col].astype('str')
print(transaction.dtypes)
transaction

transaction_id            object
customer_id               object
product_id                object
quantity                   int64
price                    float64
timestamp         datetime64[ns]
dtype: object


Unnamed: 0,transaction_id,customer_id,product_id,quantity,price,timestamp
0,b1182d23-5bf8-4676-9f12-a0e912011caa,C001,P003,6,150.0,2024-02-04 13:06:35
1,3b019fcb-f96d-4403-948c-93f3028d042b,C003,P001,10,100.0,2024-06-22 11:42:33
2,346d2b7e-00d3-41af-b53e-0c8624aeba79,C001,P001,5,100.0,2024-03-12 16:16:16
3,9ad7558f-eecb-425b-864f-768db080e003,C002,P002,3,200.0,2024-03-29 04:01:02
4,b51d70d8-582d-4972-ba08-9ca81cc5a8a0,C003,P002,1,200.0,2024-02-10 09:54:36
5,b1182d23-5bf8-4676-9f12-a0e912011caa,C001,P003,6,150.0,2024-02-04 13:06:35
6,3b019fcb-f96d-4403-948c-93f3028d042b,C003,P001,10,100.0,2024-06-22 11:42:33
7,346d2b7e-00d3-41af-b53e-0c8624aeba79,C001,P001,5,100.0,2024-03-12 16:16:16
8,9ad7558f-eecb-425b-864f-768db080e003,C002,P002,3,200.0,2024-03-29 04:01:02
9,b51d70d8-582d-4972-ba08-9ca81cc5a8a0,C003,P002,1,200.0,2024-02-10 09:54:36


In [9]:
# Merge 
merged_df = pd.merge(transaction, product, on='product_id', how='inner', suffixes=('_left', ''))
merged_df.drop(columns=['price_left'], inplace=True)
merged_df.drop_duplicates(inplace=True)
merged_df


Unnamed: 0,transaction_id,customer_id,product_id,quantity,timestamp,product_name,category,price
0,b1182d23-5bf8-4676-9f12-a0e912011caa,C001,P003,6,2024-02-04 13:06:35,Product 3,Category A,150.0
4,3b019fcb-f96d-4403-948c-93f3028d042b,C003,P001,10,2024-06-22 11:42:33,Product 1,Category A,100.0
5,346d2b7e-00d3-41af-b53e-0c8624aeba79,C001,P001,5,2024-03-12 16:16:16,Product 1,Category A,100.0
12,9ad7558f-eecb-425b-864f-768db080e003,C002,P002,3,2024-03-29 04:01:02,Product 2,Category B,200.0
13,b51d70d8-582d-4972-ba08-9ca81cc5a8a0,C003,P002,1,2024-02-10 09:54:36,Product 2,Category B,200.0


In [10]:
merged_df[merged_df['transaction_id'] == 'b1182d23-5bf8-4676-9f12-a0e912011caa']

Unnamed: 0,transaction_id,customer_id,product_id,quantity,timestamp,product_name,category,price
0,b1182d23-5bf8-4676-9f12-a0e912011caa,C001,P003,6,2024-02-04 13:06:35,Product 3,Category A,150.0


In [11]:
merged_df.columns

Index(['transaction_id', 'customer_id', 'product_id', 'quantity', 'timestamp',
       'product_name', 'category', 'price'],
      dtype='object')

## Check query pg

In [22]:
import psycopg2
import matplotlib.pyplot as plt

# Database connection parameters
db_params = {
    'dbname': 'shopsmart',
    'user': 'postgres',
    'password': 'secret',
    'host': 'localhost', 
    'port': 5434
}

# Connect to the PostgreSQL database
conn = psycopg2.connect(**db_params)

# Query to fetch data from fact_sales
query = '''
SELECT transaction_id, customer_id, product_id, quantity, price, timestamp
FROM fact_sales
'''

# Fetch the data into a DataFrame
df = pd.read_sql(query, conn)

# Close the connection
conn.close()

# Plotting
fig, ax = plt.subplots(1, 2, figsize=(15, 5))

# Plot: Sales quantity over time
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df.resample('D')['quantity'].sum().plot(ax=ax[0], title='Daily Sales Quantity')

# Table: Display the DataFrame as a table
ax[1].axis('off')
table = ax[1].table(cellText=df.head(10).values, colLabels=df.columns, cellLoc='center', loc='center')
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1.2, 1.2)

# Show plot
plt.tight_layout()
plt.show()


OperationalError: connection to server at "localhost" (::1), port 5434 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5434 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?
