# Integra√ß√£o

* Verificar `parse.ipynb` antes para instru√ß√µes

* Une datasets pelo crit√©rio de dist√¢ncia m√≠nima:
    1. Para cada ponto de tempetura, obt√©m a sua data e encontra todos os pontos de $CO_2$ nessa mesma data;
    2. Calcula a dist√¢ncia entre o ponto de temperatura e todos os outros pontos de $CO_2$ anteriormente selecionados;
    3. Escolhe aquele de m√≠nima dist√¢ncia como o ponto equivalente entre os *datasets*.

<br />

* C√°lculo da dist√¢ncia:
    * ‚úîÔ∏è **Abordagem 1:** plana ‚Äî $d(P_1, P_2) = \sqrt{(x_1-x_2)^2 + (y_1-y_2)^2}$
    * ‚ùì **Abordagem 2:** esf√©rica ‚Äî Faz sentido? √â necess√°ria? Como fazer?

<br />

* Sobre a implementa√ß√£o:
    * N√£o encontrei uma forma direta e elegante para fazer a integra√ß√£o apenas atrav√©s da API do PySpark, ent√£o optei por apelar ao SQL üôè;
    * A t√≠tulo de explora√ß√£o, foi feita uma integra√ß√£o utilizando `CROSS JOIN` e aplicando √† risca o m√©todo;
        * Tal alternativa √© invi√°vel, haja vista que geraria registros numa ordem de grandeza de $10^{6 + 8}$.
    * A t√≠tulo de usababilidade, foi feita a integra√ß√£o utilizando `LEFT JOIN` e `ROUND`, que possui o exato mesmo *result set* da alternativa precedente.

# Bibliotecas e Configura√ß√µes

In [1]:
from time import time

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(
        "spark.jars.packages",
        "io.xskipper:xskipper-core_2.12:1.3.0"
).getOrCreate()

In [3]:
from xskipper import Xskipper

In [4]:
metadata_path = "./tmp/metadata"

config = dict([
    ("io.xskipper.parquet.mdlocation", metadata_path),
    ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")
])

Xskipper.setConf(spark, config)

# Leitura

In [5]:
tpr_parsed_path = './data/tpr_data_parsed.csv'
co2_parsed_path = './data/co2_data_parsed.csv'

tpr_reader = spark.read.options(header='True').format("csv")
co2_reader = spark.read.options(header='True').format("csv")

tpr_data = tpr_reader.load(tpr_parsed_path)
co2_data = co2_reader.load(co2_parsed_path)

In [6]:
tpr_xskipper = Xskipper(spark, tpr_parsed_path)

if tpr_xskipper.isIndexed(): tpr_xskipper.dropIndex()

tpr_xskipper.indexBuilder()                     \
            .addValueListIndex("t_date")      \
            .addValueListIndex("t_longitude") \
            .addValueListIndex("t_latitude")  \
            .build(tpr_reader)                  \
            .show(10, False)

+-------+-----------------+-------------------+
|status |new_entries_added|old_entries_removed|
+-------+-----------------+-------------------+
|SUCCESS|1                |0                  |
+-------+-----------------+-------------------+



In [7]:
co2_xskipper = Xskipper(spark, co2_parsed_path)

if co2_xskipper.isIndexed(): co2_xskipper.dropIndex()

co2_xskipper.indexBuilder()                     \
            .addValueListIndex("c_date")      \
            .addValueListIndex("c_longitude") \
            .addValueListIndex("c_latitude")  \
            .build(co2_reader)                  \
            .show(10, False)

+-------+-----------------+-------------------+
|status |new_entries_added|old_entries_removed|
+-------+-----------------+-------------------+
|SUCCESS|1                |0                  |
+-------+-----------------+-------------------+



# Testes de desempenho

## Query ‚ùå | Skipper ‚ùå

In [8]:
if Xskipper.isEnabled(spark): Xskipper.disable(spark)

In [9]:
tpr_data.limit(int(1e3)).createOrReplaceTempView("tpr_data")
co2_data.limit(int(1e5)).createOrReplaceTempView("co2_data")

In [10]:
# Raw query de integraliza√ß√£o

cross_data = spark.sql(
    """
    SELECT *
    FROM (
        -- Ordena√ß√£o crescente das dist√¢ncias,
        -- orientado a data, latitude e longitude
        SELECT
            *,
            ROW_NUMBER() OVER (
                PARTITION BY
                    t_date,
                    t_latitude,
                    t_longitude
                ORDER BY distance ASC
            ) AS row_num
        FROM (
            -- CROSS JOIN para c√°lculo de dist√¢ncias entre
            -- todos os pontos dos datasets em uma mesma data
            SELECT
                *,
                SQRT(POW(t_latitude - c_latitude, 2) + POW(t_longitude - c_longitude, 2)) AS distance
            FROM tpr_data AS T
            CROSS JOIN co2_data AS C
            ON t_date = c_date
        )
    )
    WHERE row_num = 1; -- Escolha dos registros com menor dist√¢ncia
    """
)

In [11]:
t_start = time()
cross_data.show(10)
t_stop = time()
print('Tempo decorrido:', t_stop-t_start, 's')


+----------+-------------+-----------------+------+---------+----------+-----------+--------------+---------------+----------+----------+-----------+-----------------+-------------------+-------+
|    t_date|t_temperature|t_temperature_unc|t_city|t_country|t_latitude|t_longitude|t_latitude_rnd|t_longitude_rnd|    c_date|c_latitude|c_longitude|            c_co2|           distance|row_num|
+----------+-------------+-----------------+------+---------+----------+-----------+--------------+---------------+----------+----------+-----------+-----------------+-------------------+-------+
|1850-01-01|       -5.265|             1.82| √Örhus|  Denmark|     57.05|      10.33|            57|             10|1850-01-01|        57|         10|285.5076904296875|0.33376638536557235|      1|
|1850-02-01|        1.859|            1.641| √Örhus|  Denmark|     57.05|      10.33|            57|             10|1850-02-01|        57|         10|286.4536437988281|0.33376638536557235|      1|
+----------+------

