# ETL de datos de importación de productos

## Instalación de librerías base

In [None]:
import pandas as pd
import sqlalchemy
# from sqlalchemy import create_engine 
from sqlalchemy import text, create_engine
import psycopg2
import uuid

## Extraction

In [None]:
engine = create_engine('postgresql+psycopg2://postgres:mysecretpass@localhost:5432/postgres')
with engine.begin()as conn:
    query = text("""SELECT * FROM trades""")
    df_trades = pd.read_sql_query(query, engine)

#engine = create_engine('postgresql+psycopg2://postgres:mysecretpass@localhost:5432/postgres')
#df_trades = pd.read_sql("select * from trades", engine)
#conn = engine.connect()
#query = text('select *  from trades')
#pd.read_sql(text, conn)

In [None]:
df_codes= pd.read_csv('./hs_codes_862da485-f4db-473c-95ce-47b85fe08791.csv')
df_parents = df_codes[df_codes['Level']==2].copy()
df_countries = pd.read_json('./country_data.json')


## Transform

#### Clean codes

In [None]:
df_codes = df_codes[df_codes['Code_comm'].notnull()]

In [None]:
def clean_code(text):
    text = str(text)
    parent_code = None
    if len(text) == 11:
        code = text[:5]
        parent_code = text[:1]
    else:
        code = text[:6]
        parent_code = text[:2]
    try:
        parent = df_parents[df_parents['Code_comm']==parent_code]['Description'].values[0]
    except:
        parent = None
    return (code,parent)


In [None]:
df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x : clean_code(x['Code']),axis=1,result_type='expand')

In [None]:
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code','Description','parent_description']] 

In [None]:
df_codes['id_code'] = df_codes.index + 1

In [None]:
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')

### Clean Countries

In [None]:
df_countries = df_countries[['alpha-3','country','region','sub-region']]

In [None]:
df_countries = df_countries[df_countries['alpha-3'].notnull()]

In [None]:
df_countries['id_country'] = df_countries.index + 1

### Merge

In [None]:
df_trades_clean =df_trades.merge(df_codes[['clean_code','id_code']],how='left',left_on='comm_code',right_on='clean_code')

In [None]:
df_trades_clean =df_trades_clean.merge(df_countries[['alpha-3','id_country']],how='left',left_on='country_code',right_on='alpha-3')

### Clean trades

In [None]:
def create_dimension(data, id_name):
    list_keys = []
    value = 1 
    for _ in data:
        list_keys.append(value)
        value = value + 1 
    return pd.DataFrame({id_name:list_keys,'values':data})


In [None]:
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(),'id_quantity')

df_flow = create_dimension(df_trades_clean['flow'].unique(),'id_flow')

df_year = create_dimension(df_trades_clean['year'].unique(),'id_year')

In [None]:
df_trades_clean = df_trades_clean.merge(df_quantity,how='left', left_on='quantity_name', right_on='values')

df_trades_clean = df_trades_clean.merge(df_flow,how='left', left_on='flow', right_on='values')

df_trades_clean = df_trades_clean.merge(df_year,how='left', left_on='year', right_on='values')

In [None]:
df_trades_clean['id_trades'] = df_trades_clean.index + 1

In [98]:
df_trades_final = df_trades_clean[['id_trades','trade_usd','kg','quantity','id_code','id_country','id_quantity','id_flow','id_year']].copy()

In [None]:
df_countries = df_countries[['id_country','alpha-3','country','region','sub-region']]

In [None]:
df_codes = df_codes[['id_code','clean_code','Description','parent_description']]

## Load

In [99]:
df_trades_final.to_csv('target/trades.csv', index=False, sep='|')
df_countries.to_csv('target/countries.csv', index=False, sep='|')
df_codes.to_csv('target/codes.csv', index=False, sep='|')
df_quantity.to_csv('target/quantity.csv', index=False, sep='|')
df_flow.to_csv('target/flow.csv', index=False, sep='|')
df_year.to_csv('target/year.csv', index=False, sep='|')


In [None]:
# import os
# import boto3
# import redshift_connector 


# client = boto3.client(
#     's3',
#     aws_access_key_id = os.environ.get('aws_access_key_id'),
#     aws_secret_access_key = os.environ.get('aws_secret_access_key'),
# )

# conn = redshift_connector.connect(
#     host = os.environ.get('redshift_host'),
#     database=  os.environ.get('redshift_database'),
#     port= 5439,
#     user= os.environ.get('redshift_user'),
#     password=  os.environ.get('redshift_password')
# )

# cursor = conn.cursor()




In [None]:
# def load_file(file_name) :
#     table_name = file_name.split('.')[0]
#     client.upload_file(
#         Filename='target/{}'.format(file_name),
#         Bucket='platzi-etl',
#         Key='course_etl_target/{}'.format(file_name),
#     )
#     sentence = '''copy public.{} from 's3://curso-pentaho-platzi/{}' credentials 'aws_access_key_id={};aws_secret_access_key={}' csv delimiter '|' region 'us-west-2' ignoreheader 1'''.format(table_name, file_name, os.environ.get('aws_access_key_id'), os.environ.get('aws_secret_access_key'))
#     try:
#         cursor.execute
#         print('tabla cargada correctamente'+table_name)

#     except:
#         print('error en la tabla'+table_name)

In [None]:
import os
import boto3

# Obtenga las credenciales de AWS de las variables de entorno
aws_access_key_id = 
aws_secret_access_key = 

# Cree una sesión de boto3 usando las credenciales de las variables de entorno
session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
)

# Cree un objeto S3 utilizando la sesión creada anteriormente
s3 = session.resource('s3')

# Ahora puede usar el objeto s3 para interactuar con los objetos del bucket
bucket = s3.Bucket('curso-pentaho-platzi')
s3.meta.client.upload_file('./target/trades.csv', 'curso-pentaho-platzi', 'trades.csv')


In [97]:
bucket = s3.Bucket('curso-pentaho-platzi')
for obj in bucket.objects.all():
    print(obj.key)

target/codes.csv
target/countries.csv
target/flow.csv
target/quantity.csv
target/trades.csv
target/year.csv
