In [6]:
!pip install pyspark



In [2]:
!pip install psycopg2

Collecting psycopg2
  Downloading psycopg2-2.9.7-cp39-cp39-win_amd64.whl (1.2 MB)
     ---------------------------------------- 1.2/1.2 MB 775.2 kB/s eta 0:00:00
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.7


# Extract

In [3]:
import csv
import pandas as pd

data = pd.read_csv('data.csv',encoding= 'unicode_escape')

data.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850.0,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850.0,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850.0,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850.0,United Kingdom


In [7]:
data.shape

(541909, 8)

In [8]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   InvoiceNo    541909 non-null  object 
 1   StockCode    541909 non-null  object 
 2   Description  540455 non-null  object 
 3   Quantity     541909 non-null  int64  
 4   InvoiceDate  541909 non-null  object 
 5   UnitPrice    541909 non-null  float64
 6   CustomerID   406829 non-null  float64
 7   Country      541909 non-null  object 
dtypes: float64(2), int64(1), object(5)
memory usage: 33.1+ MB


# Clean

In [11]:
#Check Duplicates
data.duplicated().sum()

5268

In [12]:
for col in data.columns:
    pct_missing = data[col].isnull().mean()
    print(f'{col} - {pct_missing :.1%}')

InvoiceNo - 0.0%
StockCode - 0.0%
Description - 0.3%
Quantity - 0.0%
InvoiceDate - 0.0%
UnitPrice - 0.0%
CustomerID - 24.9%
Country - 0.0%


In [13]:
data['Description'] = data['Description'].fillna('Unknown')
data['CustomerID']  = data['CustomerID'].fillna(0)

In [14]:
data.isnull().sum()

InvoiceNo      0
StockCode      0
Description    0
Quantity       0
InvoiceDate    0
UnitPrice      0
CustomerID     0
Country        0
dtype: int64

# Transform 

In [15]:
data['InvoiceDate'] = pd.to_datetime(data.InvoiceDate, format='%m/%d/%Y %H:%M')

In [16]:
data['Description'] = data.Description.str.lower()

In [17]:
data['TotalAmount'] = data['Quantity'] * data['UnitPrice']

In [18]:
# change columns tyoe - String to Int type 
data['CustomerID'] = data['CustomerID'].astype('int64')

In [19]:
data['CustomerID'] = data['CustomerID'].astype(str)

In [20]:
data.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,TotalAmount
0,536365,85123A,white hanging heart t-light holder,6,2010-12-01 08:26:00,2.55,17850,United Kingdom,15.3
1,536365,71053,white metal lantern,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34
2,536365,84406B,cream cupid hearts coat hanger,8,2010-12-01 08:26:00,2.75,17850,United Kingdom,22.0
3,536365,84029G,knitted union flag hot water bottle,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34
4,536365,84029E,red woolly hottie white heart.,6,2010-12-01 08:26:00,3.39,17850,United Kingdom,20.34


In [21]:
# rearrange all the columns for easy reference
data = data[['CustomerID','InvoiceNo','InvoiceDate','Description','StockCode','Quantity','UnitPrice','TotalAmount','Country']]

In [22]:
data.head()

Unnamed: 0,CustomerID,InvoiceNo,InvoiceDate,Description,StockCode,Quantity,UnitPrice,TotalAmount,Country
0,17850,536365,2010-12-01 08:26:00,white hanging heart t-light holder,85123A,6,2.55,15.3,United Kingdom
1,17850,536365,2010-12-01 08:26:00,white metal lantern,71053,6,3.39,20.34,United Kingdom
2,17850,536365,2010-12-01 08:26:00,cream cupid hearts coat hanger,84406B,8,2.75,22.0,United Kingdom
3,17850,536365,2010-12-01 08:26:00,knitted union flag hot water bottle,84029G,6,3.39,20.34,United Kingdom
4,17850,536365,2010-12-01 08:26:00,red woolly hottie white heart.,84029E,6,3.39,20.34,United Kingdom


In [24]:
data.tail()

Unnamed: 0,CustomerID,InvoiceNo,InvoiceDate,Description,StockCode,Quantity,UnitPrice,TotalAmount,Country
541904,12680,581587,2011-12-09 12:50:00,pack of 20 spaceboy napkins,22613,12,0.85,10.2,France
541905,12680,581587,2011-12-09 12:50:00,children's apron dolly girl,22899,6,2.1,12.6,France
541906,12680,581587,2011-12-09 12:50:00,childrens cutlery dolly girl,23254,4,4.15,16.6,France
541907,12680,581587,2011-12-09 12:50:00,childrens cutlery circus parade,23255,4,4.15,16.6,France
541908,12680,581587,2011-12-09 12:50:00,baking set 9 piece retrospot,22138,3,4.95,14.85,France


# Load

In [36]:
import psycopg2
import csv
import psycopg2.extras
import psycopg2
from psycopg2 import sql

In [37]:
conn = psycopg2.connect(
    host="localhost",
    database="ECommerce",
    user="postgres",
    password="Mundackal123"
)


In [38]:
# Create a cursor object to interact with the database
cur = conn.cursor()

# Define the table name and CSV file path
table_name = 'Product'

df_columns = list(data)
columns = ",".join(df_columns)

header = columns.split(',') # Get the header row
types = ['VARCHAR' for _ in header] 
try:
# Construct the CREATE TABLE statement
    create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(f'{col} {typ}' for col, typ in zip(header, types))})"

# Execute the CREATE TABLE statement
    cur.execute(create_table_sql)
    conn.commit()

# Close the database connection
    cur.close()
    conn.close()
except BaseException as e:
       conn.rollback()
       

In [39]:
cur = conn.cursor()
table_name = 'Product'
try:
    data.columns = data.columns.str.lower()

    # Loop through the rows of the DataFrame
    for index, row in data.iterrows():
        # Construct the INSERT query
        insert_query = sql.SQL(
            "INSERT INTO {} ({}) VALUES ({})"
        ).format(
            sql.Identifier(table_name),
            sql.SQL(", ").join(map(sql.Identifier, row.index),
            sql.SQL(", ").join([sql.Placeholder()] * len(row))  # Use len(row) instead of len(row.index)
        )

        # Execute the INSERT query
        cur.execute(insert_query, [AsIs(val) for val in row])

    # Commit the transaction
    conn.commit()

except Exception as e:
    # Roll back the transaction in case of any error
    conn.rollback()
    print(f"Error: {e}")

finally:
    # Close the cursor and the connection
    cur.close()
    conn.close()


SyntaxError: invalid syntax (3123978236.py, line 18)