# TODO
- Load in new Redshift table.

## Spark setup

In [1]:
from os import environ
from pyspark.sql import SparkSession

In [2]:


spark = SparkSession.builder \
        .master("local") \
        .appName("entregable-2") \
        .getOrCreate()

     # .config("spark.jars", driver_path) \
        # .config("spark.executor.extraClassPath", driver_path) \

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.driver.port', '37787'),
 ('spark.executor.id', 'driver'),
 ('spark.app.initial.jar.urls',
  'spark://6ad3cba5f73b:37787/jars/RedshiftJDBC42-no-awssdk-1.2.36.1060.jar,spark://6ad3cba5f73b:37787/jar

## Lectura de datos de la API en formato JSON 

Los datos tienen la siguiente estructura

```json
[
    {dato1},
    {dato2},
    ...,
    {datoN}
]
```

Activamos la opción [multiline](https://sparkbyexamples.com/pyspark/pyspark-read-json-file-into-dataframe/#read-json-multiline) para que Spark pueda armar el DataFrame correctamente. 

In [4]:
df = spark.read.option("multiline", "true").json("api/motorcycles.json")

## Exploración de los datos

Una posible mejora del entregable 1 era:

- El schema puede variar según el fabricante. Podríamos obtener todas las características disponibles en los datos extraídos y definir un schema con todas las columnas posibles para la tabla de Redshift en lugar de priozar un subconjunto de ellas.

La tabla creada en la entrega anterior tiene 24 columnas:

```
1 make
2 model
3 year
4 type
5 displacement
6 engine
7 power
8 top_speed
9 compression
10 bore_stroke
11 cooling
12 fuel_consumption
13 emission
14 front_suspension
15 rear_suspension
16 front_tire
17 rear_tire
18 front_brakes
19 rear_brakes
20 dry_weight
21 total_height
22 total_length
23 total_width
24 starter
```

Veamos lo que nos dice Spark.

In [5]:
df_cols = df.columns

print(f"# columnas = {len(df_cols)}")

# columnas = 41


Esto nos dice que Spark fue capaz de entender la estructura de los datos desde la carga. El DataFrame tiene un método más cómodo para visualizar el schema completo

In [6]:
df.printSchema()

root
 |-- bore_stroke: string (nullable = true)
 |-- clutch: string (nullable = true)
 |-- compression: string (nullable = true)
 |-- cooling: string (nullable = true)
 |-- displacement: string (nullable = true)
 |-- dry_weight: string (nullable = true)
 |-- emission: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- frame: string (nullable = true)
 |-- front_brakes: string (nullable = true)
 |-- front_suspension: string (nullable = true)
 |-- front_tire: string (nullable = true)
 |-- front_wheel_travel: string (nullable = true)
 |-- fuel_capacity: string (nullable = true)
 |-- fuel_consumption: string (nullable = true)
 |-- fuel_control: string (nullable = true)
 |-- fuel_system: string (nullable = true)
 |-- gearbox: string (nullable = true)
 |-- ground_clearance: string (nullable = true)
 |-- ignition: string (nullable = true)
 |-- lubrication: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- power: string (null

¡Genial! Ahora veamos algunos valores

In [7]:
df.show(1)

+--------------------+------+-----------+-------+--------------------+--------------------+--------------------+--------------------+-----+------------+----------------+----------+------------------+-------------+--------------------+------------+-----------+-------+----------------+--------+-----------+-------+-----------------+--------------------+--------------------+---------------+----------+-----------------+-----------+---------------+--------------------+------+--------------------+--------------------+------------+--------------------+------------+-----+-------------------+---------+----+
|         bore_stroke|clutch|compression|cooling|        displacement|          dry_weight|            emission|              engine|frame|front_brakes|front_suspension|front_tire|front_wheel_travel|fuel_capacity|    fuel_consumption|fuel_control|fuel_system|gearbox|ground_clearance|ignition|lubrication|   make|            model|               power|         rear_brakes|rear_suspension| rear_

Ilegible. Al parecer es un problema de Jupyter al formatear la tabla que imprime Spark. Probemos jugando con algunos parámetros de [show](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.DataFrame.show.html)

In [8]:
df.show(n=1, truncate=False, vertical=True)

-RECORD 0--------------------------------------------------------------
 bore_stroke         | 52.4 x 49.5 mm (2.1 x 1.9 inches)               
 clutch              | null                                            
 compression         | 8.8:1                                           
 cooling             | Air                                             
 displacement        | 110.0 ccm (6.71 cubic inches)                   
 dry_weight          | 99.0 kg (218.3 pounds)                          
 emission            | 48.7 CO2 g/km. (CO2 - Carbon dioxide emission)  
 engine              | Single cylinder, four-stroke                    
 frame               | null                                            
 front_brakes        | Single disc                                     
 front_suspension    | Telescopic fork                                 
 front_tire          | 130/60-13                                       
 front_wheel_travel  | null                                     

Mucho mejor. Vemos que algunas columnas son `null`, lo que tiene sentido porque algunos fabricantes incluyen datos que otros no.

Busquemos filas duplicadas. A la cantidad total de filas vamos a restarle la cantidad de filas distintas

In [9]:
total_rows = df.select(df_cols).count()
print(f"trs = {total_rows}")

trs = 750


In [10]:
total_distinct_rows = df.select(df_cols).distinct().count()
print(f"tdrs = {total_distinct_rows}")

tdrs = 750


In [11]:
repeated_rows = total_rows - total_distinct_rows
print(f"Repeated rows: trs - tdrs = {repeated_rows}")

Repeated rows: trs - tdrs = 0


Por lo que no tenemos datos repetidos.

## Transformación de los datos

### Año

Si volvemos al schema del DataFrame vamos a notar que la columna `year` es de tipo `string`. Hagamos, por conveniencia, que sea de tipo `integer`. Para eso vamos a usar la función `col` de PySpark

In [12]:
from pyspark.sql.functions import col

transformed_df = df.withColumn("year", col("year").cast("Integer"))
print(df.schema["year"].dataType)
print(transformed_df.schema["year"].dataType)

StringType()
IntegerType()


El nuevo DataFrame `transformed_df` tiene el mismo schema que el DataFrame original salvo por la columna `year` que ahora es un `integer`.

### Peso seco y total

Si volvemos al schema, vamos a notar que algunos fabricantes dan el `dry_weight`(peso de la moto sin fluidos como combustible, refrigerante, etc) y que otros dan el `total_weight`. En ambos casos se trata de strings. Nuestro objetivo consiste en crear dos columnas nuevas `dry_weight_kg` y `total_weight_kg` de tipo `float` en nuestro DataFrame.

In [13]:
print(transformed_df.schema["dry_weight"].dataType)
print(transformed_df.schema["total_weight"].dataType)

StringType()
StringType()


Ahora veamos el formato de estas strings

In [14]:
# Como tenemos pocos datos podemos aprovechar collect.
rows = transformed_df.collect() 
rows[0].dry_weight

'99.0 kg (218.3 pounds)'

In [15]:
rows[749].total_weight

'415.9 kg (917.0 pounds)'

In [16]:
"99.0 kg (218.3 pounds)".split(" kg")[0]

'99.0'

Vamos a aprovechar las [UDFs](https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/#pyspark-udf-withcolumn) de PySpark para transformar las columnas. Primero extraemos el valor en kg de la string y luego lo convertimos a `float`. Si es `NULL` devolvemos ese mismo valor sin modificación alguna.

In [17]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def weight_in_kg(value):
    if value:
        return float(value.split(" kg")[0])
    return None

udf_weight_in_kg = F.udf(weight_in_kg, FloatType())
for column in ["dry_weight", "total_weight"]:
    transformed_df = transformed_df.withColumn(f"{column}_kg",  udf_weight_in_kg(column))
transformed_df.printSchema()

root
 |-- bore_stroke: string (nullable = true)
 |-- clutch: string (nullable = true)
 |-- compression: string (nullable = true)
 |-- cooling: string (nullable = true)
 |-- displacement: string (nullable = true)
 |-- dry_weight: string (nullable = true)
 |-- emission: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- frame: string (nullable = true)
 |-- front_brakes: string (nullable = true)
 |-- front_suspension: string (nullable = true)
 |-- front_tire: string (nullable = true)
 |-- front_wheel_travel: string (nullable = true)
 |-- fuel_capacity: string (nullable = true)
 |-- fuel_consumption: string (nullable = true)
 |-- fuel_control: string (nullable = true)
 |-- fuel_system: string (nullable = true)
 |-- gearbox: string (nullable = true)
 |-- ground_clearance: string (nullable = true)
 |-- ignition: string (nullable = true)
 |-- lubrication: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- power: string (null

Abusemos de `collect` una vez más para explorar el resultado final.

In [18]:
rows = transformed_df.collect()

In [19]:
print(f"dry_weight = {rows[0].dry_weight} => dry_weight_kg = {rows[0].dry_weight_kg}")
print(f"total_weight = {rows[1].total_weight} => total_weight_kg = {rows[1].total_weight_kg}")
print(f"dry_weight = {rows[555].dry_weight} => dry_weight_kg = {rows[555].dry_weight_kg}")
print(f"total_weight = {rows[555].total_weight} => total_weight_kg = {rows[555].total_weight_kg}")

dry_weight = 99.0 kg (218.3 pounds) => dry_weight_kg = 99.0
total_weight = None => total_weight_kg = None
dry_weight = 247.0 kg (544.5 pounds) => dry_weight_kg = 247.0
total_weight = 252.0 kg (555.6 pounds) => total_weight_kg = 252.0


¡Nuestra transformación tiene buena pinta!

# Carga en Redshift

Vamos a usar `redshift-connector` para crear una nueva tabla `motorcycles2` en Redshift, luego vamos a cargar los datos con Spark. Instalamos `redshift-connector`

In [20]:
!pip install redshift-connector

Collecting redshift-connector
  Downloading redshift_connector-2.0.911-py3-none-any.whl (112 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.6/112.6 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting boto3<2.0.0,>=1.9.201
  Downloading boto3-1.26.152-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
Collecting lxml>=4.6.5
  Downloading lxml-4.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl (7.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.1/7.1 MB[0m [31m18.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting botocore<2.0.0,>=1.12.201
  Downloading botocore-1.29.152-py3-none-any.whl (10.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.9/10.9 MB[0m [31m31.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting scramp<1.5.0,>=1.2.0
 

In [21]:
import redshift_connector
from os import environ

connection = redshift_connector.connect(
    host=environ["REDSHIFT_CODER_HOST"],
    database=environ["REDSHIFT_CODER_DB"],
    port=int(environ["REDSHIFT_CODER_PORT"]),
    user=environ["REDSHIFT_CODER_USER"],
    password=environ["REDSHIFT_CODER_PASSWORD"])
connection.autocommit = True
cursor = connection.cursor()

table_name = "motorcycles2"

In [22]:
# Podría haber aprovechado transformed_df.schema.json() para crear la tabla.
statement = f"""
CREATE TABLE IF NOT EXISTS {environ['REDSHIFT_CODER_SCHEMA']}.{table_name} (
    make varchar not null,
    model varchar not null,
    year integer not null,
    type varchar not null,
    bore_stroke varchar,
    clutch varchar,
    compression varchar,
    cooling varchar,
    displacement varchar,
    dry_weight varchar,
    emission varchar,
    engine varchar,
    frame varchar,
    front_brakes varchar,
    front_suspension varchar,
    front_tire varchar,
    front_wheel_travel varchar,
    fuel_capacity varchar,
    fuel_consumption varchar,
    fuel_control varchar,
    fuel_system varchar,
    gearbox varchar,
    ground_clearance varchar,
    ignition varchar,
    lubrication varchar,
    power varchar,
    rear_brakes varchar,
    rear_suspension varchar,
    rear_tire varchar,
    rear_wheel_travel varchar,
    seat_height varchar,
    starter varchar,
    top_speed varchar,
    torque varchar,
    total_height varchar,
    total_length varchar,
    total_weight varchar,
    total_width varchar,
    transmission varchar,
    valves_per_cylinder varchar,
    wheelbase varchar,
    dry_weight_kg float,
    total_weight_kg float
)
"""
cursor.execute(statement)

<redshift_connector.cursor.Cursor at 0x7fb436cf2c20>

Con la tabla lista, ya podemos pasar a cargar los datos con Spark

In [30]:
r = transformed_df.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{environ['REDSHIFT_CODER_HOST']}:{environ['REDSHIFT_CODER_PORT']}/{environ['REDSHIFT_CODER_DB']}") \
    .option("dbtable", f"{environ['REDSHIFT_CODER_SCHEMA']}.{table_name}") \
    .option("user", environ['REDSHIFT_CODER_USER']) \
    .option("password", environ['REDSHIFT_CODER_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [28]:
!pip install psycopg2-binary sqlalchemy



In [32]:
from sqlalchemy import create_engine
conn = create_engine(f"postgresql://{environ['REDSHIFT_CODER_HOST']}:{environ['REDSHIFT_CODER_PORT']}/{environ['REDSHIFT_CODER_DB']}")


In [25]:
import psycopg2

conn = psycopg2.connect(
    host=environ['REDSHIFT_CODER_HOST'],
    port=environ['REDSHIFT_CODER_PORT'],
    dbname=environ['REDSHIFT_CODER_DB'],
    user=environ['REDSHIFT_CODER_USER'],
    password=environ['REDSHIFT_CODER_PASSWORD']
)

In [33]:
df_to_write = transformed_df.select("*").toPandas()
df_to_write.to_sql(f"{environ['REDSHIFT_CODER_SCHEMA']}.{table_name}", conn, index=False, if_exists='replace')

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 