In [1]:
import time
import requests
import datetime as dt
from multiprocessing import Process

import findspark
findspark.init()
from pathlib import Path
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, LongType
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql import SparkSession

In [2]:
conf = SparkConf().setAppName("spark-streaming-micro-batch")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [26]:
def shape(df):
    return (df.count(), len(df.columns))

In [3]:
! hadoop fs -mkdir -p /user/root/onibus/batch

23/06/15 00:27:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
! hadoop fs -mkdir -p /user/root/onibus/stream

23/06/15 00:28:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
! hadoop fs -ls /user/root/onibus/

23/06/15 00:28:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwxr-xr-x   - root supergroup          0 2023-06-15 00:27 /user/root/onibus/batch
drwxr-xr-x   - root supergroup          0 2023-06-15 00:28 /user/root/onibus/stream


In [6]:
BASE_URL = "https://dados.mobilidade.rio/gps/sppo?" \
           "dataInicial={dt_inicial}+{hora_inicial}&dataFinal={dt_final}+{hora_final}"

In [7]:
def call_bus_api(interval=60):
    offset = dt.timedelta(hours=3)
    dtf = dt.datetime.now() - offset
    dti = dtf - dt.timedelta(seconds=interval)
    dt_inicial = dti.strftime("%Y-%m-%d")
    dt_final = dtf.strftime("%Y-%m-%d")
    hora_inicial = dti.strftime("%H:%M:%S")
    hora_final = dtf.strftime("%H:%M:%S")
    ret = requests.get(
        BASE_URL.format(
            dt_inicial=dt_inicial, dt_final=dt_final, hora_inicial=hora_inicial, hora_final=hora_final
        )
    )
    rdd = spark.sparkContext.parallelize([ret.text])
    df = spark.read.json(rdd)
    df.write.json(f"/user/root/onibus/stream/{dtf.strftime('%Y%m%d_%H%M%S')}")

In [8]:
def handle_stream(bdf, ctrl):
    offset = dt.timedelta(hours=3)
    dtt = dt.datetime.now() - offset
    df = bdf.withColumn(
        "latitude", regexp_replace(col("latitude"), ",", ".").cast(FloatType())
    ).withColumn(
        "longitude", regexp_replace(col("longitude"), ",", ".").cast(FloatType())
    ).withColumn(
        "datahora", col("datahora").cast(LongType())
    ).withColumn(
        "velocidade", col("velocidade").cast(IntegerType())
    ).withColumn(
        "datahoraenvio", col("datahoraenvio").cast(LongType())
    ).withColumn(
        "datahoraservidor", col("datahoraservidor").cast(LongType())
    )
    df.write.csv(f"/user/root/onibus/batch/{dtt.strftime('%Y%m%d_%H%M%S')}")

