<a href="https://colab.research.google.com/github/mseclen/AutomationWithAnsible/blob/master/pyspark_demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q pyspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"
os.environ["GOOGLE_CLOUD_PROJECT"] = "data-intelligence-prepro"

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
project_id = 'data-intelligence-prepro'
!gcloud config set project {project_id}

Updated property [core/project].


In [None]:
from google.cloud import storage
from pyspark.sql import SparkSession

In [None]:
# para la demo usar master local. para un ambiente dataproc usar
spark = SparkSession \
  .builder \
  .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.25.2") \
  .master('local') \
  .appName('spark-bigquery-demo') \
  .getOrCreate()

In [None]:
temp_bucket = "sky_lakehouse_temps_dev"
spark.conf.set('temporaryGcsBucket', temp_bucket)

payments = spark.read.format('bigquery') \
  .option('table', 'data-intelligence-350116:ODS_RADIXX.tb_enh_payment') \
  .option("filter", " ID_DATE_BQ='2022-07-05' ") \
  .load()

# Nueva sección

In [None]:
payments.createOrReplaceTempView('payments')

In [None]:
payments.show()

+----------+-------+--------+--------------------+----------+--------+--------------+----------+----------------+-------------+----------------------+-----------+--------------------+--------------+-----------------------+------------+--------------------+--------------+------------------------+-------------------+--------------------------+--------------+----------------+--------+--------------+----------------+-------------+----------+-------------+----------------+---------+---------------+-----------+--------------------+------+-------------+--------------------+-----------------+-----------+-----------+--------------------+-------------------+-----------------------+-----------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+--------------------+--------------------+------------------+---+-----------+--------+
|ID_DATE_BQ|VIGENTE| ID_DATE|              ID_KEY|PAYMENT_ID|PAYER_ID|RESERVATION_ID|SERIES_NUM|CONFIRMATION_NUM|D

In [None]:
payments_count = spark.sql(
    'SELECT COUNT(DISTINCT(PAYMENT_ID)) FROM payments WHERE TRAN_STATUS_ID=1'
)

In [None]:
payments_count.show()

+--------------------------+
|count(DISTINCT PAYMENT_ID)|
+--------------------------+
|                     23100|
+--------------------------+



In [None]:
payments_map = spark.read.format('bigquery') \
  .option('table', 'data-intelligence-350116:ODS_RADIXX.tb_enh_payment_map') \
  .option("filter", "  ID_DATE_BQ='2022-07-05' ") \
  .load()

In [None]:
payments_map.createOrReplaceTempView('payments_map')

In [None]:
charge = spark.read.format('bigquery') \
  .option('table', 'data-intelligence-350116:ODS_RADIXX.tb_enh_charge') \
  .option("filter", "  ID_DATE_BQ='2022-07-05' ") \
  .load()

charge.createOrReplaceTempView('charge')

In [None]:
segment = spark.read.format('bigquery') \
  .option('table', 'data-intelligence-350116:ODS_RADIXX.tb_enh_segment') \
  .option("filter", "  ID_DATE_BQ='2022-07-05' ") \
  .load()

segment.createOrReplaceTempView('segment')

In [None]:
cuadratura_sql_1 = spark.sql(
    """
    SELECT
    ROW_NUMBER() OVER (ORDER BY pag.ID_DATE_BQ) AS llave_registro,
    CONCAT(TO_DATE(CAST(UNIX_TIMESTAMP(pag.date_paid, 'dd-MMM-yyyy HH:mm:ss') AS TIMESTAMP)),'_',pag.confirmation_num,'_',pag.series_num,'_',pag.payment_id) AS llave_1,
    CONCAT(TO_DATE(CAST(UNIX_TIMESTAMP(pag.date_paid, 'dd-MMM-yyyy HH:mm:ss') AS TIMESTAMP)),'_',pag.confirmation_num,'_',pag.series_num) AS llave_2,
    TO_DATE(CAST(UNIX_TIMESTAMP(pag.date_paid, 'dd-MMM-yyyy HH:mm:ss') AS TIMESTAMP)) AS fecha_pago,
    pag.confirmation_num AS reserva_radixx,
    pag.series_num AS series_num,
    pag.payment_id AS id_pago,
    pag.payment_method_code AS codigo_fop,
    pag.curr_paid_code AS moneda_pag_real,
    pag.curr_paid_amount AS monto_pag_real,
    pag.res_curr_code AS moneda_pag_res,
    pag.res_amount AS monto_pag_res,
    pag.rpt_curr_code AS moneda_pag_rpt,
    pag.rpt_curr_amount AS monto_pag_rpt,
    pag.payer_id AS id_pagador
    FROM payments AS pag
    WHERE
    ID_DATE_BQ>='2022-07-05' AND
    ID_DATE_BQ<='2022-07-05' AND
    pag.payment_method_code NOT IN ('TKNE','TCKT') AND
    pag.tran_status_id=1 AND
    pag.curr_paid_amount>0
    """
)
cuadratura_result_1 = cuadratura_sql_1.show()
cuadratura_result_1 = cuadratura_sql_1.createOrReplaceTempView('cuadratura_result_1')

