In [0]:
##########################################################################################################
# VERSION  	DESARROLLADOR             FECHA        DESCRIPCION
# -------------------------------------------------------------
#  1        Walter Albites Azarte     15/06/2020   Curso Basico de Databricks - 3 ETL Pysark
##########################################################################################################

# I Importar Librerias

## 1.1 ETL con PySpark

In [0]:
##########################################################################################################
# Importar Librerias Necesarias pyspark
##########################################################################################################
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [0]:
%fs 
ls dbfs:/FileStore/

path,name,size
dbfs:/FileStore/RDV/,RDV/,0
dbfs:/FileStore/UDV/,UDV/,0
dbfs:/FileStore/import-stage/,import-stage/,0
dbfs:/FileStore/tables/,tables/,0


In [0]:
##########################################################################################################
# Crear el directorio RDV
# Mover el archivo csv cargado al directorio RDV
##########################################################################################################
#dbutils.fs.mkdirs("dbfs:/FileStore/RDV")
dbutils.fs.cp("dbfs:/FileStore/tables/ubigeo.csv", "dbfs:/FileStore/RDV/ubigeo.csv")

In [0]:
##########################################################################################################
# Comandos magicos para crear editar mover directorios y archivos
##########################################################################################################
#dbutils.fs.put("dbfs:/FileStore/RDV/leeme.txt", "Primer Curso de Databricks")
#dbutils.fs.head("dbfs:/FileStore/RDV/leeme.txt")
#dbutils.fs.rm("dbfs:/FileStore/RDV/leeme.txt")
#%fs
#put "/FileStore/RDV/leeme.txt" "Primer Curso de Databricks"
#rm /FileStore/RDV/leeme.txt
#ls /FileStore/RDV/
#head /FileStore/RDV/leeme.txt

In [0]:
#%fs
#ls /FileStore/UDV/ubigeo/

-sandbox
### Our ETL Pipeline

pipeline ETL que acabara de construir. En el resto de este curso, trabajará con versiones más complejas de este patrón general.

| Code | Stage |
|------|-------|
| `logDF = (spark`                                                                          | Extract |
| &nbsp;&nbsp;&nbsp;&nbsp;`.read`                                                           | Extract |
| &nbsp;&nbsp;&nbsp;&nbsp;`.option("header", True)`                                         | Extract |
| &nbsp;&nbsp;&nbsp;&nbsp;`.csv(<source>)`                                                  | Extract |
| `)`                                                                                       | Extract |
| `serverErrorDF = (logDF`                                                                  | Transform |
| &nbsp;&nbsp;&nbsp;&nbsp;`.filter((col("code") >= 500) & (col("code") < 600))`             | Transform |
| &nbsp;&nbsp;&nbsp;&nbsp;`.select("date", "time", "extention", "code")`                    | Transform |
| `)`                                                                                       | Transform |
| `(serverErrorDF.write`                                                                 | Load |
| &nbsp;&nbsp;&nbsp;&nbsp;`.parquet(<destination>))`                                      | Load |

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Este es un trabajo distribuido, por lo que puede escalar fácilmente para adaptarse a las demandas de su conjunto de datos.

In [0]:
##########################################################################################################
# 1 Forma Leer en un Dataframe de Pyspark un CSV
##########################################################################################################
path = "/FileStore/RDV/ubigeo.csv"

df_ubigeo = (spark
  .read
  .options(header=True,encoding="utf-8")
  .csv(path)
  #.sample(withReplacement=False, fraction=0.3, seed=3) 
  # El parámetro fraction representa la fracción aproximada del conjunto de datos que se devolverá. Por ejemplo, si lo establece en 0.3 , se devolverá el 30% (3/10) de las filas
  # seed semilla indica un valor para indentificar la muestra aleatoria, para la siguiente lectura salgan los mismos valores 
)

display(df_ubigeo.limit(5))

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD
60401,CHOTA,CHOTA,CAJAMARCA,-6.555281874,-78.64146956
10302,CHISQUILLA,BONGARA,AMAZONAS,-5.89320537,-77.78280858
120807,SANTA BARBARA DE CARHUACAYAN,YAULI,JUNÍN,-11.21381302,-76.42904358
140304,JAYANCA,LAMBAYEQUE,LAMBAYEQUE,-6.394345534,-79.81451536
200601,SULLANA,SULLANA,PIURA,-4.898786419,-80.63618267


In [0]:
df_ubigeo.count()

