# ETL de datos de importación de productos

## Instalación de librerías base

In [1]:
import pandas as pd
from sqlalchemy import create_engine
import uuid

## Extraction

In [2]:
engine = create_engine('postgresql+psycopg2://postgres:mysecretpass@localhost/postgres')
df_trades  = pd.read_sql("select * from trades", engine)

In [3]:
df_countries = pd.read_json('src/country_data.json')

In [4]:
df_codes = pd.read_csv('src/hs_codes.csv')

In [5]:
df_parents = df_codes[df_codes['Level']==2].copy()

In [6]:
df_parents.head()

Unnamed: 0,Order,Level,Code,Parent,Code_comm,Parent.1,Description_complex,Description
2,1654557,2,10021000090,10011000000.0,1,I,CHAPTER 1 - LIVE ANIMALS,LIVE ANIMALS
52,1654607,2,20021000090,10011000000.0,2,I,CHAPTER 2 - MEAT AND EDIBLE MEAT OFFAL,MEAT AND EDIBLE MEAT OFFAL
140,1654695,2,30021000090,10011000000.0,3,I,"CHAPTER 3 - FISH AND CRUSTACEANS, MOLLUSCS AND...","FISH AND CRUSTACEANS, MOLLUSCS AND OTHER AQUAT..."
416,1654971,2,40021000090,10011000000.0,4,I,CHAPTER 4 - DAIRY PRODUCE; BIRDS' EGGS; NATURA...,DAIRY PRODUCE; BIRDS' EGGS; NATURAL HONEY; EDI...
463,1655018,2,50021000090,10011000000.0,5,I,"CHAPTER 5 - PRODUCTS OF ANIMAL ORIGIN, NOT ELS...","PRODUCTS OF ANIMAL ORIGIN, NOT ELSEWHERE SPECI..."


## Transform

#### Clean codes

In [7]:
df_codes = df_codes[df_codes['Code_comm'].notnull()]

In [8]:
def clean_code(text):
  text = str(text)
  parent_code = None
  if len(text) == 11:
    code = text[:5]
    parent_code = text[:1]
  else:
    code = text[:6]
    parent_code = text[:2]
  try:
    parent = df_parents[df_parents['Code_comm']==parent_code]['Description'].values[0]
  except:
    parent = None
  return(code, parent)

In [9]:
df_codes[['clean_code','parent_description']] = df_codes.apply(lambda x :clean_code(x['Code']), axis=1, result_type='expand')

In [10]:
df_codes = df_codes[df_codes['clean_code'].notnull()][['clean_code','Description','parent_description']]
df_codes.head()

Unnamed: 0,clean_code,Description,parent_description
1,10011,LIVE ANIMALS; ANIMAL PRODUCTS,LIVE ANIMALS
2,10021,LIVE ANIMALS,LIVE ANIMALS
3,10100,"Live horses, asses, mules and hinnies",LIVE ANIMALS
5,10121,Pure-bred breeding horses,LIVE ANIMALS
6,10129,Live horses (excl. pure-bred for breeding),LIVE ANIMALS


In [11]:
df_codes['id_code'] = df_codes.index + 1

In [12]:
df_codes['clean_code'] = df_codes['clean_code'].astype('int64')

### Clean Countries

In [13]:
df_contries = df_countries[['alpha-3','country','region', 'sub-region']]

In [14]:
df_countries = df_countries[df_countries['alpha-3'].notnull()]

In [15]:
df_countries['id_country'] = df_countries.index + 1

### Merge

In [16]:
df_trades_clean = df_trades.merge(df_codes[['clean_code', 'id_code']], how='left', left_on='comm_code', right_on='clean_code')

In [17]:
df_trades_clean = df_trades_clean.merge(df_countries[['alpha-3', 'id_country']], how='left', left_on='country_code', right_on ='alpha-3')

### Clean trades

In [18]:
def create_dimension(data, id_name):
    list_keys = []
    value = 1
    for _ in data:
        list_keys.append(value)
        value = value + 1
    return pd.DataFrame({id_name:list_keys, 'values':data})

In [19]:
df_quantity = create_dimension(df_trades_clean['quantity_name'].unique(), 'id_quantity')

df_flow = create_dimension(
    df_trades_clean['flow'].unique(), 'id_flow')


df_year = create_dimension(
    df_trades_clean['year'].unique(), 'id_year')


In [20]:
df_trades_clean = df_trades_clean.merge(df_quantity, how='left', left_on='quantity_name', right_on='values')

df_trades_clean = df_trades_clean.merge(df_flow, how='left', left_on='flow', right_on='values')

df_trades_clean = df_trades_clean.merge(df_year, how='left', left_on='year', right_on='values')


In [21]:
df_trades_clean['id_trades'] = df_trades_clean.index + 1

In [22]:
df_trades_final = df_trades_clean[['id_trades','trade_usd','kg','quantity','id_code','id_country','id_quantity', 'id_flow', 'id_year']].copy()

In [23]:
df_countries = df_countries[['id_country','alpha-3','country','region','sub-region']]

In [24]:
df_codes =  df_codes[['id_code','clean_code','Description','parent_description']]

## Load

In [25]:
df_trades_final.to_csv('target/trades.csv', index=False, sep='|')
df_countries.to_csv('target/countries.csv', index=False, sep='|')
df_codes.to_csv('target/codes.csv', index=False, sep='|')
df_quantity.to_csv('target/quantity.csv', index=False, sep='|')
df_flow.to_csv('target/flow.csv', index=False, sep='|')
df_years.to_csv('target/year.csv', index=False, sep='|')

In [55]:
import os
import boto3
import redshift_connector

client = boto3.client(
  's3',
  aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID'),
  aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
)

conn = redshift_connector.connect(
    host= os.environ.get('redshift-host'),
    database= os.environ.get('redshift_database'),
    port= 5439,
    user= os.environ.get('redshift_user'),
    password= os.environ.get('redshift_password'),
)

cursor = conn.cursor()


In [56]:
def load_file(file_name):
    table_name = file_name.split('.')[0]
    client.upload_file(
        Filename = 'target/{}'.format(file_name),
        Bucket='demo-platzi-curso-etl-sochoag',
        Key='course_etl_target/{}'.format(file_name),
    )

    sentence = '''copy public.{} from 's3://demo-platzi-curso-etl-sochoag/course_etl_target/{}' credentials 'aws_access_key_id={};aws_secret_access_key={}' csv delimiter '|' region 'us-east-2' ignoreheader 1'''.format(
        table_name, file_name, os.environ.get('AWS_ACCESS_KEY_ID'), os.environ.get('AWS_SECRET_ACCESS_KEY'))
    try:
        cursor.execute(sentence)
        print('Ok en la tabla '+ table_name)
    except Exception as e:
        print(e) 
        print('Error en la tabla '+ table_name)

In [59]:
load_file('codes.csv')
load_file('countries.csv')
load_file('flow.csv')
load_file('quantity.csv')
load_file('trades.csv')
load_file('years.csv')

Ok en la tabla codes
Ok en la tabla countries
Ok en la tabla flow
Ok en la tabla quantity
Ok en la tabla trades
Ok en la tabla years


In [None]:
conn.commit()
conn.close()