# Ejemplo Entregable 1

Pasos del ejemplo:
* Bajar datos de una API en formato JSON
* Cargar datos en la tabla de Redshift

Esto lo vamos a llevar a cabo usando `requests`, `Spark` y un driver de conexión de `Postgres`

![Imagen](./entregable_arquitectura.png)

## 1) Bajar datos de una API en formato JSON

Para este ejemplo vamos a usar la API de [Datos Argentina](https://www.datos.gob.ar/)

Y nos vamos a traer los datos de: Exportaciones de cereales. En millones de dólares FOB

Para probar la API ir a: [API de Series de Tiempo AR: Generador de URLs](https://datosgobar.github.io/series-tiempo-ar-call-generator/)

In [1]:
import requests
import urllib.parse

def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))

In [2]:
# Ejemplo: https://apis.datos.gob.ar/series/api/series?ids=75.3_IEC_0_M_26&start_date=2020-01-01
api_call = get_api_call(["75.3_IEC_0_M_26"], start_date="2020-01-01")
print(api_call)

https://apis.datos.gob.ar/series/api/series?start_date=2020-01-01&ids=75.3_IEC_0_M_26


In [3]:
result = requests.get(api_call).json()
print(result)

{'data': [['2020-01-01', 1158.0], ['2020-02-01', 762.0], ['2020-03-01', 947.0], ['2020-04-01', 943.0], ['2020-05-01', 788.0], ['2020-06-01', 845.0], ['2020-07-01', 806.0], ['2020-08-01', 862.0], ['2020-09-01', 605.0], ['2020-10-01', 529.0], ['2020-11-01', 358.0], ['2020-12-01', 404.0], ['2021-01-01', 773.0], ['2021-02-01', 682.0], ['2021-03-01', 1041.0], ['2021-04-01', 1211.0], ['2021-05-01', 1074.0], ['2021-06-01', 1303.0], ['2021-07-01', 1447.0], ['2021-08-01', 1411.0], ['2021-09-01', 1312.0], ['2021-10-01', 1194.0], ['2021-11-01', 880.0], ['2021-12-01', 1329.0], ['2022-01-01', 1528.0], ['2022-02-01', 1415.0], ['2022-03-01', 1642.0], ['2022-04-01', 1837.0], ['2022-05-01', 1584.0], ['2022-06-01', 1403.0], ['2022-07-01', 1712.0], ['2022-08-01', 1330.0], ['2022-09-01', 957.0], ['2022-10-01', 727.0], ['2022-11-01', 650.0], ['2022-12-01', 887.0], ['2023-01-01', 740.0], ['2023-02-01', 712.0], ['2023-03-01', 952.0], ['2023-04-01', 678.0]], 'count': 40, 'meta': [{'frequency': 'month', 'start

## 2) Cargar datos en la tabla de Redshift

La tabla debe estar creada en el schema que esté usando. El create table es el siguiente:

```SQL
create table if not exists lucas_trubiano_coderhouse.exportaciones_cereales (
    date_from VARCHAR(10) distkey,
    millones_dolares decimal(10,2),
    frequency varchar(12)
) sortkey(date_from);
```

In [4]:
!pip install psycopg2-binary



In [5]:
# Crear sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [6]:
env = os.environ

Revisar documentación:
* [AWS Redshift + Spark documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-redshift.html)
* [Spark + Redshift connector](https://github.com/spark-redshift-community/spark-redshift#readme)

In [7]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [8]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.exportaciones_cereales (
    date_from VARCHAR(10) distkey,
    millones_dolares decimal(10,2),
    frequency varchar(12)
) sortkey(date_from);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [9]:
cursor = conn.cursor()
cursor.execute(f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['AWS_REDSHIFT_SCHEMA']}';
""")
# resultado = cursor.fetchall()
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

agents, calls, customers, exportaciones_cereales, ventas


In [10]:
# Create the DataFrame with the specified column names
df = spark.createDataFrame(result['data'], ["date_from", "millones_dolares"])

In [11]:
df.printSchema()
df.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)

+----------+----------------+
| date_from|millones_dolares|
+----------+----------------+
|2020-01-01|          1158.0|
|2020-02-01|           762.0|
|2020-03-01|           947.0|
|2020-04-01|           943.0|
|2020-05-01|           788.0|
|2020-06-01|           845.0|
|2020-07-01|           806.0|
|2020-08-01|           862.0|
|2020-09-01|           605.0|
|2020-10-01|           529.0|
|2020-11-01|           358.0|
|2020-12-01|           404.0|
|2021-01-01|           773.0|
|2021-02-01|           682.0|
|2021-03-01|          1041.0|
|2021-04-01|          1211.0|
|2021-05-01|          1074.0|
|2021-06-01|          1303.0|
|2021-07-01|          1447.0|
|2021-08-01|          1411.0|
+----------+----------------+
only showing top 20 rows



In [12]:
df_to_write = df.withColumn('frequency', lit('Mensual'))
df_to_write.printSchema()
df_to_write.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)
 |-- frequency: string (nullable = false)

+----------+----------------+---------+
| date_from|millones_dolares|frequency|
+----------+----------------+---------+
|2020-01-01|          1158.0|  Mensual|
|2020-02-01|           762.0|  Mensual|
|2020-03-01|           947.0|  Mensual|
|2020-04-01|           943.0|  Mensual|
|2020-05-01|           788.0|  Mensual|
|2020-06-01|           845.0|  Mensual|
|2020-07-01|           806.0|  Mensual|
|2020-08-01|           862.0|  Mensual|
|2020-09-01|           605.0|  Mensual|
|2020-10-01|           529.0|  Mensual|
|2020-11-01|           358.0|  Mensual|
|2020-12-01|           404.0|  Mensual|
|2021-01-01|           773.0|  Mensual|
|2021-02-01|           682.0|  Mensual|
|2021-03-01|          1041.0|  Mensual|
|2021-04-01|          1211.0|  Mensual|
|2021-05-01|          1074.0|  Mensual|
|2021-06-01|          1303.0|  Mensual|
|2021-07-01|          1

In [13]:
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.exportaciones_cereales") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [14]:
# Query Redshift using Spark SQL
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.exportaciones_cereales"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [15]:
data.printSchema()
data.show()

root
 |-- date_from: string (nullable = true)
 |-- millones_dolares: double (nullable = true)
 |-- frequency: string (nullable = true)

+----------+----------------+---------+
| date_from|millones_dolares|frequency|
+----------+----------------+---------+
|2020-01-01|          1158.0|  Mensual|
|2020-02-01|           762.0|  Mensual|
|2020-03-01|           947.0|  Mensual|
|2020-04-01|           943.0|  Mensual|
|2020-05-01|           788.0|  Mensual|
|2020-06-01|           845.0|  Mensual|
|2020-07-01|           806.0|  Mensual|
|2020-08-01|           862.0|  Mensual|
|2020-09-01|           605.0|  Mensual|
|2020-10-01|           529.0|  Mensual|
|2020-11-01|           358.0|  Mensual|
|2020-12-01|           404.0|  Mensual|
|2021-01-01|           773.0|  Mensual|
|2021-02-01|           682.0|  Mensual|
|2021-03-01|          1041.0|  Mensual|
|2021-04-01|          1211.0|  Mensual|
|2021-05-01|          1074.0|  Mensual|
|2021-06-01|          1303.0|  Mensual|
|2021-07-01|          14

In [16]:
conn.close()