# Import

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import json
import logging
import pickle
import time
import datetime

In [3]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("consumer")


config = json.load(open("config.json"))
print(json.dumps(config, indent=2))

{
  "global": {
    "kafka_bootstrap_servers": "kafka:9092",
    "kafka_topic": "test-structured-streaming",
    "kafka_consumer_group": "ss_job",
    "max_records_per_batch": 20
  }
}


# Spark Packages

In [4]:
kafka_packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{"2.12"}:{"3.3.0"}',
    'org.apache.kafka:kafka-clients:2.8.0',
    "org.apache.hadoop:hadoop-aws:3.3.0",
    "com.google.guava:guava:21.0",
    "org.apache.httpcomponents:httpcore:4.4.8",
    "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1"
]

In [5]:
conf = SparkConf().setAppName("StreamJob")
conf = (conf.setMaster('spark://spark:7077')
        .set('spark.jars.packages', ','.join(kafka_packages))
       )
sc = SparkContext(conf=conf)
logger.info(f"Spark Driver memory: {sc._conf.get('spark.driver.memory')}")
logger.info(f"Spark Executor memory: {sc._conf.get('spark.executor.memory')}")
logger.info(
    f'Loaded jars:\n{json.dumps((sc._jsc.sc().listJars().toList().toString().replace("List(", "").replace(")", "").split(", ")), indent=2)}')
sc.setLogLevel("ERROR")
spark = SparkSession(sc)

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.google.guava#guava added as a dependency
org.apache.httpcomponents#httpcore added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9347076a-3a1d-4725-9093-cdb8e3174dfc;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#un

