# Ejemplo Entregable 1

Pasos del ejemplo:
* Bajar datos de una API en formato JSON
* Cargar datos en la tabla de Redshift

Esto lo vamos a llevar a cabo usando `requests`, `Spark` y un driver de conexión de `Postgres`

![Imagen](./entregable_arquitectura.png)

## 1) Bajar datos de una API en formato JSON

Para este ejemplo vamos a usar la API de [Datos Argentina](https://www.datos.gob.ar/)

Y nos vamos a traer los datos de:Patentamiento automotor. En miles de unidades.

Para probar la API ir a: [API de Series de Tiempo AR: Generador de URLs](https://datosgobar.github.io/series-tiempo-ar-call-generator/)

In [1]:
import requests
import urllib.parse

def get_api_call(ids, **kwargs):
    API_BASE_URL = "https://apis.datos.gob.ar/series/api/"
    kwargs["ids"] = ",".join(ids)
    return "{}{}?{}".format(API_BASE_URL, "series", urllib.parse.urlencode(kwargs))

In [2]:
#https://apis.datos.gob.ar/series/api/series?ids=Automotriz_patentamiento_Izd1M1&collapse=year&start_date=2010-01-01&end_date=2022-05-31&format=json
# Ejemplo: https://apis.datos.gob.ar/series/api/series?ids=75.3_IEC_0_M_26&start_date=2020-01-01
api_call = get_api_call(["Automotriz_patentamiento_Izd1M1"], start_date="2010-01-01")
print(api_call)

https://apis.datos.gob.ar/series/api/series?start_date=2010-01-01&ids=Automotriz_patentamiento_Izd1M1


In [3]:
result = requests.get(api_call).json()
print(result)

{'data': [['2010-01-01', 801.0], ['2010-02-01', 519.0], ['2010-03-01', 931.0], ['2010-04-01', 898.0], ['2010-05-01', 777.0], ['2010-06-01', 844.0], ['2010-07-01', 810.0], ['2010-08-01', 782.0], ['2010-09-01', 878.0], ['2010-10-01', 834.0], ['2010-11-01', 904.0], ['2010-12-01', 658.0], ['2011-01-01', 1045.0], ['2011-02-01', 988.0], ['2011-03-01', 995.0], ['2011-04-01', 1031.0], ['2011-05-01', 1100.0], ['2011-06-01', 1086.0], ['2011-07-01', 1103.0], ['2011-08-01', 1140.0], ['2011-09-01', 1192.0], ['2011-10-01', 1071.0], ['2011-11-01', 1083.0], ['2011-12-01', 638.0], ['2012-01-01', 1162.0], ['2012-02-01', 811.0], ['2012-03-01', 1230.0], ['2012-04-01', 815.0], ['2012-05-01', 1003.0], ['2012-06-01', 842.0], ['2012-07-01', 1051.0], ['2012-08-01', 1325.0], ['2012-09-01', 929.0], ['2012-10-01', 974.0], ['2012-11-01', 1052.0], ['2012-12-01', 693.0], ['2013-01-01', 1006.0], ['2013-02-01', 851.0], ['2013-03-01', 1155.0], ['2013-04-01', 1017.0], ['2013-05-01', 1159.0], ['2013-06-01', 964.0], ['201

## 2) Cargar datos en la tabla de Redshift

La tabla debe estar creada en el schema que esté usando. El create table es el siguiente:

```SQL
create table if not exists luciano_dinaso_coderhouse.patentamiento_automotor (
    date_from VARCHAR(10) distkey,
    miles_unidades decimal(10,2),
    frequency varchar(12)
) sortkey(date_from);
```

In [4]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.6


In [5]:
# Crear sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [6]:
env = os.environ

Revisar documentación:
* [AWS Redshift + Spark documentation](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-redshift.html)
* [Spark + Redshift connector](https://github.com/spark-redshift-community/spark-redshift#readme)

In [7]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [8]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.patentamiento_automotor (
    date_from VARCHAR(10) distkey,
    miles_unidades decimal(10,2),
    frequency varchar(12)
) sortkey(date_from);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [9]:
cursor = conn.cursor()
cursor.execute(f"""
SELECT
  distinct tablename
FROM
  PG_TABLE_DEF
WHERE
  schemaname = '{env['AWS_REDSHIFT_SCHEMA']}';
""")
# resultado = cursor.fetchall()
print(", ".join(map(lambda x: x[0], cursor.fetchall())))
cursor.close()

patentamiento_automotor


In [10]:
# Create the DataFrame with the specified column names
df = spark.createDataFrame(result['data'], ["date_from", "miles_unidades"])

In [11]:
df.printSchema()
df.show()

root
 |-- date_from: string (nullable = true)
 |-- miles_unidades: double (nullable = true)

+----------+--------------+
| date_from|miles_unidades|
+----------+--------------+
|2010-01-01|         801.0|
|2010-02-01|         519.0|
|2010-03-01|         931.0|
|2010-04-01|         898.0|
|2010-05-01|         777.0|
|2010-06-01|         844.0|
|2010-07-01|         810.0|
|2010-08-01|         782.0|
|2010-09-01|         878.0|
|2010-10-01|         834.0|
|2010-11-01|         904.0|
|2010-12-01|         658.0|
|2011-01-01|        1045.0|
|2011-02-01|         988.0|
|2011-03-01|         995.0|
|2011-04-01|        1031.0|
|2011-05-01|        1100.0|
|2011-06-01|        1086.0|
|2011-07-01|        1103.0|
|2011-08-01|        1140.0|
+----------+--------------+
only showing top 20 rows



In [14]:
df_to_write = df.withColumn('frequency', lit('Mensual'))
df_to_write.printSchema()
df_to_write.show()

root
 |-- date_from: string (nullable = true)
 |-- miles_unidades: double (nullable = true)
 |-- frequency: string (nullable = false)

+----------+--------------+---------+
| date_from|miles_unidades|frequency|
+----------+--------------+---------+
|2010-01-01|         801.0|  Mensual|
|2010-02-01|         519.0|  Mensual|
|2010-03-01|         931.0|  Mensual|
|2010-04-01|         898.0|  Mensual|
|2010-05-01|         777.0|  Mensual|
|2010-06-01|         844.0|  Mensual|
|2010-07-01|         810.0|  Mensual|
|2010-08-01|         782.0|  Mensual|
|2010-09-01|         878.0|  Mensual|
|2010-10-01|         834.0|  Mensual|
|2010-11-01|         904.0|  Mensual|
|2010-12-01|         658.0|  Mensual|
|2011-01-01|        1045.0|  Mensual|
|2011-02-01|         988.0|  Mensual|
|2011-03-01|         995.0|  Mensual|
|2011-04-01|        1031.0|  Mensual|
|2011-05-01|        1100.0|  Mensual|
|2011-06-01|        1086.0|  Mensual|
|2011-07-01|        1103.0|  Mensual|
|2011-08-01|        1140.0|  

In [15]:
df_to_write.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.patentamiento_automotor") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [16]:
# Query Redshift using Spark SQL
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.patentamiento_automotor"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [19]:
data.printSchema()
data.show()

root
 |-- date_from: string (nullable = true)
 |-- miles_unidades: double (nullable = true)
 |-- frequency: string (nullable = true)

+----------+--------------+---------+
| date_from|miles_unidades|frequency|
+----------+--------------+---------+
|2010-01-01|         801.0|  Mensual|
|2010-02-01|         519.0|  Mensual|
|2010-03-01|         931.0|  Mensual|
|2010-04-01|         898.0|  Mensual|
|2010-05-01|         777.0|  Mensual|
|2010-06-01|         844.0|  Mensual|
|2010-07-01|         810.0|  Mensual|
|2010-08-01|         782.0|  Mensual|
|2010-09-01|         878.0|  Mensual|
|2010-10-01|         834.0|  Mensual|
|2010-11-01|         904.0|  Mensual|
|2010-12-01|         658.0|  Mensual|
|2011-01-01|        1045.0|  Mensual|
|2011-02-01|         988.0|  Mensual|
|2011-03-01|         995.0|  Mensual|
|2011-04-01|        1031.0|  Mensual|
|2011-05-01|        1100.0|  Mensual|
|2011-06-01|        1086.0|  Mensual|
|2011-07-01|        1103.0|  Mensual|
|2011-08-01|        1140.0|  M

In [18]:
conn.close()