In [4]:
import requests
import pandas as pd
import mysql.connector
import os
from pandarallel import pandarallel
from utils import image
from airflow import DAG
from airflow.operators.python import PythonOperator

In [5]:
# Initialise

root_path = '/Users/nitinnandansingh/Documents/workspace/e-commerce_data_pipeline'
images_dir = os.path.join(root_path, 'assets/product_images')
print(images_dir)

# Initialize pandarallel
pandarallel.initialize(progress_bar=True)

/Users/nitinnandansingh/Documents/workspace/e-commerce_data_pipeline/assets/product_images
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.


In [6]:
def fetch_products_data():
    response = requests.get("https://fakestoreapi.com/products")
    data = response.json()
    return pd.DataFrame(data)

In [7]:
def fetch_users_data():
    response = requests.get("https://fakestoreapi.com/users")
    data = response.json()
    return pd.DataFrame(data)

In [8]:
products_df = fetch_products_data()
users_df = fetch_users_data()

## Explore products data

In [9]:
products_df.head(3)

Unnamed: 0,id,title,price,description,category,image,rating
0,1,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",109.95,Your perfect pack for everyday use and walks i...,men's clothing,https://fakestoreapi.com/img/81fPKd-2AYL._AC_S...,"{'rate': 3.9, 'count': 120}"
1,2,Mens Casual Premium Slim Fit T-Shirts,22.3,"Slim-fitting style, contrast raglan long sleev...",men's clothing,https://fakestoreapi.com/img/71-3HjGNDUL._AC_S...,"{'rate': 4.1, 'count': 259}"
2,3,Mens Cotton Jacket,55.99,great outerwear jackets for Spring/Autumn/Wint...,men's clothing,https://fakestoreapi.com/img/71li-ujtlUL._AC_U...,"{'rate': 4.7, 'count': 500}"


In [10]:
products_df.columns.to_list()

['id', 'title', 'price', 'description', 'category', 'image', 'rating']

In [11]:
products_df.describe(include='all').fillna("")

Unnamed: 0,id,title,price,description,category,image,rating
count,20.0,20,20.0,20,20,20,20
unique,,20,,20,4,20,20
top,,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",,Your perfect pack for everyday use and walks i...,electronics,https://fakestoreapi.com/img/81fPKd-2AYL._AC_S...,"{'rate': 3.9, 'count': 120}"
freq,,1,,1,6,1,1
mean,10.5,,162.046,,,,
std,5.91608,,272.220532,,,,
min,1.0,,7.95,,,,
25%,5.75,,15.24,,,,
50%,10.5,,56.49,,,,
75%,15.25,,110.9625,,,,


In [12]:
products_df['image'][0]

'https://fakestoreapi.com/img/81fPKd-2AYL._AC_SL1500_.jpg'

### Download images

In [13]:
def download_image(url, images_dir):
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        filename = os.path.join(images_dir, url.split('/')[-1])
        with open(filename, 'wb') as file:
            for chunk in response.iter_content(1024):
                file.write(chunk)
        return filename
    except Exception as e:
        print(f"Error downloading {url}: {e}")
        return None

def parallel_download(url):
    save_dir = images_dir
    return download_image(url, save_dir)

products_df['images_dir'] = products_df['image'].parallel_apply(parallel_download)

VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=3), Label(value='0 / 3'))), HBox(c…

In [14]:
products_df.head(5)

Unnamed: 0,id,title,price,description,category,image,rating,images_dir
0,1,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",109.95,Your perfect pack for everyday use and walks i...,men's clothing,https://fakestoreapi.com/img/81fPKd-2AYL._AC_S...,"{'rate': 3.9, 'count': 120}",/Users/nitinnandansingh/Documents/workspace/e-...
1,2,Mens Casual Premium Slim Fit T-Shirts,22.3,"Slim-fitting style, contrast raglan long sleev...",men's clothing,https://fakestoreapi.com/img/71-3HjGNDUL._AC_S...,"{'rate': 4.1, 'count': 259}",/Users/nitinnandansingh/Documents/workspace/e-...
2,3,Mens Cotton Jacket,55.99,great outerwear jackets for Spring/Autumn/Wint...,men's clothing,https://fakestoreapi.com/img/71li-ujtlUL._AC_U...,"{'rate': 4.7, 'count': 500}",/Users/nitinnandansingh/Documents/workspace/e-...
3,4,Mens Casual Slim Fit,15.99,The color could be slightly different between ...,men's clothing,https://fakestoreapi.com/img/71YXzeOuslL._AC_U...,"{'rate': 2.1, 'count': 430}",/Users/nitinnandansingh/Documents/workspace/e-...
4,5,John Hardy Women's Legends Naga Gold & Silver ...,695.0,"From our Legends Collection, the Naga was insp...",jewelery,https://fakestoreapi.com/img/71pWzhdJNwL._AC_U...,"{'rate': 4.6, 'count': 400}",/Users/nitinnandansingh/Documents/workspace/e-...


In [15]:
products_df.drop_duplicates(subset=['id'], inplace=True)
# products_df.dropna(inplace=True)

In [16]:
products_df['rating'][0]

{'rate': 3.9, 'count': 120}

In [17]:
def get_count_of_ratings(dict_val):
    return dict_val['count']

def get_ratings(dict_val):
    return dict_val['rate']

In [18]:
products_df['count_of_ratings'] = products_df['rating'].parallel_apply(get_count_of_ratings)

# products_df['count_of_ratings'] = products_df['rating']['count']

VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=3), Label(value='0 / 3'))), HBox(c…

In [19]:
products_df['rating'] = products_df['rating'].parallel_apply(get_ratings)


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=3), Label(value='0 / 3'))), HBox(c…

In [20]:
products_df.head(3)

Unnamed: 0,id,title,price,description,category,image,rating,images_dir,count_of_ratings
0,1,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",109.95,Your perfect pack for everyday use and walks i...,men's clothing,https://fakestoreapi.com/img/81fPKd-2AYL._AC_S...,3.9,/Users/nitinnandansingh/Documents/workspace/e-...,120
1,2,Mens Casual Premium Slim Fit T-Shirts,22.3,"Slim-fitting style, contrast raglan long sleev...",men's clothing,https://fakestoreapi.com/img/71-3HjGNDUL._AC_S...,4.1,/Users/nitinnandansingh/Documents/workspace/e-...,259
2,3,Mens Cotton Jacket,55.99,great outerwear jackets for Spring/Autumn/Wint...,men's clothing,https://fakestoreapi.com/img/71li-ujtlUL._AC_U...,4.7,/Users/nitinnandansingh/Documents/workspace/e-...,500


In [21]:
# validate images
products_df['image_valid'] = products_df['images_dir'].parallel_apply(image.is_valid_image)

VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=3), Label(value='0 / 3'))), HBox(c…

In [22]:
products_df[products_df['image_valid']==False]

Unnamed: 0,id,title,price,description,category,image,rating,images_dir,count_of_ratings,image_valid


In [23]:
products_df.drop(columns=['image_valid'], axis=1, inplace=True)
products_df.head(3)

Unnamed: 0,id,title,price,description,category,image,rating,images_dir,count_of_ratings
0,1,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",109.95,Your perfect pack for everyday use and walks i...,men's clothing,https://fakestoreapi.com/img/81fPKd-2AYL._AC_S...,3.9,/Users/nitinnandansingh/Documents/workspace/e-...,120
1,2,Mens Casual Premium Slim Fit T-Shirts,22.3,"Slim-fitting style, contrast raglan long sleev...",men's clothing,https://fakestoreapi.com/img/71-3HjGNDUL._AC_S...,4.1,/Users/nitinnandansingh/Documents/workspace/e-...,259
2,3,Mens Cotton Jacket,55.99,great outerwear jackets for Spring/Autumn/Wint...,men's clothing,https://fakestoreapi.com/img/71li-ujtlUL._AC_U...,4.7,/Users/nitinnandansingh/Documents/workspace/e-...,500


In [24]:
products_df.columns.to_list()

['id',
 'title',
 'price',
 'description',
 'category',
 'image',
 'rating',
 'images_dir',
 'count_of_ratings']

In [27]:
def check_data_types(df):
    # Get the data types of each column
    data_types = df.dtypes

    # Print the data type of each column
    for column, dtype in data_types.items():
        print(f"Column '{column}' has data type '{dtype}'.")

# Call the function with your DataFrame
check_data_types(products_df)

Column 'id' has data type 'int64'.
Column 'title' has data type 'object'.
Column 'price' has data type 'float64'.
Column 'description' has data type 'object'.
Column 'category' has data type 'object'.
Column 'image' has data type 'object'.
Column 'rating' has data type 'float64'.
Column 'images_dir' has data type 'object'.
Column 'count_of_ratings' has data type 'int64'.


## Explore users data

In [28]:
users_df.head(3)

Unnamed: 0,address,id,email,username,password,name,phone,__v
0,"{'geolocation': {'lat': '-37.3159', 'long': '8...",1,john@gmail.com,johnd,m38rmF$,"{'firstname': 'john', 'lastname': 'doe'}",1-570-236-7033,0
1,"{'geolocation': {'lat': '-37.3159', 'long': '8...",2,morrison@gmail.com,mor_2314,83r5^_,"{'firstname': 'david', 'lastname': 'morrison'}",1-570-236-7033,0
2,"{'geolocation': {'lat': '40.3467', 'long': '-3...",3,kevin@gmail.com,kevinryan,kev02937@,"{'firstname': 'kevin', 'lastname': 'ryan'}",1-567-094-1345,0


In [29]:
users_df.columns.to_list()

['address', 'id', 'email', 'username', 'password', 'name', 'phone', '__v']

In [30]:
set(users_df['__v'].to_list())

{0}

In [31]:
users_df.describe(include='all').fillna('')

Unnamed: 0,address,id,email,username,password,name,phone,__v
count,10,10.0,10,10,10,10,10,10.0
unique,10,,10,10,10,10,9,
top,"{'geolocation': {'lat': '-37.3159', 'long': '8...",,john@gmail.com,johnd,m38rmF$,"{'firstname': 'john', 'lastname': 'doe'}",1-570-236-7033,
freq,1,,1,1,1,1,2,
mean,,5.5,,,,,,0.0
std,,3.02765,,,,,,0.0
min,,1.0,,,,,,0.0
25%,,3.25,,,,,,0.0
50%,,5.5,,,,,,0.0
75%,,7.75,,,,,,0.0


In [32]:
users_df.drop(columns=['__v'], axis=1, inplace=True)


In [33]:
def check_data_types(df):
    # Get the data types of each column
    data_types = df.dtypes

    # Print the data type of each column
    for column, dtype in data_types.items():
        print(f"Column '{column}' has data type '{dtype}'.")

# Call the function with your DataFrame
check_data_types(users_df)


Column 'address' has data type 'object'.
Column 'id' has data type 'int64'.
Column 'email' has data type 'object'.
Column 'username' has data type 'object'.
Column 'password' has data type 'object'.
Column 'name' has data type 'object'.
Column 'phone' has data type 'object'.


In [40]:
type(users_df['address'][0]['number'])

int

In [41]:
address_df = users_df['address'].apply(pd.Series)
address_df

Unnamed: 0,geolocation,city,street,number,zipcode
0,"{'lat': '-37.3159', 'long': '81.1496'}",kilcoole,new road,7682,12926-3874
1,"{'lat': '-37.3159', 'long': '81.1496'}",kilcoole,Lovers Ln,7267,12926-3874
2,"{'lat': '40.3467', 'long': '-30.1310'}",Cullman,Frances Ct,86,29567-1452
3,"{'lat': '50.3467', 'long': '-20.1310'}",San Antonio,Hunters Creek Dr,6454,98234-1734
4,"{'lat': '40.3467', 'long': '-40.1310'}",san Antonio,adams St,245,80796-1234
5,"{'lat': '20.1677', 'long': '-10.6789'}",el paso,prospect st,124,12346-0456
6,"{'lat': '10.3456', 'long': '20.6419'}",fresno,saddle st,1342,96378-0245
7,"{'lat': '50.3456', 'long': '10.6419'}",mesa,vally view ln,1342,96378-0245
8,"{'lat': '40.12456', 'long': '20.5419'}",miami,avondale ave,345,96378-0245
9,"{'lat': '30.24788', 'long': '-20.545419'}",fort wayne,oak lawn ave,526,10256-4532


In [44]:
users_df = pd.concat([users_df, address_df], axis=1).drop('address', axis=1)
users_df

Unnamed: 0,id,email,username,password,name,phone,geolocation,city,street,number,zipcode
0,1,john@gmail.com,johnd,m38rmF$,"{'firstname': 'john', 'lastname': 'doe'}",1-570-236-7033,"{'lat': '-37.3159', 'long': '81.1496'}",kilcoole,new road,7682,12926-3874
1,2,morrison@gmail.com,mor_2314,83r5^_,"{'firstname': 'david', 'lastname': 'morrison'}",1-570-236-7033,"{'lat': '-37.3159', 'long': '81.1496'}",kilcoole,Lovers Ln,7267,12926-3874
2,3,kevin@gmail.com,kevinryan,kev02937@,"{'firstname': 'kevin', 'lastname': 'ryan'}",1-567-094-1345,"{'lat': '40.3467', 'long': '-30.1310'}",Cullman,Frances Ct,86,29567-1452
3,4,don@gmail.com,donero,ewedon,"{'firstname': 'don', 'lastname': 'romer'}",1-765-789-6734,"{'lat': '50.3467', 'long': '-20.1310'}",San Antonio,Hunters Creek Dr,6454,98234-1734
4,5,derek@gmail.com,derek,jklg*_56,"{'firstname': 'derek', 'lastname': 'powell'}",1-956-001-1945,"{'lat': '40.3467', 'long': '-40.1310'}",san Antonio,adams St,245,80796-1234
5,6,david_r@gmail.com,david_r,3478*#54,"{'firstname': 'david', 'lastname': 'russell'}",1-678-345-9856,"{'lat': '20.1677', 'long': '-10.6789'}",el paso,prospect st,124,12346-0456
6,7,miriam@gmail.com,snyder,f238&@*$,"{'firstname': 'miriam', 'lastname': 'snyder'}",1-123-943-0563,"{'lat': '10.3456', 'long': '20.6419'}",fresno,saddle st,1342,96378-0245
7,8,william@gmail.com,hopkins,William56$hj,"{'firstname': 'william', 'lastname': 'hopkins'}",1-478-001-0890,"{'lat': '50.3456', 'long': '10.6419'}",mesa,vally view ln,1342,96378-0245
8,9,kate@gmail.com,kate_h,kfejk@*_,"{'firstname': 'kate', 'lastname': 'hale'}",1-678-456-1934,"{'lat': '40.12456', 'long': '20.5419'}",miami,avondale ave,345,96378-0245
9,10,jimmie@gmail.com,jimmie_k,klein*#%*,"{'firstname': 'jimmie', 'lastname': 'klein'}",1-104-001-4567,"{'lat': '30.24788', 'long': '-20.545419'}",fort wayne,oak lawn ave,526,10256-4532


In [45]:
users_df['lat'] = users_df['geolocation'].apply(lambda x: x['lat'])
users_df['long'] = users_df['geolocation'].apply(lambda x: x['long'])
users_df.drop(['geolocation'], axis=1, inplace=True)

In [46]:
users_df

Unnamed: 0,id,email,username,password,name,phone,city,street,number,zipcode,lat,long
0,1,john@gmail.com,johnd,m38rmF$,"{'firstname': 'john', 'lastname': 'doe'}",1-570-236-7033,kilcoole,new road,7682,12926-3874,-37.3159,81.1496
1,2,morrison@gmail.com,mor_2314,83r5^_,"{'firstname': 'david', 'lastname': 'morrison'}",1-570-236-7033,kilcoole,Lovers Ln,7267,12926-3874,-37.3159,81.1496
2,3,kevin@gmail.com,kevinryan,kev02937@,"{'firstname': 'kevin', 'lastname': 'ryan'}",1-567-094-1345,Cullman,Frances Ct,86,29567-1452,40.3467,-30.131
3,4,don@gmail.com,donero,ewedon,"{'firstname': 'don', 'lastname': 'romer'}",1-765-789-6734,San Antonio,Hunters Creek Dr,6454,98234-1734,50.3467,-20.131
4,5,derek@gmail.com,derek,jklg*_56,"{'firstname': 'derek', 'lastname': 'powell'}",1-956-001-1945,san Antonio,adams St,245,80796-1234,40.3467,-40.131
5,6,david_r@gmail.com,david_r,3478*#54,"{'firstname': 'david', 'lastname': 'russell'}",1-678-345-9856,el paso,prospect st,124,12346-0456,20.1677,-10.6789
6,7,miriam@gmail.com,snyder,f238&@*$,"{'firstname': 'miriam', 'lastname': 'snyder'}",1-123-943-0563,fresno,saddle st,1342,96378-0245,10.3456,20.6419
7,8,william@gmail.com,hopkins,William56$hj,"{'firstname': 'william', 'lastname': 'hopkins'}",1-478-001-0890,mesa,vally view ln,1342,96378-0245,50.3456,10.6419
8,9,kate@gmail.com,kate_h,kfejk@*_,"{'firstname': 'kate', 'lastname': 'hale'}",1-678-456-1934,miami,avondale ave,345,96378-0245,40.12456,20.5419
9,10,jimmie@gmail.com,jimmie_k,klein*#%*,"{'firstname': 'jimmie', 'lastname': 'klein'}",1-104-001-4567,fort wayne,oak lawn ave,526,10256-4532,30.24788,-20.545419


In [47]:
check_data_types(users_df)

Column 'id' has data type 'int64'.
Column 'email' has data type 'object'.
Column 'username' has data type 'object'.
Column 'password' has data type 'object'.
Column 'name' has data type 'object'.
Column 'phone' has data type 'object'.
Column 'city' has data type 'object'.
Column 'street' has data type 'object'.
Column 'number' has data type 'int64'.
Column 'zipcode' has data type 'object'.
Column 'lat' has data type 'object'.
Column 'long' has data type 'object'.


In [53]:
products_df.to_excel("products.xlsx", index=False)
users_df.to_excel("users.xlsx", index=False)

In [55]:
products_df.columns.to_list()

['id',
 'title',
 'price',
 'description',
 'category',
 'image',
 'rating',
 'images_dir',
 'count_of_ratings']

In [2]:
conn = mysql.connector.connect(
        host="localhost",
        user="ecommerce_user",
        password="password",
        database="ecommerce_data"
    )

In [3]:
cursor = conn.cursor()
res = cursor.execute("select * from ecommerce_data;")
res

ProgrammingError: 1146 (42S02): Table 'ecommerce_data.ecommerce_data' doesn't exist

In [None]:
def store_data(df, table_name):
    conn = mysql.connector.connect(
        host="localhost",
        user="ecommerce_user",
        password="password",
        database="ecommerce_data"
    )
    cursor = conn.cursor()
    for _, row in df.iterrows():
        if table_name == "products":
            cursor.execute("""
                INSERT INTO products (id, title, price, description, category, image)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                title = VALUES(title),
                price = VALUES(price),
                description = VALUES(description),
                category = VALUES(category),
                image = VALUES(image);
            """,tuple(row))
        elif table_name == "users":
            cursor.execute("""
                INSERT INTO users (id, email, username, password, name, address, phone)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                email = VALUES(email),
                username = VALUES(username),
                password = VALUES(password),
                name = VALUES(name),
                address = VALUES(address),
                phone = VALUES(phone);
            """, tuple(row))
            conn.commit()
            cursor.close()
            conn.close()
            

In [None]:
def run_pipeline():
    products_df = fetch_products_data()
    store_data(products_df, "products")

    users_df = fetch_users_data()
    store_data(users_df, "users")
    