## Query ‚úîÔ∏è | Skipper ‚ùå

In [12]:
if Xskipper.isEnabled(spark): Xskipper.disable(spark)

In [13]:
tpr_data.limit(int(1e3)).createOrReplaceTempView("tpr_data")
co2_data.limit(int(1e5)).createOrReplaceTempView("co2_data")

In [14]:
# Raw query de integraliza√ß√£o

cross_data = spark.sql(
    """
    SELECT *
    FROM tpr_data
    LEFT JOIN co2_data
    ON
        t_date = c_date AND
        t_latitude_rnd = c_latitude AND
        t_longitude_rnd = c_longitude;
    """
)

In [15]:
t_start = time()
cross_data.show(10)
t_stop = time()
print('Tempo decorrido:', t_stop-t_start, 's')


+----------+--------------------+------------------+------+---------+----------+-----------+--------------+---------------+----------+----------+-----------+-----------------+
|    t_date|       t_temperature| t_temperature_unc|t_city|t_country|t_latitude|t_longitude|t_latitude_rnd|t_longitude_rnd|    c_date|c_latitude|c_longitude|            c_co2|
+----------+--------------------+------------------+------+---------+----------+-----------+--------------+---------------+----------+----------+-----------+-----------------+
|1850-01-01|              -5.265|              1.82| √Örhus|  Denmark|     57.05|      10.33|            57|             10|1850-01-01|        57|         10|285.5076904296875|
|1850-02-01|               1.859|             1.641| √Örhus|  Denmark|     57.05|      10.33|            57|             10|1850-02-01|        57|         10|286.4536437988281|
|1850-03-01|0.031999999999999806|             3.167| √Örhus|  Denmark|     57.05|      10.33|            57|          

## Query ‚úîÔ∏è | Skipper ‚úîÔ∏è

In [16]:
if not Xskipper.isEnabled(spark): Xskipper.enable(spark)

In [17]:
tpr_data.limit(int(1e3)).createOrReplaceTempView("tpr_data")
co2_data.limit(int(1e5)).createOrReplaceTempView("co2_data")

In [18]:
# Raw query de integraliza√ß√£o

cross_data = spark.sql(
    """
    SELECT *
    FROM tpr_data
    LEFT JOIN co2_data
    ON
        t_date = c_date AND
        t_latitude_rnd = c_latitude AND
        t_longitude_rnd = c_longitude;
    """
)

In [19]:
t_start = time()
cross_data.show(10)
t_stop = time()
print('Tempo decorrido:', t_stop-t_start, 's')

+----------+--------------------+------------------+------+---------+----------+-----------+--------------+---------------+----------+----------+-----------+-----------------+
|    t_date|       t_temperature| t_temperature_unc|t_city|t_country|t_latitude|t_longitude|t_latitude_rnd|t_longitude_rnd|    c_date|c_latitude|c_longitude|            c_co2|
+----------+--------------------+------------------+------+---------+----------+-----------+--------------+---------------+----------+----------+-----------+-----------------+
|1850-01-01|              -5.265|              1.82| √Örhus|  Denmark|     57.05|      10.33|            57|             10|1850-01-01|        57|         10|285.5076904296875|
|1850-02-01|               1.859|             1.641| √Örhus|  Denmark|     57.05|      10.33|            57|             10|1850-02-01|        57|         10|286.4536437988281|
|1850-03-01|0.031999999999999806|             3.167| √Örhus|  Denmark|     57.05|      10.33|            57|          

# Simula√ß√£o

In [20]:
times = []

for i in [1, 10, 100, 1000]:
    if not Xskipper.isEnabled(spark): Xskipper.enable(spark)

    tpr_data.limit(i).createOrReplaceTempView("tpr_data")
    co2_data.createOrReplaceTempView("co2_data")

    cross_data = spark.sql(
        """
        SELECT
            t_date AS date,
            t_temperature AS temperature,
            t_temperature_unc AS temperature_unc,
            t_city AS city,
            t_country AS country,
            t_latitude AS latitude,
            t_longitude AS longitude,
            c_co2 AS co2
        FROM tpr_data
        LEFT JOIN co2_data
        ON
            t_date = c_date AND
            t_latitude_rnd = c_latitude AND
            t_longitude_rnd = c_longitude;
        """
    )

    t_start = time()
    cross_data.show(20)
    t_stop = time()
    times.append(t_stop - t_start)

    print(f'Simula√ß√£o com {i} registros finalizada.')
    print(times)
    print()

+----------+-----------+---------------+-----+-------+--------+---------+-----------------+
|      date|temperature|temperature_unc| city|country|latitude|longitude|              co2|
+----------+-----------+---------------+-----+-------+--------+---------+-----------------+
|1850-01-01|     -5.265|           1.82|√Örhus|Denmark|   57.05|    10.33|285.5076904296875|
+----------+-----------+---------------+-----+-------+--------+---------+-----------------+

Simula√ß√£o com 1 registros finalizada.
[159.47370672225952]

+----------+--------------------+------------------+-----+-------+--------+---------+------------------+
|      date|         temperature|   temperature_unc| city|country|latitude|longitude|               co2|
+----------+--------------------+------------------+-----+-------+--------+---------+------------------+
|1850-09-01|              11.837|             0.988|√Örhus|Denmark|   57.05|    10.33| 279.6823425292969|
|1850-02-01|               1.859|             1.641|√Ör