In [9]:
schema = StructType([
    StructField("ordem", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("datahora", StringType(), True),
    StructField("velocidade", StringType(), True),
    StructField("linha", StringType(), True),
    StructField("datahoraenvio", StringType(), True),
    StructField("datahoraservidor", StringType(), True),
])

In [10]:
def loop(max_iter=15):
    idx = 1
    while True:
        print(f"{idx} - fetching API")
        call_bus_api()
        if idx == max_iter:
            break
        time.sleep(60)
        idx += 1

In [11]:
sdf = spark.readStream.schema(schema).json("/user/root/onibus/stream/*/*")

In [12]:
stream_handle = sdf.writeStream                         \
                   .outputMode("append")                \
                   .foreachBatch(handle_stream)         \
                   .trigger(processingTime="3 minutes") \
                   .start()

In [13]:
stream_handle.isActive

True

In [14]:
stream_handle.status

{'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [33]:
stream_handle.lastProgress

{'id': '5de5b0d9-1ffa-4ebe-8624-e7a1e48e2686',
 'runId': '18dd276b-3f10-4f46-bd3e-61522ad32651',
 'name': None,
 'timestamp': '2023-06-15T00:57:00.009Z',
 'batchId': 5,
 'numInputRows': 19306,
 'inputRowsPerSecond': 107.25019304590327,
 'processedRowsPerSecond': 23805.17879161529,
 'durationMs': {'addBatch': 273,
  'getBatch': 17,
  'getOffset': 480,
  'queryPlanning': 4,
  'triggerExecution': 811,
  'walCommit': 22},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[hdfs://node-master:9000/user/root/onibus/stream/*/*]',
   'startOffset': {'logOffset': 4},
   'endOffset': {'logOffset': 5},
   'numInputRows': 19306,
   'inputRowsPerSecond': 107.25019304590327,
   'processedRowsPerSecond': 23805.17879161529}],
 'sink': {'description': 'ForeachBatchSink'}}

In [15]:
# stream_handle.stop()

In [16]:
p = Process(target=loop)
p.start()

1 - fetching API
2 - fetching API
3 - fetching API
4 - fetching API
5 - fetching API
6 - fetching API
7 - fetching API
8 - fetching API
9 - fetching API
10 - fetching API
11 - fetching API
12 - fetching API
13 - fetching API
14 - fetching API
15 - fetching API


In [23]:
dir_batch, dir_stream = "/user/root/onibus/batch/*/*", "/user/root/onibus/stream/*/*"

In [34]:
dfc = spark.read.schema(schema).json(dir_stream)

In [35]:
dfc.show()

+------+---------+---------+-------------+----------+-------+-------------+----------------+
| ordem| latitude|longitude|     datahora|velocidade|  linha|datahoraenvio|datahoraservidor|
+------+---------+---------+-------------+----------+-------+-------------+----------------+
|B28569|-22,90281|-43,18435|1686790497000|        50|    326|1686790507000|   1686790542000|
|B28626|-22,80857|-43,20777|1686790490000|        36|    324|1686790507000|   1686790512000|
|B28540|-22,81547|-43,18855|1686790500000|         0|    324|1686790507000|   1686790512000|
|B28569|-22,90281|-43,18435|1686790497000|        50|    326|1686790507000|   1686790512000|
|B28538|-22,83392|-43,24198|1686790494000|        42|    326|1686790507000|   1686790512000|
|B58128|-22,83722|-43,28496|1686790492000|         0|GARAGEM|1686790507000|   1686790533000|
|B58096|-22,83729|-43,28544|1686790496000|         0|GARAGEM|1686790507000|   1686790533000|
|B58044|-22,92608|-43,24574|1686790486000|        25|    622|168679050

In [36]:
shape(dfc)

(94758, 8)

In [37]:
dfc2 = spark.read.csv(dir_batch)

In [38]:
dfc2.show()

+------+---------+---------+-------------+---+-------+-------------+-------------+
|   _c0|      _c1|      _c2|          _c3|_c4|    _c5|          _c6|          _c7|
+------+---------+---------+-------------+---+-------+-------------+-------------+
|B28569|-22.90281|-43.18435|1686790497000| 50|    326|1686790507000|1686790542000|
|B28626|-22.80857|-43.20777|1686790490000| 36|    324|1686790507000|1686790512000|
|B28540|-22.81547|-43.18855|1686790500000|  0|    324|1686790507000|1686790512000|
|B28569|-22.90281|-43.18435|1686790497000| 50|    326|1686790507000|1686790512000|
|B28538|-22.83392|-43.24198|1686790494000| 42|    326|1686790507000|1686790512000|
|B58128|-22.83722|-43.28496|1686790492000|  0|GARAGEM|1686790507000|1686790533000|
|B58096|-22.83729|-43.28544|1686790496000|  0|GARAGEM|1686790507000|1686790533000|
|B58044|-22.92608|-43.24574|1686790486000| 25|    622|1686790507000|1686790533000|
|B58075|-22.83717|-43.28489|1686790495000|  0|GARAGEM|1686790507000|1686790533000|
|B58

In [39]:
shape(dfc2)

(94758, 8)

In [40]:
dfc2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

