In [1]:
# Need some packages to talk to Kafka.
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'

# from ast import literal_eval

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
# create spark
spark = SparkSession.builder.appName("Streaming").getOrCreate()

In [4]:
# read stream from kafka
df = spark\
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "UberTopic") \
      .option("startingOffsets", "earliest") \
      .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # convert key-value from binary to string type
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
from pyspark.sql.functions import col, to_timestamp

# Parsing the messeage value into dataframe
df_uber = df.select(col("value").cast("string")).alias("csv").select("csv.*")
df_uber2 = df_uber.selectExpr("split(value,',')[0] as dt",
                               "split(value,',')[1] as lat",
                               "split(value,',')[2] as lon",
                               "split(value,',')[3] as base")
df_uber2.printSchema()

root
 |-- dt: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- base: string (nullable = true)



In [6]:
# convert column type
df_uber3 = df_uber2.withColumn("dt",to_timestamp("dt").cast("timestamp")) \
                    .withColumn("lat", col("lat").cast("double")) \
                    .withColumn("lon", col("lon").cast("double")) 
df_uber3.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- base: string (nullable = true)



In [14]:
# Show df
# df_uber3.writeStream.queryName("Uber").format("memory").outputMode("append").start()

<pyspark.sql.streaming.StreamingQuery at 0x1e3b8b4e0d0>

In [17]:
# top5 = spark.sql("select * from Uber")
# top5.show(5)

+-------------------+-------+--------+------+
|                 dt|    lat|     lon|  base|
+-------------------+-------+--------+------+
|2014-08-01 00:00:00| 40.729|-73.9422|B02598|
|2014-08-01 00:00:00|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00| 40.751|-73.9869|B02598|
|2014-08-01 00:00:00|40.7406|-73.9902|B02598|
+-------------------+-------+--------+------+
only showing top 5 rows



In [7]:
from pyspark.ml.feature import VectorAssembler
# Denfine features vector to use for kmeans algorithm
featureCols = ['lat', 'lon']
assembler = VectorAssembler(inputCols=featureCols, outputCol='features')

df_uber4 = assembler.transform(df_uber3)

In [8]:
# load model
from pyspark.ml.clustering import KMeansModel
model = KMeansModel.load("./model/uber_location")

In [9]:
# make prediction
df_predicted = model.transform(df_uber4)

In [12]:
# stream = df_predicted.writeStream.queryName("uber").format("memory").outputMode("append").start()
# display_trained = spark.sql("select * from uber")
# display_trained.show(5)

+-------------------+-------+--------+------+------------------+---+
|                 dt|    lat|     lon|  base|          features|cid|
+-------------------+-------+--------+------+------------------+---+
|2014-08-01 00:00:00| 40.729|-73.9422|B02598| [40.729,-73.9422]|  2|
|2014-08-01 00:00:00|40.7476|-73.9871|B02598|[40.7476,-73.9871]|  0|
|2014-08-01 00:00:00|40.7424|-74.0044|B02598|[40.7424,-74.0044]|  0|
|2014-08-01 00:00:00| 40.751|-73.9869|B02598| [40.751,-73.9869]| 13|
|2014-08-01 00:00:00|40.7406|-73.9902|B02598|[40.7406,-73.9902]|  0|
+-------------------+-------+--------+------+------------------+---+
only showing top 5 rows



In [10]:
from pyspark.sql.functions import split, concat_ws, concat

# add id column = cid + lat + lon
split_lon = split(df_predicted.lon, "\.").getItem(1)
split_lat = split(df_predicted.lat, "\.").getItem(1)
id = concat(split_lat,split_lon) # nối chuỗi
df_uber_id = df_predicted.withColumn("id", concat_ws("_",col("cid"),id)) # add column "id"

In [11]:
# drop feature column
df_uber_locates = df_uber_id.drop(df_uber_id.features)

In [12]:
# write stream dataframe to hdfs
warehouse = df_uber_locates.writeStream.format("csv") \
                                        .option("path", "hdfs://localhost:9000/Uber_Warehouse/data_warehouse") \
                                        .option("checkpointLocation", "hdfs://localhost:9000/Uber_Warehouse/checkpoints") \
                                        .outputMode("append") \
                                        .start()

In [None]:
#  producer = KafkaProducer(bootstrap_servers='mykafka-broker')
#     with open('/home/antonis/repos/testfile.csv') as file:
#         reader = csv.DictReader(file, delimiter=";")
#         for row in reader:
#             producer.send(topic='stable_topic', value=row)
#             producer.flush()