22/12/16 21:55:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
INFO:consumer:Spark Driver memory: None
INFO:consumer:Spark Executor memory: None
INFO:consumer:Loaded jars:
[
  "spark://0a1193a2c030:43947/jars/org.apache.hadoop_hadoop-aws-3.3.0.jar",
  "spark://0a1193a2c030:43947/jars/org.xerial.snappy_snappy-java-1.1.8.4.jar",
  "spark://0a1193a2c030:43947/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.3.0.jar",
  "spark://0a1193a2c030:43947/jars/com.google.code.findbugs_jsr305-3.0.0.jar",
  "spark://0a1193a2c030:43947/jars/org.wildfly.openssl_wildfly-openssl-1.0.7.Final.jar",
  "spark://0a1193a2c030:43947/jars/org.mongodb_mongodb-driver-sync-4.0.5.jar",
  "spark://0a1193a2c030:43947/jars/org.apache.kafka_kafka-clients-2.8.1.jar",
  "spark://0a1193a2c030:43947/jars/org.mongodb_mongodb-driver-core-4.0.5.jar",
  "spark://0a1193a2c030:43947/jars/com.google.guava_guava-21.0.jar",
  "spark://0a1193a2c030:43947/

# Test Kafka topic and connection

In [6]:
from confluent_kafka.admin import AdminClient

def test_kafka_connection(broker_conf:dict) -> None:
    """
    Function to test kafka connection
    :param broker_conf: Broker configuration
    :returns: None
    """
    client = AdminClient(broker_conf)
    topics = client.list_topics().topics
    if not topics:
        raise RuntimeError()
    print("Kafka Connection successful!")


broker_conf = {
    'bootstrap.servers': config["global"]["kafka_bootstrap_servers"]
}

# Test kafka connection
test_kafka_connection(broker_conf)


Kafka Connection successful!


# Configure Spark-Kafka consumer options and Subscribe to Kafka Topic

In [7]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, LongType, ArrayType,FloatType,DoubleType
schema = StructType().add("VendorID", StringType(), True)\
                    .add("total_amount",StringType(),True)\
                    .add("trip_distance",StringType(),True)

In [8]:
schema

StructType([StructField('VendorID', StringType(), True), StructField('total_amount', StringType(), True), StructField('trip_distance', StringType(), True)])

In [9]:
spark.sparkContext.setLogLevel("ERROR")

# Configure spark kafka client options
spark_kafka_options = {
    "kafka.bootstrap.servers": config["global"]["kafka_bootstrap_servers"],
    "subscribe": config["global"]["kafka_topic"],
    "kafka.group.id": config["global"]["kafka_consumer_group"],
    "maxOffsetsPerTrigger": 10,
    "startingOffsets": "earliest",
}

# Enable spark read stream
df = spark.readStream.format("kafka").options(**spark_kafka_options).load()

In [10]:
def process_batch(batch_df, batch_idx):
    print(f"{batch_idx} | {batch_df.count()}")
    batch_df = batch_df.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data"))
    # batch_df = batch_df.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data"))
    batch_df = batch_df.selectExpr("data.VendorID as VendorID","CAST(data.total_amount AS FLOAT) as total_amount","CAST(data.trip_distance AS FLOAT) as trip_distance")    
    # "CAST(data.birth AS DATE) as birth"
    mongoURL = "mongodb://mongo:27017/demo.transactions"
    batch_df.write.format("mongo").mode("append").option("uri",mongoURL).save()
    
    # batch_df.write.mode('append').format("console")
    batch_df.collect()
    batch_df.show()
    return batch_df

# Structred streaming query
# query = df.writeStream.foreachBatch(process_batch).start()
query = df.writeStream.trigger(processingTime='10 seconds').foreachBatch(process_batch).start()

INFO:py4j.java_gateway:Callback Server Starting
INFO:py4j.java_gateway:Socket listening on ('127.0.0.1', 33707)


In [11]:
import pymongo
from pymongo import MongoClient
client = MongoClient()
client = MongoClient("mongodb://mongo:27017/")

In [12]:
for db in client.list_databases():
    print(db)

{'name': 'admin', 'sizeOnDisk': 65536.0, 'empty': False}
{'name': 'app', 'sizeOnDisk': 1019904.0, 'empty': False}
{'name': 'auth', 'sizeOnDisk': 2322432.0, 'empty': False}
{'name': 'config', 'sizeOnDisk': 176128.0, 'empty': False}
{'name': 'demo', 'sizeOnDisk': 73728.0, 'empty': False}
{'name': 'events', 'sizeOnDisk': 61440.0, 'empty': False}
{'name': 'hosting', 'sizeOnDisk': 28672.0, 'empty': False}
{'name': 'local', 'sizeOnDisk': 182562816.0, 'empty': False}
{'name': 'log', 'sizeOnDisk': 24150016.0, 'empty': False}
{'name': 'metadata', 'sizeOnDisk': 323584.0, 'empty': False}
{'name': 'test', 'sizeOnDisk': 32768.0, 'empty': False}
{'name': 'transactions', 'sizeOnDisk': 65536.0, 'empty': False}


In [13]:
collection = client.demo["transactions"]
for doc in collection.find({}):
    print(doc)

{'_id': ObjectId('639ccd0517a7ac237ada612a'), 'VendorID': '2', 'total_amount': 17.049999237060547, 'trip_distance': 1.590000033378601}
{'_id': ObjectId('639ccd0517a7ac237ada612b'), 'VendorID': '1', 'total_amount': 17.799999237060547, 'trip_distance': 3.299999952316284}
{'_id': ObjectId('639ccd0517a7ac237ada612c'), 'VendorID': '1', 'total_amount': 10.800000190734863, 'trip_distance': 1.7999999523162842}
{'_id': ObjectId('639ccd0517a7ac237ada612d'), 'VendorID': '1', 'total_amount': 4.800000190734863, 'trip_distance': 0.5}
{'_id': ObjectId('639ccd0517a7ac237ada612e'), 'VendorID': '1', 'total_amount': 16.299999237060547, 'trip_distance': 3.0}
{'_id': ObjectId('639ccd0717a7ac237ada612f'), 'VendorID': '1', 'total_amount': 40.33000183105469, 'trip_distance': 9.0}
{'_id': ObjectId('639ccd0717a7ac237ada6130'), 'VendorID': '1', 'total_amount': 15.300000190734863, 'trip_distance': 2.200000047683716}
{'_id': ObjectId('639ccd0717a7ac237ada6131'), 'VendorID': '1', 'total_amount': 9.960000038146973, 

In [14]:
db = client["demo"]
col = db["transactions"]
col.count_documents({})

37

In [15]:
client.close()

INFO:py4j.clientserver:Python Server ready to receive messages
INFO:py4j.clientserver:Received command c on object id p0
                                                                                

0 | 10


INFO:py4j.clientserver:Received command c on object id p0                       


+--------+------------+-------------+
|VendorID|total_amount|trip_distance|
+--------+------------+-------------+
|       2|       17.05|         1.59|
|       1|        17.8|          3.3|
|       1|        10.8|          1.8|
|       1|         4.8|          0.5|
|       1|        16.3|          3.0|
|       1|       40.33|          9.0|
|       1|        15.3|          2.2|
|       1|        9.96|          0.8|
|       1|       58.13|         18.2|
|       1|        9.35|          0.9|
+--------+------------+-------------+

1 | 9
+--------+------------+-------------+
|VendorID|total_amount|trip_distance|
+--------+------------+-------------+
|       1|        9.96|          0.9|
|       1|         9.8|          1.1|
|       1|         4.3|          0.3|
|       1|        23.3|          3.1|
|       1|         7.3|          1.1|
|       2|       22.68|         2.38|
|       2|        14.3|         2.83|
|       2|       41.21|         8.33|
|       2|        13.3|         2.37|
+----