#### Cargaremos la información de mysql a los distintas fuentes que utilizaremos

In [2]:
import sqlalchemy as db
from sqlalchemy import text
import pandas as pd

In [3]:
engine = db.create_engine("mysql://root:root@192.168.1.12:3310/retail_db")
conn = engine.connect()

In [5]:
customers_df = pd.read_sql_query(text('SELECT * FROM customers'), con=conn)

In [8]:
orders_df = pd.read_sql_query(text('SELECT * FROM orders'), con=conn)

In [9]:
order_items_df = pd.read_sql_query(text('SELECT * FROM order_items'), con=conn)

In [10]:
products_df = pd.read_sql_query(text('SELECT * FROM products'), con=conn)

In [11]:
categories_df = pd.read_sql_query(text('SELECT * FROM categories'), con=conn)

In [12]:
departments_df = pd.read_sql_query(text('SELECT * FROM departments'), con=conn)

In [6]:
customers_df.head()

Unnamed: 0,customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode
0,1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
1,2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
2,3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,725
3,4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069
4,5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,10 Crystal River Mall,Caguas,PR,725


In [14]:
categories_df.head()

Unnamed: 0,category_id,category_department_id,category_name
0,1,2,Football
1,2,2,Soccer
2,3,2,Baseball & Softball
3,4,2,Basketball
4,5,2,Lacrosse


#### Cargamos los datos a Azure Data Lake

In [15]:
from azure.storage.blob import ContainerClient
import io

In [19]:
conn_str = "BlobEndpoint=https://adlsdatapath.blob.core.windows.net/;QueueEndpoint=https://adlsdatapath.queue.core.windows.net/;FileEndpoint=https://adlsdatapath.file.core.windows.net/;TableEndpoint=https://adlsdatapath.table.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=bfqt&srt=co&sp=rwdlacupyx&se=2023-06-07T09:05:36Z&st=2023-06-07T01:05:36Z&spr=https,http&sig=RjAPfpvS6QLpsp5gGKqFyGqi4hW7BJ5C5rPC1n7RwVE%3D"
container = "source"

container_client = ContainerClient.from_connection_string(
    conn_str=conn_str, 
    container_name=container
)


In [20]:
output = io.StringIO()
output = customers_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/karla/customers", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fcdf40b78b0>

In [21]:
output = io.StringIO()
output = orders_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/karla/orders", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fcdf5135eb0>

In [22]:
output = io.StringIO()
output = order_items_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/karla/order_items", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fcdf52c0e50>

In [23]:
output = io.StringIO()
output = products_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/karla/products", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fcdf569bac0>

In [24]:
output = io.StringIO()
output = categories_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/karla/categories", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fcdf52cb070>

In [25]:
output = io.StringIO()
output = departments_df.to_csv(encoding = "utf-8", index=False)
container_client.upload_blob("retail/karla/departments", output, overwrite=True, encoding='utf-8')

<azure.storage.blob._blob_client.BlobClient at 0x7fcdefd04100>

#### Cargamos los datos a Cloud Storage

In [26]:
import os
from google.cloud.storage import Client

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/user/app/ProyectoEndToEndPython/Clases/1.Data Ingest/dep12-386900-aa972d12d985.json"


In [27]:
client = Client()
bucket = client.get_bucket('karladep12')


In [28]:
bucket.blob('retail/customers').upload_from_string(customers_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [29]:
bucket.blob('retail/orders').upload_from_string(orders_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [30]:
bucket.blob('retail/order_items').upload_from_string(order_items_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [31]:
bucket.blob('retail/products').upload_from_string(products_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [32]:
bucket.blob('retail/categories').upload_from_string(categories_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

In [33]:
bucket.blob('retail/departments').upload_from_string(departments_df.to_csv(encoding = "utf-8", index=False), 'text/csv')

#### Cargamos los datos a MongoDB

In [40]:
from pymongo import MongoClient
def get_database():

    CONNECTION_STRING = "mongodb+srv://dep12karla:987654321@clusterbase.pnm8idk.mongodb.net/?retryWrites=true&w=majority"
    client = MongoClient(CONNECTION_STRING)

    return client['retail_db']

In [41]:
df = customers_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["customers"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7fcd9d8a89d0>

In [42]:
df = orders_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["orders"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7fcd9d8a84c0>

In [43]:
df = order_items_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["order_items"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7fcddde6ceb0>

In [44]:
df = products_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["products"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7fcdef5124f0>

In [45]:
df = categories_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["categories"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7fcd9d8a5fd0>

In [46]:
df = departments_df.copy()
df.reset_index(inplace=False)
df_to_dict = df.to_dict("records")
dbname = get_database()
dbname["departments"].insert_many(df_to_dict)

<pymongo.results.InsertManyResult at 0x7fcddecae4c0>