In [0]:
##########################################################################################################
# 2 Forma Leer en un Dataframe de Pyspark un CSV, no lo lee bien falta un cero a proposito para aplicar una transformacion
##########################################################################################################
df_ubigeo = spark.read.csv("/FileStore/RDV/ubigeo.csv", inferSchema=True, header=True, encoding="utf-8")
display(df_ubigeo.limit(5))

UBIGEO,DISTRITO,PROVINCIA,DEPARTAMENTO,LATITUD,LONGITUD
60401,CHOTA,CHOTA,CAJAMARCA,-6.555281874,-78.64146956
10302,CHISQUILLA,BONGARA,AMAZONAS,-5.89320537,-77.78280858
120807,SANTA BARBARA DE CARHUACAYAN,YAULI,JUNÍN,-11.21381302,-76.42904358
140304,JAYANCA,LAMBAYEQUE,LAMBAYEQUE,-6.394345534,-79.81451536
200601,SULLANA,SULLANA,PIURA,-4.898786419,-80.63618267


In [0]:
##########################################################################################################
# UDF (User Defined Functions) son las funciones de usuario, y son sistemas para definir nuevos métodos SQL que operan sobre las columnas de un DataFrame.
# Crear un UDF función definida por el usuario para completar el cero adelante de una columna
##########################################################################################################
udf_completar_ubigeo = udf(lambda x: str(x).zfill(6), T.StringType())

In [0]:
##########################################################################################################
# Aplicar la transformacion de la columna aplicando UDF
##########################################################################################################
df_ubigeo = df_ubigeo.select(udf_completar_ubigeo(F.col("UBIGEO")).alias("UBIGEO"), F.col("DEPARTAMENTO").alias("DEP"), F.col("PROVINCIA").alias("PRO"), F.col("DISTRITO").alias("DIS"), F.col("LATITUD").alias("LAT"), F.col("LONGITUD").alias("LON"))

In [0]:
display(df_ubigeo.limit(5))

UBIGEO,DEP,PRO,DIS,LAT,LON
60401,CAJAMARCA,CHOTA,CHOTA,-6.555281874,-78.64146956
10302,AMAZONAS,BONGARA,CHISQUILLA,-5.89320537,-77.78280858
120807,JUNÍN,YAULI,SANTA BARBARA DE CARHUACAYAN,-11.21381302,-76.42904358
140304,LAMBAYEQUE,LAMBAYEQUE,JAYANCA,-6.394345534,-79.81451536
200601,PIURA,SULLANA,SULLANA,-4.898786419,-80.63618267


In [0]:
##########################################################################################################
# Crear el directorio UDV
# El objetivo es copiar la data que se necesita a UDV
##########################################################################################################
dbutils.fs.mkdirs("dbfs:/FileStore/UDV")

In [0]:
##########################################################################################################
# El objetivo es copiar solo los Ubigeos de Cajamarca en  las nuevas columnas
##########################################################################################################
from pyspark.sql.functions import col

df_ubigeo_lima = (df_ubigeo
  .filter(col("DEP") == "CAJAMARCA")
  .select(udf_completar_ubigeo("UBIGEO").alias("UBIGEO") , "DEP", "PRO", "DIS","LAT","LON")
)

display(df_ubigeo_lima.limit(5))

UBIGEO,DEP,PRO,DIS,LAT,LON
60401,CAJAMARCA,CHOTA,CHOTA,-6.555281874,-78.64146956
61303,CAJAMARCA,SANTA CRUZ,CATACHE,-6.735890508,-79.04859161
61307,CAJAMARCA,SANTA CRUZ,PULAN,-6.745548677,-78.92685928
60504,CAJAMARCA,CONTUMAZA,GUZMANGO,-7.367974043,-78.91690445
60404,CAJAMARCA,CHOTA,CHIGUIRIP,-6.409399128,-78.69964905


In [0]:
##########################################################################################################
# Escribir en UDV la nueva data 
##########################################################################################################
#df_ubigeo_lima.write.parquet("/FileStore/UDV/ubigeo", mode='overwrite', compression="snappy")

targetPath = "/FileStore/UDV/ubigeo"

(df_ubigeo_lima
  .write
  .mode("overwrite") # overwrites a file if it already exists
  .parquet(targetPath)
)


In [0]:
##########################################################################################################
# Crear la base de datos databricks
##########################################################################################################

In [0]:
%sql
DROP DATABASE IF EXISTS databricks;
CREATE DATABASE IF NOT EXISTS databricks;

In [0]:
##########################################################################################################
# Crear la tabla ubigeo en la base de datos databricks
##########################################################################################################

In [0]:
%sql
DROP TABLE IF EXISTS databricks.ubigeo;
CREATE TABLE IF NOT EXISTS databricks.ubigeo
(
UBIGEO STRING,
DEP STRING,
PRO STRING,
DIS STRING,
LAT DOUBLE,
LON DOUBLE
)
USING PARQUET
OPTIONS ('compression'='snappy')
LOCATION 'dbfs:/FileStore/UDV/ubigeo';

In [0]:
##########################################################################################################
# Leer las tablas con SQL
##########################################################################################################

In [0]:
%sql
REFRESH TABLE databricks.ubigeo;
SELECT * FROM databricks.ubigeo limit 5

UBIGEO,DEP,PRO,DIS,LAT,LON
60401,CAJAMARCA,CHOTA,CHOTA,-6.555281874,-78.64146956
61303,CAJAMARCA,SANTA CRUZ,CATACHE,-6.735890508,-79.04859161
61307,CAJAMARCA,SANTA CRUZ,PULAN,-6.745548677,-78.92685928
60504,CAJAMARCA,CONTUMAZA,GUZMANGO,-7.367974043,-78.91690445
60404,CAJAMARCA,CHOTA,CHIGUIRIP,-6.409399128,-78.69964905


## 1.2 ETL con Pandas

In [0]:
import pandas as pd

In [0]:
df_ubigeo_udv_pandas=df_ubigeo.toPandas()


In [0]:
df_ubigeo_udv_pandas["UBIGEO"]=df_ubigeo_udv_pandas.apply(lambda x: str(x["UBIGEO"]).zfill(6),axis=1)

In [0]:
df_ubigeo_udv_pandas.sort_values(['UBIGEO'], inplace=True)

In [0]:
df_ubigeo_udv_pandas.head()

Unnamed: 0,UBIGEO,DEP,PRO,DIS,LAT,LON
489,10101,AMAZONAS,CHACHAPOYAS,CHACHAPOYAS,-6.251183,-77.881751
1083,10102,AMAZONAS,CHACHAPOYAS,ASUNCION,-6.03189,-77.71228
1123,10103,AMAZONAS,CHACHAPOYAS,BALSAS,-6.84023,-78.023975
982,10104,AMAZONAS,CHACHAPOYAS,CHETO,-6.255578,-77.700378
535,10105,AMAZONAS,CHACHAPOYAS,CHILIQUIN,-6.07347,-77.778773


In [0]:
%sql
DROP TABLE IF EXISTS databricks.ubigeo;
CREATE TABLE IF NOT EXISTS databricks.ubigeo
(
UBIGEO STRING,
DEP STRING,
PRO STRING,
DIS STRING,
LAT DOUBLE,
LON DOUBLE
)
USING PARQUET
OPTIONS ('compression'='snappy')
LOCATION 'dbfs:/FileStore/UDV/ubigeo';

In [0]:
%sql
REFRESH TABLE databricks.ubigeo;
SELECT * FROM databricks.ubigeo limit 5

UBIGEO,DEP,PRO,DIS,LAT,LON
60401,CAJAMARCA,CHOTA,CHOTA,-6.555281874,-78.64146956
61303,CAJAMARCA,SANTA CRUZ,CATACHE,-6.735890508,-79.04859161
61307,CAJAMARCA,SANTA CRUZ,PULAN,-6.745548677,-78.92685928
60504,CAJAMARCA,CONTUMAZA,GUZMANGO,-7.367974043,-78.91690445
60404,CAJAMARCA,CHOTA,CHIGUIRIP,-6.409399128,-78.69964905


In [0]:
df_ubigeo_udv_tabla=spark.createDataFrame(df_ubigeo_udv_pandas)

In [0]:
df_ubigeo_udv_tabla.write.parquet("/FileStore/UDV/ubigeo", mode='overwrite', compression="snappy")

In [0]:
%sql
REFRESH TABLE databricks.ubigeo;
SELECT * FROM databricks.ubigeo limit 5

UBIGEO,DEP,PRO,DIS,LAT,LON
60401,CAJAMARCA,CHOTA,CHOTA,-6.555281874,-78.64146956
61303,CAJAMARCA,SANTA CRUZ,CATACHE,-6.735890508,-79.04859161
61307,CAJAMARCA,SANTA CRUZ,PULAN,-6.745548677,-78.92685928
60504,CAJAMARCA,CONTUMAZA,GUZMANGO,-7.367974043,-78.91690445
60404,CAJAMARCA,CHOTA,CHIGUIRIP,-6.409399128,-78.69964905