+--------------+--------------------+--------------------+----------+--------------+----------+--------+----------+---------------+----------------+--------------+-------------+--------------+----------------+----------+
|llave_registro|             llave_1|             llave_2|fecha_pago|reserva_radixx|series_num| id_pago|codigo_fop|moneda_pag_real|  monto_pag_real|moneda_pag_res|monto_pag_res|moneda_pag_rpt|   monto_pag_rpt|id_pagador|
+--------------+--------------------+--------------------+----------+--------------+----------+--------+----------+---------------+----------------+--------------+-------------+--------------+----------------+----------+
|             1|2022-07-05_XOOWDS...|2022-07-05_XOOWDS...|2022-07-05|        XOOWDS|       299|31904064|      CASH|            USD|    75.240000000|           USD|        75.24|           CLP| 70199.000000000|  62940250|
|             2|2022-07-05_RBPDO3...|2022-07-05_RBPDO3...|2022-07-05|        RBPDO3|       299|31897057|      VCHR| 

In [None]:
cuadratura_sql_2 = spark.sql(
    """
    SELECT 
    ROW_NUMBER() OVER (ORDER BY neteo.llave_1) AS llave_registro,
    neteo.*
    FROM (
      SELECT
      t01.llave_1,
      t01.llave_2,
      map.res_curr_code AS moneda_map_res,
          --SUM(map.res_curr_amount) AS monto_map_res,
          (map.res_curr_amount) AS monto_map_res,
      map.rpt_curr_code AS moneda_map_rpt,
          --SUM(map.rpt_curr_amount) AS monto_map_rpt,
          (map.rpt_curr_amount) AS monto_map_rpt,
      car.code_type AS codigo_cargo,
      COALESCE(car.tax_code,'') As codigo_impuesto,
          --MAX(car.charge_id) AS id_cargo,
          (car.charge_id) AS id_cargo,
          --MAX(car.segment_id) AS id_segmento,
          (car.segment_id) AS id_segmento,
      seg.passenger_id AS id_pasajero,
      seg.from_airport AS origen
      FROM cuadratura_result_1 t01
          INNER JOIN payments_map AS map ON map.payment_id=t01.id_pago AND map.vigente='S'
          INNER JOIN charge AS car ON car.charge_id=map.charge_id AND car.vigente='S'
          INNER JOIN segment seg ON seg.segment_id=car.segment_id
      WHERE 
        t01.fecha_pago>='2022-07-05' AND
        t01.fecha_pago<='2022-07-05'
    ) AS neteo;
    """
)
cuadratura_result_2 = cuadratura_sql_2.show()
cuadratura_result_2 = cuadratura_sql_2.createOrReplaceTempView('cuadratura_result_2')

+--------------+--------------------+--------------------+--------------+---------------+--------------+----------------+------------+---------------+---------+-----------+-----------+------+
|llave_registro|             llave_1|             llave_2|moneda_map_res|  monto_map_res|moneda_map_rpt|   monto_map_rpt|codigo_cargo|codigo_impuesto| id_cargo|id_segmento|id_pasajero|origen|
+--------------+--------------------+--------------------+--------------+---------------+--------------+----------------+------------+---------------+---------+-----------+-----------+------+
|             1|2022-07-05_0041MT...|2022-07-05_0041MT...|           CLP|20990.000000000|           CLP| 20990.000000000|         AIR|               |467209373| 0041MT2991|   62892232|   ZCO|
|             2|2022-07-05_0041MT...|2022-07-05_0041MT...|           CLP| 6374.000000000|           CLP|  6374.000000000|         TAX|             CL|467209374| 0041MT2991|   62892232|   ZCO|
|             3|2022-07-05_0041MT...|202

In [None]:
cuadratura_result_2 = cuadratura_sql_1.show()