In [1]:
from pyspark.sql import SparkSession,DataFrame
import os
import numpy as np
from pyspark.sql import functions as f
from pyspark.sql.types import *
import io
import time
from pyspark.sql import Row

In [2]:
local=True
# spark.rpc.message.maxSize if for write large csv file. The default value is 128, here we set it to 1024
if local:
    spark = SparkSession \
    .builder.master("local[4]") \
    .appName("SparkKafkaWriter") \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1') \
    .getOrCreate()
else: 
    spark = SparkSession \
    .builder.master("k8s://https://kubernetes.default.svc:443") \
    .appName("SparkKafkaWriter") \
    .config("spark.kubernetes.container.image", "inseefrlab/jupyter-datascience:master") \
    .config("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT']) \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.memory","8g") \
    .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE']) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1') \
    .getOrCreate()

In [3]:
spark.sparkContext.stop()

In [26]:
print(spark.sparkContext.version)

3.1.1


In [3]:
! kubectl get pods

I0929 13:17:59.603960    6347 request.go:655] Throttling request took 1.178614547s, request: GET:https://kubernetes.default/apis/acme.cert-manager.io/v1?timeout=32s
NAME                               READY   STATUS    RESTARTS   AGE
flume-test-agent-df8c5b944-vtjbx   1/1     Running   0          9d
jupyter-266220-5bf4b859f8-wfkz6    1/1     Running   0          29h
jupyter-327820-74559b896c-phsxz    1/1     Running   0          3h49m
kafka-client                       1/1     Running   0          4m46s
kafka-server-0                     1/1     Running   0          10d
kafka-server-1                     1/1     Running   0          9d
kafka-server-2                     1/1     Running   0          10d
kafka-server-zookeeper-0           1/1     Running   0          9d
rstudio-381639-65f5fbb79d-qr9fj    1/1     Running   0          3h12m
rstudio-625080-8d7b9fbfb-wdmlw     1/1     Running   0          25h


In [28]:
! kubectl get pods | grep sparkarrow | awk '{print $1}' | xargs kubectl delete pods

I0928 13:59:34.345375    1999 request.go:655] Throttling request took 1.166803404s, request: GET:https://kubernetes.default/apis/acme.cert-manager.io/v1alpha3?timeout=32s
I0928 13:59:38.077155    2034 request.go:655] Throttling request took 1.181243355s, request: GET:https://kubernetes.default/apis/acme.cert-manager.io/v1beta1?timeout=32s
pod "sparkarrowcompression-77efdb7c26eb6e0b-exec-5" deleted
pod "sparkarrowcompression-77efdb7c26eb6e0b-exec-6" deleted
pod "sparkarrowcompression-77efdb7c26eb6e0b-exec-7" deleted
pod "sparkarrowcompression-77efdb7c26eb6e0b-exec-8" deleted


In [4]:
# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-server.user-pengfei.svc.cluster.local:9092") \
  .option("subscribe", "test-topic") \
  .option("includeHeaders", "true") \
  .load()

In [6]:
df.writeStream \
.outputMode("append") \
.option("checkpointLocation","./kafka-sample") \
.format("memory").queryName("message") \
.trigger(processingTime='10 seconds') \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7fd4c79ce4d0>

In [7]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)



In [11]:
spark.sql("select count(*) from message limit 10").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [15]:
df2 = df.selectExpr("CAST(key AS STRING)", 
             "CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType")
df2.show(truncate=False)

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka