In [107]:
import time
import requests
import datetime as dt
from multiprocessing import Process

import findspark
findspark.init()
from pathlib import Path
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, LongType
from pyspark.sql.functions import col, regexp_replace, length, desc, row_number
from pyspark.sql import SparkSession, Window

In [2]:
conf = SparkConf().setAppName("spark-streaming-micro-batch")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
def shape(df):
    return (df.count(), len(df.columns))

In [78]:
! hadoop fs -mkdir -p /user/root/onibus/sink2

In [79]:
! hadoop fs -mkdir -p /user/root/onibus/source2

In [80]:
! hadoop fs -mkdir -p /user/root/onibus/checkpoint2

In [81]:
! hadoop fs -ls /user/root/onibus/

Found 6 items
drwxr-xr-x   - root supergroup          0 2023-08-25 00:06 /user/root/onibus/checkpoint
drwxr-xr-x   - root supergroup          0 2023-08-25 00:47 /user/root/onibus/checkpoint2
drwxr-xr-x   - root supergroup          0 2023-08-25 00:27 /user/root/onibus/sink
drwxr-xr-x   - root supergroup          0 2023-08-25 00:46 /user/root/onibus/sink2
drwxr-xr-x   - root supergroup          0 2023-08-25 00:23 /user/root/onibus/source
drwxr-xr-x   - root supergroup          0 2023-08-25 00:47 /user/root/onibus/source2


In [85]:
BASE_URL = "https://dados.mobilidade.rio/gps/sppo?" \
           "dataInicial={dt_inicial}+{hora_inicial}&dataFinal={dt_final}+{hora_final}"

In [86]:
def call_bus_api(interval=60):
    offset = dt.timedelta(hours=3)
    dtf = dt.datetime.now() - offset
    dti = dtf - dt.timedelta(seconds=interval)
    dt_inicial = dti.strftime("%Y-%m-%d")
    dt_final = dtf.strftime("%Y-%m-%d")
    hora_inicial = dti.strftime("%H:%M:%S")
    hora_final = dtf.strftime("%H:%M:%S")
    ret = requests.get(
        BASE_URL.format(
            dt_inicial=dt_inicial, dt_final=dt_final, hora_inicial=hora_inicial, hora_final=hora_final
        )
    )
    rdd = spark.sparkContext.parallelize([ret.text])
    df = spark.read.json(rdd)
    df.write.json(f"/user/root/onibus/source2/{dtf.strftime('%Y%m%d_%H%M%S')}")

In [108]:
window = Window.partitionBy(
    col("ordem")
).orderBy(
    desc("velocidade")
)

In [134]:
def handle_stream(bdf, ctrl):
    offset = dt.timedelta(hours=3)
    dtt = dt.datetime.now() - offset
    # just data conversion
    df = bdf.withColumn(
        "latitude", regexp_replace(col("latitude"), ",", ".").cast(FloatType())
    ).withColumn(
        "longitude", regexp_replace(col("longitude"), ",", ".").cast(FloatType())
    ).withColumn(
        "datahora", col("datahora").cast(LongType())
    ).withColumn(
        "velocidade", col("velocidade").cast(IntegerType())
    ).withColumn(
        "datahoraenvio", col("datahoraenvio").cast(LongType())
    ).withColumn(
        "datahoraservidor", col("datahoraservidor").cast(LongType())
    )
    # start transformation
    # ordem => carroceria (janela)
    # linhas regulares => tamanho três (filtro)
    df.filter(
        length(df.linha) == 3
    ).withColumn(
        "idx", row_number().over(window)
    ).filter(
        col("idx").isin(1)  # top1 velocidade de cada carroceria nessa janela de tempo
    ).select(
        col("ordem"), col("linha"), col("velocidade")
    ).write.csv(f"/user/root/onibus/sink2/{dtt.strftime('%Y%m%d_%H%M%S')}")

In [89]:
schema = StructType([
    StructField("ordem", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("datahora", StringType(), True),
    StructField("velocidade", StringType(), True),
    StructField("linha", StringType(), True),
    StructField("datahoraenvio", StringType(), True),
    StructField("datahoraservidor", StringType(), True),
])

In [135]:
def loop(max_iter=6):
    idx = 1
    while True:
        print(f"{idx} - fetching API")
        call_bus_api()
        if idx == max_iter:
            break
        time.sleep(60)
        idx += 1

In [111]:
sdf = spark.readStream.schema(schema).json("/user/root/onibus/source2/*/*")

In [112]:
stream_handle = sdf.writeStream                                                  \
                   .outputMode("append")                                         \
                   .foreachBatch(handle_stream)                                  \
                   .trigger(processingTime="3 minutes")                          \
                   .option("checkpointLocation", "/user/root/onibus/checkpoint2") \
                   .start()

In [139]:
stream_handle.isActive

True

In [138]:
stream_handle.status

{'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}

#### Batch control

- **batchID** `0` => `perdemos - antes da interrupção - 6_090`
- **batchID** `1` => `7_474`
- **batchID** `2` => `25_059`
- **batchID** `3` => `17_384`
- **batchID** `4` => previsto: `15_482` / real: `15_482`
- **batchID** `5` => `0`
- **total** `49_917 / 56_007`

In [149]:
stream_handle.lastProgress

{'id': 'aaa8d33d-2584-4d1f-a3b7-e3c707e0552c',
 'runId': '33371245-cde2-4d37-9e90-9f308cd8792e',
 'name': None,
 'timestamp': '2023-08-25T01:12:00.001Z',
 'batchId': 5,
 'numInputRows': 16072,
 'inputRowsPerSecond': 89.28888888888889,
 'processedRowsPerSecond': 7566.854990583804,
 'durationMs': {'addBatch': 1179,
  'getBatch': 52,
  'getOffset': 756,
  'queryPlanning': 10,
  'triggerExecution': 2124,
  'walCommit': 72},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[hdfs://node-master:9000/user/root/onibus/source2/*/*]',
   'startOffset': {'logOffset': 4},
   'endOffset': {'logOffset': 5},
   'numInputRows': 16072,
   'inputRowsPerSecond': 89.28888888888889,
   'processedRowsPerSecond': 7566.854990583804}],
 'sink': {'description': 'ForeachBatchSink'}}

In [120]:
stream_handle.recentProgress

[{'id': 'aaa8d33d-2584-4d1f-a3b7-e3c707e0552c',
  'runId': '33371245-cde2-4d37-9e90-9f308cd8792e',
  'name': None,
  'timestamp': '2023-08-25T00:53:31.092Z',
  'batchId': 0,
  'numInputRows': 5846,
  'processedRowsPerSecond': 2391.002044989775,
  'durationMs': {'addBatch': 2111,
   'getBatch': 42,
   'queryPlanning': 13,
   'triggerExecution': 2445},
  'stateOperators': [],
  'sources': [{'description': 'FileStreamSource[hdfs://node-master:9000/user/root/onibus/source2/*/*]',
    'startOffset': None,
    'endOffset': {'logOffset': 0},
    'numInputRows': 5846,
    'processedRowsPerSecond': 2391.002044989775}],
  'sink': {'description': 'ForeachBatchSink'}},
 {'id': 'aaa8d33d-2584-4d1f-a3b7-e3c707e0552c',
  'runId': '33371245-cde2-4d37-9e90-9f308cd8792e',
  'name': None,
  'timestamp': '2023-08-25T00:54:00.001Z',
  'batchId': 1,
  'numInputRows': 16660,
  'inputRowsPerSecond': 576.2911204123284,
  'processedRowsPerSecond': 3360.90377244301,
  'durationMs': {'addBatch': 3569,
   'getBatc

In [97]:
# stream_handle.stop()

> interrupt cell without stopping kernel: `ESC + I (2x)`<br/>
> by Pedro Nora & ChatGPT

In [136]:
p = Process(target=loop)
p.start()

1 - fetching API
2 - fetching API
3 - fetching API
4 - fetching API
5 - fetching API
6 - fetching API


In [117]:
! hadoop fs -ls /user/root/onibus/source2/

Found 4 items
drwxr-xr-x   - root supergroup          0 2023-08-25 00:50 /user/root/onibus/source2/20230824_214956
drwxr-xr-x   - root supergroup          0 2023-08-25 00:51 /user/root/onibus/source2/20230824_215108
drwxr-xr-x   - root supergroup          0 2023-08-25 00:52 /user/root/onibus/source2/20230824_215211
drwxr-xr-x   - root supergroup          0 2023-08-25 00:53 /user/root/onibus/source2/20230824_215316


In [118]:
! hadoop fs -ls /user/root/onibus/sink2/

Found 2 items
drwxr-xr-x   - root supergroup          0 2023-08-25 00:53 /user/root/onibus/sink2/20230824_215331
drwxr-xr-x   - root supergroup          0 2023-08-25 00:54 /user/root/onibus/sink2/20230824_215401


In [141]:
dir_source, dir_sink = "/user/root/onibus/source2/*/*", "/user/root/onibus/sink2/*/*"

In [142]:
schema_sink = StructType([
    StructField("ordem", StringType(), True),
    StructField("linha", StringType(), True),
    StructField("velocidade", IntegerType(), True),
])

In [143]:
df_sink = spark.read.schema(schema_sink).csv(dir_sink)

In [144]:
shape(df_sink)

(4550, 3)

In [145]:
df_sink.show()

+-----+-----+----------+
|ordem|linha|velocidade|
+-----+-----+----------+
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
| null| null|      null|
+-----+-----+----------+
only showing top 20 rows



In [146]:
df_source = spark.read.schema(schema).json(dir_source)

In [147]:
shape(df_source)

(71685, 8)